1 module upromised.http; 2 import std.exception : enforce; 3 import std.format : format; 4 import upromised.manual_stream : ManualStream; 5 import upromised.operations : readAllChunks; 6 import upromised.promise : DelegatePromiseIterator, Promise, PromiseIterator; 7 import upromised.stream : Stream; 8 import upromised.tokenizer : Tokenizer; 9 10 struct Header { 11 string key; 12 string value; 13 } 14 15 enum Method { 16 GET, POST, PUT, DELETE 17 } 18 19 struct Response { 20 int statusCode; 21 string statusLine; 22 } 23 24 struct FullResponse { 25 Response response; 26 Header[] headers; 27 PromiseIterator!(const(ubyte)[]) bodyData; 28 } 29 30 private inout(Y)[] as(Y, T)(inout(T) input) { 31 return cast(inout(Y)[])input; 32 } 33 34 PromiseIterator!(const(ubyte)[]) decodeChunked(PromiseIterator!(const(ubyte)[]) chunked) nothrow { 35 return decodeChunked(new Tokenizer!(const(ubyte))(chunked)); 36 } 37 38 PromiseIterator!(const(ubyte)[]) decodeChunked(Tokenizer!(const(ubyte)) tokenizer) nothrow { 39 import std.conv : to; 40 41 tokenizer.partialReceive(); 42 tokenizer.limit(); 43 tokenizer.separator("\r\n"); 44 45 return new class PromiseIterator!(const(ubyte)[]) { 46 size_t chunkSize = -1; 47 48 override Promise!ItValue next(Promise!bool) nothrow { 49 if (chunkSize == 0) { 50 return Promise!ItValue.resolved(ItValue(true)); 51 } 52 53 bool step; 54 const(ubyte)[] chunkR; 55 return tokenizer.read().each((chunk) { 56 step = !step; 57 if (step) { 58 tokenizer.separator(); 59 chunkSize = chunk.as!char[0..$-2].to!size_t(16); 60 tokenizer.limit(chunkSize + 2); 61 return true; 62 } else { 63 tokenizer.separator("\r\n"); 64 tokenizer.limit(); 65 chunkR = chunk[0..$-2]; 66 return false; 67 } 68 }).then((eof) => eof ? ItValue(true) : ItValue(false, chunkR)); 69 } 70 }; 71 } 72 unittest { 73 import upromised.operations : toAsyncChunks; 74 75 const(ubyte)[] response; 76 77 (cast(const(ubyte)[])"4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n TRAILING IGNORE") 78 .toAsyncChunks(3) 79 .decodeChunked 80 .readAllChunks 81 .then(a => response = a) 82 .nothrow_(); 83 84 assert(response == "Wikipedia in\r\n\r\nchunks.", "%s unexpected".format([response.as!char])); 85 } 86 unittest { 87 auto err = new Exception("err"); 88 bool called = false; 89 90 auto rejected = new DelegatePromiseIterator!(const(ubyte)[]); 91 rejected.reject(err); 92 rejected 93 .decodeChunked 94 .each((_) { 95 assert(false); 96 }).then((_) { 97 assert(false); 98 }).except((Exception e) { 99 assert(e is err); 100 called = true; 101 }).nothrow_(); 102 103 assert(called); 104 } 105 106 class Client { 107 private: 108 import upromised.tokenizer : Tokenizer; 109 110 size_t contentLength; 111 bool chunked; 112 Tokenizer!(const(ubyte)) inputTokenizer; 113 114 public: 115 DelegatePromiseIterator!(immutable(ubyte)[]) output; 116 117 this() nothrow { 118 output = new DelegatePromiseIterator!(immutable(ubyte)[]); 119 } 120 121 this(Stream stream) nothrow { 122 this(); 123 this.stream = stream; 124 } 125 126 Promise!void stream(Stream stream) nothrow { 127 assert(!inputTokenizer); 128 129 inputTokenizer = new Tokenizer!(const(ubyte))(stream.read()); 130 131 return output 132 .each((data) => stream.write(data)) 133 .then(() {}); 134 } 135 136 Promise!FullResponse fullRequest(Method method, string uri, Header[] headers, immutable(ubyte)[] bodyData) nothrow { 137 import upromised.operations : toAsync, toAsyncChunks; 138 139 return fullRequest(method, uri, headers.toAsync, bodyData.toAsyncChunks); 140 } 141 142 Promise!FullResponse fullRequest(Method method, string uri, PromiseIterator!Header headers, immutable(ubyte)[] bodyData) nothrow { 143 import upromised.operations : toAsyncChunks; 144 145 return fullRequest(method, uri, headers, bodyData.toAsyncChunks); 146 } 147 148 Promise!FullResponse fullRequest(Method method, string uri, Header[] headers, PromiseIterator!(immutable(ubyte)[]) bodyData = null) nothrow { 149 import upromised.operations : toAsync; 150 151 return fullRequest(method, uri, headers.toAsync, bodyData); 152 } 153 154 Promise!FullResponse fullRequest(Method method, string uri, PromiseIterator!Header headers, PromiseIterator!(immutable(ubyte)[]) bodyData = null) nothrow { 155 import upromised.operations : readAll; 156 157 FullResponse r; 158 return sendRequest(method, uri) 159 .then(() => sendHeaders(headers)) 160 .then(() { 161 if (bodyData !is null) { 162 return sendBody(bodyData); 163 } else { 164 return sendBody(); 165 } 166 }).then(() => fetchResponse()) 167 .then((response) => r.response = response) 168 .then((_) => fetchHeaders().readAll) 169 .then((headers) => r.headers = headers) 170 .then((_) => r.bodyData = fetchBody()) 171 .then((_) => r); 172 } 173 174 Promise!void sendRequest(Method method, string uri) nothrow { 175 return Promise!void.resolved() 176 .then(() => output.resolve("%s %s HTTP/1.1\r\n".format(method, uri).as!ubyte)) 177 .then((_) {}); 178 } 179 180 Promise!void sendHeaders(Header[] headers) nothrow { 181 import upromised.operations : toAsync; 182 183 return sendHeaders(headers.toAsync); 184 } 185 186 Promise!void sendHeaders(PromiseIterator!Header headers) nothrow { 187 import std.conv : to; 188 189 return headers.each((header) { 190 if (header.key == "Content-Length") { 191 contentLength = header.value.to!size_t; 192 } 193 194 return output.resolve("%s: %s\r\n".format(header.key, header.value).as!ubyte); 195 }).then((_) {}); 196 } 197 198 Promise!void sendBody() nothrow { 199 return output.resolve("\r\n".as!ubyte).then((_) {}); 200 } 201 202 Promise!void sendBody(immutable(ubyte)[] data) nothrow { 203 import upromised.operations : toAsyncChunks; 204 205 return sendBody(data.toAsyncChunks); 206 } 207 208 Promise!void sendBody(PromiseIterator!(immutable(ubyte)[]) dataArg) nothrow { 209 import upromised.tokenizer : Tokenizer; 210 211 if (contentLength > 0) { 212 auto data = new Tokenizer!(immutable(ubyte))(dataArg); 213 data.limit(contentLength); 214 data.partialReceive(true); 215 216 return output.resolve("\r\n".as!ubyte).then((_) => data.read().each((chunk) { 217 contentLength -= chunk.length; 218 219 data.limit(contentLength); 220 return output.resolve(chunk).then((a) => a && contentLength > 0); 221 }).then((_) {})); 222 } else { 223 auto data = dataArg; 224 return output.resolve("Transfer-Encoding: chunked\r\n\r\n".as!ubyte).then((_) => data.each((chunk) { 225 return output 226 .resolve("%x\r\n".format(chunk.length).as!ubyte) 227 .then((_) => output.resolve(chunk)) 228 .then((_) => output.resolve("\r\n".as!ubyte)); 229 })).then((_) { 230 return output.resolve("0\r\n\r\n".as!ubyte); 231 }).then((_) {}); 232 } 233 } 234 235 Promise!Response fetchResponse() nothrow { 236 import std.array : join; 237 import std.conv : to; 238 import std..string : split; 239 240 contentLength = 0; 241 chunked = false; 242 243 inputTokenizer.partialReceive(); 244 inputTokenizer.limit(); 245 inputTokenizer.separator("\r\n"); 246 247 Response r; 248 return inputTokenizer.read().each((response) { 249 const(char)[][] responseParts = response.as!char[0..$-2].split(" "); 250 enforce(responseParts.length > 2, "%s should be two parts. %s".format(responseParts, [response])); 251 enforce(responseParts[0] == "HTTP/1.1"); 252 r.statusCode = responseParts[1].to!int; 253 r.statusLine = responseParts[2..$].join(" ").idup; 254 return false; 255 }).then((_) => r); 256 } 257 258 PromiseIterator!Header fetchHeaders() nothrow { 259 import std.algorithm : countUntil; 260 import std.conv : to; 261 262 inputTokenizer.partialReceive(); 263 inputTokenizer.limit(); 264 inputTokenizer.separator("\r\n"); 265 266 return new class PromiseIterator!Header { 267 override Promise!ItValue next(Promise!bool) { 268 ItValue result; 269 return inputTokenizer.read().each((headerLine) { 270 if (headerLine.as!char == "\r\n") { 271 result.eof = true; 272 return false; 273 } 274 275 auto pos = headerLine.as!char.countUntil(": "); 276 Header header = Header(headerLine.as!char[0..pos], headerLine.as!char[pos + 2..$-2]); 277 result.value = header; 278 279 if (header.key == "Content-Length") { 280 contentLength = header.value.to!size_t; 281 } 282 283 if (header.key == "Transfer-Encoding" && header.value == "chunked") { 284 chunked = true; 285 } 286 287 return false; 288 }).then((_) => result); 289 } 290 }; 291 } 292 293 PromiseIterator!(const(ubyte)[]) fetchBody() nothrow { 294 if (chunked) { 295 return decodeChunked(inputTokenizer); 296 } 297 298 inputTokenizer.partialReceive(true); 299 inputTokenizer.limit(contentLength); 300 inputTokenizer.separator(); 301 302 return new class PromiseIterator!(const(ubyte)[]) { 303 override Promise!ItValue next(Promise!bool) { 304 if (contentLength == 0) { 305 return Promise!ItValue.resolved(ItValue(true)); 306 } 307 308 ItValue result; 309 return inputTokenizer.read().each((chunk) { 310 contentLength -= chunk.length; 311 inputTokenizer.limit(contentLength); 312 result.value = chunk; 313 return false; 314 }).then((eof) => eof ? ItValue(true) : result); 315 } 316 }; 317 } 318 319 Tokenizer!(const(ubyte)) release() nothrow { 320 import std.algorithm : swap; 321 322 Tokenizer!(const(ubyte)) released; 323 inputTokenizer.swap(released); 324 return released; 325 } 326 } 327 unittest { 328 const(ubyte)[] request; 329 330 auto a = new ManualStream; 331 (new Client(a)).sendRequest(Method.GET, "/sup/moite").then(() => a.shutdown()).nothrow_(); 332 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 333 334 assert(request == "GET /sup/moite HTTP/1.1\r\n", "%s unexpected".format([cast(const(char)[])request])); 335 } 336 unittest { 337 const(ubyte)[] request; 338 339 auto a = new ManualStream; 340 (new Client(a)).sendRequest(Method.POST, "/sup/moite").then(() => a.shutdown()).nothrow_(); 341 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 342 343 assert(request == "POST /sup/moite HTTP/1.1\r\n", "%s unexpected".format([cast(const(char)[])request])); 344 } 345 unittest { 346 const(ubyte)[] request; 347 348 auto a = new ManualStream; 349 (new Client(a)).sendHeaders([Header("Oi", "Ola"), Header("Oi2", "Ola2")]).then(() => a.shutdown()).nothrow_(); 350 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 351 352 assert(request == "Oi: Ola\r\nOi2: Ola2\r\n", "%s unexpected".format([cast(const(char)[])request])); 353 } 354 unittest { 355 bool called; 356 357 Exception err = new Exception("same"); 358 auto headers = new DelegatePromiseIterator!Header; 359 headers.reject(err); 360 (new Client()).sendHeaders(headers).then(() { 361 assert(false); 362 }).except((Exception err2) { 363 assert(err2 is err); 364 called = true; 365 }).nothrow_(); 366 367 assert(called); 368 } 369 unittest { 370 const(ubyte)[] request; 371 372 auto a = new ManualStream; 373 (new Client(a)).sendBody().then(() => a.shutdown()).nothrow_(); 374 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 375 376 assert(request == "\r\n", "%s unexpected".format([cast(const(char)[])request])); 377 } 378 unittest { 379 import upromised.operations : toAsyncChunks; 380 381 const(ubyte)[] request; 382 383 auto a = new ManualStream; 384 auto client = new Client(a); 385 386 client 387 .sendRequest(Method.POST, "/supas") 388 .then(() => client.sendHeaders([Header("Content-Length", "4")])) 389 .then(() => client.sendBody("supasupa".as!ubyte.toAsyncChunks(2))) 390 .then(() => a.shutdown()) 391 .nothrow_(); 392 393 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 394 395 assert(request == "POST /supas HTTP/1.1\r\nContent-Length: 4\r\n\r\nsupa", "%s unexpected".format([cast(const(char)[])request])); 396 } 397 unittest { 398 import std.algorithm : startsWith, endsWith; 399 import upromised.operations : toAsyncChunks; 400 401 const(ubyte)[] request; 402 403 auto a = new ManualStream; 404 auto client = new Client(a); 405 406 client 407 .sendRequest(Method.POST, "/supas") 408 .then(() => client.sendHeaders([Header("Host", "www.supa.com")])) 409 .then(() => client.sendBody("supasupa".as!ubyte.toAsyncChunks(2))) 410 .then(() => a.shutdown()) 411 .nothrow_(); 412 413 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 414 415 auto prefix = "POST /supas HTTP/1.1\r\nHost: www.supa.com\r\nTransfer-Encoding: chunked\r\n\r\n"; 416 assert(request.startsWith(prefix), "%s unexpected".format([cast(const(char)[])request])); 417 assert(request.endsWith("0\r\n\r\n"), "%s unexpected".format([cast(const(char)[])request])); 418 auto bodyChunked = request[prefix.length..$]; 419 const(ubyte)[] body_; 420 421 bodyChunked 422 .toAsyncChunks 423 .decodeChunked 424 .readAllChunks.then((data) => body_ = data) 425 .nothrow_; 426 427 assert(body_ == "supasupa", "%s unexpected".format([cast(const(char)[])request])); 428 } 429 unittest { 430 import upromised.operations : toAsyncChunks, readAll; 431 432 string responseData = 433 "HTTP/1.1 301 Moved Permanently\r\n" ~ 434 "Date: Thu, 30 Mar 2017 17:02:29 GMT\r\n" ~ 435 "Set-Cookie: WMF-Last-Access=30-Mar-2017;Path=/;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT\r\n" ~ 436 "Set-Cookie: WMF-Last-Access-Global=30-Mar-2017;Path=/;Domain=.wikipedia.org;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT\r\n" ~ 437 "Location: https://en.wikipedia.org/\r\n" ~ 438 "Content-Length: 0\r\n\r\n"; 439 440 Response response; 441 Header[] headers; 442 const(ubyte)[] data; 443 444 auto a = new ManualStream; 445 auto client = new Client(a); 446 447 client 448 .fetchResponse 449 .then((responseArg) { response = responseArg; } ) 450 .then(() => client.fetchHeaders().readAll()) 451 .then((headersArg) { headers = headersArg; }) 452 .then(() => client.fetchBody().readAllChunks()) 453 .then((dataArg) { data = dataArg;}) 454 .nothrow_(); 455 456 responseData 457 .as!ubyte 458 .toAsyncChunks(13) 459 .each((chunk) => a.writeToRead(chunk).then(() {})) 460 .then((_) => a.writeToRead()) 461 .nothrow_(); 462 463 assert(response.statusCode == 301); 464 assert(response.statusLine == "Moved Permanently"); 465 assert(headers[0] == Header("Date", "Thu, 30 Mar 2017 17:02:29 GMT")); 466 assert(headers[1] == Header("Set-Cookie", "WMF-Last-Access=30-Mar-2017;Path=/;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT")); 467 assert(headers[2] == Header("Set-Cookie", "WMF-Last-Access-Global=30-Mar-2017;Path=/;Domain=.wikipedia.org;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT")); 468 assert(headers[3] == Header("Location", "https://en.wikipedia.org/")); 469 assert(headers[4] == Header("Content-Length", "0")); 470 assert(headers.length == 5); 471 assert(data.length == 0); 472 } 473 unittest { 474 import upromised.operations : toAsyncChunks, readAll; 475 476 string responseData = 477 "HTTP/1.1 200 OK\r\n" ~ 478 "Server: nginx\r\n" ~ 479 "Date: Thu, 30 Mar 2017 22:32:32 GMT\r\n" ~ 480 "Content-Type: text/plain; charset=UTF-8\r\n" ~ 481 "Content-Length: 15\r\n" ~ 482 "Connection: close\r\n" ~ 483 "Access-Control-Allow-Origin: *\r\n" ~ 484 "Access-Control-Allow-Methods: GET\r\n" ~ 485 "\r\n" ~ 486 "192.30.253.112\n"; 487 488 Response response; 489 Header[] headers; 490 const(ubyte)[] data; 491 492 auto a = new ManualStream; 493 auto client = new Client(a); 494 495 client 496 .fetchResponse 497 .then((responseArg) { response = responseArg; } ) 498 .then(() => client.fetchHeaders().readAll()) 499 .then((headersArg) { headers = headersArg; }) 500 .then(() => client.fetchBody().readAllChunks()) 501 .then((dataArg) { data = dataArg;}) 502 .nothrow_(); 503 504 responseData 505 .as!ubyte 506 .toAsyncChunks(11) 507 .each((chunk) => a.writeToRead(chunk).then((){})) 508 .nothrow_(); 509 510 assert(response.statusCode == 200); 511 assert(response.statusLine == "OK"); 512 assert(headers[0] == Header("Server", "nginx")); 513 assert(headers[1] == Header("Date", "Thu, 30 Mar 2017 22:32:32 GMT")); 514 assert(headers[2] == Header("Content-Type", "text/plain; charset=UTF-8")); 515 assert(headers[3] == Header("Content-Length", "15")); 516 assert(headers[4] == Header("Connection", "close")); 517 assert(headers[5] == Header("Access-Control-Allow-Origin", "*")); 518 assert(headers[6] == Header("Access-Control-Allow-Methods", "GET")); 519 assert(headers.length == 7); 520 assert(data == "192.30.253.112\n"); 521 } 522 unittest { 523 import upromised.operations : toAsyncChunks, readAll; 524 525 string responseData = 526 "HTTP/1.1 200 OK\r\n" ~ 527 "Date: Thu, 30 Mar 2017 23:24:14 GMT\r\n" ~ 528 "Content-Type: text/plain;charset=utf-8\r\n" ~ 529 "Transfer-Encoding: chunked\r\n" ~ 530 "Connection: keep-alive\r\n" ~ 531 "Pragma: no-cache\r\n" ~ 532 "Vary: Accept-Encoding\r\n" ~ 533 "Server: cloudflare-nginx\r\n" ~ 534 "\r\n" ~ 535 "14\r\n" ~ 536 "6\n5\n3\n1\n1\n2\n4\n3\n3\n6\n\r\n" ~ 537 "0\r\n" ~ 538 "\r\n"; 539 540 Response response; 541 Header[] headers; 542 const(ubyte)[] data; 543 544 auto a = new ManualStream; 545 auto client = new Client(a); 546 547 client 548 .fetchResponse 549 .then((responseArg) { response = responseArg; } ) 550 .then(() => client.fetchHeaders().readAll()) 551 .then((headersArg) { headers = headersArg; }) 552 .then(() => client.fetchBody().readAllChunks()) 553 .then((dataArg) { data = dataArg;}) 554 .nothrow_(); 555 556 responseData 557 .as!ubyte 558 .toAsyncChunks(11) 559 .each((chunk) => a.writeToRead(chunk).then((){})) 560 .then((_) => a.writeToRead()) 561 .nothrow_(); 562 563 assert(response.statusCode == 200); 564 assert(response.statusLine == "OK"); 565 assert(headers[0] == Header("Date", "Thu, 30 Mar 2017 23:24:14 GMT")); 566 assert(headers[1] == Header("Content-Type", "text/plain;charset=utf-8")); 567 assert(headers[2] == Header("Transfer-Encoding", "chunked")); 568 assert(headers[3] == Header("Connection", "keep-alive")); 569 assert(headers[4] == Header("Pragma", "no-cache")); 570 assert(headers[5] == Header("Vary", "Accept-Encoding")); 571 assert(headers[6] == Header("Server", "cloudflare-nginx")); 572 assert(headers.length == 7); 573 assert(data == "6\n5\n3\n1\n1\n2\n4\n3\n3\n6\n"); 574 }