Join Thread Safe Events Tutorial
Event dispatching is a common design in today's interactive
applications. Although it is based on the "Observer" design pattern, it
has been called with different names, such as signals/slots,
subject/observers or distributors. The common use scenario is that we
use events to represent "interesting" system changes and application
code can subscribe to events by registering callback functions /
methods,
which will be invoked when events happen.
The design and implementation of event dispatching or Observer is
more challenging and involving than how it first looks, just as Andrei
Alexandrescu has discussed in his two articles
[http://www.erdani.org/publications/cuj-2005-04.pdf]
[http://www.erdani.org/publications/cuj-2005-06.pdf]. Most event
dispatching is single-threaded and callbacks are executed
by
the event distributing thread. Multithreaded event dispatching involves
the following design
dimensions:
- Allow subscribing / unsubscribing events concurrently from
different threads; Events data structures must be thread safe.
- During the loop of dispatching an event to the list of registered
callbacks functions / methods, callback code could invoke the event
interface again, change / delete event subscription or post a new
event. This is difficult to handle with the "synchronous" semantics of
normal method calls. It could lead to dead-lock (we need recursive
mutex for event data structures). The event dispatching loop could be
corrupted if some subscriptions are deleted during some callbacks,
since the loop iterators may become
invalid.
Join's async ports provide an alternative to solve these issues.
- When applications run on multi-core machines, we may want the
callbacks to execute concurrently for better performance.
Join allow us to experiment with some interesting thread safe
events designs. In the following we explain three thread safe events
design with different focus.
The
first event design targets thread safety and allows callback
functions to invoke any events API methods including adding / deleting
subscriptions and posting new events. We achieve this by exporting all
of events API as async ports.
The event class is templated by the type of event data V.
template
<typename V>
class event: public joint {
public:
Here we define the callback function / method type which takes event
data (with type V) as argument.
typedef boost::function<void(V)> subscriber;
struct subscription_data {
subscriber
callback_;
subscription_data(subscriber cb) : callback_(cb) {}
};
Here we define subscriptions which is returned when user code subscribe
to a event by registering a callback. Subscription objects can be used
to unsubscribe to events.
typedef boost::shared_ptr<subscription_data> subscription;
private:
Here we define events' internal state: the list of subscriptions /
callbacks.
std::vector<subscription> subs_;
The thread safety and data integrity is maintained by async port
"ready()" which is acting as a lock.
async<void> ready;
Event subscriptions are submited thru the following async port.
async<subscription> subscribe_priv;
public:
Events are posted / submited thru async port "post()".
async<V> post;
Application code will call subscribe() to attach a callback to event.
Internally a subscription is created, submited to async port
"subscribe_priv()" and returned to caller for later un-subscription.
subscription subscribe(subscriber cb) {
subscription
sub(new subscription_data(cb));
subscribe_priv(sub);
return sub;
}
async port "unsubscribe()" is called to detach a subscription /
callback from event.
async<subscription> unsubscribe;
operator() is define to allow posting event in similar way as function
call: event_obj(data).
void operator()(V v) { post(v); }
In constructor, we tell which executor to run the
following three async chords bodies. The first chord is for posting and
dispatching events; the second chord is for attaching callbacks to
events and the third chord is for detaching callbacks. Please note that
the third "unsubscribe" chord is defined as the last item, that means
that with a "first match" scheduler,
all pending subscribing messages will always be processed before any
pending un-subscribing messages. All three chords consist of pure async
ports, so their chord bodies will be executed asynchronously as a
task in executor's thread pool.
event(executor *e) : joint(e) {
chord(post, ready,
&event::post_cb);
chord(subscribe_priv, ready, &event::sub_cb);
//define
unsubscribe last so subscribe call will be processed first
chord(unsubscribe,
ready, &event::unsub_cb);
ready();
}
private:
In "posting" chord body, we broadcast the event to all callbacks. At
the end we call ready() to let others run (ie. release the lock).
void post_cb(V post, void_t r) {
for(size_t i=0;
i<subs_.size(); i++)
subs_[i]->callback_(post.arg1);
ready();
}
In "subscribing" chord body, we add the new subscriber to event's list
of subscribers.
void sub_cb(subscription sub, void_t r) {
subs_.push_back(sub.arg1);
ready();
}
In "un-subscribing" chord body, we delete the subscription from list.
void unsub_cb(subscription unsub, void_t r) {
typename
std::vector<subscription>::iterator iter;
if((iter =
std::find(subs_.begin(), subs_.end(), unsub.arg1)) != subs_.end()) {
subs_.erase(iter);
}
ready();
}
};
The
second event design modifies the first design by letting all
attached callbacks run concurrently as different tasks in executor's
thread pool. The only changes are related to the API to post an event.
We start these callback tasks as a chain: the first callback task is
started by calling async port "post_priv(event_data,
callback_task_index)" with the index = 0; the first callback task will
call "post_priv()" with index=1 to spawn the second callback task and
then run the first callback function; the second callback task will
spawn the third callback task and then run its callback function, and
so on.
template
<typename V>
class event: public joint {
......
private:
......
Here is the async port to spawn task to run the callback function
indexed by the int argument.
async<tuple<V,int> >
post_priv;
public:
......
Here are the API to post an event, internally we call async port
"post_priv()" to spawn the first callback task.
void operator()(V v) { post(v); }
void post(V v) { post_priv(v,
0); }
event(executor *e) : joint(e) {
chord(post_priv,
ready, &event::post_cb);
......
ready();
}
private:
Here is the "posting" chord body.
void post_cb(tuple<V,int> post, void_t r) {
V val = tuples::get<0>(post);
size_t num = tuples::get<1>(post);
subscriber cb;
if (num < subs_.size()) {
First we save the callback to be run in this task.
cb = subs_[num]->callback_;
Then we call async port "post_priv()" to spawn task to run the next
callback.
//start next callback in another task
if((num+1) < subs_.size()) {
post_priv(make_tuple(val,
(num+1)));
}
Calling "ready()" allow other chords to run.
ready();
Finally invoke the callback of this task.
if(cb) cb(val);
}
......
};
The
third event design is different from the above two event designs.
In some applications, we want all callbacks execute in the event
dispatching thread (maybe the callbacks use some thread local data
which is specified in the event dispatching thread); so event posting
API must be synchronous. Also we may want a thread safe event design
which doesn't involve asynchronous tasks (or executors) so we cannot
use chords with pure async ports. However we still want the API for
subscribing and unsubscribing to be asynchronous so that inside
callbacks functions we can call them to change subscriptions. The third
event design satisfy these requirements.
template
<typename V>
class event: public joint {
......
private:
std::vector<subscription>
subs_;
async<void> ready;
Here we define a async port to mark the state that no more
subscribing / un-subscribing messages are pending to be processed.
async<void> subscription_empty;
Here is a synch port which calls itself recursively to process all
pending subscribing / un-subscribing messages.
synch<void,void> process_subscription;
Here we define async port to process "subscribing" message.
async<subscription> subscribe_priv;
public:
Here is the synch port to dispatch events, invoke all callbacks in
the caller thread.
synch<V> post;
Here is the async port to process "unsubscribing" message.
async<subscription> unsubscribe;
void operator()(V v) { post(v);
}
In constructor, we create four chords to define event's concurrent
behaviour.
event() {
The first chord is for posting a event synchronously (in caller's
thread).
chord(post, ready, &event::post_cb);
The second chord is for processing "subscribing" messages.
chord(process_subscription, subscribe_priv,
&event::sub_cb);
The third chord is for processing "unsubscribing" messages and it is
specified later so that all pending "subscribing"
messages will be processed before any "unsubscribing" messages.
chord(process_subscription, unsubscribe,
&event::unsub_cb);
The fourth chord is for stopping processing subscriptions when no more
subscription
messages pending and it is assigned the last so that it will
run when no more subscription / un-subscription messages pending.
chord(process_subscription,
subscription_empty, &event::sub_empty_cb);
ready();
subscription_empty();
}
private:
In "posting" chord, we first call synch port "process_subscription()"
which will call itself recursively and process all pending subscribing
/ unsubscribing messages. Then we dispatch the event to all attached
callbacks.
void post_cb(V post, void_t r) {
process_subscription();
for(size_t i=0;
i<subs_.size(); i++)
subs_[i]->callback_(post.arg1);
ready();
}
In "subscribing" chord, we first attach the pending subscriber and then
call "process_subscription()" recursively.
void sub_cb(void_t proc, subscription sub) {
subs_.push_back(sub);
process_subscription();
}
In "unsubscribing" chord, we first detach the subscriber and then call
"process_subscription()" recursively.
void unsub_cb(void_t proc, subscription
unsub) {
typename
std::vector<subscription>::iterator iter;
if((iter =
std::find(subs_.begin(), subs_.end(), unsub)) != subs_.end()) {
subs_.erase(iter);
}
process_subscription();
}
Finally the simple "stopping" chord to stop the recursive call of
"process_subscription()".
void sub_empty_cb(void_t proc, void_t
empty) {
subscription_empty();
}
};