1 module upromised.loop; 2 import std.socket : Address; 3 import std.datetime : Duration; 4 import upromised.stream : DatagramStream, Stream; 5 import upromised.promise : Promise, PromiseIterator; 6 7 struct TlsContext { 8 private Object self; 9 } 10 11 interface Loop { 12 Promise!(Address[]) resolve(const(char)[] hostname, ushort port) nothrow; 13 Promise!Stream connectTcp(Address[] dns) nothrow; 14 PromiseIterator!Stream listenTcp(Address[] dns) nothrow; 15 string defaultCertificatesPath() nothrow; 16 Promise!TlsContext context(string certificatesPath = null) nothrow; 17 Promise!Stream tlsHandshake(Stream stream, TlsContext context, string hostname = null) nothrow; 18 Promise!DatagramStream udp(Address addr = null) nothrow; 19 PromiseIterator!int interval(Duration) nothrow; 20 void* inner() nothrow; 21 int run(); 22 23 final Promise!void sleep(Duration a) nothrow { 24 return interval(a).each((_) => false).then((){}); 25 } 26 } 27 28 Loop defaultLoop() { 29 import deimos.libuv.uv : uv_default_loop, uv_loop_t; 30 uv_loop_t* loop = uv_default_loop(); 31 return new class Loop { 32 override void* inner() nothrow { 33 return loop; 34 } 35 36 override int run() { 37 import deimos.libuv.uv : uv_run, uv_run_mode; 38 39 return uv_run(loop, uv_run_mode.UV_RUN_DEFAULT); 40 } 41 42 override Promise!(Address[]) resolve(const(char)[] hostname, ushort port) nothrow { 43 import upromised.dns : getAddrinfo; 44 45 return getAddrinfo(loop, hostname, port); 46 } 47 48 override Promise!Stream connectTcp(Address[] dns) nothrow { 49 import upromised.tcp : TcpSocket; 50 51 return Promise!void.resolved() 52 .then(() => new TcpSocket(loop)) 53 .then((socket) { 54 return socket 55 .connect(dns) 56 .except((Exception e) { 57 return socket.close().then(() { 58 throw e; 59 }); 60 }) 61 .then!Stream(() => socket); 62 }); 63 } 64 65 override PromiseIterator!Stream listenTcp(Address[] dns) nothrow { 66 import upromised.tcp : TcpSocket; 67 68 Promise!TcpSocket socket = Promise!void.resolved() 69 .then(() => new TcpSocket(loop)) 70 .then((socket) { 71 socket.bind(dns); 72 return socket; 73 }); 74 Promise!(PromiseIterator!TcpSocket) listen = socket.then((s) => s.listen(128)); 75 76 auto r = new class PromiseIterator!Stream { 77 override Promise!ItValue next(Promise!bool done) { 78 done.then((cont) { 79 if (!cont) { 80 return socket.then((s) => s.close()); 81 } 82 return Promise!void.resolved(); 83 }).nothrow_(); 84 85 return listen 86 .then((self) => self.next(done)) 87 .then((eofConn) => ItValue(eofConn.eof, eofConn.value)); 88 } 89 }; 90 return r; 91 } 92 93 override string defaultCertificatesPath() nothrow { 94 version(hasOpenssl) { 95 import std.algorithm : filter; 96 import std.file : exists; 97 98 auto tries = [ 99 "/etc/ssl/ca-bundle.pem", 100 "/etc/ssl/certs/ca-certificates.crt", 101 "/etc/pki/tls/certs/ca-bundle.crt", 102 "/usr/local/etc/openssl/cert.pem", 103 ].filter!(x => x.exists); 104 105 if (tries.empty) { 106 return null; 107 } 108 109 return tries.front; 110 } else { 111 return null; 112 } 113 } 114 115 override Promise!TlsContext context(string certificatesPath = null) nothrow { 116 version(hasOpenssl) { 117 import upromised.tls : OpensslTlsContext = TlsContext; 118 119 return Promise!void.resolved() 120 .then(() => new OpensslTlsContext) 121 .then((r) { 122 if (certificatesPath) { 123 r.load_verify_locations(certificatesPath); 124 } 125 126 return TlsContext(r); 127 }); 128 } else version(hasSecurity) { 129 return Promise!TlsContext.resolved(TlsContext.init); 130 } else { 131 return Promise!TlsContext.rejected(new Exception("TLS not supported")); 132 } 133 } 134 135 override Promise!Stream tlsHandshake(Stream stream, TlsContext contextUntyped, string hostname) nothrow { 136 version(hasOpenssl) { 137 import upromised.tls : OpensslTlsContext = TlsContext, TlsStream; 138 139 OpensslTlsContext context = cast(OpensslTlsContext)(contextUntyped.self); 140 return Promise!void.resolved() 141 .then(() => new TlsStream(stream, context)) 142 .then((r) { 143 return r 144 .connect(hostname) 145 .then!Stream(() => r); 146 }); 147 } else version(hasSecurity) { 148 import upromised.security : TlsStream; 149 return Promise!void.resolved() 150 .then(() => new TlsStream(stream)) 151 .then((r) { 152 return r 153 .connect(hostname) 154 .then!Stream(() => r); 155 }); 156 } else { 157 return Promise!Stream.rejected(new Exception("TLS not supported")); 158 } 159 } 160 161 override Promise!DatagramStream udp(Address addr) nothrow { 162 import upromised.udp : UdpSocket; 163 164 return Promise!void.resolved() 165 .then(() => new UdpSocket(loop)) 166 .then((udp) { 167 return Promise!void.resolved() 168 .then(() { 169 if (addr !is null) { 170 udp.bind(addr); 171 } 172 }).except((Exception e) { 173 return udp.close().then(() { 174 throw e; 175 }); 176 }) 177 .then(() => cast(DatagramStream)udp); 178 }); 179 } 180 181 override PromiseIterator!int interval(Duration a) nothrow { 182 import upromised.timer : Timer; 183 auto timer = Promise!void.resolved() 184 .then(() => new Timer(loop)); 185 auto timerIterator = timer.then((timer) => timer.start(a, a)); 186 187 return new class PromiseIterator!int { 188 override Promise!ItValue next(Promise!bool done) { 189 done.then((cont) { 190 if (!cont) { 191 return timer.then((timer) => timer.close()); 192 } 193 return Promise!void.resolved(); 194 }); 195 196 return timerIterator.then((iterator) => iterator.next(done)); 197 } 198 }; 199 } 200 }; 201 }