L:/channel/channel/include/OutputMgr.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00012 #ifndef _OUTPUT_MGR_
00013 #define _OUTPUT_MGR_
00014 
00015 #include "ace/Message_Block.h"
00016 #include "ace/Task.h"
00017 #include "ace/Thread_Manager.h"
00018 #include "ace/Thread_Mutex.h"
00019 #include "ace/Thread_Semaphore.h"
00020 #include "ace/Log_Msg.h"
00021 #include "ace/Signal.h"
00022 #include "ace/CDR_Stream.h"
00023 //for ACE_GUARD macros, need the following 2 headers
00024 #include "ace/Synch.h"
00025 #include "ace/OS.h"
00026 
00034 namespace channel {
00035 
00038   template <class Channel, class ConnHandler>
00039   class Out_Bound_Msg {
00040   public:
00041     typedef typename Channel::Msg Msg;
00042     Msg *msg;
00043     ConnHandler *conn;
00044     Out_Bound_Msg (Msg *m, ConnHandler *c) : msg(m), conn(c) {
00045     }
00046   };
00047 
00048 
00050   template <class Channel, class ConnHandler>
00051   class OutputMgr
00052     : public ACE_Task<ACE_MT_SYNCH> {
00053     private:
00054     int num_thr_;
00055     ACE_Thread_Mutex num_thr_lock_; //for gracely shutdown
00056     ACE_Thread_Semaphore exit_sema_; //for thread group exit gracely
00057 
00058     public:
00059     typedef typename Channel::Msg Msg;
00060 
00061     enum { QUEUE_MAX = 16 * 1024 }; //16k
00062 
00063     OutputMgr(int nt=1) {
00064       num_thr_ = nt;
00065     }
00066 
00067     int num_thr(void) { return num_thr_; }
00068   
00069     void num_thr(int nt) { num_thr_ = nt; }
00070 
00071     virtual int open () {// Initialization hook method.
00072       if (msg_queue ()->activate ()
00073           == ACE_Message_Queue_Base::ACTIVATED) {
00074         msg_queue ()->high_water_mark (QUEUE_MAX);
00075         return activate (THR_NEW_LWP | THR_DETACHED, num_thr_);
00076       } else return 0;
00077     }
00078 
00079     void shut_down(void) 
00080       {
00081         if(msg_queue()->state() != ACE_Message_Queue_Base::DEACTIVATED) {
00082           msg_queue()->deactivate(); //hope all blocked sending threads will wake up
00083         }
00084         for(int i=0; i<num_thr_; i++)
00085           exit_sema_.acquire();
00086       }
00087 
00088     virtual int close() { //is invoked in each thread
00089       //NO need to delete, since this is statically allocated by Connector
00090       //ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, num_thr_lock_, -1);
00091       //num_thr_--;
00092       //if(num_thr_==0) //i am the last guy
00093       //  delete this;
00094       ACE_DEBUG((LM_DEBUG, "OutputMgr close down, thread exit...\n"));
00095       exit_sema_.release();
00096       return 0;
00097     }
00098 
00100     virtual int put (ACE_Message_Block *mb, ACE_Time_Value *timeout = 0)
00101       {
00102         return putq (mb, timeout);
00103       }
00104 
00105     protected:
00106     virtual int svc ()
00107       {
00108         ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN);
00109         ACE_Sig_Action original_action;
00110         no_sigpipe.register_action (SIGPIPE, &original_action);
00111  
00112         for (;;) {
00113           ACE_Message_Block *mblk = 0;
00114           if (getq (mblk) == -1) {
00115             if (errno == ESHUTDOWN) { //msgque deactivated
00116               break;
00117             } else if (errno != EWOULDBLOCK) break;
00118           } else {
00119             if (send (mblk) == -1) {
00120               ACE_DEBUG((LM_DEBUG, "(%t) failed to send one msg...\n"));
00121             }
00122             mblk->release();
00123           }
00124         }
00125 
00126         no_sigpipe.restore_action (SIGPIPE, original_action);
00127         return 0;
00128       }
00129     virtual int send (ACE_Message_Block *mblk)
00130       {
00131         Out_Bound_Msg<Channel, ConnHandler> *out_msg = (Out_Bound_Msg<Channel, ConnHandler> *)mblk->rd_ptr();
00132         Msg *msg = out_msg->msg;
00133         ConnHandler *connhndl = out_msg->conn;
00134         return connhndl->send_msg(msg);
00135       }
00136   };
00137 
00138 };
00139 
00140 #endif

Generated on Mon Feb 27 19:59:21 2006 for channel by  doxygen 1.4.6-NO