1 module upromised.utp; 2 3 import std.format : format; 4 import std.socket : Address; 5 import upromised.c.utp; 6 import upromised.loop : Loop; 7 import upromised.memory : gcrelease, gcretain; 8 import upromised.promise : DelegatePromise, DelegatePromiseIterator, Promise, PromiseIterator; 9 import upromised.stream : DatagramStream, Interrupted, Stream; 10 11 private auto noth(alias f, Args...)(Args args) { 12 try { 13 return f(args); 14 } catch(Exception) { 15 import core.stdc.stdlib : abort; 16 abort(); 17 assert(false); 18 } 19 } 20 21 class CumulativeBuffer: PromiseIterator!(const(ubyte)[]) { 22 protected: 23 ubyte[] pending; 24 bool eof; 25 DelegatePromise!ItValue pendingReq; 26 27 void onDrain() { 28 } 29 30 public: 31 size_t pendingLength() const { 32 return pending.length; 33 } 34 35 override Promise!ItValue next(Promise!bool _) { 36 if (pending.length > 0) { 37 auto done = pending; 38 pending = pending[$..$]; 39 return Promise!ItValue.resolved(ItValue(false, done)); 40 } 41 42 if (eof) { 43 return Promise!ItValue.resolved(ItValue(true)); 44 } 45 46 assert(pendingReq is null); 47 pendingReq = new DelegatePromise!ItValue; 48 return pendingReq; 49 } 50 51 void append(const(ubyte)[] data) { 52 if (pendingReq !is null) { 53 assert(pending.length == 0); 54 auto done = pendingReq; 55 pendingReq = null; 56 done.resolve(ItValue(false, data)); 57 return; 58 } 59 60 pending ~= data; 61 } 62 63 void resolve() { 64 this.eof = true; 65 66 if (pendingReq !is null) { 67 assert(pending.length == 0); 68 auto done = pendingReq; 69 pendingReq = null; 70 done.resolve(ItValue(true)); 71 } 72 } 73 } 74 75 class UtpContext { 76 private: 77 int count; 78 79 utp_context* context; 80 DatagramStream underlying; 81 DelegatePromiseIterator!(utp_socket*) sockets; 82 83 public: 84 this(Loop loop, DatagramStream underlying) { 85 import std.datetime : msecs; 86 import upromised.dns : toAddress; 87 import upromised.promise : break_, continue_, do_while; 88 89 context = utp_init(2); 90 assert(context !is null); 91 count++; 92 utp_context_set_userdata(context, cast(void*)cast(Object)this); 93 this.underlying = underlying; 94 95 utp_set_callback(context, UTP_SENDTO, (args) { 96 auto self = cast(UtpContext)cast(Object)utp_context_get_userdata(args.context); 97 auto addr = args.address.toAddress(args.address_len); 98 auto data = (cast(const(ubyte)*)args.buf)[0..args.len].idup; 99 self.underlying.sendTo(addr, data) 100 .except((Exception e) { 101 import std.stdio : stderr; 102 debug stderr.writeln(e); 103 }).nothrow_; 104 return 0; 105 }); 106 107 utp_set_callback(context, UTP_ON_FIREWALL, (args) { 108 auto self = cast(UtpContext)cast(Object)utp_context_get_userdata(args.context); 109 return self.sockets is null; 110 }); 111 112 utp_set_callback(context, UTP_ON_ERROR, (args) { 113 auto socket = cast(UtpStream)cast(Object)utp_get_userdata(args.socket); 114 assert(socket !is null); 115 socket.on_error(args.error_code); 116 return 0; 117 }); 118 119 utp_set_callback(context, UTP_ON_READ, (args) { 120 auto socket = cast(UtpStream)cast(Object)utp_get_userdata(args.socket); 121 assert(socket !is null); 122 socket.on_read((cast(ubyte*)args.buf)[0..args.len]); 123 return 0; 124 }); 125 126 utp_set_callback(context, UTP_GET_READ_BUFFER_SIZE, (args) { 127 auto socket = cast(UtpStream)cast(Object)utp_get_userdata(args.socket); 128 if (socket is null) { 129 return 4096; 130 } 131 132 assert(socket !is null); 133 return socket.recv_buffer(); 134 }); 135 136 utp_set_callback(context, UTP_ON_STATE_CHANGE, (args) { 137 auto socket = cast(UtpStream)cast(Object)utp_get_userdata(args.socket); 138 if (socket is null) { 139 return 0; 140 } 141 socket.on_state(args.state); 142 return 0; 143 }); 144 145 utp_set_callback(context, UTP_ON_ACCEPT, (args) { 146 auto self = cast(UtpContext)cast(Object)utp_context_get_userdata(args.context); 147 assert(self !is null); 148 assert(self.sockets !is null); 149 self.sockets.resolve(args.socket); 150 return 0; 151 }); 152 153 loop.interval(500.msecs).each((_) nothrow { 154 if (this.context is null) return false; 155 utp_check_timeouts(this.context); 156 return true; 157 }).nothrow_; 158 159 auto recv = underlying.recvFrom(); 160 do_while(() { 161 bool raced; 162 loop.sleep(30.msecs) 163 .then(() { 164 if (raced || this.context is null) return; 165 utp_issue_deferred_acks(this.context); 166 }); 167 168 return recv.next() 169 .then((eofValue) { 170 raced = true; 171 if (eofValue.eof) return break_; 172 auto dgram = eofValue.value; 173 utp_process_udp(context, cast(const(byte)*)dgram.message.ptr, dgram.message.length, dgram.addr.name(), dgram.addr.nameLen()); 174 return continue_; 175 }); 176 }).except((Interrupted _) { 177 }).except((Exception e) { 178 import std.stdio; 179 debug stderr.writeln(sockets); 180 if (sockets !is null) { 181 sockets.reject(e); 182 } 183 }).nothrow_(); 184 185 gcretain(this); 186 } 187 188 void inc() nothrow { 189 ++count; 190 } 191 192 Promise!void close() nothrow { 193 return Promise!void.resolved() 194 .then(() { 195 --count; 196 if (count == 0) { 197 return this.underlying.close() 198 .finall(() { 199 utp_destroy(this.context); 200 this.context = null; 201 if (this.sockets !is null) { 202 auto done = this.sockets; 203 this.sockets = null; 204 done.resolve(); 205 } 206 gcrelease(this); 207 }); 208 } else { 209 return Promise!void.resolved(); 210 } 211 }); 212 } 213 214 Promise!UtpStream connect(Address dest) nothrow { 215 return Promise!void.resolved() 216 .then(() => new UtpStream(utp_create_socket(this.context), this)) 217 .then((s) { 218 return s.connect(dest) 219 .failure((Exception _) => s.close()) 220 .then(() => s); 221 }); 222 } 223 224 PromiseIterator!Stream accept() nothrow { 225 assert(sockets is null); 226 sockets = new DelegatePromiseIterator!(utp_socket*); 227 228 return new class PromiseIterator!Stream { 229 override Promise!ItValue next(Promise!bool done) { 230 if (sockets is null) { 231 return Promise!ItValue.resolved(ItValue(true)); 232 } 233 234 return sockets.next(done) 235 .then((socketValue) { 236 if (socketValue.eof) { 237 return ItValue(true); 238 } 239 240 return ItValue(false, new UtpStream(socketValue.value, this.outer)); 241 }); 242 } 243 }; 244 } 245 } 246 247 class UtpStream: Stream { 248 private: 249 utp_socket* socket; 250 UtpContext context; 251 const(ubyte)[] pending_write_buffer; 252 DelegatePromise!void pending_write_promise; 253 CumulativeBuffer read_; 254 DelegatePromise!void connecting; 255 uint recv_len; 256 257 this(utp_socket* socket, UtpContext context) { 258 this.context = context; 259 this.socket = socket; 260 this.context.inc(); 261 gcretain(this); 262 utp_set_userdata(socket, cast(void*)cast(Object)this); 263 recvLen = 4096; 264 read_ = new class CumulativeBuffer { 265 override void onDrain() { 266 utp_read_drained(this.outer.socket); 267 } 268 }; 269 } 270 271 void on_state(int state) { 272 if (state == UTP_STATE_CONNECT && connecting) { 273 auto done = connecting; 274 connecting = null; 275 done.resolve(); 276 } 277 278 if (state == UTP_STATE_CONNECT || state == UTP_STATE_WRITABLE) { 279 do_write(); 280 } 281 282 if (state == UTP_STATE_EOF) { 283 read_.resolve(); 284 } 285 } 286 287 void on_error(int error_code) { 288 if (connecting) { 289 auto done = connecting; 290 connecting = null; 291 done.reject(new Exception("Error %s".noth!format(error_code))); 292 } 293 } 294 295 Promise!void do_write() nothrow { 296 if (pending_write_buffer.length == 0) { 297 return Promise!void.resolved(); 298 } 299 300 auto nwrote = utp_write(this.socket, cast(void*)pending_write_buffer.ptr, pending_write_buffer.length); 301 302 if (nwrote < 0) { 303 if (pending_write_promise is null) pending_write_promise = new DelegatePromise!void; 304 auto done = pending_write_promise; 305 pending_write_promise = null; 306 done.reject(new Exception("Error %s".noth!format(-nwrote))); 307 return done; 308 } 309 310 pending_write_buffer = pending_write_buffer[nwrote..$]; 311 if (pending_write_buffer.length == 0) { 312 if (pending_write_promise !is null) { 313 auto done = pending_write_promise; 314 pending_write_promise = null; 315 done.resolve(); 316 } 317 318 return Promise!void.resolved(); 319 } 320 321 if (pending_write_promise is null) pending_write_promise = new DelegatePromise!void; 322 return pending_write_promise; 323 } 324 325 Promise!void connect(Address addr) { 326 assert(connecting is null); 327 connecting = new DelegatePromise!void; 328 utp_connect(this.socket, addr.name, addr.nameLen); 329 return connecting; 330 } 331 332 void on_read(const(ubyte)[] data) { 333 read_.append(data); 334 } 335 336 int recv_buffer() { 337 return cast(int)read_.pendingLength(); 338 } 339 public: 340 @property void recvLen(int len) { 341 utp_setsockopt(this.socket, UTP_RCVBUF, 4096); 342 } 343 @property int recvLen() { 344 return utp_getsockopt(this.socket, UTP_RCVBUF); 345 } 346 347 override Promise!void shutdown() nothrow { 348 return close(); 349 } 350 351 override Promise!void close() nothrow { 352 return Promise!void.resolved() 353 .then(() { 354 utp_close(this.socket); 355 }).finall(() { 356 if (this.context is null) { 357 return Promise!void.resolved(); 358 } else { 359 auto done = this.context; 360 this.context = null; 361 return done.close(); 362 } 363 }).finall(() { 364 gcrelease(this); 365 }); 366 } 367 368 override PromiseIterator!(const(ubyte)[]) read() nothrow { 369 return read_; 370 } 371 372 override Promise!void write(immutable(ubyte)[] buf) nothrow { 373 assert(pending_write_promise is null, "Paralell writes"); 374 375 pending_write_buffer = buf; 376 377 return do_write(); 378 } 379 }