1 module upromised.uv_stream;
2 import deimos.libuv.uv;
3 import deimos.libuv._d;
4 import std.exception : enforce;
5 import upromised.memory : gcrelease, gcretain, getSelf;
6 import upromised.promise : DelegatePromise, Promise, PromiseIterator;
7 import upromised.stream : Interrupted, Stream;
8 import upromised.uv : uvCheck, UvError;
9 
10 extern (C) static void readAlloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) nothrow {
11 	import core.memory : GC;
12 
13 	version(ARM_HardFloat) {
14 		// Using GC.malloc causes memory leak on LDC2 for ArmHF.
15 		// The reason is unknown.
16 		buf.base = (new char[size]).ptr;
17 	} else {
18 		buf.base = cast(char*)GC.malloc(size);
19 	}
20 	gcretain(buf.base);
21 	buf.len = size;
22 }
23 
24 const(char)[] shrinkBuf(const(uv_buf_t)* buf, size_t len) nothrow {
25 	import core.memory : GC;
26 	version(ARM_HardFloat) {
27 		auto r = new char[len];
28 		r[0..len] = buf.base[0..len];
29 	} else {
30 		auto r = cast(const(char)*)GC.realloc(cast(void*)buf.base, len);
31 	}
32 	return r[0..len];
33 }
34 
35 class UvStream(SELF) : Stream {
36 private:
37 	DelegatePromise!void closePromise;
38 	DelegatePromise!(const(ubyte)[]) readPromise;
39 
40 protected:
41 	uv_loop_t* ctx;
42 
43 public:
44 	SELF self;
45 
46 	this(uv_loop_t* ctx) {
47 		this.ctx = ctx;
48 		gcretain(this);
49 	}
50 	
51 	private extern (C) static void readCb(uv_stream_t* selfSelf, long nread, inout(uv_buf_t)* buf) nothrow {
52 		import std.algorithm : swap;
53 		import upromised.uv : stream;
54 
55 		auto self = getSelf!UvStream(selfSelf);
56 		if (buf.base !is null) gcrelease(buf.base);
57 		uv_read_stop(self.self.stream);
58 		
59 		if (nread == uv_errno_t.UV_EOF) {
60 			self.readPromise.resolve(null);
61 			return;
62 		}
63 
64 		if (nread <= 0) {
65 			self.readPromise.reject(new UvError(cast(int)nread));
66 			return;
67 		}
68 
69 		auto base = shrinkBuf(buf, nread);
70 		self.readPromise.resolve(cast(ubyte[])base);
71 	}
72 
73 	override PromiseIterator!(const(ubyte)[]) read() nothrow {
74 		import std.algorithm : swap;
75 		import upromised.uv : stream;
76 
77 		return new class PromiseIterator!(const(ubyte)[]) {
78 			override Promise!ItValue next(Promise!bool) {
79 				enforce(readPromise is null, "Already reading");
80 				readPromise = new DelegatePromise!(const(ubyte)[]);
81 
82 				uv_read_start(self.stream, &readAlloc, &readCb).uvCheck(readPromise);
83 				return readPromise.finall(() {
84 					readPromise = null;
85 				}).then((chunk) => chunk ? ItValue(false, chunk) : ItValue(true));
86 			}
87 		};
88 	}
89 
90 	override Promise!void write(immutable(ubyte)[] data) nothrow {
91 		import upromised.uv : stream;
92 
93 		WritePromise r = new WritePromise;
94 		gcretain(r);
95 		r.data.base = cast(char*)data.ptr;
96 		r.data.len = data.length;
97 		int err = uv_write(&r.self, self.stream, &r.data, 1, (rSelf, status) nothrow {
98 			auto r = getSelf!WritePromise(rSelf);
99 			if (status == 0) {
100 				r.resolve();
101 			} else {
102 				r.reject(new UvError(status));
103 			}
104 		});
105 		err.uvCheck(r);
106 		r.finall(() => gcrelease(r));
107 		return r;
108 	}
109 	private class WritePromise : DelegatePromise!void {
110 		uv_write_t self;
111 		uv_buf_t data;
112 	}
113 
114 	override Promise!void shutdown() nothrow {
115 		import upromised.uv : stream;
116 
117 		ShutdownPromise r = new ShutdownPromise;
118 		gcretain(r);
119 		int err = uv_shutdown(&r.self, self.stream, (rSelf, status) nothrow {
120 			auto r = getSelf!ShutdownPromise(rSelf);
121 			if (status == 0) {
122 				r.resolve();
123 			} else {
124 				r.reject(new UvError(status));
125 			}
126 		});
127 		err.uvCheck(r);
128 		r.finall(() => gcrelease(r));
129 		return r;
130 	}
131 	private class ShutdownPromise : DelegatePromise!void {
132 		uv_shutdown_t self;
133 	}
134 
135 	override Promise!void close() nothrow {
136 		import std.algorithm : swap;
137 		import upromised.uv : handle;
138 
139 		if (closePromise) return closePromise;
140 		if (readPromise) {
141 			typeof(readPromise) gone;
142 			swap(gone, readPromise);
143 			gone.reject(new Interrupted);
144 		}
145 
146 		closePromise = new DelegatePromise!void;
147 		uv_close(self.handle, (selfSelf) nothrow {
148 			auto self = getSelf!UvStream(selfSelf);
149 			self.closePromise.resolve();
150 			gcrelease(self);
151 		});
152 		return closePromise;
153 	}
154 }