1 module upromised.tokenizer;
2 import std.exception : enforce;
3 import upromised.promise : DelegatePromiseIterator, Promise, PromiseIterator;
4 import upromised.stream : Stream;
5 import upromised : fatal;
6 
7 private ptrdiff_t countUntilPartial(const(ubyte)[] input, const(ubyte)[] search) nothrow {
8     import std.algorithm : startsWith, min;
9 
10     foreach(pos; 0..input.length) {
11         if (search.startsWith(input[pos..$.min(pos + search.length)])) return pos;
12     }
13     return -1;
14 }
15 
16 class Tokenizer(T) {
17 private:
18     alias Underlying = PromiseIterator!(T[]);
19     Underlying underlying;
20     Underlying read_;
21     bool underlyingEof;
22     T[] separator_;
23     T[] buffer;
24     size_t limit_;
25     bool partialReceive_;
26 
27 public:
28     this(Underlying underlying) nothrow {
29         this.underlying = underlying;
30     }
31 
32     void separator(immutable(void)[] separator = null) nothrow {
33         separator_ = cast(immutable(T)[])separator;
34     }
35     void limit(size_t limit = 0) nothrow {
36         limit_ = limit;
37     }
38     void partialReceive(bool partialReceive = false) nothrow {
39         partialReceive_ = partialReceive;
40     }
41 
42     PromiseIterator!(T[]) read() nothrow {
43         if (read_ is null) {
44             read_ = new class PromiseIterator!(T[]) {
45                 override Promise!ItValue next(Promise!bool) {
46                     return readOne()
47                     .then((chunk) => chunk ? ItValue(false, chunk) : ItValue(true));
48                 }
49             };
50         }
51         return read_;
52     }
53 
54     Promise!(T[]) readOne() nothrow {
55         ptrdiff_t posClosed = -1;
56         if (separator_.length > 0) {
57             posClosed = buffer.countUntilPartial(separator_);
58         }
59 
60         if (posClosed >= 0) {
61             auto posOpen = posClosed + separator_.length;
62             // Found separator
63             if (posOpen <= buffer.length) {
64                 auto output = buffer[0..posOpen];
65                 buffer = buffer[posOpen..$];
66                 return Promise!(T[]).resolved(output);
67             }
68             // Found part of the separator on end of buffer
69             if (posOpen > buffer.length && partialReceive_ && posClosed > 0) {
70                 auto output = buffer[0..posClosed];
71                 buffer = buffer[posClosed..$];
72                 return Promise!(T[]).resolved(output);
73             }
74         } else if (limit_ > 0 && buffer.length >= limit_) {
75             auto output = buffer[0..limit_];
76             buffer = buffer[limit_..$];
77             return Promise!(T[]).resolved(output);
78         } else if (partialReceive_ && buffer.length > 0) {
79             auto output = buffer;
80             buffer = null;
81             return Promise!(T[]).resolved(output);
82         }
83 
84         if (underlyingEof) {
85             return Promise!void.resolved()
86             .then(() {
87                 enforce((!limit_ && !separator_) || partialReceive_, "EOF unexpected");
88                 auto output = buffer;
89                 buffer = null;
90                 return Promise!(T[]).resolved(output);
91             });
92         }
93 
94         return underlying.each((data) {
95             buffer ~= data;
96             return false;
97         }).then((eof) {
98             underlyingEof = eof;
99         }).then(() => readOne());
100     }
101 }
102 unittest {
103     auto a = new DelegatePromiseIterator!(const(ubyte)[]);
104     auto b = new Tokenizer!(const(ubyte))(a);
105     bool called = false;
106     bool eof = false;
107     b.read().each((data) {
108         assert(!called);
109         assert(data == "Hello world");
110         called = true;
111     }).then((_) {
112         assert(called);
113         assert(!eof);
114         eof = true;
115     }).nothrow_();
116     a.resolve(cast(const(ubyte)[])"Hello world").nothrow_();
117     a.resolve();
118     assert(eof);
119 }
120 unittest {
121     auto a = new DelegatePromiseIterator!(const(ubyte)[]);
122     auto b = new Tokenizer!(const(ubyte))(a);
123     b.separator("\r\n");
124     b.limit();
125     int call = 0;
126     b.read().each((data) {
127         switch(call++) {
128         case 0:
129             assert(data == "\r\n");
130             break;
131         case 1:
132             assert(data == "Hello\r\n");
133             break;
134         default: assert(false);
135         }
136     }).except((Exception e) {
137         assert(call++ == 2);
138         assert(e.msg == "EOF unexpected");
139     }).then(() {
140         assert(call++ == 3);
141     }).nothrow_();
142     a.resolve(cast(const(ubyte)[])"\r\nHello\r\nWorld").nothrow_();
143     assert(call == 2);
144     a.resolve();
145     assert(call == 4);
146 }
147 unittest {
148     auto a = new DelegatePromiseIterator!(const(ubyte)[]);
149     auto b = new Tokenizer!(const(ubyte))(a);
150     b.separator();
151     b.limit(3);
152     int call = 0;
153     b.read().each((data) {
154         switch(call++) {
155         case 0:
156             assert(data == "abc");
157             break;
158         case 1:
159             assert(data == "def");
160             break;
161         default: assert(false);
162         }
163     }).except((Exception e) {
164         assert(call++ == 2);
165         assert(e.msg == "EOF unexpected");
166     }).then(() {
167         assert(call++ == 3);
168     }).nothrow_();
169     a.resolve(cast(const(ubyte)[])"ab").nothrow_();
170     assert(call == 0);
171     a.resolve(cast(const(ubyte)[])"cdef").nothrow_();
172     assert(call == 2);
173     a.resolve(cast(const(ubyte)[])"gh").nothrow_();
174     assert(call == 2);
175     a.resolve();
176     assert(call == 4);
177 }
178 unittest {
179     auto a = new DelegatePromiseIterator!(const(ubyte)[]);
180     auto b = new Tokenizer!(const(ubyte))(a);
181     b.separator();
182     b.limit(3);
183     auto err = new Exception("yada");
184     int call = 0;
185     b.read().each((data) {
186         switch(call++) {
187         case 0:
188             assert(data == "abc");
189             break;
190         case 1:
191             assert(data == "def");
192             break;
193         default: assert(false);
194         }
195     }).then((_) {
196         assert(false);
197     }).except((Exception e) {
198         assert(e is err);
199         call++;
200     }).nothrow_();
201     a.resolve(cast(const(ubyte)[])"abcdefgh").nothrow_();
202     assert(call == 2);
203     a.reject(err).nothrow_();
204     assert(call == 3);
205 }
206 unittest {
207     auto a = new DelegatePromiseIterator!(const(ubyte)[]);
208     auto b = new Tokenizer!(const(ubyte))(a);
209     b.separator("ABCD");
210     b.limit();
211     b.partialReceive(true);
212     int call = 0;
213     b.read().each((data) { 
214         switch(call++) {
215         case 0:
216             assert(data == "abc");
217             break;
218         case 1:
219             assert(data == "abcABCD");
220             break;
221         case 2:
222             assert(data == "abc");
223             break;
224         case 3:
225             assert(data == "abc");
226             break;
227         case 4:
228             assert(data == "def");
229             break;
230         case 5:
231             assert(data == "ab");
232             break;
233         case 6:
234             assert(data == "ABab");
235             break;
236         case 7:
237             assert(data == "ABCD");
238             break;
239         case 8:
240             assert(data == "ab");
241             break;
242         case 9:
243             assert(data == "AB");
244             break;
245         default: assert(false);
246         }
247     }).then((eof) {
248         assert(eof);
249         assert(call++ == 10);
250     }).nothrow_();
251     a.resolve(cast(const(ubyte)[])"abc").nothrow_();
252     assert(call == 1);
253     a.resolve(cast(const(ubyte)[])"abcABCDabc").nothrow_();
254     assert(call == 3);
255     b.separator();
256     b.limit(3);
257     a.resolve(cast(const(ubyte)[])"abcdef").nothrow_();
258     assert(call == 5);
259     b.separator("ABCD");
260     b.limit();
261     a.resolve(cast(const(ubyte)[])"abAB").nothrow_();
262     assert(call == 6);
263     a.resolve(cast(const(ubyte)[])"abAB").nothrow_();
264     assert(call == 7);
265     a.resolve(cast(const(ubyte)[])"CDabAB").nothrow_();
266     assert(call == 9);
267     a.resolve();
268     assert(call == 11);
269 }