1 module upromised.udp;
2 import deimos.libuv.uv : uv_buf_t, uv_loop_t, uv_udp_getsockname, uv_udp_t;
3 import std.exception : enforce;
4 import std.socket : Address, AddressFamily;
5 import upromised.dns : sockaddr;
6 import upromised.memory : gcrelease, gcretain, getSelf;
7 import upromised.promise : DelegatePromise, Promise, PromiseIterator;
8 import upromised.stream : Datagram, DatagramStream, Interrupted;
9 import upromised.uv : uvCheck, UvError;
10 
11 class UdpSocket : DatagramStream {
12 private:
13 	DelegatePromise!void closePromise;
14 	DelegatePromise!Datagram readPromise;
15 
16 public:
17 	uv_udp_t self;
18 
19 	this(uv_loop_t* loop) {
20 		import deimos.libuv.uv : uv_udp_init;
21 
22 		uv_udp_init(loop, &self).uvCheck();
23 		gcretain(this);
24 	}
25 
26 	void bind(Address addr) {
27 		import deimos.libuv.uv : uv_udp_bind;
28 
29 		uv_udp_bind(&self, addr.name, 0).uvCheck();
30 	}
31 
32 	override Promise!void sendTo(Address dest, immutable(ubyte)[] message) nothrow {
33 		import deimos.libuv.uv : uv_udp_send;
34 
35 		SendPromise r = new SendPromise;
36 		gcretain(r);
37 		r.finall(() => gcrelease(r));
38 		r.dest = dest;
39 		r.data.base = cast(char*)message.ptr;
40 		r.data.len = message.length;
41 
42 		int rc = uv_udp_send(&r.self, &self, &r.data, 1, r.dest.name(), (selfArg, int status) nothrow {
43 			SendPromise self = selfArg.getSelf!SendPromise;
44 			if (status == 0) {
45 				self.resolve();
46 			} else {
47 				self.reject(new UvError(status));
48 			}
49 		});
50 		rc.uvCheck(r);
51 
52 		return r;
53 	}
54 
55 	private class SendPromise : DelegatePromise!void {
56 		import deimos.libuv.uv : uv_udp_send_t;
57 
58 		Address dest;
59 		uv_udp_send_t self;
60 		uv_buf_t data;
61 	}
62 
63 	override PromiseIterator!Datagram recvFrom() nothrow {
64 		import std.algorithm : swap;
65 		import deimos.libuv.uv : uv_udp_recv_stop, uv_udp_recv_start;
66 		import upromised.dns : toAddress;
67 		import upromised.uv_stream : readAlloc, shrinkBuf;
68 
69 		return new class PromiseIterator!Datagram {
70 			override Promise!ItValue next(Promise!bool) {
71 				if (closePromise !is null) {
72 					return Promise!ItValue.resolved(ItValue(true));
73 				}
74 
75 
76 				enforce(readPromise is null, "Already reading");
77 				readPromise = new DelegatePromise!Datagram;
78 				uv_udp_recv_start(&self, &readAlloc, (selfArg, nread, buf, addr, flags) nothrow {
79 					auto self = selfArg.getSelf!UdpSocket;
80 
81 					if (buf.base !is null) gcrelease(buf.base);
82 
83 					if (nread == 0 && addr is null) {
84 						return;
85 					}
86 
87 					uv_udp_recv_stop(&self.self);
88 
89 					if (self.readPromise is null) {
90 						return;
91 					}
92 
93 					if (nread < 0) {
94 						self.readPromise.reject(new UvError(cast(int)nread));
95 						return;
96 					}
97 
98 					auto base = shrinkBuf(buf, nread);
99 					self.readPromise.resolve(Datagram(addr.toAddress, cast(const(ubyte)[])base));
100 				}).uvCheck(readPromise);
101 				
102 				return readPromise.finall(() {
103 					readPromise = null;
104 				}).then((datagram) => datagram.addr is null ? ItValue(true) : ItValue(false, datagram));
105 			}
106 		};
107 	}
108 
109 	override Promise!void close() nothrow {
110 		import deimos.libuv.uv : uv_close;
111 		import upromised.uv : handle;
112 
113 		if (closePromise) return closePromise;
114 		if (readPromise) {
115 			readPromise.reject(new Interrupted);
116 		}
117 
118 		closePromise = new DelegatePromise!void;
119 		uv_close(self.handle, (selfArg) nothrow {
120 			auto self = selfArg.getSelf!UdpSocket;
121 			self.closePromise.resolve();
122 			gcrelease(self);
123 		});
124 		return closePromise;
125 	}
126 
127 	void connect(Address dest) {
128 		connect(dest.name(), dest.nameLen());
129 	}
130 
131 	void connect(sockaddr* name, int nameLen) {
132 		version(Posix) {
133 			import core.sys.posix.netdb : connect;
134 		} else version(Windows) {
135 			import core.sys.windows.winsock2 : connect;
136 		}
137 		import deimos.libuv.uv : uv_os_fd_t, uv_fileno;
138 		import upromised.uv : handle;
139 
140 		uv_os_fd_t fileno;
141 		uv_fileno(self.handle, &fileno).uvCheck;
142 		connect(fileno, name, nameLen);
143 	}
144 
145 	void disconnect() {
146 		import std.socket : parseAddress;
147 		import upromised.dns : sockaddr, sockaddr_in, toAddress;
148 
149 		sockaddr_in storage;
150 		storage.sin_family = AddressFamily.UNSPEC;
151 		connect(cast(sockaddr*)&storage, storage.sizeof);
152 	}
153 
154 	Address sockname() {
155 		import upromised.dns : sockaddr_in6, sockaddr, toAddress;
156 
157 		sockaddr_in6 dataStorage;
158 		sockaddr* data = cast(sockaddr*)&dataStorage;
159 		int len = data.sizeof;
160 		uv_udp_getsockname(&self, data, &len).uvCheck();
161 
162 		return toAddress(data, len);
163 	}
164 }