L:/channel/channel/include/Port.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00018 #ifndef _PORT_H_
00019 #define _PORT_H_
00020 
00021 #include "ace/Message_Queue.h"
00022 #include "ace/Log_Msg.h"
00023 
00024 #include <BaseDef.h>
00025 
00026 
00027 namespace channel {
00028 
00029   template <class Channel, class MsgQueueType=ACE_Message_Queue<ACE_MT_SYNCH> >
00030   class  Port : public Channel::Destination, public Channel::Source {
00031     typedef typename Channel::Msg Msg;
00032     
00033   private:
00034     Channel *my_chan_;
00035     MsgQueueType *my_que_;
00036     ACE_Thread_Semaphore suspend_sema_;
00037     int num_blocked_thr;
00038     bool borrow_que_;
00039 
00040   public:
00041 
00042     Port(Channel *chan, MsgQueueType *que=NULL) : 
00043       Channel::Destination(chan), Channel::Source(chan)
00044       {
00045         num_blocked_thr = 0;
00046         borrow_que_ = true;
00047         my_chan_ = chan;
00048         my_que_ = que;
00049         if (que == NULL) {
00050           //allocate a new private msg_que
00051           my_que_ = new MsgQueueType();
00052           borrow_que_ = false;
00053         }
00054       }
00055 
00056     ~Port() {
00057       if (!borrow_que_)
00058         delete my_que_;
00059     }
00060 
00063     Status put_msg(Msg *msg, ACE_Time_Value *timeout) {
00064       //Note: since here mb (msg block) doesn't create/own the mem space for msg,
00065       //mb and its data will be release() and delete separately and explicitly.
00066       //internally it is marked as DONT_DELETE
00067       ACE_Message_Block *mb = new ACE_Message_Block((const char*) msg, sizeof(Msg));
00068       mb->wr_ptr (sizeof(Msg));
00069       if(my_que_->enqueue_tail(mb, timeout)!=-1)
00070         return SUCCESS;
00071       return FAILURE;
00072     }
00073 
00074     int num_pending_msg(void) {
00075       return my_que_->message_count();
00076     }
00077 
00079     Status recv_msg(Msg *&msg)
00080       {
00081         ACE_Message_Block *mb = 0;
00082         int ret;
00083 
00084         if(my_que_->state()==MsgQueueType::PULSED) {
00085           ACE_DEBUG ((LM_DEBUG,
00086                       "(%t) task block on semaphore when que is pulsed\n"));
00087           num_blocked_thr++;
00088           ret = suspend_sema_.acquire();
00089         }
00090 
00091         if(my_que_->dequeue_head (mb) == -1) {  //wait indefinitely
00092           if(my_que_->state()!=MsgQueueType::PULSED)
00093             return FAILURE;     
00094         
00095           ACE_DEBUG ((LM_DEBUG,
00096                       "(%t) task block on semaphore when que is pulsed\n"));
00097           num_blocked_thr++;
00098           ret = suspend_sema_.acquire();
00099         }
00100         //forward message
00101         msg = (Msg *)mb->rd_ptr();
00102         //dont worry about msg (the internal data), it is already
00103         //marked DONT_DELETE when msg is wrapped into mblk
00104         mb->release();
00105         return SUCCESS;
00106       }
00107 
00110     Status suspend(void)
00111       {
00112         my_que_->pulse();
00113         return SUCCESS; //add later
00114       }
00116     Status resume(void)
00117       {
00118         my_que_->activate();
00119         for(int i=0; i<num_blocked_thr;i++)
00120           suspend_sema_.release();  //release my threads
00121         return SUCCESS; //add later
00122       }
00123 
00124   };
00125 
00126 };
00127 
00128 
00129 #endif

Generated on Mon Feb 27 19:59:21 2006 for channel by  doxygen 1.4.6-NO