1 module upromised.fiber; 2 import upromised.promise : DelegatePromise, Promise; 3 import core.thread : Fiber; 4 5 class FiberPromise(T) : DelegatePromise!T { 6 private: 7 bool running; 8 T delegate() task; 9 10 public: 11 this(T delegate() task) { 12 this.task = task; 13 } 14 15 override protected void then_(void delegate(Value) nothrow cb) nothrow { 16 if (!running) { 17 try { 18 (new Fiber(() nothrow { 19 execute(); 20 })).call(); 21 } catch(Exception) { 22 import core.stdc.stdlib : abort; 23 abort(); 24 } 25 } 26 27 super.then_(cb); 28 } 29 30 void execute() nothrow { 31 import upromised.promise : promisifyCall; 32 33 if (running) { 34 return; 35 } 36 37 running = true; 38 promisifyCall(task).then_((v) => resolve(v)); 39 } 40 } 41 42 Promise!T async(T)(T delegate() cb) { 43 return new FiberPromise!T(cb); 44 } 45 T await(T)(Promise!T a) { 46 auto otherFiber = cast(FiberPromise!T)a; 47 if (otherFiber !is null) { 48 otherFiber.execute(); 49 } 50 51 auto self = Fiber.getThis(); 52 assert(self !is null); 53 54 bool already; 55 Promise!T.Value r; 56 57 a.then_((rArg) nothrow { 58 r = rArg; 59 try { 60 if (Fiber.getThis() is self) { 61 already = true; 62 } else { 63 self.call(); 64 } 65 } catch(Exception) { 66 assert(false); 67 } 68 }); 69 70 if (!already) { 71 Fiber.yield(); 72 } 73 74 if (r.e) { 75 throw r.e; 76 } 77 78 static if (!is(T == void)) { 79 return r.value[0]; 80 } 81 } 82 //Async turns the fiber into a promise 83 unittest { 84 int value; 85 async(() { 86 return 3; 87 }).then((valueArg) { 88 value = valueArg; 89 }).nothrow_(); 90 assert(value == 3); 91 } 92 //Await blocks the fiber until the promise is resolved 93 unittest { 94 int value; 95 auto dg = new DelegatePromise!int; 96 async(() { 97 return await(dg); 98 }).then((valueArg) { 99 value = valueArg; 100 }).nothrow_(); 101 assert(value == 0); 102 dg.resolve(4); 103 assert(value == 4); 104 } 105 //Async propagates exceptions 106 unittest { 107 bool called; 108 auto err = new Exception("msg"); 109 async(() { 110 throw err; 111 }).except((Exception e) { 112 assert(e is err); 113 called = true; 114 }).nothrow_(); 115 assert(called); 116 } 117 //Await propagates exceptions 118 unittest { 119 auto err = new Exception("msg"); 120 DelegatePromise!void dg = new DelegatePromise!void; 121 bool called; 122 async(() { 123 try { 124 await(dg); 125 assert(false); 126 } catch(Exception e) { 127 assert(err is e); 128 called = true; 129 } 130 }).nothrow_(); 131 132 assert(!called); 133 dg.reject(err); 134 assert(called); 135 } 136 //Await works on already fulfiled Promise 137 unittest { 138 int value; 139 async(() { 140 return await(Promise!int.resolved(3)); 141 }).then((valueArg) { 142 value = valueArg; 143 }).nothrow_(); 144 145 assert(value == 3); 146 } 147 //Call then twice on an async will yield the same result 148 unittest { 149 auto res = async(() { 150 return new int; 151 }); 152 153 int* v1; 154 int* v2; 155 res.then((v) { 156 v1 = v; 157 }).nothrow_(); 158 res.then((v) { 159 v2 = v; 160 }).nothrow_(); 161 162 assert(v1 !is null); 163 assert(v1 is v2); 164 } 165 //Async inside async 166 unittest { 167 int v; 168 DelegatePromise!int dg = new DelegatePromise!int; 169 async(() { 170 return await(async(() { 171 return await(dg); 172 })); 173 }).then((arg) { 174 v = arg; 175 }).nothrow_(); 176 assert(v == 0); 177 dg.resolve(3); 178 assert(v == 3); 179 } 180 //Await already fulfilled async 181 unittest { 182 int v; 183 async(() { 184 auto other = async(() { 185 return 3; 186 }); 187 other.then((arg) { 188 assert(v == 0); 189 v = arg; 190 }).nothrow_(); 191 assert(v == 3); 192 v = 0; 193 int v2 = await(other); 194 assert(v == 0); 195 return v2; 196 }).then((arg) { 197 v = arg; 198 }).nothrow_; 199 assert(v == 3); 200 } 201 //Async within async reuses the same fiber 202 unittest { 203 int called; 204 async(() { 205 auto outter = Fiber.getThis(); 206 await(async(() { 207 assert(Fiber.getThis() is outter); 208 called++; 209 })); 210 assert(called == 1); 211 212 auto paralel = async(() { 213 assert(Fiber.getThis() !is outter); 214 called++; 215 }).nothrow_; 216 assert(called == 2); 217 called++; 218 }).nothrow_(); 219 assert(called == 3); 220 }