1 module upromised.uv_stream;
2 import deimos.libuv.uv;
3 import deimos.libuv._d;
4 import std.exception : enforce;
5 import upromised.memory : gcrelease, gcretain, getSelf;
6 import upromised.promise : DelegatePromise, Promise, PromiseIterator;
7 import upromised.stream : Interrupted, Stream;
8 import upromised.uv : uvCheck, UvError;
9 
10 extern (C) static void readAlloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) nothrow {
11 	import core.memory : GC;
12 
13 	buf.base = cast(char*)GC.malloc(size);
14 	gcretain(buf.base);
15 	buf.len = size;
16 }
17 
18 const(char)[] shrinkBuf(const(uv_buf_t)* buf, size_t len) nothrow {
19 	import core.memory : GC;
20 	auto r = cast(const(char)*)GC.realloc(cast(void*)buf.base, len);
21 	return r[0..len];
22 }
23 
24 class UvStream(SELF) : Stream {
25 private:
26 	DelegatePromise!void closePromise;
27 	DelegatePromise!(const(ubyte)[]) readPromise;
28 
29 protected:
30 	uv_loop_t* ctx;
31 
32 public:
33 	SELF self;
34 
35 	this(uv_loop_t* ctx) {
36 		this.ctx = ctx;
37 		gcretain(this);
38 	}
39 	
40 	private extern (C) static void readCb(uv_stream_t* selfSelf, long nread, inout(uv_buf_t)* buf) nothrow {
41 		import std.algorithm : swap;
42 		import upromised.uv : stream;
43 
44 		auto self = getSelf!UvStream(selfSelf);
45 		if (buf.base !is null) gcrelease(buf.base);
46 		uv_read_stop(self.self.stream);
47 		
48 		if (nread == uv_errno_t.UV_EOF) {
49 			self.readPromise.resolve(null);
50 			return;
51 		}
52 
53 		if (nread <= 0) {
54 			self.readPromise.reject(new UvError(cast(int)nread));
55 			return;
56 		}
57 
58 		auto base = shrinkBuf(buf, nread);
59 		self.readPromise.resolve(cast(ubyte[])base);
60 	}
61 
62 	override PromiseIterator!(const(ubyte)[]) read() nothrow {
63 		import std.algorithm : swap;
64 		import upromised.uv : stream;
65 
66 		return new class PromiseIterator!(const(ubyte)[]) {
67 			override Promise!ItValue next(Promise!bool) {
68 				enforce(readPromise is null, "Already reading");
69 				readPromise = new DelegatePromise!(const(ubyte)[]);
70 
71 				uv_read_start(self.stream, &readAlloc, &readCb).uvCheck(readPromise);
72 				return readPromise.finall(() {
73 					readPromise = null;
74 				}).then((chunk) => chunk ? ItValue(false, chunk) : ItValue(true));
75 			}
76 		};
77 	}
78 
79 	override Promise!void write(immutable(ubyte)[] data) nothrow {
80 		import upromised.uv : stream;
81 
82 		WritePromise r = new WritePromise;
83 		gcretain(r);
84 		r.data.base = cast(char*)data.ptr;
85 		r.data.len = data.length;
86 		int err = uv_write(&r.self, self.stream, &r.data, 1, (rSelf, status) nothrow {
87 			auto r = getSelf!WritePromise(rSelf);
88 			if (status == 0) {
89 				r.resolve();
90 			} else {
91 				r.reject(new UvError(status));
92 			}
93 		});
94 		err.uvCheck(r);
95 		r.finall(() => gcrelease(r));
96 		return r;
97 	}
98 	private class WritePromise : DelegatePromise!void {
99 		uv_write_t self;
100 		uv_buf_t data;
101 	}
102 
103 	override Promise!void shutdown() nothrow {
104 		import upromised.uv : stream;
105 
106 		ShutdownPromise r = new ShutdownPromise;
107 		gcretain(r);
108 		int err = uv_shutdown(&r.self, self.stream, (rSelf, status) nothrow {
109 			auto r = getSelf!ShutdownPromise(rSelf);
110 			if (status == 0) {
111 				r.resolve();
112 			} else {
113 				r.reject(new UvError(status));
114 			}
115 		});
116 		err.uvCheck(r);
117 		r.finall(() => gcrelease(r));
118 		return r;
119 	}
120 	private class ShutdownPromise : DelegatePromise!void {
121 		uv_shutdown_t self;
122 	}
123 
124 	override Promise!void close() nothrow {
125 		import std.algorithm : swap;
126 		import upromised.uv : handle;
127 
128 		if (closePromise) return closePromise;
129 		if (readPromise) {
130 			typeof(readPromise) gone;
131 			swap(gone, readPromise);
132 			gone.reject(new Interrupted);
133 		}
134 
135 		closePromise = new DelegatePromise!void;
136 		uv_close(self.handle, (selfSelf) nothrow {
137 			auto self = getSelf!UvStream(selfSelf);
138 			self.closePromise.resolve();
139 			gcrelease(self);
140 		});
141 		return closePromise;
142 	}
143 }