1 module workspaced.future;
2 
3 import core.time;
4 
5 import std.parallelism;
6 import std.traits : isCallable;
7 
8 class Future(T)
9 {
10 	import core.thread : Fiber, Thread;
11 
12 	static if (!is(T == void))
13 		T value;
14 	Throwable exception;
15 	bool has;
16 	void delegate() _onDone;
17 	private Thread _worker;
18 
19 	/// Sets the onDone callback if no value has been set yet or calls immediately if the value has already been set or was set during setting the callback.
20 	/// Crashes with an assert error if attempting to override an existing callback (i.e. calling this function on the same object twice).
21 	void onDone(void delegate() callback) @property
22 	{
23 		assert(!_onDone);
24 		if (has)
25 			callback();
26 		else
27 		{
28 			bool called;
29 			_onDone = { called = true; callback(); };
30 			if (has && !called)
31 				callback();
32 		}
33 	}
34 
35 	static if (is(T == void))
36 		static Future!void finished()
37 		{
38 			auto ret = new typeof(return);
39 			ret.has = true;
40 			return ret;
41 		}
42 	else
43 		static Future!T fromResult(T value)
44 		{
45 			auto ret = new typeof(return);
46 			ret.value = value;
47 			ret.has = true;
48 			return ret;
49 		}
50 
51 	static Future!T async(T delegate() cb)
52 	{
53 		auto ret = new typeof(return);
54 		ret._worker = new Thread({
55 			try
56 			{
57 				static if (is(T == void))
58 				{
59 					cb();
60 					ret.finish();
61 				}
62 				else
63 					ret.finish(cb());
64 			}
65 			catch (Throwable t)
66 			{
67 				ret.error(t);
68 			}
69 		}).start();
70 		return ret;
71 	}
72 
73 	static Future!T fromError(T)(Throwable error)
74 	{
75 		auto ret = new typeof(return);
76 		ret.error = error;
77 		ret.has = true;
78 		return ret;
79 	}
80 
81 	static if (is(T == void))
82 		void finish()
83 		{
84 			assert(!has);
85 			has = true;
86 			if (_onDone)
87 				_onDone();
88 		}
89 	else
90 		void finish(T value)
91 		{
92 			assert(!has);
93 			this.value = value;
94 			has = true;
95 			if (_onDone)
96 				_onDone();
97 		}
98 
99 	void error(Throwable t)
100 	{
101 		assert(!has);
102 		exception = t;
103 		has = true;
104 		if (_onDone)
105 			_onDone();
106 	}
107 
108 	/// Waits for the result of this future using Thread.sleep
109 	T getBlocking(alias sleepDur = 1.msecs)()
110 	{
111 		while (!has)
112 			Thread.sleep(sleepDur);
113 		if (_worker)
114 		{
115 			_worker.join();
116 			_worker = null;
117 		}
118 		if (exception)
119 			throw exception;
120 		static if (!is(T == void))
121 			return value;
122 	}
123 
124 	/// Waits for the result of this future using Fiber.yield
125 	T getYield()
126 	{
127 		assert(Fiber.getThis() !is null,
128 			"Attempted to getYield without being in a Fiber context");
129 
130 		while (!has)
131 			Fiber.yield();
132 		if (_worker)
133 		{
134 			_worker.join();
135 			_worker = null;
136 		}
137 		if (exception)
138 			throw exception;
139 		static if (!is(T == void))
140 			return value;
141 	}
142 }
143 
144 enum string gthreadsAsyncProxy(string call) = `auto __futureRet = new typeof(return);
145 	gthreads.create({
146 		mixin(traceTask);
147 		try
148 		{
149 			__futureRet.finish(` ~ call ~ `);
150 		}
151 		catch (Throwable t)
152 		{
153 			__futureRet.error(t);
154 		}
155 	});
156 	return __futureRet;
157 `;
158 
159 void create(T)(TaskPool pool, T fun) if (isCallable!T)
160 {
161 	pool.put(task(fun));
162 }