1 module upromised.udp; 2 import deimos.libuv.uv : uv_buf_t, uv_loop_t, uv_udp_getsockname, uv_udp_t; 3 import std.exception : enforce; 4 import std.socket : Address, AddressFamily; 5 import upromised.dns : sockaddr; 6 import upromised.memory : gcrelease, gcretain, getSelf; 7 import upromised.promise : DelegatePromise, Promise, PromiseIterator; 8 import upromised.stream : Datagram, DatagramStream, Interrupted; 9 import upromised.uv : uvCheck, UvError; 10 11 class UdpSocket : DatagramStream { 12 private: 13 DelegatePromise!void closePromise; 14 DelegatePromise!Datagram readPromise; 15 16 public: 17 uv_udp_t self; 18 19 this(uv_loop_t* loop) { 20 import deimos.libuv.uv : uv_udp_init; 21 22 uv_udp_init(loop, &self).uvCheck(); 23 gcretain(this); 24 } 25 26 void bind(Address addr) { 27 import deimos.libuv.uv : uv_udp_bind; 28 29 uv_udp_bind(&self, addr.name, 0).uvCheck(); 30 } 31 32 override Promise!void sendTo(Address dest, immutable(ubyte)[] message) nothrow { 33 import deimos.libuv.uv : uv_udp_send; 34 35 SendPromise r = new SendPromise; 36 gcretain(r); 37 r.finall(() => gcrelease(r)); 38 r.dest = dest; 39 r.data.base = cast(char*)message.ptr; 40 r.data.len = message.length; 41 42 int rc = uv_udp_send(&r.self, &self, &r.data, 1, r.dest.name(), (selfArg, int status) nothrow { 43 SendPromise self = selfArg.getSelf!SendPromise; 44 if (status == 0) { 45 self.resolve(); 46 } else { 47 self.reject(new UvError(status)); 48 } 49 }); 50 rc.uvCheck(r); 51 52 return r; 53 } 54 55 private class SendPromise : DelegatePromise!void { 56 import deimos.libuv.uv : uv_udp_send_t; 57 58 Address dest; 59 uv_udp_send_t self; 60 uv_buf_t data; 61 } 62 63 override PromiseIterator!Datagram recvFrom() nothrow { 64 import std.algorithm : swap; 65 import deimos.libuv.uv : uv_udp_recv_stop, uv_udp_recv_start; 66 import upromised.dns : toAddress; 67 import upromised.uv_stream : readAlloc, shrinkBuf; 68 69 return new class PromiseIterator!Datagram { 70 override Promise!ItValue next(Promise!bool) { 71 if (closePromise !is null) { 72 return Promise!ItValue.resolved(ItValue(true)); 73 } 74 75 76 enforce(readPromise is null, "Already reading"); 77 readPromise = new DelegatePromise!Datagram; 78 uv_udp_recv_start(&self, &readAlloc, (selfArg, nread, buf, addr, flags) nothrow { 79 auto self = selfArg.getSelf!UdpSocket; 80 81 if (buf.base !is null) gcrelease(buf.base); 82 83 if (nread == 0 && addr is null) { 84 return; 85 } 86 87 uv_udp_recv_stop(&self.self); 88 89 if (self.readPromise is null) { 90 return; 91 } 92 93 if (nread < 0) { 94 self.readPromise.reject(new UvError(cast(int)nread)); 95 return; 96 } 97 98 auto base = shrinkBuf(buf, nread); 99 self.readPromise.resolve(Datagram(addr.toAddress, cast(const(ubyte)[])base)); 100 }).uvCheck(readPromise); 101 102 return readPromise.finall(() { 103 readPromise = null; 104 }).then((datagram) => datagram.addr is null ? ItValue(true) : ItValue(false, datagram)); 105 } 106 }; 107 } 108 109 override Promise!void close() nothrow { 110 import deimos.libuv.uv : uv_close; 111 import upromised.uv : handle; 112 113 if (closePromise) return closePromise; 114 if (readPromise) { 115 readPromise.reject(new Interrupted); 116 } 117 118 closePromise = new DelegatePromise!void; 119 uv_close(self.handle, (selfArg) nothrow { 120 auto self = selfArg.getSelf!UdpSocket; 121 self.closePromise.resolve(); 122 gcrelease(self); 123 }); 124 return closePromise; 125 } 126 127 void connect(Address dest) { 128 connect(dest.name(), dest.nameLen()); 129 } 130 131 void connect(sockaddr* name, int nameLen) { 132 version(Posix) { 133 import core.sys.posix.netdb : connect; 134 } else version(Windows) { 135 import core.sys.windows.winsock2 : connect; 136 } 137 import deimos.libuv.uv : uv_os_fd_t, uv_fileno; 138 import upromised.uv : handle; 139 140 uv_os_fd_t fileno; 141 uv_fileno(self.handle, &fileno).uvCheck; 142 connect(fileno, name, nameLen); 143 } 144 145 void disconnect() { 146 import std.socket : parseAddress; 147 import upromised.dns : sockaddr, sockaddr_in, toAddress; 148 149 sockaddr_in storage; 150 storage.sin_family = AddressFamily.UNSPEC; 151 connect(cast(sockaddr*)&storage, storage.sizeof); 152 } 153 154 Address sockname() { 155 import upromised.dns : sockaddr_in6, sockaddr, toAddress; 156 157 sockaddr_in6 dataStorage; 158 sockaddr* data = cast(sockaddr*)&dataStorage; 159 int len = data.sizeof; 160 uv_udp_getsockname(&self, data, &len).uvCheck(); 161 162 return toAddress(data, len); 163 } 164 }