Join Thread Safe Events Tutorial

Event dispatching is a common design in today's interactive applications. Although it is based on the "Observer" design pattern, it has been called with different names, such as signals/slots, subject/observers or distributors. The common use scenario is that we use events to represent "interesting" system changes and application code can subscribe to events by registering callback functions / methods, which will be invoked when events happen.

The design and implementation of event dispatching or Observer is more challenging and involving than how it first looks, just as Andrei Alexandrescu has discussed in his two articles [http://www.erdani.org/publications/cuj-2005-04.pdf] [http://www.erdani.org/publications/cuj-2005-06.pdf]. Most event dispatching is single-threaded and callbacks are executed by the event distributing thread. Multithreaded event dispatching involves the following design dimensions:

Join allow us to experiment with some interesting thread safe events designs. In the following we explain three thread safe events design with different focus.

The first event design targets thread safety and allows callback functions to invoke any events API methods including adding / deleting subscriptions and posting new events. We achieve this by exporting all of events API as async ports.

The event class is templated by the type of event data V.
template <typename V>
class event: public joint {
public:
Here we define the callback function / method type which takes event data (with type V) as argument.
  typedef boost::function<void(V)> subscriber;
  struct subscription_data {
    subscriber callback_;
    subscription_data(subscriber cb) : callback_(cb) {}
    };
Here we define subscriptions which is returned when user code subscribe to a event by registering a callback. Subscription objects can be used to unsubscribe to events.
  typedef boost::shared_ptr<subscription_data> subscription;
private:
Here we define events' internal state: the list of subscriptions / callbacks.
  std::vector<subscription> subs_;
The thread safety and data integrity is maintained by async port "ready()" which is acting as a lock.
  async<void> ready;
Event subscriptions are submited thru the following async port.
  async<subscription> subscribe_priv;
public:
Events are posted / submited thru async port "post()".
  async<V> post;
Application code will call subscribe() to attach a callback to event. Internally a subscription is created, submited to async port "subscribe_priv()" and returned to caller for later un-subscription.
  subscription subscribe(subscriber cb) {
    subscription sub(new subscription_data(cb));
    subscribe_priv(sub);
    return sub;
  }
async port "unsubscribe()" is called to detach a subscription /  callback from event.
  async<subscription> unsubscribe;
operator() is define to allow posting event in similar way as function call: event_obj(data).
  void operator()(V v) { post(v); }
In constructor, we tell which executor to run the following three async chords bodies. The first chord is for posting and dispatching events; the second chord is for attaching callbacks to events and the third chord is for detaching callbacks. Please note that the third "unsubscribe" chord is defined as the last item, that means that with a "first match" scheduler, all pending subscribing messages will always be processed before any pending un-subscribing messages. All three chords consist of pure async ports, so their chord bodies will be executed asynchronously as a task in executor's thread pool.
  event(executor *e) : joint(e) {
    chord(post, ready, &event::post_cb);
    chord(subscribe_priv, ready, &event::sub_cb);
    //define unsubscribe last so subscribe call will be processed first
    chord(unsubscribe, ready, &event::unsub_cb);
    ready();
  }
private:
In "posting" chord body, we broadcast the event to all callbacks. At the end we call ready() to let others run (ie. release the lock).
  void post_cb(V post, void_t r) {
    for(size_t i=0; i<subs_.size(); i++)
      subs_[i]->callback_(post.arg1);
    ready();
  }
In "subscribing" chord body, we add the new subscriber to event's list of subscribers.
  void sub_cb(subscription sub, void_t r) {
    subs_.push_back(sub.arg1);
    ready();
  }
In "un-subscribing" chord body, we delete the subscription from list.
  void unsub_cb(subscription unsub, void_t r) {
    typename std::vector<subscription>::iterator iter;
    if((iter = std::find(subs_.begin(), subs_.end(), unsub.arg1)) != subs_.end()) {
      subs_.erase(iter);
    }
    ready();
  }
};

The second event design modifies the first design by letting all attached callbacks run concurrently as different tasks in executor's thread pool. The only changes are related to the API to post an event. We start these callback tasks as a chain: the first callback task is started by calling async port "post_priv(event_data, callback_task_index)" with the index = 0; the first callback task will call "post_priv()" with index=1 to spawn the second callback task and then run the first callback function; the second callback task will spawn the third callback task and then run its callback function, and so on.

template <typename V>
class event: public joint {
    ......
private:
    ......
Here is the async port to spawn task to run the callback function indexed by the int argument.
  async<tuple<V,int> > post_priv;
public:
    ......
Here are the API to post an event, internally we call async port "post_priv()" to spawn the first callback task.
  void operator()(V v) { post(v); }
  void post(V v) { post_priv(v, 0); }
  event(executor *e) : joint(e) {
    chord(post_priv, ready, &event::post_cb);
    ......
    ready();
  }
private:
Here is the "posting" chord body.
  void post_cb(tuple<V,int> post, void_t r) {
    V val = tuples::get<0>(post);
    size_t num = tuples::get<1>(post);
    subscriber cb;
    if (num < subs_.size()) {
First we save the callback to be run in this task.
      cb = subs_[num]->callback_;
Then we call async port "post_priv()" to spawn task to run the next callback.
      //start next callback in another task
      if((num+1) < subs_.size()) {
        post_priv(make_tuple(val, (num+1)));
      }

Calling "ready()" allow other chords to run.
    ready();
Finally invoke the callback of this task.
    if(cb) cb(val);
  }
    ......
};

The third event design is different from the above two event designs. In some applications, we want all callbacks execute in the event dispatching thread (maybe the callbacks use some thread local data which is specified in the event dispatching thread); so event posting API must be synchronous. Also we may want a thread safe event design which doesn't involve asynchronous tasks (or executors) so we cannot use chords with pure async ports. However we still want the API for subscribing and unsubscribing to be asynchronous so that inside callbacks functions we can call them to change subscriptions. The third event design satisfy these requirements.

template <typename V>
class event: public joint {
......
private:
  std::vector<subscription> subs_;
  async<void> ready;
Here we define a async port to mark the state that no more subscribing / un-subscribing messages are pending to be processed.
  async<void> subscription_empty;
Here is a synch port which calls itself recursively to process all pending subscribing / un-subscribing messages.
  synch<void,void> process_subscription;
Here we define async port to process "subscribing" message.
  async<subscription> subscribe_priv;
public:
Here is the synch port to dispatch events, invoke all callbacks in the caller thread.
  synch<V> post;
Here is the async port to process "unsubscribing" message.
  async<subscription> unsubscribe;
  void operator()(V v) { post(v); }
In constructor, we create four chords to define event's concurrent behaviour.
  event() {
The first chord is for posting a event synchronously (in caller's thread).
    chord(post, ready, &event::post_cb);
The second chord is for processing "subscribing" messages.
    chord(process_subscription, subscribe_priv, &event::sub_cb);
The third chord is for processing "unsubscribing" messages and it is specified later so that all pending "subscribing" messages will be processed before any "unsubscribing" messages.
    chord(process_subscription, unsubscribe, &event::unsub_cb);
The fourth chord is for stopping processing subscriptions when no more subscription messages pending and it is assigned the last so that it will run when no more subscription / un-subscription messages pending.
    chord(process_subscription, subscription_empty, &event::sub_empty_cb);
    ready();
    subscription_empty();
  }
private:
In "posting" chord, we first call synch port "process_subscription()" which will call itself recursively and process all pending subscribing / unsubscribing messages. Then we dispatch the event to all attached callbacks.
  void post_cb(V post, void_t r) {
    process_subscription();
    for(size_t i=0; i<subs_.size(); i++)
      subs_[i]->callback_(post.arg1);
    ready();
  }
In "subscribing" chord, we first attach the pending subscriber and then call "process_subscription()" recursively.
  void sub_cb(void_t proc, subscription sub) {
    subs_.push_back(sub);
    process_subscription();
  }
In "unsubscribing" chord, we first detach the subscriber and then call "process_subscription()" recursively.
  void unsub_cb(void_t proc, subscription unsub) {
    typename std::vector<subscription>::iterator iter;
    if((iter = std::find(subs_.begin(), subs_.end(), unsub)) != subs_.end()) {
      subs_.erase(iter);
    }
    process_subscription();
  }
Finally the simple "stopping" chord to stop the recursive call of "process_subscription()".
  void sub_empty_cb(void_t proc, void_t empty) {
    subscription_empty();
  }
};