Futures/Promises .vs. Join, is "future" a form
of
message passing?
Futures (and Promises) is a simple and elegant tool for programming a
specific class of concurrent applications: returning result from
asynchronous operations or different threads. In Boost community, there
are several proposals for Futures, among which Braddock Gaskill's
proposal and Anthony
Williams proposal are the most complete and famous.
Here are the definitions of "Futures" in Braddock's and Anthony's
proposals:
1> in terms of Braddock's proposal: a special variable with
undefined value at
creation, can be set later by another thread, any attempt to get
future's value will be blocked if it is not set yet. "future uses a
split interface: the promise<T>
class interface is
used for creating and setting a future value, and the future<T>
interface is used for obtaining and using a future value."
2> in terms of Anthony's (N2561) proposal: "... a kind
of return
buffer that takes a value (or an exception) in one (sub-)thread and
provides the value in another (controlling) thread. This buffer
provides essentially two interfaces:
* an interface to assign a value as class promise and
* an interface to wait for, query and retrieve the value (or exception)
from the buffer as classes unique_future and shared_future."
From both documents, Future has 2 interfaces with diff behaviours:
* the promise<T> interface is "asynchronous": the
thread which sets the future's value will not wait for future's reader;
it just sets the value and go
* the future<T> interface is "synchronous": the thread which
gets/reads the future's value will block wait if the future is not set
yet
We can define these 2 interfaces with Join as following:
template<typename T>
class promise_intf {
public:
async<T> set;
};
template <typename T>
class future_intf {
public:
synch<T,void> get;
}
A Future class and its synchronization between promise_interface and
future_interface can be defined as following:
template <typename T>
class future : public promise_intf<T>,
public future_intf<T>,
public joint
{
public:
future() {
chord(get, set,
&future::get_result);
}
private:
T get_result(void_t g, T s) {
return s;
}
};
Manually
crafted Future is normally implemented using one mutex and one
condition var. The rule to get the number of (mutex, cond) used by a
Join class is simple: each joint holds a mutex which is shared by all
code in async/synch/joint, each synch<> port holds a
conditional var and async<> ports hold nothing. So the above
future class use exactly one mutex and one conditional var, the same as
manually crafted.
To transfer both result value and exception, we need use a result
holder and a diff promise interface:
template <typename T>
class result {
public:
shared_ptr<T> value;
exception_ptr except;
...
};
template <typename T>
class promise_intf {
protected:
async<result<T> > set_result;
public:
void set(T t) {
set_result(result<T>(t)); }
void fail(exception_ptr e) {
set_result(result<T>(e)); }
};
The future class will be modified as following:
template <typename T>
class future : public promise_intf<T>,
public future_intf<T>,
public joint
{
public:
future() {
chord(get,
set_result, &future::get_result);
}
private:
T get_result(void_t g, result<T>
res) {
if(res.except)
rethrow_exception(res.except);
return res.value;
}
};
Again this class uses one mutex and one cond var.
If
we step back from the mental image of treating "future" as "a special
kind of variable" for now, we can treat promise (future-setter)
interface as message sending interface, and treat future_intf
(future-reader) interface as message receiving interface. Then "future"
becomes a message passing channel between the thread which produces and
sets future's value and the thread which consumes future's value.
Message sending and promise have the same behavior (async, send and
go); and message recving and future_intf have the same behaviour
(blocking wait).
Also the above first Future definition is exactly the same as the
simplest message queue defined in Join:
template <typename T>
class queue {
public:
async<T> send;
synch<T,void> recv;
queue() {
chord(recv, send,
&queue::forward);
}
private:
T forward(void_t r, T s) {
return s.arg1;
}
};
The following code is from Braddock's JobQueue:
template <class T>
future<T>
schedule(boost::function<T (void)> const& fn) {
boost::mutex::scoped_lock lck(mutex_);
promise<T> prom; // create promise
q_.push_back(future_wrapper<T>(fn,
prom)); //queue the job
condition_.notify_all(); // wake worker thread(s)
return future<T>(prom); // return a future
created from the promise
}
This
code presents a very interesting idiom: we first create Future (inside
promise<T>'s constructor) - the message channel. Then we bundle
the work item up with prom - the sending-interface of channel together,
and pass this bundle to worker thread. (Asynchronously the worker
thread will do the work and write the result to the sending-interface
of channel). Finally we return future_intf - the receiving-interface of
channel to the receiving thread. Here we have loose-coupling: worker
thread don't care where the request come from and the receiver don't
care who sends the result.
Bundling a response-channel with a request message is the normal way
how function-call or RPC call are implemented in message passing
systems. It also shows another intrinsic property of message passing:
using channels to define interfaces between modules and threads to
achieve loose-coupling.
If we can wrap around our mind and think of "future" as a form of
message passing, we can gain much freedom from traditional concept of
"future", for example:
* we can simply bundle a
reference to an async port (channel) with request; and worker thread
will return result by invoking the async port when work item is done.
* the receiving thread can wait-for/handle the result
differently:
. we can block the thread at the
"getting" point, same as the current "future" implementation;
. Or if the system is already message passing based (we have a
main-loop to receive and process messages), and the work item request
is probably sent from a message handling function to worker threads,
then we can simply create a new chord which adds the response channel
(async port) to the set of channels handled by the main-loop and go
back to blocking wait at and serve main-loop;
. Or if the application is driven by
thread-pool, we can create a chord to dispatch the result from response
channel to thread-pool to handle it asynchronously.
Treating Future as message passing also make it easier to implement
wait_any_future and wait_all_future.
template <typename T1, T2, T3...>
class wait_all_future : public joint {
public:
async<T1> promise1;
async<T2> promise2;
async<T3> promise3;
...
synch<tuple<T1,T2,T3,...>()> wait;
wait_all_future() {
chord(wait, promise1, promise2,
..., &wait_all_future::proc);
}
private:
tuple<T1,T2,T3,...>
proc(void_t w, T1 p1, T2 p2, ...) {
return make_tuple(p1, p2,...);
}
};
Applications
can instantiate one wait_all instance, bundle its async response
channels (promise1,promise2,...) with diff requests to diff worker
threads, and then call wait(), blocking wait untill all results come
back.
template <typename T1, T2, T3...>
class wait_any_future : public joint {
public:
async<T1> promise1;
async<T2> promise2;
async<T3> promise3;
...
synch<void,void> wait;
wait_any_future() {
chord(wait, promise1,
&wait_any_future::proc_future1);
chord(wait, promise2,
&wait_any_future::proc_future2);
...
}
private:
void proc_future1(void_t
w, T1 p1) {
... process p1 ...
}
void proc_future2(void_t
w, T2 p2) {
... process p2 ...
}
...
};
Applications
can instantiate one wait_any_future instance, bundle its async response
channels (promise1,promise2,...) with diff requests to diff worker
threads, and then call wait() in a loop, any results coming back will
be processed in the order it arrives.