1 module upromised.udp;
2 import deimos.libuv.uv : uv_buf_t, uv_loop_t, uv_udp_t;
3 import std.exception : enforce;
4 import std.socket : Address;
5 import upromised.memory : gcrelease, gcretain, getSelf;
6 import upromised.promise : DelegatePromise, Promise, PromiseIterator;
7 import upromised.stream : Datagram, DatagramStream, Interrupted;
8 import upromised.uv : uvCheck, UvError;
9 
10 class UdpSocket : DatagramStream {
11 private:
12 	DelegatePromise!void closePromise;
13 	DelegatePromise!Datagram readPromise;
14 
15 public:
16 	uv_udp_t self;
17 
18 	this(uv_loop_t* loop) {
19 		import deimos.libuv.uv : uv_udp_init;
20 
21 		uv_udp_init(loop, &self).uvCheck();
22 		gcretain(this);
23 	}
24 
25 	void bind(Address addr) {
26 		import deimos.libuv.uv : uv_udp_bind;
27 
28 		uv_udp_bind(&self, addr.name, 0).uvCheck();
29 	}
30 
31 	override Promise!void sendTo(Address dest, immutable(ubyte)[] message) nothrow {
32 		import deimos.libuv.uv : uv_udp_send;
33 
34 		SendPromise r = new SendPromise;
35 		gcretain(r);
36 		r.finall(() => gcrelease(r));
37 		r.dest = dest;
38 		r.data.base = cast(char*)message.ptr;
39 		r.data.len = message.length;
40 
41 		int rc = uv_udp_send(&r.self, &self, &r.data, 1, r.dest.name(), (selfArg, int status) nothrow {
42 			SendPromise self = selfArg.getSelf!SendPromise;
43 			if (status == 0) {
44 				self.resolve();
45 			} else {
46 				self.reject(new UvError(status));
47 			}
48 		});
49 		rc.uvCheck(r);
50 
51 		return r;
52 	}
53 
54 	private class SendPromise : DelegatePromise!void {
55 		import deimos.libuv.uv : uv_udp_send_t;
56 
57 		Address dest;
58 		uv_udp_send_t self;
59 		uv_buf_t data;
60 	}
61 
62 	override PromiseIterator!Datagram recvFrom() nothrow {
63 		import std.algorithm : swap;
64 		import deimos.libuv.uv : uv_udp_recv_stop, uv_udp_recv_start;
65 		import upromised.dns : toAddress;
66 		import upromised.uv_stream : readAlloc, shrinkBuf;
67 
68 		return new class PromiseIterator!Datagram {
69 			override Promise!ItValue next(Promise!bool) {
70 				if (closePromise !is null) {
71 					return Promise!ItValue.resolved(ItValue(true));
72 				}
73 
74 
75 				enforce(readPromise is null, "Already reading");
76 				readPromise = new DelegatePromise!Datagram;
77 				uv_udp_recv_start(&self, &readAlloc, (selfArg, nread, buf, addr, flags) nothrow {
78 					auto self = selfArg.getSelf!UdpSocket;
79 
80 					if (buf.base !is null) gcrelease(buf.base);
81 
82 					if (nread == 0 && addr is null) {
83 						return;
84 					}
85 
86 					uv_udp_recv_stop(&self.self);
87 
88 					if (self.readPromise is null) {
89 						return;
90 					}
91 
92 					if (nread < 0) {
93 						self.readPromise.reject(new UvError(cast(int)nread));
94 						return;
95 					}
96 
97 					auto base = shrinkBuf(buf, nread);
98 					self.readPromise.resolve(Datagram(addr.toAddress, cast(const(ubyte)[])base));
99 				}).uvCheck(readPromise);
100 				
101 				return readPromise.finall(() {
102 					readPromise = null;
103 				}).then((datagram) => datagram.addr is null ? ItValue(true) : ItValue(false, datagram));
104 			}
105 		};
106 	}
107 
108 	override Promise!void close() nothrow {
109 		import deimos.libuv.uv : uv_close;
110 		import upromised.uv : handle;
111 
112 		if (closePromise) return closePromise;
113 		if (readPromise) {
114 			readPromise.reject(new Interrupted);
115 		}
116 
117 		closePromise = new DelegatePromise!void;
118 		uv_close(self.handle, (selfArg) nothrow {
119 			auto self = selfArg.getSelf!UdpSocket;
120 			self.closePromise.resolve();
121 			gcrelease(self);
122 		});
123 		return closePromise;
124 	}
125 }