Sample 8. channel
connection thru shared memory
This sample shows that remote channels at 2 processes (chat1, chat2)
can be connected thru shared memory message queues based on
Boost.Interprocess.
Boost.Interprocess provides rich facilities for intra-node
inter-process communication, including message queues residing inside
shared memory. Channel uses this shmem message queue as one of its
transport among processes in the same node.
Here is the code walk thru at one of peer process (chat1.cpp):
First we define a message handler function which just print out the
received messages.
void msg_handler(id_type id, boost::shared_ptr<void> p, int sz, timeout_type * t)
{
chat_msg *msg = (chat_msg *) p.get();
cout << msg->source_ << " speak on [" << chat_chan::id_trait::id_to_string(id) << "]:\n";
cout << msg->data_ << "\n";
}
In the main function, we create channel; bind named_out to ids for
sending messages; bind named_in to ids and message handler for handling
incoming messages.
int main(int, char **) {
char *my_name //create local channel and bind local event source/sink
chat_chan chan;
//create/bind-to subjects i am going to speak about
chat_chan::out basketball_o(chan, basketball);
chat_chan::out tennis_o(chan, tennis);
chat_chan::out stock_o(chan, stock);
//subscribe to subjects i am interested in listening
chat_chan::in basketball_i(chan, basketball, msg_handler);
//i want hear all about financials
chat_chan::in all_financial_i(chan, all_financial, msg_handler);
register message type for marshaling/demarshaling:
//register chat msg type for marshaling/demarshaling
chat_chan::text_marshaler_registry mar_reg;
mar_reg.register_default_msg_marshaler<chat_msg>();
Next we create a "stream" based on shared memory message queue for
connecting chat channel to remote channel in other processes. The 1st
argument "false" means this process just open the share memory message
queue and don't create it.
shmem_stream<
id_type,
chat_chan::text_marshaler_registry,
timeout_type
>
shmstream(false, que_name, que_max_num, que_max_size, mar_reg); //let the other side create shmem
Then we connect channel to this shmem "stream" and this is the active
side to start channel connection hand shaking.
connect(chan, &shmstream, true); //active connection side
Here comes the main loop for sending chat messages:
char msg[1024];
bool cont = true;
while (cont) {
//speak
cout << "which subject : 1-basket_ball, 2-tennis, 3-stock, 4-exit:\n";
int subject;
cin.getline(msg, 1024);
subject = atoi(msg);
switch(subject) {
case 1:
case 2:
case 3:
cout << "enter your message: ";
cin.getline(msg, 1024);
switch (subject) {
case 1:
basketball_o.send(new chat_msg(my_name, msg));
break;
case 2:
tennis_o.send(new chat_msg(my_name, msg));
break;
case 3:
stock_o.send(new chat_msg(my_name, msg));
break;
default:
break;
}
Next we walk thru the code for 2nd peer process (chat2.cpp).
Most code is
similar to chat1.cpp, besides we have 2 local channels here.
int main(int, char **) {
char* my_name = "chatter 2";
//create local channels and connect them
chat_chan chan;
chat_chan chan2;
connect(chan, chan2);
//create/bind-to subjects i am going to speak about
chat_chan::publisher pub(chan);
pub.publish(basketball);
pub.publish(baseball);
pub.publish(tax);
pub.publish(investment);
//subscribe to subjects i am interested in listening
chat_chan::subscriber sub(chan, msg_handler);
//i am a sports fan
sub.subscribe(all_sports);
sub.subscribe(stock);
sub.subscribe(tax);
//subscribe at chan2, using named_in/out api
chat_chan::in basketball_i(chan2, basketball, msg_handler2);
chat_chan::in all_financial_i(chan2, all_financial, msg_handler2);
//register chat msg type for marshaling/demarshaling
chat_chan::text_marshaler_registry mar_reg;
mar_reg.register_default_msg_marshaler<chat_msg>();
Here we create shared memory "stream" to connect to remote channel;
notice that the 1st argument is "true", so the shared memory message
queue will be created if it doesn't exist.
//create shmem stream and connect to channel
shmem_stream<
id_type,
chat_chan::text_marshaler_registry,
timeout_type
>
shmstream(true, que_name, que_max_num, que_max_size, mar_reg); //i'll create shmem
connect(chan, &shmstream, false); //i am passive side
Complete source code listing:
chat_defs.hpp
chat1.cpp
chat2.cpp