00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00011 
00012 #include <PongTask.h>
00013 #include <PongTask_export.h>
00014 #include <Clock.h>
00015 #include <iostream>
00016 
00017 using namespace std;
00018 
00019 
00020 class TimeOutCB : public ACE_Event_Handler
00021 {
00022 public:
00023   TimeOutCB(Pong_Task *pt) : pong_task_(pt) {}
00024   virtual int handle_timeout (const ACE_Time_Value &,
00025                               const void *arg);
00026 private:
00027   Pong_Task *pong_task_;
00028 };
00029 
00030 int TimeOutCB::handle_timeout (const ACE_Time_Value &, const void *arg)
00031 {
00032   ACE_UNUSED_ARG(arg);
00033   pong_task_->send_ping_pong();
00034   return 0;
00035 }
00036 
00037 
00038 
00039 
00040 
00041 
00042 
00043 Status Pong_Task::prepare(void)
00044 {  
00045   my_port()->publish_msg(PING_MSG, SCOPE_GLOBAL);
00046   
00047   my_port()->subscribe_msg(PONG_MSG);
00048 
00049   const ACE_Time_Value curr_tv = ACE_OS::gettimeofday ();
00050   TimeOutCB *cb_ = new TimeOutCB(this);
00051   ACE_Time_Value interval = ACE_Time_Value (1, 0);
00052   Clock::instance()->schedule (cb_, NULL, curr_tv + interval, interval);
00053 
00054   return SUCCESS;
00055 }
00056 
00057 Status Pong_Task::cleanup(void)
00058 {
00059   ACE_DEBUG ((LM_DEBUG,
00060               "(%t) Pong_Task::cleanup(void)...\n"));
00061   my_port()->unpublish_msg(PING_MSG);
00062   
00063   my_port()->unsubscribe_msg(PONG_MSG);
00064   return SUCCESS;
00065 }
00066 
00067 void Pong_Task::send_ping_pong(void) {
00068   Ping_Pong_Msg *rsp;
00069   rsp = new Ping_Pong_Msg();
00070   rsp->count = 0;
00071   strcpy(rsp->data,"Lets ping pong...");
00072   rsp->len = strlen("Lets ping pong...");
00073   Msg *m = new Msg(PING_MSG, rsp);
00074   my_port()->send_msg(m);
00075   cerr << "send one msg...\n";
00076 }
00077 
00078 
00079 int Pong_Task::work() {
00080   ACE_DEBUG ((LM_DEBUG,
00081               "(%t) %s pong_task coming up ...\n", my_name().c_str()));
00082 
00083   Msg *msg;
00084   Ping_Pong_Msg *sm, *rsp;
00085 
00086   
00087   ACE_OS::sleep(ACE_Time_Value(1));
00088 
00089   
00090   rsp = new Ping_Pong_Msg();
00091   rsp->count = 0;
00092   strcpy(rsp->data,"Lets ping pong...");
00093   rsp->len = strlen("Lets ping pong...");
00094   Msg *m = new Msg(PING_MSG, rsp);
00095   my_port()->send_msg(m);
00096 
00097   for(;;) {
00098   ACE_OS::sleep(ACE_Time_Value(100));
00099   
00100     if(my_port()->recv_msg(msg) == SUCCESS) {
00101       
00102       if(msg->type == PONG_MSG) {
00103         sm = (Ping_Pong_Msg *)msg;
00104         sm->data[sm->len-1] = '\0';
00105 
00106         ACE_DEBUG ((LM_DEBUG, "(%t) (%s) receive the Pong_Pong_Msg[%d]: \n%s\n",
00107                     my_name().c_str(),sm->count,sm->data));
00108  
00109         rsp = new Ping_Pong_Msg();
00110         rsp->len = sm->len;
00111         rsp->count = sm->count+1;
00112         if(rsp->count > 4096) rsp->count=0;
00113         strcpy(rsp->data,sm->data);
00114 
00115         delete sm;
00116 
00117         ACE_OS::sleep(ACE_Time_Value(1));
00118 
00119         m = new Msg(PING_MSG, rsp);
00120         my_port()->send_msg(m);
00121       }
00122     } else {
00123       
00124       break;
00125     }
00126  
00127   }
00128 
00129   ACE_DEBUG ((LM_DEBUG,
00130               "(%t) %s pong_task exits...\n", my_name().c_str()));
00131 
00132   return 0;
00133 }
00134 
00135 ACE_FACTORY_DEFINE (PongTask, Pong_Task)