1 module upromised.manual_stream;
2 import upromised.stream : Stream;
3 import upromised.promise : DelegatePromiseIterator, Promise, PromiseIterator;
4 
5 class ManualStream : Stream {
6 private:
7     DelegatePromiseIterator!(const(ubyte)[]) read_;
8     DelegatePromiseIterator!(const(ubyte)[]) write_;
9 
10 public:
11     this() {
12         write_ = new DelegatePromiseIterator!(const(ubyte)[]);
13     }
14 
15     override Promise!void close() nothrow {
16         return Promise!void.resolved();
17     }
18 
19     override Promise!void shutdown() nothrow {
20         write_.resolve();
21         return Promise!void.resolved();
22     }
23 
24     override PromiseIterator!(const(ubyte)[]) read() nothrow {
25         if (read_ is null) {
26             read_ = new DelegatePromiseIterator!(const(ubyte)[]);
27         }
28         return read_;
29     }
30 
31     Promise!bool writeToRead(const(ubyte)[] data) nothrow {
32         return Promise!void.resolved().then(() {
33             assert(read_ !is null);
34             return read_.resolve(data).then((cont) {
35                 return cont;
36             });
37         });
38     }
39     
40     Promise!bool writeToRead() nothrow {
41         assert(read_ !is null);
42         return read_.resolve();
43     }
44 
45     Promise!bool writeToRead(Exception e) nothrow {
46         assert(read_ !is null);
47         return read_.reject(e);
48     }
49 
50     override Promise!void write(immutable(ubyte)[] r) nothrow {
51         return write_.resolve(r).then((_) {});
52     }
53 
54     PromiseIterator!(const(ubyte)[]) readFromWrite() nothrow {
55         return write_;
56     }
57 }
58 unittest {
59     auto a = new ManualStream;
60     bool called = false;
61     a.read().each((data) {
62         assert((cast(const(ubyte)[])data) == "Hello world");
63         called = true;
64     }).nothrow_;
65     assert(!called);
66     a.writeToRead(cast(immutable(ubyte)[])"Hello world").nothrow_;
67     assert(called);
68 }
69 unittest {
70     import upromised.promise : DelegatePromise;
71 
72     auto a = new ManualStream;
73     auto delayed = new DelegatePromise!bool;
74     bool called1, called2;
75     
76     a.write(cast(immutable(ubyte)[])"Hello world").then(() {
77         called1 = true;
78     }).nothrow_;
79     assert(!called1);
80     a.readFromWrite.each((data) {
81         assert(!called2);
82         called2 = true;
83         assert(cast(const(ubyte)[])data == "Hello world");
84         return delayed;
85     }).nothrow_;
86     assert(!called1);
87     assert(called2);
88     delayed.resolve(true);
89     assert(called1);
90 }
91 unittest {
92     auto a = new ManualStream;
93     foreach(i; 0..2) {
94         bool called = false;
95         a.read().each((_) {
96             assert(!called);
97             called = true;
98             return false;
99         }).nothrow_();
100         assert(!called);
101         a.writeToRead(cast(const(ubyte)[])"sup").nothrow_;
102         assert(called);
103     }
104 }
105 unittest {
106     auto a = new ManualStream;
107     bool called = false;
108     a.read().each((_) {
109         assert(false);
110     }).then((eof) {
111         assert(eof);
112         called = true;
113     }).nothrow_();
114     assert(!called);
115     a.writeToRead();
116     assert(called);
117 }
118 unittest {
119     auto a = new ManualStream;
120     int calls = 0;
121 
122     a.readFromWrite().each((_) {
123         assert(calls++ == 0);
124         return true;
125     }).then((eof) {
126         assert(eof);
127         assert(calls++ == 1);
128     }).nothrow_();
129 
130     assert(calls == 0);
131     a.write(cast(immutable(ubyte)[])"yada");
132     assert(calls == 1);
133     a.shutdown();
134     assert(calls == 2);
135 }