Sample 3: Distributed Event Channel


This sample is based on Sample1 with changes to show how remote channels can be connected to facilitate distributed events dispatching.

Now we put the event source (gui_window) in one process with some local event sinks and connect it to another process which contains other event sinks. We are going to use Boost.Asio to set up tcp socket for remote connection transport.

All major data types are defined in a common header file evt_chan_defs.hpp.

First we need to instantiate the concrete channel class. Different from Sample1 and Sample2, we are going to make Channel asynchronous which means that callbacks are not executed by the event sending thread directly (or in place). In stead callbacks will be scheduled and executed lated in an executor. For best integration with asio, Channel provides a special executor (asio_executor) to dispatch asynchronous operations to asio's main thread. So here is our channel type:
typedef channel<string,
        boost_platform,
        mt_synch<boost_platform>,
        asio_executor> evt_chan;
Channel's implementation depends on some system facilities, such as mutex, condition, timers. To facilitate platform independence, these facilties are defined as nested types inside "platform" wrapper class, such as boost_platform used here. mt_synch is synchronization policy class for thread safe operations, while another class null_synch<> contains empty "null" definitions of synchronization primitives and can be used for single threaded application.

Next we define event ids and data structure the same as Sample1.

Since we are doing distributed event dispatching now, the messages/event_data need be marshaled/demarshaled across process boundary. Channel uses Boost.Serialization for message marshaling. So the message class contains the following serialize() method:
struct evt_data {
  string data_;
  evt_data(const char *d) : data_(d) {}
  evt_data() {} //have to define this for marshaling to work
  //serialization function for evt_data
  template <typename Archive>
  void serialize(Archive & ar, const unsigned int version)
  {
    ar & data_;
  }
};

In file evt_chan1.cpp, we first define event source (gui_window) and handler class (window_handler) the same as Sample1 and Sample2.
Next we introduce a major addition in this sameple - name binding callback. Since we are going to connect channels in 2 processes and send events to remote process, we must start sending events after the remote process connect in and bind to event ids. Otherwise events will be lost. Channel allows registering callbacks to names (named_in and named_out) so that application will be notified when remote peers connect in and bind. The following is the binding callback in this sample in which we start sending events after remote peers connect and bind:
void name_binding_callback(evt_chan::name *n, evt_chan::name::binding_event e) {
  if (n->type_ == evt_chan::name::member_remote && e == evt_chan::name::bind_ev) {
    if (n->id_ == down_id) rmt_down_bound = true;
    if (n->id_ == up_id) rmt_up_bound = true;
    if (n->id_ == close_id) rmt_close_bound = true;
    ///fire sample events after remote peer connected and ids bound
    if (rmt_down_bound && rmt_up_bound && rmt_close_bound) {
      window->up("..Hi there [mouse-left-up] ..");
      window->down(".. a simple window test [mouse-right-down] ..");
      window->close(".. simple window wiered [window-closed] ..");
    }
  }
}

Now the main function in evt_chan1.cpp:
int main(int, char **) {
  try {
First we create asio io_service and asio_executor for asynchronous operations.
    boost::asio::io_service io_service;
    asio_executor asio_exec(io_service);
Next create local channel and configure it to run all its async operations in asio_exec
    evt_chan chan(&asio_exec);
Same as Sample1 and Sample2, create event source (gui_window) and handler and bind them to channel. Also name binding callback is registered when creating gui_window.
    gui_window window(chan);
    window_handler hdlr(chan);
Since event messages will be sent to remote process, they will be marshaled into on-wire format at sender side and demarshaled at receiver side. Channel's marshaling mechanism is based on Boost.Serialization. Here we register event/message data class (evt_data) with corresponding event ids in a marshaler registry inside which a marshaler object is created automatically for this data class. Later when socket connection is set up, we should specify the marshaler_registry to use:
    evt_chan::text_marshaler_registry mar_reg;
    std::vector<evt_chan::id_type> ids;
    ids.push_back(down_id);
    ids.push_back(up_id);
    ids.push_back(close_id);
    mar_reg.register_msg_marshaler<evt_data>(ids);
Now we create asio connector and connect local channel to remote channel. Since we are using asynchronous API at asio, we pass in a callback (asio_bind_sock_chan) asking asio_connector to connect socket streams to local channel, including which marshaler_registry to use. Here in evt_chan1.cpp, asio connector plays the active connecting role.
    asio_connector connector(io_service);
    connector.async_connect("localhost", "6666", //remote channel address
    boost::bind(asio_bind_sock_chan<evt_chan, evt_chan::text_marshaler_registry>(),
        boost::ref(chan), boost::ref(mar_reg), _1, _2));
Finally start asio main event loop.
    io_service.run();
...
}

In file evt_chan2.cpp, most code is similar to code in evt_chan1.cpp with one major difference: asio_connector plays the passive side of connection (accept):
  try {
    boost::asio::io_service io_service;
    asio_connector connector(io_service);
    connector.async_accept(6666, //channel published at port 6666
    boost::bind(asio_bind_sock_chan<evt_chan, evt_chan::text_marshaler_registry>(),
            boost::ref(chan), boost::ref(mar_reg), _1, _2));
    io_service.run();
  }

During testing, first start evt_chan2 and then start evt_chan1. From trace, we should see the events generated at evt_chan1 received and processed at both evt_chan1 and evt_chan2 processes.

Here are complete source listing:
evt_chan_defs.hpp
evt_chan1.cpp
evt_chan2.cpp