Sample 3: Distributed Event Channel
This sample is based on Sample1 with changes to show how remote
channels can be connected to facilitate distributed events dispatching.
Now we put the event source (gui_window) in one process with some local
event sinks and connect it to another process which contains other
event
sinks. We are going to use Boost.Asio to set up tcp socket for remote
connection transport.
All major data types are defined in a common header file evt_chan_defs.hpp.
First we need to instantiate the concrete channel class. Different from
Sample1 and Sample2, we are going to make Channel asynchronous which means that
callbacks are not executed by the event sending thread directly (or in
place). In stead callbacks will be scheduled and executed lated in an
executor. For best integration with asio, Channel provides a special
executor (asio_executor) to dispatch asynchronous operations to asio's
main thread. So here is our channel type:
typedef
channel<string,
boost_platform,
mt_synch<boost_platform>,
asio_executor> evt_chan;
Channel's implementation depends on some system facilities, such as
mutex, condition, timers. To facilitate platform independence, these
facilties are defined as nested types inside "platform" wrapper class,
such as boost_platform used here. mt_synch is synchronization policy
class for thread safe operations, while another class
null_synch<> contains empty "null" definitions of synchronization
primitives and can be used for single threaded application.
Next we define event ids and data structure the same as Sample1.
Since we are doing distributed event dispatching now, the
messages/event_data need be marshaled/demarshaled across process
boundary. Channel uses Boost.Serialization for message marshaling. So
the message class contains the following serialize() method:
struct
evt_data {
string data_;
evt_data(const char *d) : data_(d) {}
evt_data() {} //have to define this for marshaling to work
//serialization function for evt_data
template <typename Archive>
void serialize(Archive & ar, const unsigned int version)
{
ar & data_;
}
};
In file evt_chan1.cpp,
we first define event source (gui_window) and handler class
(window_handler) the same as Sample1 and Sample2.
Next we introduce a major addition in this sameple - name binding
callback. Since we are going to connect channels in 2 processes and
send events to remote process, we must start sending events after the
remote process connect in and bind to event ids. Otherwise events will
be lost. Channel allows registering callbacks to
names (named_in and named_out) so that application will be notified
when
remote peers connect in and bind. The following is the binding callback
in this sample in which we start sending events after remote peers
connect and bind:
void
name_binding_callback(evt_chan::name *n, evt_chan::name::binding_event
e) {
if (n->type_ == evt_chan::name::member_remote && e ==
evt_chan::name::bind_ev) {
if (n->id_ == down_id) rmt_down_bound = true;
if (n->id_ == up_id) rmt_up_bound = true;
if (n->id_ == close_id) rmt_close_bound = true;
///fire sample events after remote peer connected and ids bound
if (rmt_down_bound && rmt_up_bound && rmt_close_bound) {
window->up("..Hi there [mouse-left-up] ..");
window->down(".. a simple window test [mouse-right-down] ..");
window->close(".. simple window wiered [window-closed] ..");
}
}
}
Now the main function in evt_chan1.cpp:
int
main(int, char **) {
try {
First we create asio io_service and asio_executor for asynchronous
operations.
boost::asio::io_service io_service;
asio_executor asio_exec(io_service);
Next create local channel and configure it to run all its async
operations in asio_exec
evt_chan chan(&asio_exec);
Same as Sample1 and Sample2, create event source (gui_window) and
handler and bind them to channel. Also name binding callback is
registered when creating gui_window.
gui_window window(chan);
window_handler hdlr(chan);
Since event messages will be sent to remote process, they will be
marshaled into on-wire format at sender side and demarshaled at
receiver side. Channel's marshaling mechanism is based on
Boost.Serialization. Here we register event/message data class
(evt_data) with corresponding event ids in a marshaler registry inside
which a marshaler object is created automatically for this data class.
Later when socket connection is set up, we should specify the
marshaler_registry to use:
evt_chan::text_marshaler_registry mar_reg;
std::vector<evt_chan::id_type> ids;
ids.push_back(down_id);
ids.push_back(up_id);
ids.push_back(close_id);
mar_reg.register_msg_marshaler<evt_data>(ids);
Now we create asio connector and connect local channel to remote
channel. Since we are using asynchronous API at asio, we pass in a
callback (asio_bind_sock_chan) asking asio_connector to connect socket
streams to local channel, including which marshaler_registry to use.
Here in evt_chan1.cpp, asio connector plays the active connecting role.
asio_connector connector(io_service);
connector.async_connect("localhost", "6666", //remote channel address
boost::bind(asio_bind_sock_chan<evt_chan,
evt_chan::text_marshaler_registry>(),
boost::ref(chan), boost::ref(mar_reg), _1, _2));
Finally start asio main event loop.
io_service.run();
...
}
In file evt_chan2.cpp,
most code is similar to code in evt_chan1.cpp
with one major difference: asio_connector plays the passive side of
connection (accept):
try {
boost::asio::io_service io_service;
asio_connector connector(io_service);
connector.async_accept(6666, //channel published at port 6666
boost::bind(asio_bind_sock_chan<evt_chan,
evt_chan::text_marshaler_registry>(),
boost::ref(chan),
boost::ref(mar_reg), _1, _2));
io_service.run();
}
During testing, first start evt_chan2 and then start evt_chan1. From
trace, we should see the events generated at evt_chan1 received and
processed at both evt_chan1 and evt_chan2 processes.
Here are complete source listing:
evt_chan_defs.hpp
evt_chan1.cpp
evt_chan2.cpp