00001 00002 // Copyright (c) 2005, 2006 Yigong Liu 00003 // Permission to use, copy, modify, distribute and sell this software for any 00004 // purpose is hereby granted without fee, provided that the above copyright 00005 // notice appear in all copies and that both that copyright notice and this 00006 // permission notice appear in supporting documentation. 00007 // The author makes no representations about the 00008 // suitability of this software for any purpose. It is provided "as is" 00009 // without express or implied warranty. 00011 00018 #include <iostream> 00019 #include <string> 00020 00021 #include <Channel.h> 00022 00023 using namespace std; 00024 using namespace channel; 00025 00026 //--------------------------------------------------------------- 00027 // Channel specialization 00028 // 00029 typedef StringPathId<'/'> IdType; 00030 00031 typedef Channel<IdType, IdTrait<IdType>, ACE_MT_SYNCH, TrieRouter<IdType, IdTrait<IdType>, ACE_MT_SYNCH> > Chan; 00032 typedef Chan::Msg ChanMsg; 00033 00034 //------------------------------------------------------------------ 00035 // Message definitions : 00036 // 00037 //app msgs ids 00038 IdType PING_MSG = "/APP/PING"; 00039 IdType PONG_MSG = "/APP/PONG"; 00040 IdType TEST_STRING_MSG = "/APP/TEST"; 00041 IdType APP_WILDCARD_MSG = "/APP/*"; 00042 00043 struct Test_String_Msg { 00044 enum { MAX_STR_LEN = 1024 }; 00045 int len; 00046 char data[MAX_STR_LEN]; 00047 Test_String_Msg() { 00048 } 00049 }; 00050 00051 //---------------------------------------------------------------- 00052 // Implementation 00053 // 00054 00055 class My_Callback: public Chan::Callback { 00056 public: 00057 My_Callback(Chan *c) : Chan::Callback(c) {} 00058 Status process(Msg *msg) { 00059 Test_String_Msg *sm = (Test_String_Msg *)msg->data(); 00060 cout << "hier_recv/My_Callback recv the following: " << sm->data << endl; 00061 cout << "...... Please notice callback borrow Channel internal thread\n"; 00062 delete msg; 00063 return SUCCESS; 00064 } 00065 }; 00066 00067 int main (int argc, char **argv) { 00068 if (argc < 2) { 00069 cout << "Usage: hrecv my_host:port [peer_host:port]\n"; 00070 return -1; 00071 } 00072 00073 //step1. create channel 00074 Chan * my_chan = new Chan(); 00075 00076 //step2. setup channels 00077 char *my_addr = argv[1]; 00078 Chan::TcpSockConnector *tcp_conn = new Chan::TcpSockConnector(my_chan, true); 00079 if (tcp_conn->open (my_addr) == FAILURE) { 00080 cout << "failed to open at " << my_addr << endl; 00081 return -1; 00082 } 00083 if (argc >= 3) { 00084 char *peer_addr = argv[2]; 00085 ConnInfo ci(peer_addr); 00086 if (tcp_conn->connect (ci) == FAILURE) { 00087 cout << "failed to connect peer at " << peer_addr << endl; 00088 return -1; 00089 } 00090 } 00091 00092 //step3. create ports/callbacks attached to channels 00093 Chan::Port my_port(my_chan); //use my own queue 00094 My_Callback my_calbak(my_chan); 00095 00096 //step4. sub msgs thru ports/callbacks 00097 my_port.subscribe_msg(APP_WILDCARD_MSG); 00098 my_calbak.subscribe_msg(TEST_STRING_MSG); 00099 00100 cout << "hier_recv coming up ...\n"; 00101 00102 ChanMsg *msg; 00103 Test_String_Msg *sm; 00104 00105 //step5. recv msgs from ports and process them 00106 for(;;) { 00107 if(my_port.recv_msg(msg) == SUCCESS) { 00108 // 00109 sm = (Test_String_Msg *)msg->data(); 00110 sm->data[sm->len-1] = '\0'; 00111 cout << "hier_recv thread receive the following: " << sm->data << endl; 00112 delete msg; 00113 } else { 00114 //handle_error(); 00115 break; 00116 } 00117 } 00118 00119 cout << "hier_recv exits...\n"; 00120 00121 return 0; 00122 }