1 module upromised.fiber;
2 import upromised.promise : DelegatePromise, Promise;
3 import core.thread : Fiber;
4 
5 class FiberPromise(T) : DelegatePromise!T {
6 private:
7 	bool running;
8 	T delegate() task;
9 
10 public:
11 	this(T delegate() task) {
12 		this.task = task;
13 	}
14 
15 	override protected void then_(void delegate(Value) nothrow cb) nothrow {
16 		if (!running) {
17 			try {
18 				(new Fiber(() nothrow {
19 					execute();
20 				})).call();
21 			} catch(Exception) {
22 				import core.stdc.stdlib : abort;
23 				abort();
24 			}
25 		}
26 
27 		super.then_(cb);
28 	}
29 
30 	void execute() nothrow {
31 		import upromised.promise : promisifyCall;
32 		
33 		if (running) {
34 			return;
35 		}
36 
37 		running = true;
38 		promisifyCall(task).then_((v) => resolve(v));
39 	}
40 }
41 
42 Promise!T async(T)(T delegate() cb) {
43 	return new FiberPromise!T(cb);
44 }
45 T await(T)(Promise!T a) {
46 	auto otherFiber = cast(FiberPromise!T)a;
47 	if (otherFiber !is null) {
48 		otherFiber.execute();
49 	}
50 
51 	auto self = Fiber.getThis();
52 	assert(self !is null);
53 	
54 	bool already;
55 	Promise!T.Value r;
56 
57 	a.then_((rArg) nothrow {
58 		r = rArg;
59 		try {
60 			if (Fiber.getThis() is self) {
61 				already = true;
62 			} else {
63 				self.call();
64 			}
65 		} catch(Exception) {
66 			assert(false);
67 		}
68 	});
69 
70 	if (!already) {
71 		Fiber.yield();
72 	}
73 
74 	if (r.e) {
75 		throw r.e;
76 	}
77 
78 	static if (!is(T == void)) {
79 		return r.value[0];
80 	}
81 }
82 //Async turns the fiber into a promise
83 unittest {
84 	int value;
85 	async(() {
86 		return 3;
87 	}).then((valueArg) {
88 		value = valueArg;
89 	}).nothrow_();
90 	assert(value == 3);
91 }
92 //Await blocks the fiber until the promise is resolved
93 unittest {
94 	int value;
95 	auto dg = new DelegatePromise!int;
96 	async(() {
97 		return await(dg);
98 	}).then((valueArg) {
99 		value = valueArg;
100 	}).nothrow_();
101 	assert(value == 0);
102 	dg.resolve(4);
103 	assert(value == 4);
104 }
105 //Async propagates exceptions
106 unittest {
107 	bool called;
108 	auto err = new Exception("msg");
109 	async(() {
110 		throw err;
111 	}).except((Exception e) {
112 		assert(e is err);
113 		called = true;
114 	}).nothrow_();
115 	assert(called);
116 }
117 //Await propagates exceptions
118 unittest {
119 	auto err = new Exception("msg");
120 	DelegatePromise!void dg = new DelegatePromise!void;
121 	bool called;
122 	async(() {
123 		try {
124 			await(dg);
125 			assert(false);
126 		} catch(Exception e) {
127 			assert(err is e);
128 			called = true;
129 		}
130 	}).nothrow_();
131 
132 	assert(!called);
133 	dg.reject(err);
134 	assert(called);
135 }
136 //Await works on already fulfiled Promise
137 unittest {
138 	int value;
139 	async(() {
140 		return await(Promise!int.resolved(3));
141 	}).then((valueArg) {
142 		value = valueArg;
143 	}).nothrow_();
144 
145 	assert(value == 3);
146 }
147 //Call then twice on an async will yield the same result
148 unittest {
149 	auto res = async(() {
150 		return new int;
151 	});
152 
153 	int* v1;
154 	int* v2;
155 	res.then((v) {
156 		v1 = v;
157 	}).nothrow_();
158 	res.then((v) {
159 		v2 = v;
160 	}).nothrow_();
161 
162 	assert(v1 !is null);
163 	assert(v1 is v2);
164 }
165 //Async inside async
166 unittest {
167 	int v;
168 	DelegatePromise!int dg = new DelegatePromise!int;
169 	async(() {
170 		return await(async(() {
171 			return await(dg);
172 		}));
173 	}).then((arg) {
174 		v = arg;
175 	}).nothrow_();
176 	assert(v == 0);
177 	dg.resolve(3);
178 	assert(v == 3);
179 }
180 //Await already fulfilled async
181 unittest {
182 	int v;
183 	async(() {
184 		auto other = async(() {
185 			return 3;
186 		});
187 		other.then((arg) {
188 			assert(v == 0);
189 			v = arg;
190 		}).nothrow_();
191 		assert(v == 3);
192 		v = 0;
193 		int v2 = await(other);
194 		assert(v == 0);
195 		return v2;
196 	}).then((arg) {
197 		v = arg;
198 	}).nothrow_;
199 	assert(v == 3);
200 }
201 //Async within async reuses the same fiber
202 unittest {
203 	int called;
204 	async(() {
205 		auto outter = Fiber.getThis();
206 		await(async(() {
207 			assert(Fiber.getThis() is outter);
208 			called++;
209 		}));
210 		assert(called == 1);
211 
212 		auto paralel = async(() {
213 			assert(Fiber.getThis() !is outter);
214 			called++;
215 		}).nothrow_;
216 		assert(called == 2);
217 		called++;
218 	}).nothrow_();
219 	assert(called == 3);
220 }