1 module upromised.tcp;
2 import upromised.stream : Stream;
3 import upromised.promise : DelegatePromise, DelegatePromiseIterator, PromiseIterator, Promise;
4 import upromised.memory : getSelf, gcretain, gcrelease;
5 import upromised.uv : uvCheck, UvError;
6 import upromised.uv_stream : UvStream;
7 import upromised : fatal;
8 import std.exception : enforce;
9 import std.format : format;
10 import std.socket : Address;
11 import deimos.libuv.uv;
12 import deimos.libuv._d;
13 
14 extern (C) void alloc_buffer(uv_handle_t*, size_t size, uv_buf_t* buf) {
15 	import core.stdc.stdlib : malloc;
16 	buf.base = cast(char *)malloc(size);
17 	buf.len = size;
18 }
19 
20 private sockaddr* upcast(ref sockaddr_in self) nothrow {
21     return cast(sockaddr*)&self;
22 }
23 
24 class TcpSocket : UvStream!uv_tcp_t {
25 private:
26 	DelegatePromiseIterator!TcpSocket listenPromise;
27 
28 public:
29 	this(uv_loop_t* ctx) {
30 		super(ctx);
31 		uv_tcp_init(ctx, &self).uvCheck();
32 	}
33 
34 	void bind(Address[] addr) {
35 		import std..string : toStringz;
36 
37 		enforce(addr.length > 0);
38 		uv_tcp_bind(&self, addr[0].name(), 0).uvCheck();
39 	}
40 
41 	PromiseIterator!TcpSocket listen(int backlog) nothrow {
42 		import std.algorithm : swap;
43 		import upromised.uv : stream;
44 
45 		assert(listenPromise is null);
46 		listenPromise = new DelegatePromiseIterator!TcpSocket;
47 		auto err = uv_listen(self.stream, backlog, (selfSelf, status) nothrow {
48 			Promise!void.resolved().then(() {
49 				auto self = getSelf!TcpSocket(selfSelf);
50 				enforce(status == 0);
51 				auto conn = new TcpSocket(self.ctx);
52 				uv_accept(self.self.stream, conn.self.stream).uvCheck();
53 				self.listenPromise.resolve(conn);
54 			}).except((Exception e) {
55 				auto self = getSelf!TcpSocket(selfSelf);
56 				DelegatePromiseIterator!TcpSocket failed;
57 				swap(failed, self.listenPromise);
58 				failed.reject(e);
59 			}).nothrow_();
60 		});
61         if (err.uvCheck(listenPromise)) {
62             DelegatePromiseIterator!TcpSocket r;
63             swap(r, listenPromise);
64             return r;
65         }
66 		return listenPromise;
67 	}
68 
69 	Promise!void connect(Address[] addr) nothrow {
70 		import std..string : toStringz;
71 
72 		if (addr.length == 0) {
73 			return Promise!void.rejected(new Exception("Empty address info"));
74 		}
75 
76 		ConnectPromise r = new ConnectPromise;
77 		r.addr = addr[0];
78 		gcretain(r);
79 		scope(failure) gcrelease(r);
80 		int err = uv_tcp_connect(&r.self, &self, r.addr.name(), (rSelf, status) nothrow {
81 			auto r = getSelf!ConnectPromise(rSelf);
82 			if (status == 0) {
83 				r.resolve();
84 			} else {
85 				r.reject(new UvError(status));
86 			}
87 		});
88 		err.uvCheck(r);
89 		r.finall(() => gcrelease(r));
90 		return r;
91 	}
92 	private class ConnectPromise : DelegatePromise!void {
93 		Address addr;
94 		uv_connect_t self;
95 	}
96 }