Sample 7. distributed chat thru a central server
This sample shows simple chat client and server design. Clients connect
to server to chat with each other in seperate chat groups identified by
subject. The chat subject (a string) is the ids in name space. Clients
can join/leave chat groups identified by subject ids and send messages
to chat groups. If the chat group (subject) doesn't exist yet, the
first member's "join" will make it created; so unlimited number of
subjects (thus chat groups) can be added to the name space.
Here are code for chat client (chat_cli.cpp):
We first define the handler function for chat messages, simply print
them out.
void msg_handler(id_type id, boost::shared_ptr<void> p)
{
chat_msg *msg = (chat_msg *) p.get();
cout << msg->source_ << " speak on [" << chat_chan::id_trait::id_to_string(id) << "]:\n";
cout << msg->data_ << "\n";
}
Chat clients start with 3 command line arguments: chatter's name,
chat_server host name and port number:
int main(int argc, char **argv) {
if (argc < 4) {
std::cout << "Usage: chat_cli chatter_name srv_name srv_port\n";
exit(0);
}
const char *my_name = argv[1];
const char *srv_name = argv[2];
const char *srv_port = argv[3];
Next, we create ASIO io_service object; create a asio_executor object
and create an asynchronous channel which will execute all its callbacks
in asio_executor (in ASIO's main thread).
try {
boost::asio::io_service io_service;
asio_executor asio_exec(io_service);
chat_chan chan(&asio_exec);
Since chat messages will move from one client to server and to other
subscribed clients, they need to be marshaled/demarshaled for
transmission. Use channel's text_marshaler_registry which is based on
Boost.Serialization. Since we only have one message data type, we can
register its type as default (If we have more than one message data
types, we can register the major one as default and register other with
its corresponding ids.
chat_chan::text_marshaler_registry mar_reg;
mar_reg.register_default_msg_marshaler<chat_msg>();
Connect chat channel to remote server as following and spawn a separate
thread run ASIO's main loop:
asio_connector connector(io_service);
connector.async_connect(srv_name, srv_port, //remote channel address
boost::bind(asio_bind_sock_chan<chat_chan, chat_chan::text_marshaler_registry>(),
boost::ref(chan), boost::ref(mar_reg), _1, _2));
boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
In main thread, we handle chat clients main activities: join/leave
chat_groups(subjects) and send/receive messages
chat_chan::publisher pub(chan);
chat_chan::subscriber sub(chan, msg_handler);
bool cont = true;
char buf[1024];
while (cont) {
Clients can do 3 major things: join chat_group (subject), leave
chat_group and send messages to chat_groups. If the chat group
(subject) doesn't exist yet, the
first member's "join" will make it created.
cout << "action : 1-join, 2-leave, 3-send, 4-exit:\n";
int action;
cin.getline(buf, 1024);
action = atoi(buf);
switch (action) {
case 1:
case 2:
case 3:
{
cout << "subject: ";
cin.getline(buf, 1024);
id_type id(buf);
switch (action) {
case 1:
we join the chat_group by both publishing and subscribing the subject
id:
pub.publish(id);
sub.subscribe(id);
break;
case 2:
we leave the chat_group by both unpublishing and unsubscribing the
subject id:
pub.unpublish(id);
sub.unsubscribe(id);
break;
case 3:
clients send messages:
cout << "message: ";
cin.getline(buf, 1024);
pub.send(id, new chat_msg(my_name,buf));
break;
default:
break;
}
Next let's step thru the server code (chat_srv.cpp).
In
normal chat server design based on basic socket, the chat server has 2
main responsibilities: maintaining membership info (which clients are
in which chat groups) and forwarding messages. Channel's facilities
greatly simplifies the design and code of chat server.
First, channel has 4 internal system messages notifying about name
space changes:
id_type subscription_info_msg;
id_type unsubscription_info_msg;
id_type publication_info_msg;
id_type unpublication_info_msg;
These internal system messages are defined inside id_trait class of
channel type. Application code can bind to (or subscribe to) these
system messages to get notified whenever name space changes (such as
some peers join in and starts publishing and subscribing messages).
During chatting, whenever a client join a group (subject), it will
publish/send messages on this subject and subscribe/receive messages on
this subject. By subscribing to publication_info_msg, chat servers can
know when a client joins a group and the subject of this group. In the
following msg_handler function, chat server does the following to
fullfill its 2 responsibilities:
- When server receives name space change message (publication_info_msg):
- server retrieves ids from publication_info_msg and subscribes
to these ids, so all of the messages
about this subject (or in this group) will be sent to server
- server publishes these subject ids, so all the clients which
are
in this group will be connected to server to receive messages in this
group
- When server receives chat messages, just forward them
void msg_handler(chat_chan::publisher &pub,
chat_chan::subscriber &sub,
id_type id, boost::shared_ptr<void> p, int sz, timeout_type * t)
{
typedef pubsub_info_msg_t<id_type> pubsub_info_msg_t;
pubsub_info_msg_t *pubsub_info = NULL;
if(id == chat_chan::publication_info_msg) {
//update name space based clients' publications and subscriptions
//and forward chat msgs
pubsub_info = (pubsub_info_msg_t *) p.get();
for(std::vector<id_type>::iterator iter = pubsub_info->msg_types.begin();
iter != pubsub_info->msg_types.end(); iter++) {
pub.publish((*iter), chat_chan::in::scope_remote);
sub.subscribe((*iter), chat_chan::in::scope_remote);
}
}
else { //chat subject ids
//forward chat msgs to subscribers
pub.send(id, p, sz, t);
}
}
Chat server's main function is simple, having port number as only
argument:
int main(int argc, char **argv) {
if (argc < 2) {
std::cout << "Usage: chat_srv srv_port\n";
exit(0);
}
const char * srv_port = argv[1];
Similar to chat client code, we create an asynchronous channel
with asio_executor:
try {
boost::asio::io_service io_service;
asio_executor asio_exec(io_service);
chat_chan chan(&asio_exec);
Next we create channel publisher and subscriber and bind message handler
chat_chan::publisher pub(chan);
chat_chan::subscriber sub(chan, boost::bind(msg_handler, boost::ref(pub),
boost::ref(sub), _1,_2));
We subscribe to name space change message here then,
sub.subscribe(chat_chan::publication_info_msg, chat_chan::in::scope_remote);
Then same as chat client, server create message marshaler,
asio_connector and start waiting for remote clients connections:
//register chat msg type for marshaling/demarshaling
chat_chan::text_marshaler_registry mar_reg;
mar_reg.register_default_msg_marshaler<chat_msg>();
//create asio connectors and connect to remote channel
asio_connector connector(io_service);
connector.async_accept(atoi(srv_port), // channel address
boost::bind(asio_bind_sock_chan<chat_chan, chat_chan::text_marshaler_registry>(),
boost::ref(chan), boost::ref(mar_reg), _1, _2));
//main loop
io_service.run();
A sample scenario could be as following:
- start chat server: ./chat_srv 4455
- start 2 chat clients and connect to server: ./chat_cli steve
localhost 4455 and ./chat_cli regean localhost 4455
- join chat groups in clients
- send messages to groups
Complete source code listing:
chat_defs.hpp
chat_cli.cpp
chat_srv.cpp