1 module upromised.manual_stream; 2 import upromised.stream : Stream; 3 import upromised.promise : DelegatePromiseIterator, Promise, PromiseIterator; 4 5 class ManualStream : Stream { 6 private: 7 DelegatePromiseIterator!(const(ubyte)[]) read_; 8 DelegatePromiseIterator!(const(ubyte)[]) write_; 9 10 public: 11 this() { 12 write_ = new DelegatePromiseIterator!(const(ubyte)[]); 13 } 14 15 override Promise!void close() nothrow { 16 return Promise!void.resolved(); 17 } 18 19 override Promise!void shutdown() nothrow { 20 write_.resolve(); 21 return Promise!void.resolved(); 22 } 23 24 override PromiseIterator!(const(ubyte)[]) read() nothrow { 25 if (read_ is null) { 26 read_ = new DelegatePromiseIterator!(const(ubyte)[]); 27 } 28 return read_; 29 } 30 31 Promise!bool writeToRead(const(ubyte)[] data) nothrow { 32 return Promise!void.resolved().then(() { 33 assert(read_ !is null); 34 return read_.resolve(data).then((cont) { 35 return cont; 36 }); 37 }); 38 } 39 40 Promise!bool writeToRead() nothrow { 41 assert(read_ !is null); 42 return read_.resolve(); 43 } 44 45 Promise!bool writeToRead(Exception e) nothrow { 46 assert(read_ !is null); 47 return read_.reject(e); 48 } 49 50 override Promise!void write(immutable(ubyte)[] r) nothrow { 51 return write_.resolve(r).then((_) {}); 52 } 53 54 PromiseIterator!(const(ubyte)[]) readFromWrite() nothrow { 55 return write_; 56 } 57 } 58 unittest { 59 auto a = new ManualStream; 60 bool called = false; 61 a.read().each((data) { 62 assert((cast(const(ubyte)[])data) == "Hello world"); 63 called = true; 64 }).nothrow_; 65 assert(!called); 66 a.writeToRead(cast(immutable(ubyte)[])"Hello world").nothrow_; 67 assert(called); 68 } 69 unittest { 70 import upromised.promise : DelegatePromise; 71 72 auto a = new ManualStream; 73 auto delayed = new DelegatePromise!bool; 74 bool called1, called2; 75 76 a.write(cast(immutable(ubyte)[])"Hello world").then(() { 77 called1 = true; 78 }).nothrow_; 79 assert(!called1); 80 a.readFromWrite.each((data) { 81 assert(!called2); 82 called2 = true; 83 assert(cast(const(ubyte)[])data == "Hello world"); 84 return delayed; 85 }).nothrow_; 86 assert(!called1); 87 assert(called2); 88 delayed.resolve(true); 89 assert(called1); 90 } 91 unittest { 92 auto a = new ManualStream; 93 foreach(i; 0..2) { 94 bool called = false; 95 a.read().each((_) { 96 assert(!called); 97 called = true; 98 return false; 99 }).nothrow_(); 100 assert(!called); 101 a.writeToRead(cast(const(ubyte)[])"sup").nothrow_; 102 assert(called); 103 } 104 } 105 unittest { 106 auto a = new ManualStream; 107 bool called = false; 108 a.read().each((_) { 109 assert(false); 110 }).then((eof) { 111 assert(eof); 112 called = true; 113 }).nothrow_(); 114 assert(!called); 115 a.writeToRead(); 116 assert(called); 117 } 118 unittest { 119 auto a = new ManualStream; 120 int calls = 0; 121 122 a.readFromWrite().each((_) { 123 assert(calls++ == 0); 124 return true; 125 }).then((eof) { 126 assert(eof); 127 assert(calls++ == 1); 128 }).nothrow_(); 129 130 assert(calls == 0); 131 a.write(cast(immutable(ubyte)[])"yada"); 132 assert(calls == 1); 133 a.shutdown(); 134 assert(calls == 2); 135 }