1 module upromised.utp;
2 
3 import std.format : format;
4 import std.socket : Address;
5 import upromised.c.utp;
6 import upromised.loop : Loop;
7 import upromised.memory : gcrelease, gcretain;
8 import upromised.promise : DelegatePromise, DelegatePromiseIterator, Promise, PromiseIterator;
9 import upromised.stream : DatagramStream, Interrupted, Stream;
10 
11 private auto noth(alias f, Args...)(Args args) {
12 	try {
13 		return f(args);
14 	} catch(Exception) {
15 		import core.stdc.stdlib : abort;
16 		abort();
17 		assert(false);
18 	}
19 }
20 
21 class CumulativeBuffer: PromiseIterator!(const(ubyte)[]) {
22 protected:
23 	ubyte[] pending;
24 	bool eof;
25 	DelegatePromise!ItValue pendingReq;
26 
27 	void onDrain() {
28 	}
29 
30 public:
31 	size_t pendingLength() const {
32 		return pending.length;
33 	}
34 
35 	override Promise!ItValue next(Promise!bool _) {
36 		if (pending.length > 0) {
37 			auto done = pending;
38 			pending = pending[$..$];
39 			return Promise!ItValue.resolved(ItValue(false, done));
40 		}
41 
42 		if (eof) {
43 			return Promise!ItValue.resolved(ItValue(true));
44 		}
45 
46 		assert(pendingReq is null);
47 		pendingReq = new DelegatePromise!ItValue;
48 		return pendingReq;
49 	}
50 
51 	void append(const(ubyte)[] data) {
52 		if (pendingReq !is null) {
53 			assert(pending.length == 0);
54 			auto done = pendingReq;
55 			pendingReq = null;
56 			done.resolve(ItValue(false, data));
57 			return;
58 		}
59 
60 		pending ~= data;
61 	}
62 
63 	void resolve() {
64 		this.eof = true;
65 
66 		if (pendingReq !is null) {
67 			assert(pending.length == 0);
68 			auto done = pendingReq;
69 			pendingReq = null;
70 			done.resolve(ItValue(true));
71 		}
72 	}
73 }
74 
75 class UtpContext {
76 private:
77 	int count;
78 
79 	utp_context* context;
80 	DatagramStream underlying;
81 	DelegatePromiseIterator!(utp_socket*) sockets;
82 
83 public:
84 	this(Loop loop, DatagramStream underlying) {
85 		import std.datetime : msecs;
86 		import upromised.dns : toAddress;
87 		import upromised.promise : break_, continue_, do_while;
88 
89 		context = utp_init(2);
90 		assert(context !is null);
91 		count++;
92 		utp_context_set_userdata(context, cast(void*)cast(Object)this);
93 		this.underlying = underlying;
94 
95 		utp_set_callback(context, UTP_SENDTO, (args) {
96 			auto self = cast(UtpContext)cast(Object)utp_context_get_userdata(args.context);
97 			auto addr = args.address.toAddress(args.address_len);
98 			auto data = (cast(const(ubyte)*)args.buf)[0..args.len].idup;
99 			self.underlying.sendTo(addr, data)
100 			.except((Exception e) {
101 				import std.stdio : stderr;
102 				debug stderr.writeln(e);
103 			}).nothrow_;
104 			return 0;
105 		});
106 
107 		utp_set_callback(context, UTP_ON_FIREWALL, (args) {
108 			auto self = cast(UtpContext)cast(Object)utp_context_get_userdata(args.context);
109 			return self.sockets is null;
110 		});
111 
112 		utp_set_callback(context, UTP_ON_ERROR, (args) {
113 			auto socket = cast(UtpStream)cast(Object)utp_get_userdata(args.socket);
114 			assert(socket !is null);
115 			socket.on_error(args.error_code);
116 			return 0;
117 		});
118 
119 		utp_set_callback(context, UTP_ON_READ, (args) {
120 			auto socket = cast(UtpStream)cast(Object)utp_get_userdata(args.socket);
121 			assert(socket !is null);
122 			socket.on_read((cast(ubyte*)args.buf)[0..args.len]);
123 			return 0;
124 		});
125 
126 		utp_set_callback(context, UTP_GET_READ_BUFFER_SIZE, (args) {
127 			auto socket = cast(UtpStream)cast(Object)utp_get_userdata(args.socket);
128 			if (socket is null) {
129 				return 4096;
130 			}
131 
132 			assert(socket !is null);
133 			return socket.recv_buffer();
134 		});
135 
136 		utp_set_callback(context, UTP_ON_STATE_CHANGE, (args) {
137 			auto socket = cast(UtpStream)cast(Object)utp_get_userdata(args.socket);
138 			if (socket is null) {
139 				return 0;
140 			}
141 			socket.on_state(args.state);
142 			return 0;
143 		});
144 
145 		utp_set_callback(context, UTP_ON_ACCEPT, (args) {
146 			auto self = cast(UtpContext)cast(Object)utp_context_get_userdata(args.context);
147 			assert(self !is null);
148 			assert(self.sockets !is null);
149 			self.sockets.resolve(args.socket);
150 			return 0;
151 		});
152 
153 		loop.interval(500.msecs).each((_) nothrow {
154 			if (this.context is null) return false;
155 			utp_check_timeouts(this.context);
156 			return true;
157 		}).nothrow_;
158 
159 		auto recv = underlying.recvFrom();
160 		do_while(() {
161 			bool raced;
162 			loop.sleep(30.msecs)
163 			.then(() {
164 				if (raced || this.context is null) return;
165 				utp_issue_deferred_acks(this.context);
166 			});
167 
168 			return recv.next()
169 			.then((eofValue) {
170 				raced = true;
171 				if (eofValue.eof) return break_;
172 				auto dgram = eofValue.value;
173 				utp_process_udp(context, cast(const(byte)*)dgram.message.ptr, dgram.message.length, dgram.addr.name(), dgram.addr.nameLen());
174 				return continue_;
175 			});
176 		}).except((Interrupted _) {
177 		}).except((Exception e) {
178 			import std.stdio;
179 			debug stderr.writeln(sockets);
180 			if (sockets !is null) {
181 				sockets.reject(e);
182 			}
183 		}).nothrow_();
184 
185 		gcretain(this);
186 	}
187 
188 	void inc() nothrow {
189 		++count;
190 	}
191 
192 	Promise!void close() nothrow {
193 		return Promise!void.resolved()
194 		.then(() {
195 			--count;
196 			if (count == 0) {
197 				return this.underlying.close()
198 				.finall(() {
199 					utp_destroy(this.context);
200 					this.context = null;
201 					if (this.sockets !is null) {
202 						auto done = this.sockets;
203 						this.sockets = null;
204 						done.resolve();
205 					}
206 					gcrelease(this);
207 				});
208 			} else {
209 				return Promise!void.resolved();
210 			}
211 		});
212 	}
213 
214 	Promise!UtpStream connect(Address dest) nothrow {
215 		return Promise!void.resolved()
216 		.then(() => new UtpStream(utp_create_socket(this.context), this))
217 		.then((s) {
218 			return s.connect(dest)
219 			.failure((Exception _) => s.close())
220 			.then(() => s);
221 		});
222 	}
223 
224 	PromiseIterator!Stream accept() nothrow {
225 		assert(sockets is null);
226 		sockets = new DelegatePromiseIterator!(utp_socket*);
227 
228 		return new class PromiseIterator!Stream {
229 			override Promise!ItValue next(Promise!bool done) {
230 				if (sockets is null) {
231 					return Promise!ItValue.resolved(ItValue(true));
232 				}
233 
234 				return sockets.next(done)
235 				.then((socketValue) {
236 					if (socketValue.eof) {
237 						return ItValue(true);
238 					}
239 
240 					return ItValue(false, new UtpStream(socketValue.value, this.outer));
241 				});
242 			}
243 		};
244 	}
245 }
246 
247 class UtpStream: Stream {
248 private:
249 	utp_socket* socket;
250 	UtpContext context;
251 	const(ubyte)[] pending_write_buffer;
252 	DelegatePromise!void pending_write_promise;
253 	CumulativeBuffer read_;
254 	DelegatePromise!void connecting;
255 	uint recv_len;
256 
257 	this(utp_socket* socket, UtpContext context) {
258 		this.context = context;
259 		this.socket = socket;
260 		this.context.inc();
261 		gcretain(this);
262 		utp_set_userdata(socket, cast(void*)cast(Object)this);
263 		recvLen = 4096;
264 		read_ = new class CumulativeBuffer {
265 			override void onDrain() {
266 				utp_read_drained(this.outer.socket);
267 			}
268 		};
269 	}
270 
271 	void on_state(int state) {
272 		if (state == UTP_STATE_CONNECT && connecting) {
273 			auto done = connecting;
274 			connecting = null;
275 			done.resolve();
276 		}
277 
278 		if (state == UTP_STATE_CONNECT || state == UTP_STATE_WRITABLE) {
279 			do_write();
280 		}
281 
282 		if (state == UTP_STATE_EOF) {
283 			read_.resolve();
284 		}
285 	}
286 
287 	void on_error(int error_code) {
288 		if (connecting) {
289 			auto done = connecting;
290 			connecting = null;
291 			done.reject(new Exception("Error %s".noth!format(error_code)));
292 		}
293 	}
294 
295 	Promise!void do_write() nothrow {
296 		if (pending_write_buffer.length == 0) {
297 			return Promise!void.resolved();
298 		}
299 
300 		auto nwrote = utp_write(this.socket, cast(void*)pending_write_buffer.ptr, pending_write_buffer.length);
301 		
302 		if (nwrote < 0) {
303 			if (pending_write_promise is null) pending_write_promise = new DelegatePromise!void;
304 			auto done = pending_write_promise;
305 			pending_write_promise = null;
306 			done.reject(new Exception("Error %s".noth!format(-nwrote)));
307 			return done;
308 		}
309 
310 		pending_write_buffer = pending_write_buffer[nwrote..$];
311 		if (pending_write_buffer.length == 0) {
312 			if (pending_write_promise !is null) {
313 				auto done = pending_write_promise;
314 				pending_write_promise = null;
315 				done.resolve();
316 			}
317 
318 			return Promise!void.resolved();
319 		}
320 		
321 		if (pending_write_promise is null) pending_write_promise = new DelegatePromise!void;
322 		return pending_write_promise;
323 	}
324 
325 	Promise!void connect(Address addr) {
326 		assert(connecting is null);
327 		connecting = new DelegatePromise!void;
328 		utp_connect(this.socket, addr.name, addr.nameLen);
329 		return connecting;
330 	}
331 
332 	void on_read(const(ubyte)[] data) {
333 		read_.append(data);
334 	}
335 
336 	int recv_buffer() {
337 		return cast(int)read_.pendingLength();
338 	}
339 public:
340 	@property void recvLen(int len) {
341 		utp_setsockopt(this.socket, UTP_RCVBUF, 4096);
342 	}
343 	@property int recvLen() {
344 		return utp_getsockopt(this.socket, UTP_RCVBUF);
345 	}
346 
347 	override Promise!void shutdown() nothrow {
348 		return close();
349 	}
350 
351 	override Promise!void close() nothrow {
352 		return Promise!void.resolved()
353 		.then(() {
354 			utp_close(this.socket);
355 		}).finall(() {
356 			if (this.context is null) {
357 				return Promise!void.resolved();
358 			} else {
359 				auto done = this.context;
360 				this.context = null;
361 				return done.close();
362 			}
363 		}).finall(() {
364 			gcrelease(this);
365 		});
366 	}
367 
368 	override PromiseIterator!(const(ubyte)[]) read() nothrow {
369 		return read_;
370 	}
371 	
372 	override Promise!void write(immutable(ubyte)[] buf) nothrow {
373 		assert(pending_write_promise is null, "Paralell writes");
374 
375 		pending_write_buffer = buf;
376 
377 		return do_write();
378 	}
379 }