1 module upromised.tokenizer;
2 import upromised.promise : DelegatePromiseIterator, Promise, PromiseIterator;
3 import upromised.stream : Stream;
4 import upromised : fatal;
5 
6 private ptrdiff_t countUntilPartial(const(ubyte)[] input, const(ubyte)[] search) nothrow {
7     import std.algorithm : startsWith, min;
8 
9     foreach(pos; 0..input.length) {
10         if (search.startsWith(input[pos..$.min(pos + search.length)])) return pos;
11     }
12     return -1;
13 }
14 
15 class Tokenizer(T) {
16 private:
17     alias Underlying = PromiseIterator!(T[]);
18     Underlying underlying;
19     Underlying read_;
20     bool underlyingEof;
21     T[] separator_;
22     T[] buffer;
23     size_t limit_;
24     bool partialReceive_;
25 
26 public:
27     this(Underlying underlying) nothrow {
28         this.underlying = underlying;
29     }
30 
31     void separator(immutable(void)[] separator = null) nothrow {
32         separator_ = cast(immutable(T)[])separator;
33     }
34     void limit(size_t limit = 0) nothrow {
35         limit_ = limit;
36     }
37     void partialReceive(bool partialReceive = false) nothrow {
38         partialReceive_ = partialReceive;
39     }
40 
41     PromiseIterator!(T[]) read() nothrow {
42         if (read_ is null) {
43             read_ = new class PromiseIterator!(T[]) {
44                 override Promise!ItValue next(Promise!bool) {
45                     return readOne()
46                     .then((chunk) => chunk ? ItValue(false, chunk) : ItValue(true));
47                 }
48             };
49         }
50         return read_;
51     }
52 
53 protected:
54     Promise!(T[]) readOne() nothrow {
55         ptrdiff_t posClosed = -1;
56         if (separator_.length > 0) {
57             posClosed = buffer.countUntilPartial(separator_);
58         }
59 
60         if (posClosed >= 0) {
61             auto posOpen = posClosed + separator_.length;
62             // Found separator
63             if (posOpen <= buffer.length) {
64                 auto output = buffer[0..posOpen];
65                 buffer = buffer[posOpen..$];
66                 return Promise!(T[]).resolved(output);
67             }
68             // Found part of the separator on end of buffer
69             if (posOpen > buffer.length && partialReceive_ && posClosed > 0) {
70                 auto output = buffer[0..posClosed];
71                 buffer = buffer[posClosed..$];
72                 return Promise!(T[]).resolved(output);
73             }
74         } else if (limit_ > 0 && buffer.length >= limit_) {
75             auto output = buffer[0..limit_];
76             buffer = buffer[limit_..$];
77             return Promise!(T[]).resolved(output);
78         } else if (partialReceive_ && buffer.length > 0) {
79             auto output = buffer;
80             buffer = null;
81             return Promise!(T[]).resolved(output);
82         }
83 
84         if (underlyingEof) {
85             auto output = buffer;
86             buffer = null;
87             return Promise!(T[]).resolved(output);
88         }
89 
90         return underlying.each((data) {
91             buffer ~= data;
92             return false;
93         }).then((eof) {
94             underlyingEof = eof;
95         }).then(() => readOne());
96     }
97 }
98 unittest {
99     auto a = new DelegatePromiseIterator!(const(ubyte)[]);
100     auto b = new Tokenizer!(const(ubyte))(a);
101     bool called = false;
102     bool eof = false;
103     b.read().each((data) {
104         assert(!called);
105         assert(data == "Hello world");
106         called = true;
107     }).then((_) {
108         assert(called);
109         assert(!eof);
110         eof = true;
111     }).nothrow_();
112     a.resolve(cast(const(ubyte)[])"Hello world").nothrow_();
113     a.resolve();
114     assert(eof);
115 }
116 unittest {
117     auto a = new DelegatePromiseIterator!(const(ubyte)[]);
118     auto b = new Tokenizer!(const(ubyte))(a);
119     b.separator("\r\n");
120     b.limit();
121     int call = 0;
122     b.read().each((data) {
123         switch(call++) {
124         case 0:
125             assert(data == "\r\n");
126             break;
127         case 1:
128             assert(data == "Hello\r\n");
129             break;
130         case 2:
131             assert(data == "World");
132             break;
133         default: assert(false);
134         }
135     }).then((_) {
136         assert(call++ == 3);
137     }).nothrow_();
138     a.resolve(cast(const(ubyte)[])"\r\nHello\r\nWorld").nothrow_();
139     assert(call == 2);
140     a.resolve();
141     assert(call == 4);
142 }
143 unittest {
144     auto a = new DelegatePromiseIterator!(const(ubyte)[]);
145     auto b = new Tokenizer!(const(ubyte))(a);
146     b.separator();
147     b.limit(3);
148     int call = 0;
149     b.read().each((data) {
150         switch(call++) {
151         case 0:
152             assert(data == "abc");
153             break;
154         case 1:
155             assert(data == "def");
156             break;
157         case 2:
158             assert(data == "gh");
159             break;
160         default: assert(false);
161         }
162     }).then((_) {
163         assert(call++ == 3);
164     }).nothrow_();
165     a.resolve(cast(const(ubyte)[])"ab").nothrow_();
166     assert(call == 0);
167     a.resolve(cast(const(ubyte)[])"cdef").nothrow_();
168     assert(call == 2);
169     a.resolve(cast(const(ubyte)[])"gh").nothrow_();
170     assert(call == 2);
171     a.resolve();
172     assert(call == 4);
173 }
174 unittest {
175     auto a = new DelegatePromiseIterator!(const(ubyte)[]);
176     auto b = new Tokenizer!(const(ubyte))(a);
177     b.separator();
178     b.limit(3);
179     auto err = new Exception("yada");
180     int call = 0;
181     b.read().each((data) {
182         switch(call++) {
183         case 0:
184             assert(data == "abc");
185             break;
186         case 1:
187             assert(data == "def");
188             break;
189         default: assert(false);
190         }
191     }).then((_) {
192         assert(false);
193     }).except((Exception e) {
194         assert(e is err);
195         call++;
196     }).nothrow_();
197     a.resolve(cast(const(ubyte)[])"abcdefgh").nothrow_();
198     assert(call == 2);
199     a.reject(err).nothrow_();
200     assert(call == 3);
201 }
202 unittest {
203     auto a = new DelegatePromiseIterator!(const(ubyte)[]);
204     auto b = new Tokenizer!(const(ubyte))(a);
205     b.separator("ABCD");
206     b.limit();
207     b.partialReceive(true);
208     int call = 0;
209     b.read().each((data) { 
210         switch(call++) {
211         case 0:
212             assert(data == "abc");
213             break;
214         case 1:
215             assert(data == "abcABCD");
216             break;
217         case 2:
218             assert(data == "abc");
219             break;
220         case 3:
221             assert(data == "abc");
222             break;
223         case 4:
224             assert(data == "def");
225             break;
226         case 5:
227             assert(data == "ab");
228             break;
229         case 6:
230             assert(data == "ABab");
231             break;
232         case 7:
233             assert(data == "ABCD");
234             break;
235         case 8:
236             assert(data == "ab");
237             break;
238         case 9:
239             assert(data == "AB");
240             break;
241         default: assert(false);
242         }
243     }).then((eof) {
244         assert(eof);
245         assert(call++ == 10);
246     }).nothrow_();
247     a.resolve(cast(const(ubyte)[])"abc").nothrow_();
248     assert(call == 1);
249     a.resolve(cast(const(ubyte)[])"abcABCDabc").nothrow_();
250     assert(call == 3);
251     b.separator();
252     b.limit(3);
253     a.resolve(cast(const(ubyte)[])"abcdef").nothrow_();
254     assert(call == 5);
255     b.separator("ABCD");
256     b.limit();
257     a.resolve(cast(const(ubyte)[])"abAB").nothrow_();
258     assert(call == 6);
259     a.resolve(cast(const(ubyte)[])"abAB").nothrow_();
260     assert(call == 7);
261     a.resolve(cast(const(ubyte)[])"CDabAB").nothrow_();
262     assert(call == 9);
263     a.resolve();
264     assert(call == 11);
265 }