00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00018 #ifndef _PORT_H_
00019 #define _PORT_H_
00020
00021 #include "ace/Message_Queue.h"
00022 #include "ace/Log_Msg.h"
00023
00024 #include <BaseDef.h>
00025
00026
00027 namespace channel {
00028
00029 template <class Channel, class MsgQueueType=ACE_Message_Queue<ACE_MT_SYNCH> >
00030 class Port : public Channel::Destination, public Channel::Source {
00031 typedef typename Channel::Msg Msg;
00032
00033 private:
00034 Channel *my_chan_;
00035 MsgQueueType *my_que_;
00036 ACE_Thread_Semaphore suspend_sema_;
00037 int num_blocked_thr;
00038 bool borrow_que_;
00039
00040 public:
00041
00042 Port(Channel *chan, MsgQueueType *que=NULL) :
00043 Channel::Destination(chan), Channel::Source(chan)
00044 {
00045 num_blocked_thr = 0;
00046 borrow_que_ = true;
00047 my_chan_ = chan;
00048 my_que_ = que;
00049 if (que == NULL) {
00050
00051 my_que_ = new MsgQueueType();
00052 borrow_que_ = false;
00053 }
00054 }
00055
00056 ~Port() {
00057 if (!borrow_que_)
00058 delete my_que_;
00059 }
00060
00063 Status put_msg(Msg *msg, ACE_Time_Value *timeout) {
00064
00065
00066
00067 ACE_Message_Block *mb = new ACE_Message_Block((const char*) msg, sizeof(Msg));
00068 mb->wr_ptr (sizeof(Msg));
00069 if(my_que_->enqueue_tail(mb, timeout)!=-1)
00070 return SUCCESS;
00071 return FAILURE;
00072 }
00073
00074 int num_pending_msg(void) {
00075 return my_que_->message_count();
00076 }
00077
00079 Status recv_msg(Msg *&msg)
00080 {
00081 ACE_Message_Block *mb = 0;
00082 int ret;
00083
00084 if(my_que_->state()==MsgQueueType::PULSED) {
00085 ACE_DEBUG ((LM_DEBUG,
00086 "(%t) task block on semaphore when que is pulsed\n"));
00087 num_blocked_thr++;
00088 ret = suspend_sema_.acquire();
00089 }
00090
00091 if(my_que_->dequeue_head (mb) == -1) {
00092 if(my_que_->state()!=MsgQueueType::PULSED)
00093 return FAILURE;
00094
00095 ACE_DEBUG ((LM_DEBUG,
00096 "(%t) task block on semaphore when que is pulsed\n"));
00097 num_blocked_thr++;
00098 ret = suspend_sema_.acquire();
00099 }
00100
00101 msg = (Msg *)mb->rd_ptr();
00102
00103
00104 mb->release();
00105 return SUCCESS;
00106 }
00107
00110 Status suspend(void)
00111 {
00112 my_que_->pulse();
00113 return SUCCESS;
00114 }
00116 Status resume(void)
00117 {
00118 my_que_->activate();
00119 for(int i=0; i<num_blocked_thr;i++)
00120 suspend_sema_.release();
00121 return SUCCESS;
00122 }
00123
00124 };
00125
00126 };
00127
00128
00129 #endif