1 module upromised.loop; 2 import std.socket : Address; 3 import std.datetime : Duration; 4 import upromised.stream : DatagramStream, Stream; 5 import upromised.promise : Promise, PromiseIterator, Promisify; 6 7 struct TlsContext { 8 private Object self; 9 } 10 11 interface Loop { 12 Promise!(Address[]) resolve(const(char)[] hostname, ushort port) nothrow; 13 Promise!Stream connectTcp(Address[] dns) nothrow; 14 PromiseIterator!Stream listenTcp(Address[] dns) nothrow; 15 string defaultCertificatesPath() nothrow; 16 Promise!TlsContext context(string certificatesPath = null) nothrow; 17 Promise!Stream tlsHandshake(Stream stream, TlsContext context, string hostname = null) nothrow; 18 Promise!DatagramStream udp(Address addr = null) nothrow; 19 PromiseIterator!int interval(Duration) nothrow; 20 Promise!void work(void delegate()) nothrow; 21 void* inner() nothrow; 22 int run(); 23 24 final Promise!void sleep(Duration a) nothrow { 25 return interval(a).each((_) => false).then((){}); 26 } 27 28 final Promise!T work(T)(T delegate() cb) nothrow 29 if (is(Promisify!T == Promise!T) && !is(T == void)) 30 { 31 T r; 32 return work(() { 33 r = cb(); 34 }).then(() => r); 35 } 36 } 37 38 Loop defaultLoop() { 39 import deimos.libuv.uv : uv_default_loop, uv_loop_t; 40 uv_loop_t* loop = uv_default_loop(); 41 return new class Loop { 42 override void* inner() nothrow { 43 return loop; 44 } 45 46 override int run() { 47 import deimos.libuv.uv : uv_run, uv_run_mode; 48 49 return uv_run(loop, uv_run_mode.UV_RUN_DEFAULT); 50 } 51 52 override Promise!(Address[]) resolve(const(char)[] hostname, ushort port) nothrow { 53 import upromised.dns : getAddrinfo; 54 55 return getAddrinfo(loop, hostname, port); 56 } 57 58 override Promise!Stream connectTcp(Address[] dns) nothrow { 59 import upromised.tcp : TcpSocket; 60 61 return Promise!void.resolved() 62 .then(() => new TcpSocket(loop)) 63 .then((socket) { 64 return socket 65 .connect(dns) 66 .except((Exception e) { 67 return socket.close().then(() { 68 throw e; 69 }); 70 }) 71 .then!Stream(() => socket); 72 }); 73 } 74 75 override PromiseIterator!Stream listenTcp(Address[] dns) nothrow { 76 import upromised.tcp : TcpSocket; 77 78 Promise!TcpSocket socket = Promise!void.resolved() 79 .then(() => new TcpSocket(loop)) 80 .then((socket) { 81 socket.bind(dns); 82 return socket; 83 }); 84 Promise!(PromiseIterator!TcpSocket) listen = socket.then((s) => s.listen(128)); 85 86 auto r = new class PromiseIterator!Stream { 87 override Promise!ItValue next(Promise!bool done) { 88 done.then((cont) { 89 if (!cont) { 90 return socket.then((s) => s.close()); 91 } 92 return Promise!void.resolved(); 93 }).nothrow_(); 94 95 return listen 96 .then((self) => self.next(done)) 97 .then((eofConn) => ItValue(eofConn.eof, eofConn.value)); 98 } 99 }; 100 return r; 101 } 102 103 override string defaultCertificatesPath() nothrow { 104 version(hasOpenssl) { 105 import std.algorithm : filter; 106 import std.file : exists; 107 108 auto tries = [ 109 "/etc/ssl/ca-bundle.pem", 110 "/etc/ssl/certs/ca-certificates.crt", 111 "/etc/pki/tls/certs/ca-bundle.crt", 112 "/usr/local/etc/openssl/cert.pem", 113 ].filter!(x => x.exists); 114 115 if (tries.empty) { 116 return null; 117 } 118 119 return tries.front; 120 } else { 121 return null; 122 } 123 } 124 125 override Promise!TlsContext context(string certificatesPath = null) nothrow { 126 version(hasOpenssl) { 127 import upromised.tls : OpensslTlsContext = TlsContext; 128 129 return Promise!void.resolved() 130 .then(() => new OpensslTlsContext) 131 .then((r) { 132 if (certificatesPath) { 133 r.load_verify_locations(certificatesPath); 134 } 135 136 return TlsContext(r); 137 }); 138 } else version(hasSecurity) { 139 return Promise!TlsContext.resolved(TlsContext.init); 140 } else { 141 return Promise!TlsContext.rejected(new Exception("TLS not supported")); 142 } 143 } 144 145 override Promise!Stream tlsHandshake(Stream stream, TlsContext contextUntyped, string hostname) nothrow { 146 version(hasOpenssl) { 147 import upromised.tls : OpensslTlsContext = TlsContext, TlsStream; 148 149 OpensslTlsContext context = cast(OpensslTlsContext)(contextUntyped.self); 150 return Promise!void.resolved() 151 .then(() => new TlsStream(stream, context)) 152 .then((r) { 153 return r 154 .connect(hostname) 155 .then!Stream(() => r); 156 }); 157 } else version(hasSecurity) { 158 import upromised.security : TlsStream; 159 return Promise!void.resolved() 160 .then(() => new TlsStream(stream)) 161 .then((r) { 162 return r 163 .connect(hostname) 164 .then!Stream(() => r); 165 }); 166 } else { 167 return Promise!Stream.rejected(new Exception("TLS not supported")); 168 } 169 } 170 171 override Promise!DatagramStream udp(Address addr) nothrow { 172 import upromised.udp : UdpSocket; 173 174 return Promise!void.resolved() 175 .then(() => new UdpSocket(loop)) 176 .then((udp) { 177 return Promise!void.resolved() 178 .then(() { 179 if (addr !is null) { 180 udp.bind(addr); 181 } 182 }).except((Exception e) { 183 return udp.close().then(() { 184 throw e; 185 }); 186 }) 187 .then(() => cast(DatagramStream)udp); 188 }); 189 } 190 191 override PromiseIterator!int interval(Duration a) nothrow { 192 import upromised.timer : Timer; 193 auto timer = Promise!void.resolved() 194 .then(() => new Timer(loop)); 195 auto timerIterator = timer.then((timer) => timer.start(a, a)); 196 197 return new class PromiseIterator!int { 198 override Promise!ItValue next(Promise!bool done) { 199 done.then((cont) { 200 if (!cont) { 201 return timer.then((timer) => timer.close()); 202 } 203 return Promise!void.resolved(); 204 }); 205 206 return timerIterator.then((iterator) => iterator.next(done)); 207 } 208 }; 209 } 210 211 override Promise!void work(void delegate() work) { 212 import upromised.uv.work : Work; 213 214 return (new Work(loop)).run(work); 215 } 216 }; 217 }