1 module upromised.http; 2 import std.exception : enforce; 3 import std.format : format; 4 import upromised.manual_stream : ManualStream; 5 import upromised.operations : readAllChunks; 6 import upromised.promise : DelegatePromiseIterator, Promise, PromiseIterator; 7 import upromised.stream : Stream; 8 import upromised.tokenizer : Tokenizer; 9 10 struct Header { 11 string key; 12 string value; 13 } 14 15 enum Method { 16 GET, POST, PUT, DELETE 17 } 18 19 struct Response { 20 int statusCode; 21 string statusLine; 22 } 23 24 struct FullResponse { 25 Response response; 26 Header[] headers; 27 PromiseIterator!(const(ubyte)[]) bodyData; 28 } 29 30 private inout(Y)[] as(Y, T)(inout(T) input) { 31 return cast(inout(Y)[])input; 32 } 33 34 PromiseIterator!(const(ubyte)[]) decodeChunked(PromiseIterator!(const(ubyte)[]) chunked) nothrow { 35 return decodeChunked(new Tokenizer!(const(ubyte))(chunked)); 36 } 37 38 PromiseIterator!(const(ubyte)[]) decodeChunked(Tokenizer!(const(ubyte)) tokenizer) nothrow { 39 import std.conv : to; 40 41 tokenizer.partialReceive(); 42 tokenizer.limit(); 43 tokenizer.separator("\r\n"); 44 45 return new class PromiseIterator!(const(ubyte)[]) { 46 size_t chunkSize = -1; 47 48 override Promise!ItValue next(Promise!bool) nothrow { 49 if (chunkSize == 0) { 50 return Promise!ItValue.resolved(ItValue(true)); 51 } 52 53 bool step; 54 const(ubyte)[] chunkR; 55 return tokenizer.read().each((chunk) { 56 step = !step; 57 if (step) { 58 tokenizer.separator(); 59 chunkSize = chunk.as!char[0..$-2].to!size_t(16); 60 tokenizer.limit(chunkSize + 2); 61 return true; 62 } else { 63 tokenizer.separator("\r\n"); 64 tokenizer.limit(); 65 chunkR = chunk[0..$-2]; 66 return false; 67 } 68 }).then((eof) => eof ? ItValue(true) : ItValue(false, chunkR)); 69 } 70 }; 71 } 72 unittest { 73 import upromised.operations : toAsyncChunks; 74 75 const(ubyte)[] response; 76 77 (cast(const(ubyte)[])"4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n TRAILING IGNORE") 78 .toAsyncChunks(3) 79 .decodeChunked 80 .readAllChunks 81 .then(a => response = a) 82 .nothrow_(); 83 84 assert(response == "Wikipedia in\r\n\r\nchunks.", "%s unexpected".format([response.as!char])); 85 } 86 unittest { 87 auto err = new Exception("err"); 88 bool called = false; 89 90 auto rejected = new DelegatePromiseIterator!(const(ubyte)[]); 91 rejected.reject(err); 92 rejected 93 .decodeChunked 94 .each((_) { 95 assert(false); 96 }).then((_) { 97 assert(false); 98 }).except((Exception e) { 99 assert(e is err); 100 called = true; 101 }).nothrow_(); 102 103 assert(called); 104 } 105 106 class Client { 107 private: 108 import upromised.tokenizer : Tokenizer; 109 110 size_t contentLength; 111 bool chunked; 112 Tokenizer!(const(ubyte)) inputTokenizer; 113 114 public: 115 DelegatePromiseIterator!(immutable(ubyte)[]) output; 116 117 this() nothrow { 118 output = new DelegatePromiseIterator!(immutable(ubyte)[]); 119 } 120 121 this(Stream stream) nothrow { 122 this(); 123 this.stream = stream; 124 } 125 126 Promise!void stream(Stream stream) nothrow { 127 assert(!inputTokenizer); 128 129 inputTokenizer = new Tokenizer!(const(ubyte))(stream.read()); 130 131 return output 132 .each((data) => stream.write(data)) 133 .then(() {}); 134 } 135 136 Promise!FullResponse fullRequest(Method method, string uri, Header[] headers, immutable(ubyte)[] bodyData) nothrow { 137 import upromised.operations : toAsync, toAsyncChunks; 138 139 return fullRequest(method, uri, headers.toAsync, bodyData.toAsyncChunks); 140 } 141 142 Promise!FullResponse fullRequest(Method method, string uri, PromiseIterator!Header headers, immutable(ubyte)[] bodyData) nothrow { 143 import upromised.operations : toAsyncChunks; 144 145 return fullRequest(method, uri, headers, bodyData.toAsyncChunks); 146 } 147 148 Promise!FullResponse fullRequest(Method method, string uri, Header[] headers, PromiseIterator!(immutable(ubyte)[]) bodyData = null) nothrow { 149 import upromised.operations : toAsync; 150 151 return fullRequest(method, uri, headers.toAsync, bodyData); 152 } 153 154 Promise!FullResponse fullRequest(Method method, string uri, PromiseIterator!Header headers, PromiseIterator!(immutable(ubyte)[]) bodyData = null) nothrow { 155 import upromised.operations : readAll; 156 157 FullResponse r; 158 return sendRequest(method, uri) 159 .then(() => sendHeaders(headers)) 160 .then(() { 161 if (bodyData !is null) { 162 return sendBody(bodyData); 163 } else { 164 return sendBody(); 165 } 166 }).then(() => fetchResponse()) 167 .then((response) => r.response = response) 168 .then((_) => fetchHeaders().readAll) 169 .then((headers) => r.headers = headers) 170 .then((_) => r.bodyData = fetchBody()) 171 .then((_) => r); 172 } 173 174 Promise!void sendRequest(Method method, string uri) nothrow { 175 return Promise!void.resolved() 176 .then(() => output.resolve("%s %s HTTP/1.1\r\n".format(method, uri).as!ubyte)) 177 .then((_) {}); 178 } 179 180 Promise!void sendHeaders(Header[] headers) nothrow { 181 import upromised.operations : toAsync; 182 183 return sendHeaders(headers.toAsync); 184 } 185 186 Promise!void sendHeaders(PromiseIterator!Header headers) nothrow { 187 import std.conv : to; 188 189 return headers.each((header) { 190 if (header.key == "Content-Length") { 191 contentLength = header.value.to!size_t; 192 } 193 194 return output.resolve("%s: %s\r\n".format(header.key, header.value).as!ubyte); 195 }).then((_) {}); 196 } 197 198 Promise!void sendBody() nothrow { 199 return output.resolve("\r\n".as!ubyte).then((_) {}); 200 } 201 202 Promise!void sendBody(immutable(ubyte)[] data) nothrow { 203 import upromised.operations : toAsyncChunks; 204 205 return sendBody(data.toAsyncChunks); 206 } 207 208 Promise!void sendBody(PromiseIterator!(immutable(ubyte)[]) dataArg) nothrow { 209 import upromised.tokenizer : Tokenizer; 210 211 if (contentLength > 0) { 212 auto data = new Tokenizer!(immutable(ubyte))(dataArg); 213 data.limit(contentLength); 214 data.partialReceive(true); 215 216 return output.resolve("\r\n".as!ubyte).then((_) => data.read().each((chunk) { 217 contentLength -= chunk.length; 218 219 data.limit(contentLength); 220 return output.resolve(chunk).then((a) => a && contentLength > 0); 221 }).then((_) {})); 222 } else { 223 auto data = dataArg; 224 return output.resolve("Transfer-Encoding: chunked\r\n\r\n".as!ubyte).then((_) => data.each((chunk) { 225 return output 226 .resolve("%x\r\n".format(chunk.length).as!ubyte) 227 .then((_) => output.resolve(chunk)) 228 .then((_) => output.resolve("\r\n".as!ubyte)); 229 })).then((_) { 230 return output.resolve("0\r\n\r\n".as!ubyte); 231 }).then((_) {}); 232 } 233 } 234 235 Promise!Response fetchResponse() nothrow { 236 import std.array : join; 237 import std.conv : to; 238 import std.string : split; 239 240 contentLength = 0; 241 chunked = false; 242 243 inputTokenizer.partialReceive(); 244 inputTokenizer.limit(); 245 inputTokenizer.separator("\r\n"); 246 247 Response r; 248 return inputTokenizer.read().each((response) { 249 const(char)[][] responseParts = response.as!char[0..$-2].split(" "); 250 enforce(responseParts.length > 2, "%s should be two parts. %s".format(responseParts, [response])); 251 enforce(responseParts[0] == "HTTP/1.1"); 252 r.statusCode = responseParts[1].to!int; 253 r.statusLine = responseParts[2..$].join(" ").idup; 254 return false; 255 }).then((_) => r); 256 } 257 258 PromiseIterator!Header fetchHeaders() nothrow { 259 import std.algorithm : countUntil; 260 import std.conv : to; 261 262 inputTokenizer.partialReceive(); 263 inputTokenizer.limit(); 264 inputTokenizer.separator("\r\n"); 265 266 return new class PromiseIterator!Header { 267 override Promise!ItValue next(Promise!bool) { 268 ItValue result; 269 return inputTokenizer.read().each((headerLine) { 270 if (headerLine.as!char == "\r\n") { 271 result.eof = true; 272 return false; 273 } 274 275 auto pos = headerLine.as!char.countUntil(": "); 276 Header header = Header(headerLine.as!char[0..pos], headerLine.as!char[pos + 2..$-2]); 277 result.value = header; 278 279 if (header.key == "Content-Length") { 280 contentLength = header.value.to!size_t; 281 } 282 283 if (header.key == "Transfer-Encoding" && header.value == "chunked") { 284 chunked = true; 285 } 286 287 return false; 288 }).then((_) => result); 289 } 290 }; 291 } 292 293 PromiseIterator!(const(ubyte)[]) fetchBody() nothrow { 294 if (chunked) { 295 return decodeChunked(inputTokenizer); 296 } 297 298 inputTokenizer.partialReceive(true); 299 inputTokenizer.limit(contentLength); 300 inputTokenizer.separator(); 301 302 return new class PromiseIterator!(const(ubyte)[]) { 303 override Promise!ItValue next(Promise!bool) { 304 if (contentLength == 0) { 305 return Promise!ItValue.resolved(ItValue(true)); 306 } 307 308 ItValue result; 309 return inputTokenizer.read().each((chunk) { 310 contentLength -= chunk.length; 311 inputTokenizer.limit(contentLength); 312 result.value = chunk; 313 return false; 314 }).then((eof) => eof ? ItValue(true) : result); 315 } 316 }; 317 } 318 } 319 unittest { 320 const(ubyte)[] request; 321 322 auto a = new ManualStream; 323 (new Client(a)).sendRequest(Method.GET, "/sup/moite").then(() => a.shutdown()).nothrow_(); 324 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 325 326 assert(request == "GET /sup/moite HTTP/1.1\r\n", "%s unexpected".format([cast(const(char)[])request])); 327 } 328 unittest { 329 const(ubyte)[] request; 330 331 auto a = new ManualStream; 332 (new Client(a)).sendRequest(Method.POST, "/sup/moite").then(() => a.shutdown()).nothrow_(); 333 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 334 335 assert(request == "POST /sup/moite HTTP/1.1\r\n", "%s unexpected".format([cast(const(char)[])request])); 336 } 337 unittest { 338 const(ubyte)[] request; 339 340 auto a = new ManualStream; 341 (new Client(a)).sendHeaders([Header("Oi", "Ola"), Header("Oi2", "Ola2")]).then(() => a.shutdown()).nothrow_(); 342 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 343 344 assert(request == "Oi: Ola\r\nOi2: Ola2\r\n", "%s unexpected".format([cast(const(char)[])request])); 345 } 346 unittest { 347 bool called; 348 349 Exception err = new Exception("same"); 350 auto headers = new DelegatePromiseIterator!Header; 351 headers.reject(err); 352 (new Client()).sendHeaders(headers).then(() { 353 assert(false); 354 }).except((Exception err2) { 355 assert(err2 is err); 356 called = true; 357 }).nothrow_(); 358 359 assert(called); 360 } 361 unittest { 362 const(ubyte)[] request; 363 364 auto a = new ManualStream; 365 (new Client(a)).sendBody().then(() => a.shutdown()).nothrow_(); 366 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 367 368 assert(request == "\r\n", "%s unexpected".format([cast(const(char)[])request])); 369 } 370 unittest { 371 import upromised.operations : toAsyncChunks; 372 373 const(ubyte)[] request; 374 375 auto a = new ManualStream; 376 auto client = new Client(a); 377 378 client 379 .sendRequest(Method.POST, "/supas") 380 .then(() => client.sendHeaders([Header("Content-Length", "4")])) 381 .then(() => client.sendBody("supasupa".as!ubyte.toAsyncChunks(2))) 382 .then(() => a.shutdown()) 383 .nothrow_(); 384 385 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 386 387 assert(request == "POST /supas HTTP/1.1\r\nContent-Length: 4\r\n\r\nsupa", "%s unexpected".format([cast(const(char)[])request])); 388 } 389 unittest { 390 import std.algorithm : startsWith, endsWith; 391 import upromised.operations : toAsyncChunks; 392 393 const(ubyte)[] request; 394 395 auto a = new ManualStream; 396 auto client = new Client(a); 397 398 client 399 .sendRequest(Method.POST, "/supas") 400 .then(() => client.sendHeaders([Header("Host", "www.supa.com")])) 401 .then(() => client.sendBody("supasupa".as!ubyte.toAsyncChunks(2))) 402 .then(() => a.shutdown()) 403 .nothrow_(); 404 405 a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_(); 406 407 auto prefix = "POST /supas HTTP/1.1\r\nHost: www.supa.com\r\nTransfer-Encoding: chunked\r\n\r\n"; 408 assert(request.startsWith(prefix), "%s unexpected".format([cast(const(char)[])request])); 409 assert(request.endsWith("0\r\n\r\n"), "%s unexpected".format([cast(const(char)[])request])); 410 auto bodyChunked = request[prefix.length..$]; 411 const(ubyte)[] body_; 412 413 bodyChunked 414 .toAsyncChunks 415 .decodeChunked 416 .readAllChunks.then((data) => body_ = data); 417 418 assert(body_ == "supasupa", "%s unexpected".format([cast(const(char)[])request])); 419 } 420 unittest { 421 import upromised.operations : toAsyncChunks, readAll; 422 423 string responseData = 424 "HTTP/1.1 301 Moved Permanently\r\n" ~ 425 "Date: Thu, 30 Mar 2017 17:02:29 GMT\r\n" ~ 426 "Set-Cookie: WMF-Last-Access=30-Mar-2017;Path=/;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT\r\n" ~ 427 "Set-Cookie: WMF-Last-Access-Global=30-Mar-2017;Path=/;Domain=.wikipedia.org;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT\r\n" ~ 428 "Location: https://en.wikipedia.org/\r\n" ~ 429 "Content-Length: 0\r\n\r\n"; 430 431 Response response; 432 Header[] headers; 433 const(ubyte)[] data; 434 435 auto a = new ManualStream; 436 auto client = new Client(a); 437 438 client 439 .fetchResponse 440 .then((responseArg) { response = responseArg; } ) 441 .then(() => client.fetchHeaders().readAll()) 442 .then((headersArg) { headers = headersArg; }) 443 .then(() => client.fetchBody().readAllChunks()) 444 .then((dataArg) { data = dataArg;}) 445 .nothrow_(); 446 447 responseData 448 .as!ubyte 449 .toAsyncChunks(13) 450 .each((chunk) => a.writeToRead(chunk).then(() {})) 451 .then((_) => a.writeToRead()) 452 .nothrow_(); 453 454 assert(response.statusCode == 301); 455 assert(response.statusLine == "Moved Permanently"); 456 assert(headers[0] == Header("Date", "Thu, 30 Mar 2017 17:02:29 GMT")); 457 assert(headers[1] == Header("Set-Cookie", "WMF-Last-Access=30-Mar-2017;Path=/;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT")); 458 assert(headers[2] == Header("Set-Cookie", "WMF-Last-Access-Global=30-Mar-2017;Path=/;Domain=.wikipedia.org;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT")); 459 assert(headers[3] == Header("Location", "https://en.wikipedia.org/")); 460 assert(headers[4] == Header("Content-Length", "0")); 461 assert(headers.length == 5); 462 assert(data.length == 0); 463 } 464 unittest { 465 import upromised.operations : toAsyncChunks, readAll; 466 467 string responseData = 468 "HTTP/1.1 200 OK\r\n" ~ 469 "Server: nginx\r\n" ~ 470 "Date: Thu, 30 Mar 2017 22:32:32 GMT\r\n" ~ 471 "Content-Type: text/plain; charset=UTF-8\r\n" ~ 472 "Content-Length: 15\r\n" ~ 473 "Connection: close\r\n" ~ 474 "Access-Control-Allow-Origin: *\r\n" ~ 475 "Access-Control-Allow-Methods: GET\r\n" ~ 476 "\r\n" ~ 477 "192.30.253.112\n"; 478 479 Response response; 480 Header[] headers; 481 const(ubyte)[] data; 482 483 auto a = new ManualStream; 484 auto client = new Client(a); 485 486 client 487 .fetchResponse 488 .then((responseArg) { response = responseArg; } ) 489 .then(() => client.fetchHeaders().readAll()) 490 .then((headersArg) { headers = headersArg; }) 491 .then(() => client.fetchBody().readAllChunks()) 492 .then((dataArg) { data = dataArg;}) 493 .nothrow_(); 494 495 responseData 496 .as!ubyte 497 .toAsyncChunks(11) 498 .each((chunk) => a.writeToRead(chunk).then((){})) 499 .nothrow_(); 500 501 assert(response.statusCode == 200); 502 assert(response.statusLine == "OK"); 503 assert(headers[0] == Header("Server", "nginx")); 504 assert(headers[1] == Header("Date", "Thu, 30 Mar 2017 22:32:32 GMT")); 505 assert(headers[2] == Header("Content-Type", "text/plain; charset=UTF-8")); 506 assert(headers[3] == Header("Content-Length", "15")); 507 assert(headers[4] == Header("Connection", "close")); 508 assert(headers[5] == Header("Access-Control-Allow-Origin", "*")); 509 assert(headers[6] == Header("Access-Control-Allow-Methods", "GET")); 510 assert(headers.length == 7); 511 assert(data == "192.30.253.112\n"); 512 } 513 unittest { 514 import upromised.operations : toAsyncChunks, readAll; 515 516 string responseData = 517 "HTTP/1.1 200 OK\r\n" ~ 518 "Date: Thu, 30 Mar 2017 23:24:14 GMT\r\n" ~ 519 "Content-Type: text/plain;charset=utf-8\r\n" ~ 520 "Transfer-Encoding: chunked\r\n" ~ 521 "Connection: keep-alive\r\n" ~ 522 "Pragma: no-cache\r\n" ~ 523 "Vary: Accept-Encoding\r\n" ~ 524 "Server: cloudflare-nginx\r\n" ~ 525 "\r\n" ~ 526 "14\r\n" ~ 527 "6\n5\n3\n1\n1\n2\n4\n3\n3\n6\n\r\n" ~ 528 "0\r\n" ~ 529 "\r\n"; 530 531 Response response; 532 Header[] headers; 533 const(ubyte)[] data; 534 535 auto a = new ManualStream; 536 auto client = new Client(a); 537 538 client 539 .fetchResponse 540 .then((responseArg) { response = responseArg; } ) 541 .then(() => client.fetchHeaders().readAll()) 542 .then((headersArg) { headers = headersArg; }) 543 .then(() => client.fetchBody().readAllChunks()) 544 .then((dataArg) { data = dataArg;}) 545 .nothrow_(); 546 547 responseData 548 .as!ubyte 549 .toAsyncChunks(11) 550 .each((chunk) => a.writeToRead(chunk).then((){})) 551 .then((_) => a.writeToRead()) 552 .nothrow_(); 553 554 assert(response.statusCode == 200); 555 assert(response.statusLine == "OK"); 556 assert(headers[0] == Header("Date", "Thu, 30 Mar 2017 23:24:14 GMT")); 557 assert(headers[1] == Header("Content-Type", "text/plain;charset=utf-8")); 558 assert(headers[2] == Header("Transfer-Encoding", "chunked")); 559 assert(headers[3] == Header("Connection", "keep-alive")); 560 assert(headers[4] == Header("Pragma", "no-cache")); 561 assert(headers[5] == Header("Vary", "Accept-Encoding")); 562 assert(headers[6] == Header("Server", "cloudflare-nginx")); 563 assert(headers.length == 7); 564 assert(data == "6\n5\n3\n1\n1\n2\n4\n3\n3\n6\n"); 565 }