00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00012 #include <PongTask.h>
00013 #include <PongTask_export.h>
00014 #include <Clock.h>
00015 #include <iostream>
00016
00017 using namespace std;
00018
00019
00020 class TimeOutCB : public ACE_Event_Handler
00021 {
00022 public:
00023 TimeOutCB(Pong_Task *pt) : pong_task_(pt) {}
00024 virtual int handle_timeout (const ACE_Time_Value &,
00025 const void *arg);
00026 private:
00027 Pong_Task *pong_task_;
00028 };
00029
00030 int TimeOutCB::handle_timeout (const ACE_Time_Value &, const void *arg)
00031 {
00032 ACE_UNUSED_ARG(arg);
00033 pong_task_->send_ping_pong();
00034 return 0;
00035 }
00036
00037
00038
00039
00040
00041
00042
00043 Status Pong_Task::prepare(void)
00044 {
00045 my_port()->publish_msg(PING_MSG, SCOPE_GLOBAL);
00046
00047 my_port()->subscribe_msg(PONG_MSG);
00048
00049 const ACE_Time_Value curr_tv = ACE_OS::gettimeofday ();
00050 TimeOutCB *cb_ = new TimeOutCB(this);
00051 ACE_Time_Value interval = ACE_Time_Value (1, 0);
00052 Clock::instance()->schedule (cb_, NULL, curr_tv + interval, interval);
00053
00054 return SUCCESS;
00055 }
00056
00057 Status Pong_Task::cleanup(void)
00058 {
00059 ACE_DEBUG ((LM_DEBUG,
00060 "(%t) Pong_Task::cleanup(void)...\n"));
00061 my_port()->unpublish_msg(PING_MSG);
00062
00063 my_port()->unsubscribe_msg(PONG_MSG);
00064 return SUCCESS;
00065 }
00066
00067 void Pong_Task::send_ping_pong(void) {
00068 Ping_Pong_Msg *rsp;
00069 rsp = new Ping_Pong_Msg();
00070 rsp->count = 0;
00071 strcpy(rsp->data,"Lets ping pong...");
00072 rsp->len = strlen("Lets ping pong...");
00073 Msg *m = new Msg(PING_MSG, rsp);
00074 my_port()->send_msg(m);
00075 cerr << "send one msg...\n";
00076 }
00077
00078
00079 int Pong_Task::work() {
00080 ACE_DEBUG ((LM_DEBUG,
00081 "(%t) %s pong_task coming up ...\n", my_name().c_str()));
00082
00083 Msg *msg;
00084 Ping_Pong_Msg *sm, *rsp;
00085
00086
00087 ACE_OS::sleep(ACE_Time_Value(1));
00088
00089
00090 rsp = new Ping_Pong_Msg();
00091 rsp->count = 0;
00092 strcpy(rsp->data,"Lets ping pong...");
00093 rsp->len = strlen("Lets ping pong...");
00094 Msg *m = new Msg(PING_MSG, rsp);
00095 my_port()->send_msg(m);
00096
00097 for(;;) {
00098 ACE_OS::sleep(ACE_Time_Value(100));
00099
00100 if(my_port()->recv_msg(msg) == SUCCESS) {
00101
00102 if(msg->type == PONG_MSG) {
00103 sm = (Ping_Pong_Msg *)msg;
00104 sm->data[sm->len-1] = '\0';
00105
00106 ACE_DEBUG ((LM_DEBUG, "(%t) (%s) receive the Pong_Pong_Msg[%d]: \n%s\n",
00107 my_name().c_str(),sm->count,sm->data));
00108
00109 rsp = new Ping_Pong_Msg();
00110 rsp->len = sm->len;
00111 rsp->count = sm->count+1;
00112 if(rsp->count > 4096) rsp->count=0;
00113 strcpy(rsp->data,sm->data);
00114
00115 delete sm;
00116
00117 ACE_OS::sleep(ACE_Time_Value(1));
00118
00119 m = new Msg(PING_MSG, rsp);
00120 my_port()->send_msg(m);
00121 }
00122 } else {
00123
00124 break;
00125 }
00126
00127 }
00128
00129 ACE_DEBUG ((LM_DEBUG,
00130 "(%t) %s pong_task exits...\n", my_name().c_str()));
00131
00132 return 0;
00133 }
00134
00135 ACE_FACTORY_DEFINE (PongTask, Pong_Task)