1 module upromised.tokenizer; 2 import upromised.promise : DelegatePromiseIterator, Promise, PromiseIterator; 3 import upromised.stream : Stream; 4 import upromised : fatal; 5 6 private ptrdiff_t countUntilPartial(const(ubyte)[] input, const(ubyte)[] search) nothrow { 7 import std.algorithm : startsWith, min; 8 9 foreach(pos; 0..input.length) { 10 if (search.startsWith(input[pos..$.min(pos + search.length)])) return pos; 11 } 12 return -1; 13 } 14 15 class Tokenizer(T) { 16 private: 17 alias Underlying = PromiseIterator!(T[]); 18 Underlying underlying; 19 Underlying read_; 20 bool underlyingEof; 21 T[] separator_; 22 T[] buffer; 23 size_t limit_; 24 bool partialReceive_; 25 26 public: 27 this(Underlying underlying) nothrow { 28 this.underlying = underlying; 29 } 30 31 void separator(immutable(void)[] separator = null) nothrow { 32 separator_ = cast(immutable(T)[])separator; 33 } 34 void limit(size_t limit = 0) nothrow { 35 limit_ = limit; 36 } 37 void partialReceive(bool partialReceive = false) nothrow { 38 partialReceive_ = partialReceive; 39 } 40 41 PromiseIterator!(T[]) read() nothrow { 42 if (read_ is null) { 43 read_ = new class PromiseIterator!(T[]) { 44 override Promise!ItValue next(Promise!bool) { 45 return readOne() 46 .then((chunk) => chunk ? ItValue(false, chunk) : ItValue(true)); 47 } 48 }; 49 } 50 return read_; 51 } 52 53 protected: 54 Promise!(T[]) readOne() nothrow { 55 ptrdiff_t posClosed = -1; 56 if (separator_.length > 0) { 57 posClosed = buffer.countUntilPartial(separator_); 58 } 59 60 if (posClosed >= 0) { 61 auto posOpen = posClosed + separator_.length; 62 // Found separator 63 if (posOpen <= buffer.length) { 64 auto output = buffer[0..posOpen]; 65 buffer = buffer[posOpen..$]; 66 return Promise!(T[]).resolved(output); 67 } 68 // Found part of the separator on end of buffer 69 if (posOpen > buffer.length && partialReceive_ && posClosed > 0) { 70 auto output = buffer[0..posClosed]; 71 buffer = buffer[posClosed..$]; 72 return Promise!(T[]).resolved(output); 73 } 74 } else if (limit_ > 0 && buffer.length >= limit_) { 75 auto output = buffer[0..limit_]; 76 buffer = buffer[limit_..$]; 77 return Promise!(T[]).resolved(output); 78 } else if (partialReceive_ && buffer.length > 0) { 79 auto output = buffer; 80 buffer = null; 81 return Promise!(T[]).resolved(output); 82 } 83 84 if (underlyingEof) { 85 auto output = buffer; 86 buffer = null; 87 return Promise!(T[]).resolved(output); 88 } 89 90 return underlying.each((data) { 91 buffer ~= data; 92 return false; 93 }).then((eof) { 94 underlyingEof = eof; 95 }).then(() => readOne()); 96 } 97 } 98 unittest { 99 auto a = new DelegatePromiseIterator!(const(ubyte)[]); 100 auto b = new Tokenizer!(const(ubyte))(a); 101 bool called = false; 102 bool eof = false; 103 b.read().each((data) { 104 assert(!called); 105 assert(data == "Hello world"); 106 called = true; 107 }).then((_) { 108 assert(called); 109 assert(!eof); 110 eof = true; 111 }).nothrow_(); 112 a.resolve(cast(const(ubyte)[])"Hello world").nothrow_(); 113 a.resolve(); 114 assert(eof); 115 } 116 unittest { 117 auto a = new DelegatePromiseIterator!(const(ubyte)[]); 118 auto b = new Tokenizer!(const(ubyte))(a); 119 b.separator("\r\n"); 120 b.limit(); 121 int call = 0; 122 b.read().each((data) { 123 switch(call++) { 124 case 0: 125 assert(data == "\r\n"); 126 break; 127 case 1: 128 assert(data == "Hello\r\n"); 129 break; 130 case 2: 131 assert(data == "World"); 132 break; 133 default: assert(false); 134 } 135 }).then((_) { 136 assert(call++ == 3); 137 }).nothrow_(); 138 a.resolve(cast(const(ubyte)[])"\r\nHello\r\nWorld").nothrow_(); 139 assert(call == 2); 140 a.resolve(); 141 assert(call == 4); 142 } 143 unittest { 144 auto a = new DelegatePromiseIterator!(const(ubyte)[]); 145 auto b = new Tokenizer!(const(ubyte))(a); 146 b.separator(); 147 b.limit(3); 148 int call = 0; 149 b.read().each((data) { 150 switch(call++) { 151 case 0: 152 assert(data == "abc"); 153 break; 154 case 1: 155 assert(data == "def"); 156 break; 157 case 2: 158 assert(data == "gh"); 159 break; 160 default: assert(false); 161 } 162 }).then((_) { 163 assert(call++ == 3); 164 }).nothrow_(); 165 a.resolve(cast(const(ubyte)[])"ab").nothrow_(); 166 assert(call == 0); 167 a.resolve(cast(const(ubyte)[])"cdef").nothrow_(); 168 assert(call == 2); 169 a.resolve(cast(const(ubyte)[])"gh").nothrow_(); 170 assert(call == 2); 171 a.resolve(); 172 assert(call == 4); 173 } 174 unittest { 175 auto a = new DelegatePromiseIterator!(const(ubyte)[]); 176 auto b = new Tokenizer!(const(ubyte))(a); 177 b.separator(); 178 b.limit(3); 179 auto err = new Exception("yada"); 180 int call = 0; 181 b.read().each((data) { 182 switch(call++) { 183 case 0: 184 assert(data == "abc"); 185 break; 186 case 1: 187 assert(data == "def"); 188 break; 189 default: assert(false); 190 } 191 }).then((_) { 192 assert(false); 193 }).except((Exception e) { 194 assert(e is err); 195 call++; 196 }).nothrow_(); 197 a.resolve(cast(const(ubyte)[])"abcdefgh").nothrow_(); 198 assert(call == 2); 199 a.reject(err).nothrow_(); 200 assert(call == 3); 201 } 202 unittest { 203 auto a = new DelegatePromiseIterator!(const(ubyte)[]); 204 auto b = new Tokenizer!(const(ubyte))(a); 205 b.separator("ABCD"); 206 b.limit(); 207 b.partialReceive(true); 208 int call = 0; 209 b.read().each((data) { 210 switch(call++) { 211 case 0: 212 assert(data == "abc"); 213 break; 214 case 1: 215 assert(data == "abcABCD"); 216 break; 217 case 2: 218 assert(data == "abc"); 219 break; 220 case 3: 221 assert(data == "abc"); 222 break; 223 case 4: 224 assert(data == "def"); 225 break; 226 case 5: 227 assert(data == "ab"); 228 break; 229 case 6: 230 assert(data == "ABab"); 231 break; 232 case 7: 233 assert(data == "ABCD"); 234 break; 235 case 8: 236 assert(data == "ab"); 237 break; 238 case 9: 239 assert(data == "AB"); 240 break; 241 default: assert(false); 242 } 243 }).then((eof) { 244 assert(eof); 245 assert(call++ == 10); 246 }).nothrow_(); 247 a.resolve(cast(const(ubyte)[])"abc").nothrow_(); 248 assert(call == 1); 249 a.resolve(cast(const(ubyte)[])"abcABCDabc").nothrow_(); 250 assert(call == 3); 251 b.separator(); 252 b.limit(3); 253 a.resolve(cast(const(ubyte)[])"abcdef").nothrow_(); 254 assert(call == 5); 255 b.separator("ABCD"); 256 b.limit(); 257 a.resolve(cast(const(ubyte)[])"abAB").nothrow_(); 258 assert(call == 6); 259 a.resolve(cast(const(ubyte)[])"abAB").nothrow_(); 260 assert(call == 7); 261 a.resolve(cast(const(ubyte)[])"CDabAB").nothrow_(); 262 assert(call == 9); 263 a.resolve(); 264 assert(call == 11); 265 }