1 module upromised.tokenizer; 2 import std.exception : enforce; 3 import upromised.promise : DelegatePromiseIterator, Promise, PromiseIterator; 4 import upromised.stream : Stream; 5 import upromised : fatal; 6 7 private ptrdiff_t countUntilPartial(const(ubyte)[] input, const(ubyte)[] search) nothrow { 8 import std.algorithm : startsWith, min; 9 10 foreach(pos; 0..input.length) { 11 if (search.startsWith(input[pos..$.min(pos + search.length)])) return pos; 12 } 13 return -1; 14 } 15 16 class Tokenizer(T) { 17 private: 18 alias Underlying = PromiseIterator!(T[]); 19 Underlying underlying; 20 Underlying read_; 21 bool underlyingEof; 22 T[] separator_; 23 T[] buffer; 24 size_t limit_; 25 bool partialReceive_; 26 27 public: 28 this(Underlying underlying) nothrow { 29 this.underlying = underlying; 30 } 31 32 void separator(immutable(void)[] separator = null) nothrow { 33 separator_ = cast(immutable(T)[])separator; 34 } 35 void limit(size_t limit = 0) nothrow { 36 limit_ = limit; 37 } 38 void partialReceive(bool partialReceive = false) nothrow { 39 partialReceive_ = partialReceive; 40 } 41 42 PromiseIterator!(T[]) read() nothrow { 43 if (read_ is null) { 44 read_ = new class PromiseIterator!(T[]) { 45 override Promise!ItValue next(Promise!bool) { 46 return readOne() 47 .then((chunk) => chunk ? ItValue(false, chunk) : ItValue(true)); 48 } 49 }; 50 } 51 return read_; 52 } 53 54 Promise!(T[]) readOne() nothrow { 55 ptrdiff_t posClosed = -1; 56 if (separator_.length > 0) { 57 posClosed = buffer.countUntilPartial(separator_); 58 } 59 60 if (posClosed >= 0) { 61 auto posOpen = posClosed + separator_.length; 62 // Found separator 63 if (posOpen <= buffer.length) { 64 auto output = buffer[0..posOpen]; 65 buffer = buffer[posOpen..$]; 66 return Promise!(T[]).resolved(output); 67 } 68 // Found part of the separator on end of buffer 69 if (posOpen > buffer.length && partialReceive_ && posClosed > 0) { 70 auto output = buffer[0..posClosed]; 71 buffer = buffer[posClosed..$]; 72 return Promise!(T[]).resolved(output); 73 } 74 } else if (limit_ > 0 && buffer.length >= limit_) { 75 auto output = buffer[0..limit_]; 76 buffer = buffer[limit_..$]; 77 return Promise!(T[]).resolved(output); 78 } else if (partialReceive_ && buffer.length > 0) { 79 auto output = buffer; 80 buffer = null; 81 return Promise!(T[]).resolved(output); 82 } 83 84 if (underlyingEof) { 85 return Promise!void.resolved() 86 .then(() { 87 enforce((!limit_ && !separator_) || partialReceive_, "EOF unexpected"); 88 auto output = buffer; 89 buffer = null; 90 return Promise!(T[]).resolved(output); 91 }); 92 } 93 94 return underlying.each((data) { 95 buffer ~= data; 96 return false; 97 }).then((eof) { 98 underlyingEof = eof; 99 }).then(() => readOne()); 100 } 101 } 102 unittest { 103 auto a = new DelegatePromiseIterator!(const(ubyte)[]); 104 auto b = new Tokenizer!(const(ubyte))(a); 105 bool called = false; 106 bool eof = false; 107 b.read().each((data) { 108 assert(!called); 109 assert(data == "Hello world"); 110 called = true; 111 }).then((_) { 112 assert(called); 113 assert(!eof); 114 eof = true; 115 }).nothrow_(); 116 a.resolve(cast(const(ubyte)[])"Hello world").nothrow_(); 117 a.resolve(); 118 assert(eof); 119 } 120 unittest { 121 auto a = new DelegatePromiseIterator!(const(ubyte)[]); 122 auto b = new Tokenizer!(const(ubyte))(a); 123 b.separator("\r\n"); 124 b.limit(); 125 int call = 0; 126 b.read().each((data) { 127 switch(call++) { 128 case 0: 129 assert(data == "\r\n"); 130 break; 131 case 1: 132 assert(data == "Hello\r\n"); 133 break; 134 default: assert(false); 135 } 136 }).except((Exception e) { 137 assert(call++ == 2); 138 assert(e.msg == "EOF unexpected"); 139 }).then(() { 140 assert(call++ == 3); 141 }).nothrow_(); 142 a.resolve(cast(const(ubyte)[])"\r\nHello\r\nWorld").nothrow_(); 143 assert(call == 2); 144 a.resolve(); 145 assert(call == 4); 146 } 147 unittest { 148 auto a = new DelegatePromiseIterator!(const(ubyte)[]); 149 auto b = new Tokenizer!(const(ubyte))(a); 150 b.separator(); 151 b.limit(3); 152 int call = 0; 153 b.read().each((data) { 154 switch(call++) { 155 case 0: 156 assert(data == "abc"); 157 break; 158 case 1: 159 assert(data == "def"); 160 break; 161 default: assert(false); 162 } 163 }).except((Exception e) { 164 assert(call++ == 2); 165 assert(e.msg == "EOF unexpected"); 166 }).then(() { 167 assert(call++ == 3); 168 }).nothrow_(); 169 a.resolve(cast(const(ubyte)[])"ab").nothrow_(); 170 assert(call == 0); 171 a.resolve(cast(const(ubyte)[])"cdef").nothrow_(); 172 assert(call == 2); 173 a.resolve(cast(const(ubyte)[])"gh").nothrow_(); 174 assert(call == 2); 175 a.resolve(); 176 assert(call == 4); 177 } 178 unittest { 179 auto a = new DelegatePromiseIterator!(const(ubyte)[]); 180 auto b = new Tokenizer!(const(ubyte))(a); 181 b.separator(); 182 b.limit(3); 183 auto err = new Exception("yada"); 184 int call = 0; 185 b.read().each((data) { 186 switch(call++) { 187 case 0: 188 assert(data == "abc"); 189 break; 190 case 1: 191 assert(data == "def"); 192 break; 193 default: assert(false); 194 } 195 }).then((_) { 196 assert(false); 197 }).except((Exception e) { 198 assert(e is err); 199 call++; 200 }).nothrow_(); 201 a.resolve(cast(const(ubyte)[])"abcdefgh").nothrow_(); 202 assert(call == 2); 203 a.reject(err).nothrow_(); 204 assert(call == 3); 205 } 206 unittest { 207 auto a = new DelegatePromiseIterator!(const(ubyte)[]); 208 auto b = new Tokenizer!(const(ubyte))(a); 209 b.separator("ABCD"); 210 b.limit(); 211 b.partialReceive(true); 212 int call = 0; 213 b.read().each((data) { 214 switch(call++) { 215 case 0: 216 assert(data == "abc"); 217 break; 218 case 1: 219 assert(data == "abcABCD"); 220 break; 221 case 2: 222 assert(data == "abc"); 223 break; 224 case 3: 225 assert(data == "abc"); 226 break; 227 case 4: 228 assert(data == "def"); 229 break; 230 case 5: 231 assert(data == "ab"); 232 break; 233 case 6: 234 assert(data == "ABab"); 235 break; 236 case 7: 237 assert(data == "ABCD"); 238 break; 239 case 8: 240 assert(data == "ab"); 241 break; 242 case 9: 243 assert(data == "AB"); 244 break; 245 default: assert(false); 246 } 247 }).then((eof) { 248 assert(eof); 249 assert(call++ == 10); 250 }).nothrow_(); 251 a.resolve(cast(const(ubyte)[])"abc").nothrow_(); 252 assert(call == 1); 253 a.resolve(cast(const(ubyte)[])"abcABCDabc").nothrow_(); 254 assert(call == 3); 255 b.separator(); 256 b.limit(3); 257 a.resolve(cast(const(ubyte)[])"abcdef").nothrow_(); 258 assert(call == 5); 259 b.separator("ABCD"); 260 b.limit(); 261 a.resolve(cast(const(ubyte)[])"abAB").nothrow_(); 262 assert(call == 6); 263 a.resolve(cast(const(ubyte)[])"abAB").nothrow_(); 264 assert(call == 7); 265 a.resolve(cast(const(ubyte)[])"CDabAB").nothrow_(); 266 assert(call == 9); 267 a.resolve(); 268 assert(call == 11); 269 }