1 module upromised.uv_stream; 2 import deimos.libuv.uv; 3 import deimos.libuv._d; 4 import std.exception : enforce; 5 import upromised.memory : gcrelease, gcretain, getSelf; 6 import upromised.promise : DelegatePromise, Promise, PromiseIterator; 7 import upromised.stream : Interrupted, Stream; 8 import upromised.uv : uvCheck, UvError; 9 10 extern (C) static void readAlloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) nothrow { 11 import core.memory : GC; 12 13 version(ARM_HardFloat) { 14 // Using GC.malloc causes memory leak on LDC2 for ArmHF. 15 // The reason is unknown. 16 buf.base = (new char[size]).ptr; 17 } else { 18 buf.base = cast(char*)GC.malloc(size); 19 } 20 gcretain(buf.base); 21 buf.len = size; 22 } 23 24 const(char)[] shrinkBuf(const(uv_buf_t)* buf, size_t len) nothrow { 25 import core.memory : GC; 26 version(ARM_HardFloat) { 27 auto r = new char[len]; 28 r[0..len] = buf.base[0..len]; 29 } else { 30 auto r = cast(const(char)*)GC.realloc(cast(void*)buf.base, len); 31 } 32 return r[0..len]; 33 } 34 35 class UvStream(SELF) : Stream { 36 private: 37 DelegatePromise!void closePromise; 38 DelegatePromise!(const(ubyte)[]) readPromise; 39 40 protected: 41 uv_loop_t* ctx; 42 43 public: 44 SELF self; 45 46 this(uv_loop_t* ctx) { 47 this.ctx = ctx; 48 gcretain(this); 49 } 50 51 private extern (C) static void readCb(uv_stream_t* selfSelf, long nread, inout(uv_buf_t)* buf) nothrow { 52 import std.algorithm : swap; 53 import upromised.uv : stream; 54 55 auto self = getSelf!UvStream(selfSelf); 56 if (buf.base !is null) gcrelease(buf.base); 57 uv_read_stop(self.self.stream); 58 59 if (nread == uv_errno_t.UV_EOF) { 60 self.readPromise.resolve(null); 61 return; 62 } 63 64 if (nread <= 0) { 65 self.readPromise.reject(new UvError(cast(int)nread)); 66 return; 67 } 68 69 auto base = shrinkBuf(buf, nread); 70 self.readPromise.resolve(cast(ubyte[])base); 71 } 72 73 override PromiseIterator!(const(ubyte)[]) read() nothrow { 74 import std.algorithm : swap; 75 import upromised.uv : stream; 76 77 return new class PromiseIterator!(const(ubyte)[]) { 78 override Promise!ItValue next(Promise!bool) { 79 enforce(readPromise is null, "Already reading"); 80 readPromise = new DelegatePromise!(const(ubyte)[]); 81 82 uv_read_start(self.stream, &readAlloc, &readCb).uvCheck(readPromise); 83 return readPromise.finall(() { 84 readPromise = null; 85 }).then((chunk) => chunk ? ItValue(false, chunk) : ItValue(true)); 86 } 87 }; 88 } 89 90 override Promise!void write(immutable(ubyte)[] data) nothrow { 91 import upromised.uv : stream; 92 93 WritePromise r = new WritePromise; 94 gcretain(r); 95 r.data.base = cast(char*)data.ptr; 96 r.data.len = data.length; 97 int err = uv_write(&r.self, self.stream, &r.data, 1, (rSelf, status) nothrow { 98 auto r = getSelf!WritePromise(rSelf); 99 if (status == 0) { 100 r.resolve(); 101 } else { 102 r.reject(new UvError(status)); 103 } 104 }); 105 err.uvCheck(r); 106 r.finall(() => gcrelease(r)); 107 return r; 108 } 109 private class WritePromise : DelegatePromise!void { 110 uv_write_t self; 111 uv_buf_t data; 112 } 113 114 override Promise!void shutdown() nothrow { 115 import upromised.uv : stream; 116 117 ShutdownPromise r = new ShutdownPromise; 118 gcretain(r); 119 int err = uv_shutdown(&r.self, self.stream, (rSelf, status) nothrow { 120 auto r = getSelf!ShutdownPromise(rSelf); 121 if (status == 0) { 122 r.resolve(); 123 } else { 124 r.reject(new UvError(status)); 125 } 126 }); 127 err.uvCheck(r); 128 r.finall(() => gcrelease(r)); 129 return r; 130 } 131 private class ShutdownPromise : DelegatePromise!void { 132 uv_shutdown_t self; 133 } 134 135 override Promise!void close() nothrow { 136 import std.algorithm : swap; 137 import upromised.uv : handle; 138 139 if (closePromise) return closePromise; 140 if (readPromise) { 141 typeof(readPromise) gone; 142 swap(gone, readPromise); 143 gone.reject(new Interrupted); 144 } 145 146 closePromise = new DelegatePromise!void; 147 uv_close(self.handle, (selfSelf) nothrow { 148 auto self = getSelf!UvStream(selfSelf); 149 self.closePromise.resolve(); 150 gcrelease(self); 151 }); 152 return closePromise; 153 } 154 }