1 module upromised.uv_stream; 2 import deimos.libuv.uv; 3 import deimos.libuv._d; 4 import std.exception : enforce; 5 import upromised.memory : gcrelease, gcretain, getSelf; 6 import upromised.promise : DelegatePromise, Promise, PromiseIterator; 7 import upromised.stream : Interrupted, Stream; 8 import upromised.uv : uvCheck, UvError; 9 10 extern (C) static void readAlloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) nothrow { 11 import core.memory : GC; 12 13 buf.base = cast(char*)GC.malloc(size); 14 gcretain(buf.base); 15 buf.len = size; 16 } 17 18 const(char)[] shrinkBuf(const(uv_buf_t)* buf, size_t len) nothrow { 19 import core.memory : GC; 20 auto r = cast(const(char)*)GC.realloc(cast(void*)buf.base, len); 21 return r[0..len]; 22 } 23 24 class UvStream(SELF) : Stream { 25 private: 26 DelegatePromise!void closePromise; 27 DelegatePromise!(const(ubyte)[]) readPromise; 28 29 protected: 30 uv_loop_t* ctx; 31 32 public: 33 SELF self; 34 35 this(uv_loop_t* ctx) { 36 this.ctx = ctx; 37 gcretain(this); 38 } 39 40 private extern (C) static void readCb(uv_stream_t* selfSelf, long nread, inout(uv_buf_t)* buf) nothrow { 41 import std.algorithm : swap; 42 import upromised.uv : stream; 43 44 auto self = getSelf!UvStream(selfSelf); 45 if (buf.base !is null) gcrelease(buf.base); 46 uv_read_stop(self.self.stream); 47 48 if (nread == uv_errno_t.UV_EOF) { 49 self.readPromise.resolve(null); 50 return; 51 } 52 53 if (nread <= 0) { 54 self.readPromise.reject(new UvError(cast(int)nread)); 55 return; 56 } 57 58 auto base = shrinkBuf(buf, nread); 59 self.readPromise.resolve(cast(ubyte[])base); 60 } 61 62 override PromiseIterator!(const(ubyte)[]) read() nothrow { 63 import std.algorithm : swap; 64 import upromised.uv : stream; 65 66 return new class PromiseIterator!(const(ubyte)[]) { 67 override Promise!ItValue next(Promise!bool) { 68 enforce(readPromise is null, "Already reading"); 69 readPromise = new DelegatePromise!(const(ubyte)[]); 70 71 uv_read_start(self.stream, &readAlloc, &readCb).uvCheck(readPromise); 72 return readPromise.finall(() { 73 readPromise = null; 74 }).then((chunk) => chunk ? ItValue(false, chunk) : ItValue(true)); 75 } 76 }; 77 } 78 79 override Promise!void write(immutable(ubyte)[] data) nothrow { 80 import upromised.uv : stream; 81 82 WritePromise r = new WritePromise; 83 gcretain(r); 84 r.data.base = cast(char*)data.ptr; 85 r.data.len = data.length; 86 int err = uv_write(&r.self, self.stream, &r.data, 1, (rSelf, status) nothrow { 87 auto r = getSelf!WritePromise(rSelf); 88 if (status == 0) { 89 r.resolve(); 90 } else { 91 r.reject(new UvError(status)); 92 } 93 }); 94 err.uvCheck(r); 95 r.finall(() => gcrelease(r)); 96 return r; 97 } 98 private class WritePromise : DelegatePromise!void { 99 uv_write_t self; 100 uv_buf_t data; 101 } 102 103 override Promise!void shutdown() nothrow { 104 import upromised.uv : stream; 105 106 ShutdownPromise r = new ShutdownPromise; 107 gcretain(r); 108 int err = uv_shutdown(&r.self, self.stream, (rSelf, status) nothrow { 109 auto r = getSelf!ShutdownPromise(rSelf); 110 if (status == 0) { 111 r.resolve(); 112 } else { 113 r.reject(new UvError(status)); 114 } 115 }); 116 err.uvCheck(r); 117 r.finall(() => gcrelease(r)); 118 return r; 119 } 120 private class ShutdownPromise : DelegatePromise!void { 121 uv_shutdown_t self; 122 } 123 124 override Promise!void close() nothrow { 125 import std.algorithm : swap; 126 import upromised.uv : handle; 127 128 if (closePromise) return closePromise; 129 if (readPromise) { 130 typeof(readPromise) gone; 131 swap(gone, readPromise); 132 gone.reject(new Interrupted); 133 } 134 135 closePromise = new DelegatePromise!void; 136 uv_close(self.handle, (selfSelf) nothrow { 137 auto self = getSelf!UvStream(selfSelf); 138 self.closePromise.resolve(); 139 gcrelease(self); 140 }); 141 return closePromise; 142 } 143 }