L:/channel/channel/include/Connector.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00017 #ifndef _CONNECTOR_H_
00018 #define _CONNECTOR_H_
00019 
00020 //ACE headers 
00021 #include "ace/Thread_Mutex.h"
00022 #include "ace/Thread_Semaphore.h"
00023 #include "ace/Synch.h"
00024 #include "ace/OS.h"
00025 #include "ace/os_include/os_netdb.h"
00026 #include "ace/Log_Msg.h"
00027 #include "ace/Synch_Options.h"
00028 #include "ace/Singleton.h"
00029 #include "ace/Recursive_Thread_Mutex.h"
00030 #include "ace/Thread_Manager.h"
00031 #include "ace/Reactor.h"
00032 
00033 //std
00034 #include <iostream>
00035 #include <string>
00036 #include <sstream>
00037 #include <vector>
00038 #include <map>
00039 #include <algorithm>
00040 #include <netinet/in.h>
00041 
00042 //channel
00043 #include <BaseDef.h>
00044 
00045 namespace channel {
00046  
00047   //ask the main-loop of connector's reactor to shutdown nicely
00048   class Quit_Handler : public ACE_Event_Handler {
00049     friend class ace_dewarn_gplusplus;
00050   public:
00051     Quit_Handler (ACE_Reactor *r) : ACE_Event_Handler (r) {}
00052 
00053     virtual int handle_exception (ACE_HANDLE) {
00054       ACE_DEBUG((LM_DEBUG, "Channel Reactor shutting down now...\n"));
00055       reactor ()->end_reactor_event_loop ();
00056       return -1; // Trigger call to handle_close() method.
00057     }
00058 
00059     virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask)
00060       { delete this; return 0; }
00061 
00062   private:
00063 
00064     // Private destructor ensures dynamic allocation.
00065     virtual ~Quit_Handler () {}
00066   };
00067 
00068   template <class, class> class ConnHandler;
00069   template <class, class> class OutputMgr;
00070   class ConnInfo;
00071   template <class, class> class MarshalerRegistry;
00072   template <class, class> class BinderRegistry;
00073   template <class, class> class Binder;
00074   template <class, class> class Filter;
00075   template <class, class> class Translator;
00076   class Marshaler;
00077 
00083   template <class Channel, class Transport>
00084   class Connector : public Transport, 
00085     public MarshalerRegistry<typename Channel::IdType, typename Channel::IdTrait>,
00086     public BinderRegistry<typename Channel::IdType, typename Channel::IdTrait> {
00087     template <class, class> friend class ConnHandler;
00088     template <class, class> friend class RemoteInterface;
00089 
00090   public:
00091 
00092     typedef ConnHandler<Channel, Transport> ConnHandler;
00093     typedef Interface<Channel> Interface;
00094     typedef RemoteInterface<Channel, ConnHandler> RemoteInterface;
00095     typedef Binder<typename Channel::IdType, typename Channel::IdTrait> Binder;
00096     typedef Filter<typename Channel::IdType, typename Channel::IdTrait> Filter;
00097     typedef Translator<typename Channel::IdType, typename Channel::IdTrait> Translator;
00098 
00099     bool exit_start_; 
00100     ACE_Thread_Semaphore exit_sema_; 
00101 
00102   protected:
00103 
00105     OutputMgr<Channel, ConnHandler> output_mgr_;
00106     int num_thr_;
00107     bool driver_thread_;
00110     std::map<ConnInfo, ConnHandler *> conn_map_; 
00111     ACE_Thread_Mutex conn_map_lock_;
00112 
00113   public:
00114 
00115     Channel *ch_;
00116 
00117     Connector(Channel *mchan, bool dt = false, int nt=1) : Transport(this) {
00118       conn_map_.clear();
00119       exit_start_ = false;
00120       ch_ = mchan;
00121       num_thr_ = nt;
00122       driver_thread_ = dt;
00123     }
00124     ~Connector() {}
00125 
00127     static Interface_Type type(void) { 
00128       return Transport::type(); }
00129 
00130     //the main loop driving connector's reactor
00131     static void * run_event_loop(void *arg) {
00132       ACE_UNUSED_ARG(arg);
00133       ACE_DEBUG((LM_DEBUG, "A event_loop thread started\n"));
00134       ACE_Reactor *reactor = ACE_Reactor::instance();
00135       reactor->owner(ACE_OS::thr_self());
00136       reactor->run_reactor_event_loop();
00137       return 0;
00138     }
00139 
00141     Status open(std::string addr)
00142       {
00143         output_mgr_.num_thr(num_thr_);
00144         output_mgr_.open();
00145 
00146         ConnInfo ci(addr);
00147         Transport::open(ci);
00148 
00149         if (driver_thread_) startup();
00150 
00151         return SUCCESS;
00152       }
00153 
00155     Status open(void)
00156       {
00157         output_mgr_.num_thr(num_thr_);
00158         output_mgr_.open();
00159 
00160         if (driver_thread_) startup();
00161 
00162         return SUCCESS;
00163       }
00164 
00165     Status startup(void) {
00166       ACE_Thread_Manager::instance()->spawn(run_event_loop);
00167       return SUCCESS;
00168     }
00169 
00170     Status shutdown(void) {
00171       ACE_Reactor *reactor = ACE_Reactor::instance();
00172       Quit_Handler *quit_handler = 0;
00173       ACE_NEW_RETURN (quit_handler, Quit_Handler (reactor), 0);
00174       reactor->notify (quit_handler);
00175       return SUCCESS;
00176     }
00177   
00179     Status close(void)
00180       {
00181         ACE_DEBUG ((LM_DEBUG, "(%t) Connector::close() before lock conn_map_lock...\n"));
00182         ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, conn_map_lock_, FAILURE);
00183         ACE_DEBUG ((LM_DEBUG, "(%t) Connector::close() after lock conn_map_lock...\n"));
00184         ACE_DEBUG ((LM_DEBUG, "(%t) Connector first shutdown all writer threads...\n"));
00185         output_mgr_.shut_down(); 
00186         ACE_DEBUG ((LM_DEBUG, "(%t) Connector shutdown all reader threads...\n"));
00187         exit_start_=true;
00188         typename std::map<ConnInfo, ConnHandler *>::iterator iter;
00189         for(iter=conn_map_.begin(); iter!=conn_map_.end(); iter++)
00190           iter->second->shut_down();
00192         for(iter=conn_map_.begin(); iter!=conn_map_.end(); iter++)
00193           exit_sema_.acquire();
00194 
00195         Transport::close();
00196 
00197         return SUCCESS;
00198       }
00199 
00203     Status connect(std::string addr_str, Binder *b = NULL)
00204       {
00205         ConnInfo addr(addr_str);
00206         return connect(addr, b);
00207       }
00208 
00212     Status connect(ConnInfo addr, Binder *bind = NULL)
00213       {
00214         ConnInfo adr2(addr);
00215         Transport::get_map_index(adr2);
00216         if(is_connected(adr2))
00217           return SUCCESS;
00218         ConnHandler *ch = NULL;
00219         if(Transport::connect (addr, ch) != SUCCESS)
00220           return FAILURE;
00221         ch->rmt_addr_ = adr2;
00222         //register binders(filters+translators) here
00223         if (bind != NULL)
00224           register_binder(adr2, bind);
00225         return SUCCESS;
00226       }
00227 
00229     Status disconnect(std::string addr) { 
00230       ACE_UNUSED_ARG(addr);
00231       return SUCCESS; }
00232     
00236     Status add_conn(ConnInfo peer_addr, ConnHandler *ch)
00237       {
00238         ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, conn_map_lock_, FAILURE);
00239         if(conn_map_.find(peer_addr) != conn_map_.end()) {
00240           ACE_DEBUG ((LM_DEBUG, "duplicate connection...\n"));
00241           return FAILURE; 
00242         }
00243         conn_map_[peer_addr] = ch;
00244 
00246         return SUCCESS;
00247       }
00248     Status del_conn(ConnHandler *ch)
00249       {
00250         ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, conn_map_lock_, FAILURE);
00251         typename std::map<ConnInfo, ConnHandler*>::iterator iter;
00252         for(iter = conn_map_.begin(); iter != conn_map_.end(); iter++) 
00253           if(iter->second == ch) {
00254             conn_map_.erase(iter->first);
00255             break;
00256           }
00257         return SUCCESS;
00258       }
00259     Status del_conn(ConnInfo addr)
00260       {
00261         ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, conn_map_lock_, FAILURE);
00262         conn_map_.erase(addr);
00263         return SUCCESS;
00264       }
00265     bool is_connected(ConnInfo addr)
00266       {
00267         ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, conn_map_lock_, FAILURE);
00268         if(conn_map_.find(addr) != conn_map_.end())
00269           return true;
00270         return false;
00271       }
00272 
00273   };
00274 
00275 
00276 };
00277 
00278 #endif

Generated on Mon Feb 27 19:59:21 2006 for channel by  doxygen 1.4.6-NO