1 module upromised.udp; 2 import deimos.libuv.uv : uv_buf_t, uv_loop_t, uv_udp_t; 3 import std.exception : enforce; 4 import std.socket : Address; 5 import upromised.memory : gcrelease, gcretain, getSelf; 6 import upromised.promise : DelegatePromise, Promise, PromiseIterator; 7 import upromised.stream : Datagram, DatagramStream, Interrupted; 8 import upromised.uv : uvCheck, UvError; 9 10 class UdpSocket : DatagramStream { 11 private: 12 DelegatePromise!void closePromise; 13 DelegatePromise!Datagram readPromise; 14 15 public: 16 uv_udp_t self; 17 18 this(uv_loop_t* loop) { 19 import deimos.libuv.uv : uv_udp_init; 20 21 uv_udp_init(loop, &self).uvCheck(); 22 gcretain(this); 23 } 24 25 void bind(Address addr) { 26 import deimos.libuv.uv : uv_udp_bind; 27 28 uv_udp_bind(&self, addr.name, 0).uvCheck(); 29 } 30 31 override Promise!void sendTo(Address dest, immutable(ubyte)[] message) nothrow { 32 import deimos.libuv.uv : uv_udp_send; 33 34 SendPromise r = new SendPromise; 35 gcretain(r); 36 r.finall(() => gcrelease(r)); 37 r.dest = dest; 38 r.data.base = cast(char*)message.ptr; 39 r.data.len = message.length; 40 41 int rc = uv_udp_send(&r.self, &self, &r.data, 1, r.dest.name(), (selfArg, int status) nothrow { 42 SendPromise self = selfArg.getSelf!SendPromise; 43 if (status == 0) { 44 self.resolve(); 45 } else { 46 self.reject(new UvError(status)); 47 } 48 }); 49 rc.uvCheck(r); 50 51 return r; 52 } 53 54 private class SendPromise : DelegatePromise!void { 55 import deimos.libuv.uv : uv_udp_send_t; 56 57 Address dest; 58 uv_udp_send_t self; 59 uv_buf_t data; 60 } 61 62 override PromiseIterator!Datagram recvFrom() nothrow { 63 import std.algorithm : swap; 64 import deimos.libuv.uv : uv_udp_recv_stop, uv_udp_recv_start; 65 import upromised.dns : toAddress; 66 import upromised.uv_stream : readAlloc, shrinkBuf; 67 68 return new class PromiseIterator!Datagram { 69 override Promise!ItValue next(Promise!bool) { 70 if (closePromise !is null) { 71 return Promise!ItValue.resolved(ItValue(true)); 72 } 73 74 75 enforce(readPromise is null, "Already reading"); 76 readPromise = new DelegatePromise!Datagram; 77 uv_udp_recv_start(&self, &readAlloc, (selfArg, nread, buf, addr, flags) nothrow { 78 auto self = selfArg.getSelf!UdpSocket; 79 80 if (buf.base !is null) gcrelease(buf.base); 81 82 if (nread == 0 && addr is null) { 83 return; 84 } 85 86 uv_udp_recv_stop(&self.self); 87 88 if (self.readPromise is null) { 89 return; 90 } 91 92 if (nread < 0) { 93 self.readPromise.reject(new UvError(cast(int)nread)); 94 return; 95 } 96 97 auto base = shrinkBuf(buf, nread); 98 self.readPromise.resolve(Datagram(addr.toAddress, cast(const(ubyte)[])base)); 99 }).uvCheck(readPromise); 100 101 return readPromise.finall(() { 102 readPromise = null; 103 }).then((datagram) => datagram.addr is null ? ItValue(true) : ItValue(false, datagram)); 104 } 105 }; 106 } 107 108 override Promise!void close() nothrow { 109 import deimos.libuv.uv : uv_close; 110 import upromised.uv : handle; 111 112 if (closePromise) return closePromise; 113 if (readPromise) { 114 readPromise.reject(new Interrupted); 115 } 116 117 closePromise = new DelegatePromise!void; 118 uv_close(self.handle, (selfArg) nothrow { 119 auto self = selfArg.getSelf!UdpSocket; 120 self.closePromise.resolve(); 121 gcrelease(self); 122 }); 123 return closePromise; 124 } 125 }