00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00012 #ifndef _OUTPUT_MGR_
00013 #define _OUTPUT_MGR_
00014
00015 #include "ace/Message_Block.h"
00016 #include "ace/Task.h"
00017 #include "ace/Thread_Manager.h"
00018 #include "ace/Thread_Mutex.h"
00019 #include "ace/Thread_Semaphore.h"
00020 #include "ace/Log_Msg.h"
00021 #include "ace/Signal.h"
00022 #include "ace/CDR_Stream.h"
00023
00024 #include "ace/Synch.h"
00025 #include "ace/OS.h"
00026
00034 namespace channel {
00035
00038 template <class Channel, class ConnHandler>
00039 class Out_Bound_Msg {
00040 public:
00041 typedef typename Channel::Msg Msg;
00042 Msg *msg;
00043 ConnHandler *conn;
00044 Out_Bound_Msg (Msg *m, ConnHandler *c) : msg(m), conn(c) {
00045 }
00046 };
00047
00048
00050 template <class Channel, class ConnHandler>
00051 class OutputMgr
00052 : public ACE_Task<ACE_MT_SYNCH> {
00053 private:
00054 int num_thr_;
00055 ACE_Thread_Mutex num_thr_lock_;
00056 ACE_Thread_Semaphore exit_sema_;
00057
00058 public:
00059 typedef typename Channel::Msg Msg;
00060
00061 enum { QUEUE_MAX = 16 * 1024 };
00062
00063 OutputMgr(int nt=1) {
00064 num_thr_ = nt;
00065 }
00066
00067 int num_thr(void) { return num_thr_; }
00068
00069 void num_thr(int nt) { num_thr_ = nt; }
00070
00071 virtual int open () {
00072 if (msg_queue ()->activate ()
00073 == ACE_Message_Queue_Base::ACTIVATED) {
00074 msg_queue ()->high_water_mark (QUEUE_MAX);
00075 return activate (THR_NEW_LWP | THR_DETACHED, num_thr_);
00076 } else return 0;
00077 }
00078
00079 void shut_down(void)
00080 {
00081 if(msg_queue()->state() != ACE_Message_Queue_Base::DEACTIVATED) {
00082 msg_queue()->deactivate();
00083 }
00084 for(int i=0; i<num_thr_; i++)
00085 exit_sema_.acquire();
00086 }
00087
00088 virtual int close() {
00089
00090
00091
00092
00093
00094 ACE_DEBUG((LM_DEBUG, "OutputMgr close down, thread exit...\n"));
00095 exit_sema_.release();
00096 return 0;
00097 }
00098
00100 virtual int put (ACE_Message_Block *mb, ACE_Time_Value *timeout = 0)
00101 {
00102 return putq (mb, timeout);
00103 }
00104
00105 protected:
00106 virtual int svc ()
00107 {
00108 ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN);
00109 ACE_Sig_Action original_action;
00110 no_sigpipe.register_action (SIGPIPE, &original_action);
00111
00112 for (;;) {
00113 ACE_Message_Block *mblk = 0;
00114 if (getq (mblk) == -1) {
00115 if (errno == ESHUTDOWN) {
00116 break;
00117 } else if (errno != EWOULDBLOCK) break;
00118 } else {
00119 if (send (mblk) == -1) {
00120 ACE_DEBUG((LM_DEBUG, "(%t) failed to send one msg...\n"));
00121 }
00122 mblk->release();
00123 }
00124 }
00125
00126 no_sigpipe.restore_action (SIGPIPE, original_action);
00127 return 0;
00128 }
00129 virtual int send (ACE_Message_Block *mblk)
00130 {
00131 Out_Bound_Msg<Channel, ConnHandler> *out_msg = (Out_Bound_Msg<Channel, ConnHandler> *)mblk->rd_ptr();
00132 Msg *msg = out_msg->msg;
00133 ConnHandler *connhndl = out_msg->conn;
00134 return connhndl->send_msg(msg);
00135 }
00136 };
00137
00138 };
00139
00140 #endif