1 module upromised.http;
2 import std.exception : enforce;
3 import std.format : format;
4 import upromised.manual_stream : ManualStream;
5 import upromised.operations : readAllChunks;
6 import upromised.promise : DelegatePromiseIterator, Promise, PromiseIterator;
7 import upromised.stream : Stream;
8 import upromised.tokenizer : Tokenizer;
9 
10 struct Header {
11 	string key;
12 	string value;
13 }
14 
15 enum Method {
16 	GET, POST, PUT, DELETE
17 }
18 
19 struct Response {
20 	int statusCode;
21 	string statusLine;
22 }
23 
24 struct FullResponse {
25 	Response response;
26 	Header[] headers;
27 	PromiseIterator!(const(ubyte)[]) bodyData;
28 }
29 
30 private inout(Y)[] as(Y, T)(inout(T) input) {
31 	return cast(inout(Y)[])input;
32 }
33 
34 PromiseIterator!(const(ubyte)[]) decodeChunked(PromiseIterator!(const(ubyte)[]) chunked) nothrow {
35 	return decodeChunked(new Tokenizer!(const(ubyte))(chunked));
36 }
37 
38 PromiseIterator!(const(ubyte)[]) decodeChunked(Tokenizer!(const(ubyte)) tokenizer) nothrow {
39 	import std.conv : to;
40 
41 	tokenizer.partialReceive();
42 	tokenizer.limit();
43 	tokenizer.separator("\r\n");
44 
45 	return new class PromiseIterator!(const(ubyte)[]) {
46 		size_t chunkSize = -1;
47 
48 		override Promise!ItValue next(Promise!bool) nothrow {
49 			if (chunkSize == 0) {
50 				return Promise!ItValue.resolved(ItValue(true));
51 			}
52 
53 			bool step;
54 			const(ubyte)[] chunkR;
55 			return tokenizer.read().each((chunk) {
56 				step = !step;
57 				if (step) {
58 					tokenizer.separator();
59 					chunkSize = chunk.as!char[0..$-2].to!size_t(16);
60 					tokenizer.limit(chunkSize + 2);
61 					return true;
62 				} else {
63 					tokenizer.separator("\r\n");
64 					tokenizer.limit();
65 					chunkR = chunk[0..$-2];
66 					return false;
67 				}
68 			}).then((eof) => eof ? ItValue(true) : ItValue(false, chunkR));
69 		}
70 	};
71 }
72 unittest {
73 	import upromised.operations : toAsyncChunks;
74 
75 	const(ubyte)[] response;
76 
77 	(cast(const(ubyte)[])"4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n TRAILING IGNORE")
78 		.toAsyncChunks(3)
79 		.decodeChunked
80 		.readAllChunks
81 		.then(a => response = a)
82 		.nothrow_();
83 
84 	assert(response == "Wikipedia in\r\n\r\nchunks.", "%s unexpected".format([response.as!char]));
85 }
86 unittest {
87 	auto err = new Exception("err");
88 	bool called = false;
89 
90 	auto rejected = new DelegatePromiseIterator!(const(ubyte)[]);
91 	rejected.reject(err);
92 	rejected
93 		.decodeChunked
94 		.each((_) {
95 			assert(false);
96 		}).then((_) {
97 			assert(false);
98 		}).except((Exception e) {
99 			assert(e is err);
100 			called = true;
101 		}).nothrow_();
102 
103 	assert(called);
104 }
105 
106 class Client {
107 private:
108 	import upromised.tokenizer : Tokenizer;
109 
110 	size_t contentLength;
111 	bool chunked;
112 	Tokenizer!(const(ubyte)) inputTokenizer;
113 
114 public:
115 	DelegatePromiseIterator!(immutable(ubyte)[]) output;
116 
117 	this() nothrow {
118 		output = new DelegatePromiseIterator!(immutable(ubyte)[]);
119 	}
120 	
121 	this(Stream stream) nothrow {
122 		this();
123 		this.stream = stream;
124 	}
125 
126 	Promise!void stream(Stream stream) nothrow {
127 		assert(!inputTokenizer);
128 		
129 		inputTokenizer = new Tokenizer!(const(ubyte))(stream.read());
130 
131 		return output
132 			.each((data) => stream.write(data))
133 			.then(() {});
134 	}
135 
136 	Promise!FullResponse fullRequest(Method method, string uri, Header[] headers, immutable(ubyte)[] bodyData) nothrow {
137 		import upromised.operations : toAsync, toAsyncChunks;
138 
139 		return fullRequest(method, uri, headers.toAsync, bodyData.toAsyncChunks);
140 	}
141 
142 	Promise!FullResponse fullRequest(Method method, string uri, PromiseIterator!Header headers, immutable(ubyte)[] bodyData) nothrow {
143 		import upromised.operations : toAsyncChunks;
144 
145 		return fullRequest(method, uri, headers, bodyData.toAsyncChunks);
146 	}
147 
148 	Promise!FullResponse fullRequest(Method method, string uri, Header[] headers, PromiseIterator!(immutable(ubyte)[]) bodyData = null) nothrow {
149 		import upromised.operations : toAsync;
150 		
151 		return fullRequest(method, uri, headers.toAsync, bodyData);
152 	}
153 
154 	Promise!FullResponse fullRequest(Method method, string uri, PromiseIterator!Header headers, PromiseIterator!(immutable(ubyte)[]) bodyData = null) nothrow {
155 		import upromised.operations : readAll;
156 		
157 		FullResponse r;
158 		return sendRequest(method, uri)
159 		.then(() => sendHeaders(headers))
160 		.then(() {
161 			if (bodyData !is null) {
162 				return sendBody(bodyData);
163 			} else {
164 				return sendBody();
165 			}
166 		}).then(() => fetchResponse())
167 		.then((response) => r.response = response)
168 		.then((_) => fetchHeaders().readAll)
169 		.then((headers) => r.headers = headers)
170 		.then((_) => r.bodyData = fetchBody())
171 		.then((_) => r);
172 	}
173 
174 	Promise!void sendRequest(Method method, string uri) nothrow {
175 		return Promise!void.resolved()
176 		.then(() => output.resolve("%s %s HTTP/1.1\r\n".format(method, uri).as!ubyte))
177 		.then((_) {});
178 	}
179 
180 	Promise!void sendHeaders(Header[] headers) nothrow {
181 		import upromised.operations : toAsync;
182 
183 		return sendHeaders(headers.toAsync);
184 	}
185 
186 	Promise!void sendHeaders(PromiseIterator!Header headers) nothrow {
187 		import std.conv : to;
188 
189 		return headers.each((header) {
190 			if (header.key == "Content-Length") {
191 				contentLength = header.value.to!size_t;
192 			}
193 
194 			return output.resolve("%s: %s\r\n".format(header.key, header.value).as!ubyte);
195 		}).then((_) {});
196 	}
197 
198 	Promise!void sendBody() nothrow {
199 		return output.resolve("\r\n".as!ubyte).then((_) {});
200 	}
201 
202 	Promise!void sendBody(immutable(ubyte)[] data) nothrow {
203 		import upromised.operations : toAsyncChunks;
204 
205 		return sendBody(data.toAsyncChunks);
206 	}
207 
208 	Promise!void sendBody(PromiseIterator!(immutable(ubyte)[]) dataArg) nothrow {
209 		import upromised.tokenizer : Tokenizer;
210 
211 		if (contentLength > 0) {
212 			auto data = new Tokenizer!(immutable(ubyte))(dataArg);
213 			data.limit(contentLength);
214 			data.partialReceive(true);
215 
216 			return output.resolve("\r\n".as!ubyte).then((_) => data.read().each((chunk) {
217 				contentLength -= chunk.length;
218 
219 				data.limit(contentLength);
220 				return output.resolve(chunk).then((a) => a && contentLength > 0);
221 			}).then((_) {}));
222 		} else {
223 			auto data = dataArg;
224 			return output.resolve("Transfer-Encoding: chunked\r\n\r\n".as!ubyte).then((_) => data.each((chunk) {
225 				return output
226 					.resolve("%x\r\n".format(chunk.length).as!ubyte)
227 					.then((_) => output.resolve(chunk))
228 					.then((_) => output.resolve("\r\n".as!ubyte));
229 			})).then((_) {
230 				return output.resolve("0\r\n\r\n".as!ubyte);
231 			}).then((_) {});
232 		}
233 	}
234 
235 	Promise!Response fetchResponse() nothrow {
236 		import std.array : join;
237 		import std.conv : to;
238 		import std..string : split;
239 
240 		contentLength = 0;
241 		chunked = false;
242 
243 		inputTokenizer.partialReceive();
244 		inputTokenizer.limit();
245 		inputTokenizer.separator("\r\n");
246 
247 		Response r;
248 		return inputTokenizer.read().each((response) {
249 			const(char)[][] responseParts = response.as!char[0..$-2].split(" ");
250 			enforce(responseParts.length > 2, "%s should be two parts. %s".format(responseParts, [response]));
251 			enforce(responseParts[0] == "HTTP/1.1");
252 			r.statusCode = responseParts[1].to!int;
253 			r.statusLine = responseParts[2..$].join(" ").idup;
254 			return false;
255 		}).then((_) => r);
256 	}
257 
258 	PromiseIterator!Header fetchHeaders() nothrow {
259 		import std.algorithm : countUntil;
260 		import std.conv : to;
261 
262 		inputTokenizer.partialReceive();
263 		inputTokenizer.limit();
264 		inputTokenizer.separator("\r\n");
265 		
266 		return new class PromiseIterator!Header {
267 			override Promise!ItValue next(Promise!bool) {
268 				ItValue result;
269 				return inputTokenizer.read().each((headerLine) {
270 					if (headerLine.as!char == "\r\n") {
271 						result.eof = true;
272 						return false;
273 					}
274 
275 					auto pos = headerLine.as!char.countUntil(": ");
276 					Header header = Header(headerLine.as!char[0..pos], headerLine.as!char[pos + 2..$-2]);
277 					result.value = header;
278 
279 					if (header.key == "Content-Length") {
280 						contentLength = header.value.to!size_t;
281 					}
282 
283 					if (header.key == "Transfer-Encoding" && header.value == "chunked") {
284 						chunked = true;
285 					}
286 					
287 					return false;
288 				}).then((_) => result);
289 			}
290 		};
291 	}
292 
293 	PromiseIterator!(const(ubyte)[]) fetchBody() nothrow {
294 		if (chunked) {
295 			return decodeChunked(inputTokenizer);
296 		}
297 		
298 		inputTokenizer.partialReceive(true);
299 		inputTokenizer.limit(contentLength);
300 		inputTokenizer.separator();
301 
302 		return new class PromiseIterator!(const(ubyte)[]) {
303 			override Promise!ItValue next(Promise!bool) {
304 				if (contentLength == 0) {
305 					return Promise!ItValue.resolved(ItValue(true));
306 				}
307 
308 				ItValue result;
309 				return inputTokenizer.read().each((chunk) {
310 					contentLength -= chunk.length;
311 					inputTokenizer.limit(contentLength);
312 					result.value = chunk;
313 					return false;
314 				}).then((eof) => eof ? ItValue(true) : result);
315 			}
316 		};
317 	}
318 
319 	Tokenizer!(const(ubyte)) release() nothrow {
320 		import std.algorithm : swap;
321 
322 		Tokenizer!(const(ubyte)) released;
323 		inputTokenizer.swap(released);
324 		return released;
325 	}
326 }
327 unittest {
328 	const(ubyte)[] request;
329 
330 	auto a = new ManualStream;
331 	(new Client(a)).sendRequest(Method.GET, "/sup/moite").then(() => a.shutdown()).nothrow_();
332 	a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_();
333 
334 	assert(request == "GET /sup/moite HTTP/1.1\r\n", "%s unexpected".format([cast(const(char)[])request]));
335 }
336 unittest {
337 	const(ubyte)[] request;
338 
339 	auto a = new ManualStream;
340 	(new Client(a)).sendRequest(Method.POST, "/sup/moite").then(() => a.shutdown()).nothrow_();
341 	a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_();
342 
343 	assert(request == "POST /sup/moite HTTP/1.1\r\n", "%s unexpected".format([cast(const(char)[])request]));
344 }
345 unittest {
346 	const(ubyte)[] request;
347 
348 	auto a = new ManualStream;
349 	(new Client(a)).sendHeaders([Header("Oi", "Ola"), Header("Oi2", "Ola2")]).then(() => a.shutdown()).nothrow_();
350 	a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_();
351 
352 	assert(request == "Oi: Ola\r\nOi2: Ola2\r\n", "%s unexpected".format([cast(const(char)[])request]));
353 }
354 unittest {
355 	bool called;
356 
357 	Exception err = new Exception("same");
358 	auto headers = new DelegatePromiseIterator!Header;
359 	headers.reject(err);
360 	(new Client()).sendHeaders(headers).then(() {
361 		assert(false);
362 	}).except((Exception err2) {
363 		assert(err2 is err);
364 		called = true;
365 	}).nothrow_();
366 
367 	assert(called);
368 }
369 unittest {
370 	const(ubyte)[] request;
371 
372 	auto a = new ManualStream;
373 	(new Client(a)).sendBody().then(() => a.shutdown()).nothrow_();
374 	a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_();
375 
376 	assert(request == "\r\n", "%s unexpected".format([cast(const(char)[])request]));
377 }
378 unittest {
379 	import upromised.operations : toAsyncChunks;
380 
381 	const(ubyte)[] request;
382 
383 	auto a = new ManualStream;
384 	auto client = new Client(a);
385 
386 	client
387 		.sendRequest(Method.POST, "/supas")
388 		.then(() => client.sendHeaders([Header("Content-Length", "4")]))
389 		.then(() => client.sendBody("supasupa".as!ubyte.toAsyncChunks(2)))
390 		.then(() => a.shutdown())
391 		.nothrow_();
392 
393 	a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_();
394 
395 	assert(request == "POST /supas HTTP/1.1\r\nContent-Length: 4\r\n\r\nsupa", "%s unexpected".format([cast(const(char)[])request]));
396 }
397 unittest {
398 	import std.algorithm : startsWith, endsWith;
399 	import upromised.operations : toAsyncChunks;
400 
401 	const(ubyte)[] request;
402 
403 	auto a = new ManualStream;
404 	auto client = new Client(a);
405 
406 	client
407 		.sendRequest(Method.POST, "/supas")
408 		.then(() => client.sendHeaders([Header("Host", "www.supa.com")]))
409 		.then(() => client.sendBody("supasupa".as!ubyte.toAsyncChunks(2)))
410 		.then(() => a.shutdown())
411 		.nothrow_();
412 
413 	a.readFromWrite().readAllChunks.then((a) => request = a).nothrow_();
414 
415 	auto prefix = "POST /supas HTTP/1.1\r\nHost: www.supa.com\r\nTransfer-Encoding: chunked\r\n\r\n";
416 	assert(request.startsWith(prefix), "%s unexpected".format([cast(const(char)[])request]));
417 	assert(request.endsWith("0\r\n\r\n"), "%s unexpected".format([cast(const(char)[])request]));
418 	auto bodyChunked = request[prefix.length..$];
419 	const(ubyte)[] body_;
420 
421 	bodyChunked
422 		.toAsyncChunks
423 		.decodeChunked
424 		.readAllChunks.then((data) => body_ = data)
425 		.nothrow_;
426 
427 	assert(body_ == "supasupa", "%s unexpected".format([cast(const(char)[])request]));
428 }
429 unittest {
430 	import upromised.operations : toAsyncChunks, readAll;
431 
432 	string responseData = 
433 	"HTTP/1.1 301 Moved Permanently\r\n" ~
434 	"Date: Thu, 30 Mar 2017 17:02:29 GMT\r\n" ~
435 	"Set-Cookie: WMF-Last-Access=30-Mar-2017;Path=/;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT\r\n" ~
436 	"Set-Cookie: WMF-Last-Access-Global=30-Mar-2017;Path=/;Domain=.wikipedia.org;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT\r\n" ~
437 	"Location: https://en.wikipedia.org/\r\n" ~
438 	"Content-Length: 0\r\n\r\n";
439 
440 	Response response;
441 	Header[] headers;
442 	const(ubyte)[] data;
443 
444 	auto a = new ManualStream;
445 	auto client = new Client(a);
446 
447 	client
448 		.fetchResponse
449 		.then((responseArg) { response = responseArg; } )
450 		.then(() => client.fetchHeaders().readAll())
451 		.then((headersArg) { headers = headersArg; })
452 		.then(() => client.fetchBody().readAllChunks())
453 		.then((dataArg) { data = dataArg;})
454 		.nothrow_();
455 
456 	responseData
457 		.as!ubyte
458 		.toAsyncChunks(13)
459 		.each((chunk) => a.writeToRead(chunk).then(() {}))
460 		.then((_) => a.writeToRead())
461 		.nothrow_();
462 
463 	assert(response.statusCode == 301);
464 	assert(response.statusLine == "Moved Permanently");
465 	assert(headers[0] == Header("Date", "Thu, 30 Mar 2017 17:02:29 GMT"));
466 	assert(headers[1] == Header("Set-Cookie", "WMF-Last-Access=30-Mar-2017;Path=/;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT"));
467 	assert(headers[2] == Header("Set-Cookie", "WMF-Last-Access-Global=30-Mar-2017;Path=/;Domain=.wikipedia.org;HttpOnly;secure;Expires=Mon, 01 May 2017 12:00:00 GMT"));
468 	assert(headers[3] == Header("Location", "https://en.wikipedia.org/"));
469 	assert(headers[4] == Header("Content-Length", "0"));
470 	assert(headers.length == 5);
471 	assert(data.length == 0);
472 }
473 unittest {
474 	import upromised.operations : toAsyncChunks, readAll;
475 
476 	string responseData = 
477 	"HTTP/1.1 200 OK\r\n" ~
478 	"Server: nginx\r\n" ~
479 	"Date: Thu, 30 Mar 2017 22:32:32 GMT\r\n" ~
480 	"Content-Type: text/plain; charset=UTF-8\r\n" ~
481 	"Content-Length: 15\r\n" ~
482 	"Connection: close\r\n" ~
483 	"Access-Control-Allow-Origin: *\r\n" ~
484 	"Access-Control-Allow-Methods: GET\r\n" ~
485 	"\r\n" ~
486 	"192.30.253.112\n";
487 
488 	Response response;
489 	Header[] headers;
490 	const(ubyte)[] data;
491 
492 	auto a = new ManualStream;
493 	auto client = new Client(a);
494 
495 	client
496 		.fetchResponse
497 		.then((responseArg) { response = responseArg; } )
498 		.then(() => client.fetchHeaders().readAll())
499 		.then((headersArg) { headers = headersArg; })
500 		.then(() => client.fetchBody().readAllChunks())
501 		.then((dataArg) { data = dataArg;})
502 		.nothrow_();
503 
504 	responseData
505 		.as!ubyte
506 		.toAsyncChunks(11)
507 		.each((chunk) => a.writeToRead(chunk).then((){}))
508 		.nothrow_();
509 
510 	assert(response.statusCode == 200);
511 	assert(response.statusLine == "OK");
512 	assert(headers[0] == Header("Server", "nginx"));
513 	assert(headers[1] == Header("Date", "Thu, 30 Mar 2017 22:32:32 GMT"));
514 	assert(headers[2] == Header("Content-Type", "text/plain; charset=UTF-8"));
515 	assert(headers[3] == Header("Content-Length", "15"));
516 	assert(headers[4] == Header("Connection", "close"));
517 	assert(headers[5] == Header("Access-Control-Allow-Origin", "*"));
518 	assert(headers[6] == Header("Access-Control-Allow-Methods", "GET"));
519 	assert(headers.length == 7);
520 	assert(data == "192.30.253.112\n");
521 }
522 unittest {
523 	import upromised.operations : toAsyncChunks, readAll;
524 
525 	string responseData = 
526 	"HTTP/1.1 200 OK\r\n" ~
527 	"Date: Thu, 30 Mar 2017 23:24:14 GMT\r\n" ~
528 	"Content-Type: text/plain;charset=utf-8\r\n" ~
529 	"Transfer-Encoding: chunked\r\n" ~
530 	"Connection: keep-alive\r\n" ~
531 	"Pragma: no-cache\r\n" ~
532 	"Vary: Accept-Encoding\r\n" ~
533 	"Server: cloudflare-nginx\r\n" ~
534 	"\r\n" ~
535 	"14\r\n" ~
536 	"6\n5\n3\n1\n1\n2\n4\n3\n3\n6\n\r\n" ~
537 	"0\r\n" ~
538 	"\r\n";
539 
540 Response response;
541 	Header[] headers;
542 	const(ubyte)[] data;
543 
544 	auto a = new ManualStream;
545 	auto client = new Client(a);
546 
547 	client
548 		.fetchResponse
549 		.then((responseArg) { response = responseArg; } )
550 		.then(() => client.fetchHeaders().readAll())
551 		.then((headersArg) { headers = headersArg; })
552 		.then(() => client.fetchBody().readAllChunks())
553 		.then((dataArg) { data = dataArg;})
554 		.nothrow_();
555 
556 	responseData
557 		.as!ubyte
558 		.toAsyncChunks(11)
559 		.each((chunk) => a.writeToRead(chunk).then((){}))
560 		.then((_) => a.writeToRead())
561 		.nothrow_();
562 
563 	assert(response.statusCode == 200);
564 	assert(response.statusLine == "OK");
565 	assert(headers[0] == Header("Date", "Thu, 30 Mar 2017 23:24:14 GMT"));
566 	assert(headers[1] == Header("Content-Type", "text/plain;charset=utf-8"));
567 	assert(headers[2] == Header("Transfer-Encoding", "chunked"));
568 	assert(headers[3] == Header("Connection", "keep-alive"));
569 	assert(headers[4] == Header("Pragma", "no-cache"));
570 	assert(headers[5] == Header("Vary", "Accept-Encoding"));
571 	assert(headers[6] == Header("Server", "cloudflare-nginx"));
572 	assert(headers.length == 7);
573 	assert(data == "6\n5\n3\n1\n1\n2\n4\n3\n3\n6\n");
574 }