L:/channel/channel/examples/hier_id_text_recv/recv.cpp

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00018 #include <iostream>
00019 #include <string>
00020 
00021 #include <Channel.h>
00022 
00023 using namespace std;
00024 using namespace channel;
00025 
00026 //---------------------------------------------------------------
00027 // Channel specialization
00028 //
00029 typedef StringPathId<'/'> IdType;
00030 
00031 typedef Channel<IdType, IdTrait<IdType>, ACE_MT_SYNCH, TrieRouter<IdType, IdTrait<IdType>, ACE_MT_SYNCH> > Chan;
00032 typedef Chan::Msg ChanMsg;
00033 
00034 //------------------------------------------------------------------
00035 // Message definitions :
00036 //
00037 //app msgs ids 
00038 IdType PING_MSG = "/APP/PING";
00039 IdType PONG_MSG = "/APP/PONG";
00040 IdType TEST_STRING_MSG = "/APP/TEST";
00041 IdType APP_WILDCARD_MSG = "/APP/*";
00042 
00043 struct Test_String_Msg {
00044   enum { MAX_STR_LEN = 1024 };
00045   int len;
00046   char data[MAX_STR_LEN];
00047   Test_String_Msg() {
00048   }
00049 };
00050 
00051 //----------------------------------------------------------------
00052 // Implementation
00053 //
00054 
00055 class My_Callback: public Chan::Callback {
00056 public:
00057   My_Callback(Chan *c) : Chan::Callback(c) {}
00058   Status process(Msg *msg) {
00059     Test_String_Msg *sm = (Test_String_Msg *)msg->data();
00060     cout << "hier_recv/My_Callback recv the following: " << sm->data << endl;
00061     cout << "...... Please notice callback borrow Channel internal thread\n";
00062     delete msg;
00063     return SUCCESS;
00064   }
00065 };
00066 
00067 int main (int argc, char **argv) {
00068   if (argc < 2) {
00069     cout << "Usage: hrecv my_host:port [peer_host:port]\n";
00070     return -1;
00071   }
00072 
00073   //step1. create channel
00074   Chan * my_chan = new Chan(); 
00075   
00076   //step2. setup channels
00077   char *my_addr = argv[1];
00078   Chan::TcpSockConnector *tcp_conn = new Chan::TcpSockConnector(my_chan, true);
00079   if (tcp_conn->open (my_addr) == FAILURE) {
00080     cout << "failed to open at " << my_addr << endl;
00081     return -1;
00082   }
00083   if (argc >= 3) {
00084     char *peer_addr = argv[2];
00085     ConnInfo ci(peer_addr);
00086     if (tcp_conn->connect (ci) == FAILURE) { 
00087       cout << "failed to connect peer  at " << peer_addr << endl;
00088       return -1;
00089     }
00090   }
00091 
00092   //step3. create ports/callbacks attached to channels
00093   Chan::Port my_port(my_chan); //use my own queue
00094   My_Callback my_calbak(my_chan);
00095  
00096   //step4. sub msgs thru ports/callbacks
00097   my_port.subscribe_msg(APP_WILDCARD_MSG);
00098   my_calbak.subscribe_msg(TEST_STRING_MSG);
00099 
00100   cout << "hier_recv coming up ...\n";
00101 
00102   ChanMsg *msg;
00103   Test_String_Msg *sm;
00104  
00105   //step5. recv msgs from ports and process them
00106   for(;;) {
00107     if(my_port.recv_msg(msg) == SUCCESS) {
00108       //
00109       sm = (Test_String_Msg *)msg->data();
00110       sm->data[sm->len-1] = '\0';
00111       cout << "hier_recv thread receive the following: " << sm->data << endl;
00112       delete msg;
00113     } else {
00114       //handle_error();
00115       break;
00116     }
00117   }
00118 
00119   cout << "hier_recv  exits...\n";
00120 
00121   return 0;
00122 }

Generated on Mon Feb 27 19:59:21 2006 for channel by  doxygen 1.4.6-NO