00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00017 #ifndef _CONNECTOR_H_
00018 #define _CONNECTOR_H_
00019
00020
00021 #include "ace/Thread_Mutex.h"
00022 #include "ace/Thread_Semaphore.h"
00023 #include "ace/Synch.h"
00024 #include "ace/OS.h"
00025 #include "ace/os_include/os_netdb.h"
00026 #include "ace/Log_Msg.h"
00027 #include "ace/Synch_Options.h"
00028 #include "ace/Singleton.h"
00029 #include "ace/Recursive_Thread_Mutex.h"
00030 #include "ace/Thread_Manager.h"
00031 #include "ace/Reactor.h"
00032
00033
00034 #include <iostream>
00035 #include <string>
00036 #include <sstream>
00037 #include <vector>
00038 #include <map>
00039 #include <algorithm>
00040 #include <netinet/in.h>
00041
00042
00043 #include <BaseDef.h>
00044
00045 namespace channel {
00046
00047
00048 class Quit_Handler : public ACE_Event_Handler {
00049 friend class ace_dewarn_gplusplus;
00050 public:
00051 Quit_Handler (ACE_Reactor *r) : ACE_Event_Handler (r) {}
00052
00053 virtual int handle_exception (ACE_HANDLE) {
00054 ACE_DEBUG((LM_DEBUG, "Channel Reactor shutting down now...\n"));
00055 reactor ()->end_reactor_event_loop ();
00056 return -1;
00057 }
00058
00059 virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask)
00060 { delete this; return 0; }
00061
00062 private:
00063
00064
00065 virtual ~Quit_Handler () {}
00066 };
00067
00068 template <class, class> class ConnHandler;
00069 template <class, class> class OutputMgr;
00070 class ConnInfo;
00071 template <class, class> class MarshalerRegistry;
00072 template <class, class> class BinderRegistry;
00073 template <class, class> class Binder;
00074 template <class, class> class Filter;
00075 template <class, class> class Translator;
00076 class Marshaler;
00077
00083 template <class Channel, class Transport>
00084 class Connector : public Transport,
00085 public MarshalerRegistry<typename Channel::IdType, typename Channel::IdTrait>,
00086 public BinderRegistry<typename Channel::IdType, typename Channel::IdTrait> {
00087 template <class, class> friend class ConnHandler;
00088 template <class, class> friend class RemoteInterface;
00089
00090 public:
00091
00092 typedef ConnHandler<Channel, Transport> ConnHandler;
00093 typedef Interface<Channel> Interface;
00094 typedef RemoteInterface<Channel, ConnHandler> RemoteInterface;
00095 typedef Binder<typename Channel::IdType, typename Channel::IdTrait> Binder;
00096 typedef Filter<typename Channel::IdType, typename Channel::IdTrait> Filter;
00097 typedef Translator<typename Channel::IdType, typename Channel::IdTrait> Translator;
00098
00099 bool exit_start_;
00100 ACE_Thread_Semaphore exit_sema_;
00101
00102 protected:
00103
00105 OutputMgr<Channel, ConnHandler> output_mgr_;
00106 int num_thr_;
00107 bool driver_thread_;
00110 std::map<ConnInfo, ConnHandler *> conn_map_;
00111 ACE_Thread_Mutex conn_map_lock_;
00112
00113 public:
00114
00115 Channel *ch_;
00116
00117 Connector(Channel *mchan, bool dt = false, int nt=1) : Transport(this) {
00118 conn_map_.clear();
00119 exit_start_ = false;
00120 ch_ = mchan;
00121 num_thr_ = nt;
00122 driver_thread_ = dt;
00123 }
00124 ~Connector() {}
00125
00127 static Interface_Type type(void) {
00128 return Transport::type(); }
00129
00130
00131 static void * run_event_loop(void *arg) {
00132 ACE_UNUSED_ARG(arg);
00133 ACE_DEBUG((LM_DEBUG, "A event_loop thread started\n"));
00134 ACE_Reactor *reactor = ACE_Reactor::instance();
00135 reactor->owner(ACE_OS::thr_self());
00136 reactor->run_reactor_event_loop();
00137 return 0;
00138 }
00139
00141 Status open(std::string addr)
00142 {
00143 output_mgr_.num_thr(num_thr_);
00144 output_mgr_.open();
00145
00146 ConnInfo ci(addr);
00147 Transport::open(ci);
00148
00149 if (driver_thread_) startup();
00150
00151 return SUCCESS;
00152 }
00153
00155 Status open(void)
00156 {
00157 output_mgr_.num_thr(num_thr_);
00158 output_mgr_.open();
00159
00160 if (driver_thread_) startup();
00161
00162 return SUCCESS;
00163 }
00164
00165 Status startup(void) {
00166 ACE_Thread_Manager::instance()->spawn(run_event_loop);
00167 return SUCCESS;
00168 }
00169
00170 Status shutdown(void) {
00171 ACE_Reactor *reactor = ACE_Reactor::instance();
00172 Quit_Handler *quit_handler = 0;
00173 ACE_NEW_RETURN (quit_handler, Quit_Handler (reactor), 0);
00174 reactor->notify (quit_handler);
00175 return SUCCESS;
00176 }
00177
00179 Status close(void)
00180 {
00181 ACE_DEBUG ((LM_DEBUG, "(%t) Connector::close() before lock conn_map_lock...\n"));
00182 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, conn_map_lock_, FAILURE);
00183 ACE_DEBUG ((LM_DEBUG, "(%t) Connector::close() after lock conn_map_lock...\n"));
00184 ACE_DEBUG ((LM_DEBUG, "(%t) Connector first shutdown all writer threads...\n"));
00185 output_mgr_.shut_down();
00186 ACE_DEBUG ((LM_DEBUG, "(%t) Connector shutdown all reader threads...\n"));
00187 exit_start_=true;
00188 typename std::map<ConnInfo, ConnHandler *>::iterator iter;
00189 for(iter=conn_map_.begin(); iter!=conn_map_.end(); iter++)
00190 iter->second->shut_down();
00192 for(iter=conn_map_.begin(); iter!=conn_map_.end(); iter++)
00193 exit_sema_.acquire();
00194
00195 Transport::close();
00196
00197 return SUCCESS;
00198 }
00199
00203 Status connect(std::string addr_str, Binder *b = NULL)
00204 {
00205 ConnInfo addr(addr_str);
00206 return connect(addr, b);
00207 }
00208
00212 Status connect(ConnInfo addr, Binder *bind = NULL)
00213 {
00214 ConnInfo adr2(addr);
00215 Transport::get_map_index(adr2);
00216 if(is_connected(adr2))
00217 return SUCCESS;
00218 ConnHandler *ch = NULL;
00219 if(Transport::connect (addr, ch) != SUCCESS)
00220 return FAILURE;
00221 ch->rmt_addr_ = adr2;
00222
00223 if (bind != NULL)
00224 register_binder(adr2, bind);
00225 return SUCCESS;
00226 }
00227
00229 Status disconnect(std::string addr) {
00230 ACE_UNUSED_ARG(addr);
00231 return SUCCESS; }
00232
00236 Status add_conn(ConnInfo peer_addr, ConnHandler *ch)
00237 {
00238 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, conn_map_lock_, FAILURE);
00239 if(conn_map_.find(peer_addr) != conn_map_.end()) {
00240 ACE_DEBUG ((LM_DEBUG, "duplicate connection...\n"));
00241 return FAILURE;
00242 }
00243 conn_map_[peer_addr] = ch;
00244
00246 return SUCCESS;
00247 }
00248 Status del_conn(ConnHandler *ch)
00249 {
00250 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, conn_map_lock_, FAILURE);
00251 typename std::map<ConnInfo, ConnHandler*>::iterator iter;
00252 for(iter = conn_map_.begin(); iter != conn_map_.end(); iter++)
00253 if(iter->second == ch) {
00254 conn_map_.erase(iter->first);
00255 break;
00256 }
00257 return SUCCESS;
00258 }
00259 Status del_conn(ConnInfo addr)
00260 {
00261 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, conn_map_lock_, FAILURE);
00262 conn_map_.erase(addr);
00263 return SUCCESS;
00264 }
00265 bool is_connected(ConnInfo addr)
00266 {
00267 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, conn_map_lock_, FAILURE);
00268 if(conn_map_.find(addr) != conn_map_.end())
00269 return true;
00270 return false;
00271 }
00272
00273 };
00274
00275
00276 };
00277
00278 #endif