L:/channel/channel/include/UnixSockConnector.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00012 #ifndef _UNIX_SOCK_CONNECTOR_
00013 #define _UNIX_SOCK_CONNECTOR_
00014 
00015 //for unix domain socket
00016 #include "ace/LSOCK_Connector.h"
00017 #include "ace/LSOCK_Acceptor.h"
00018 #include "ace/UNIX_Addr.h"
00019 #include "ace/SOCK_Stream.h"
00020 //acceptor-connector-svc_handler framework
00021 #include "ace/Acceptor.h"
00022 #include "ace/Connector.h"
00023 #include "ace/Svc_Handler.h"
00024 //for msg marshalling/demarshalling
00025 #include "ace/CDR_Stream.h"
00026 #include "ace/Message_Block.h"
00027 //other major headers
00028 #include "ace/Task.h"
00029 #include "ace/Thread_Manager.h"
00030 #include "ace/Thread_Mutex.h"
00031 #include "ace/Thread_Semaphore.h"
00032 #include "ace/Get_Opt.h"
00033 #include "ace/Reactor.h"
00034 #include "ace/Service_Object.h"
00035 #include "ace/Signal.h"
00036 #include "ace/Handle_Set.h"
00037 #include "ace/FILE_Addr.h"
00038 #include "ace/OS.h"
00039 #include "ace/os_include/os_netdb.h"
00040 #include "ace/Log_Msg.h"
00041 #include "ace/Synch_Options.h"
00042 
00043 
00044 //channel
00045 #include <BaseDef.h>
00046 
00047 //std
00048 #include <iostream>
00049 #include <string>
00050 #include <sstream>
00051 #include <vector>
00052 #include <algorithm>
00053 #include <netinet/in.h>
00054 
00059 namespace channel {
00060 
00061   class ConnInfo;
00062   template <class, class> class ConnHandler;
00063   template <class, class> class Connector;
00064   template <class> class UnixSockTransport;
00065 
00066   char *unix_path_prefix = "/tmp/";
00067   char *UNDEFINED_UNIX_ADDR ="undefined";
00068   char *DEFAULT_UNIX_ADDR ="chan_def_addr";
00069 
00072   template <class Channel>
00073   class Unix_ConnHandler
00074     : public ACE_Svc_Handler<ACE_LSOCK_STREAM, ACE_NULL_SYNCH>, 
00075       public ConnHandler<Channel, UnixSockTransport<Channel> > {
00076     public:
00077     typedef ACE_Svc_Handler<ACE_LSOCK_STREAM, ACE_NULL_SYNCH> PARENT;
00078     Unix_ConnHandler () {}
00079     virtual int open (void *) // Initialization hook method.
00080       {
00081         ACE_DEBUG ((LM_DEBUG, "a new unix conn comes up...\n"));
00082 
00083         return activate (THR_NEW_LWP | THR_DETACHED);
00084       }
00085     virtual int close(u_long flag=0)
00086       {
00087         ACE_UNUSED_ARG(flag);
00088         ACE_DEBUG ((LM_DEBUG, "(%t) Unix_ConnHandler<Channel>::close\n"));
00089         
00090         if(this->cm_ != NULL && this->cm_->exit_start_) {
00091           ACE_DEBUG ((LM_DEBUG, "(%t) Unix_ConnHandler<Channel>::close: release exit_sema_\n"));
00092           this->cm_->exit_sema_.release();
00093           //free resources
00094           PARENT::close();
00095         }
00096         return 0;
00097       }
00098     virtual void shut_down(void) {
00099       //close the stream, hope the reader thread will wake up
00100       //try to shutdown the thread and clean up data gracely
00101       //peer().close();
00102       ACE_DEBUG((LM_DEBUG, "one unix domain socket close...\n"));
00103       ACE_OS::close(get_handle());
00104     }
00105     virtual ACE_SOCK_Stream &peer_stream(void) {
00106       return (ACE_SOCK_Stream &) ACE_Svc_Handler<ACE_LSOCK_STREAM, ACE_NULL_SYNCH>::peer();
00107     }
00108     virtual int svc (void) { return ConnHandler<Channel, UnixSockTransport<Channel> >::service(); }
00109   };
00110 
00112   template <class Channel>
00113   class Unix_Acceptor
00114     : public ACE_Acceptor<Unix_ConnHandler<Channel>, ACE_LSOCK_Acceptor> {
00115     public:
00116     typedef ACE_Acceptor<Unix_ConnHandler<Channel>, ACE_LSOCK_Acceptor>
00117       PARENT;
00118 
00119     Connector<Channel, UnixSockTransport<Channel> > *cm_;
00120     virtual int activate_svc_handler (Unix_ConnHandler<Channel> *sh)
00121       {
00122         ACE_DEBUG((LM_DEBUG, "unix acceptor recv one conn...\n"));
00123         //hook up connection handler with framework
00124         sh->set_up(cm_, PASSIVE_ROLE);
00125         //activate threads etc
00126         if(PARENT::activate_svc_handler(sh)==-1) return -1;
00127         return 0;
00128       }
00129 
00130     //all use default behaviour
00131   };
00132 
00134   template <class Channel>
00135   class Unix_Connector
00136     : public ACE_Connector<Unix_ConnHandler<Channel>, ACE_LSOCK_Connector> {
00137     public:
00138     typedef ACE_Connector<Unix_ConnHandler<Channel>, ACE_LSOCK_Connector>
00139       PARENT;
00140 
00141     Connector<Channel, UnixSockTransport<Channel> > *cm_;
00142     virtual int activate_svc_handler (Unix_ConnHandler<Channel> *sh)
00143       {
00144         ACE_DEBUG((LM_DEBUG, "unix connector detects one conn...\n"));
00145         //hook up connection handler with framework
00146         sh->set_up(cm_, ACTIVE_ROLE);
00147         //activate threads etc
00148         if(PARENT::activate_svc_handler(sh)==-1) return -1;
00149         return 0;
00150       }
00151 
00152     //all use default behaviours 
00153   };
00154 
00155 
00157   template <class Channel>
00158   class  UnixSockTransport {
00159     typedef ConnHandler<Channel, UnixSockTransport<Channel> > ConnHandler;
00160     typedef Connector<Channel, UnixSockTransport<Channel> > Connector;
00161   private:
00162 
00163     // address I am listening for new connections
00164     std::string unix_addr_;
00165     // Factory that passively wait for connections
00166     Unix_Acceptor<Channel> unix_acceptor_;
00167     // Factory that actively connects to remotes
00168     Unix_Connector<Channel> unix_connector_;
00169 
00170   public:
00171     
00172     UnixSockTransport(Connector *c) {
00173       unix_acceptor_.cm_ = c;
00174       unix_connector_.cm_ = c;
00175       unix_addr_ = UNDEFINED_UNIX_ADDR;
00176     }
00177     static Interface_Type type(void) { return UNIX_SOCK; }
00178 
00179     //helpers
00180     std::string unix_addr(void) { return unix_addr_; }
00181     
00183     Status get_map_index(ConnInfo &addr)
00184       {
00185         //reuse current ConnInfo as conn_map_ index
00186         ACE_UNUSED_ARG(addr);
00187         return SUCCESS;
00188       }
00189 
00191     Status open(ConnInfo ci)
00192       {
00193         unix_addr_ = ci.unix_addr();
00194 
00195         ACE_DEBUG ((LM_DEBUG, "============= My Info ==============\n"));
00196 
00197         if(unix_addr_ != UNDEFINED_UNIX_ADDR) {
00198           std::string unix_path = unix_path_prefix+unix_addr_;
00199           ACE_OS::unlink (unix_path.c_str());
00200           ACE_UNIX_Addr unaddr(unix_path.c_str());
00201           if(unix_acceptor_.open(unaddr) == -1) {//start listening for incoming connections
00202             ACE_DEBUG ((LM_DEBUG, "fail to open unix sock [%s]...\n", unix_path.c_str()));
00203             return FAILURE;
00204           }
00205           ACE_DEBUG ((LM_DEBUG, "open unix sock [%s]...\n", unix_path.c_str()));
00206         }
00207         ACE_DEBUG ((LM_DEBUG, "=================================\n"));
00208         return SUCCESS;
00209       }
00210     Status close(void)
00211       {
00212         if(unix_addr_ != UNDEFINED_UNIX_ADDR) 
00213           unix_connector_.close(); //?
00214         return SUCCESS;
00215       }
00216 
00218     Status connect(ConnInfo addr, ConnHandler *&conn_handler)
00219       {
00220         ACE_DEBUG ((LM_DEBUG, "UnixSockConnector<Channel>::connect...\n"));
00221  
00222         addr.dump();
00223 
00224         Unix_ConnHandler<Channel> *handler=NULL;
00225         std::string unix_path = unix_path_prefix+addr.unix_addr();
00226         ACE_UNIX_Addr unaddr(unix_path.c_str());
00227       
00228         handler = new Unix_ConnHandler<Channel>();
00229         if(unix_connector_.connect(handler, unaddr, ACE_Synch_Options::synch) == -1) //ask connector create conn handler
00230           if (errno != EWOULDBLOCK) {
00231             ACE_DEBUG ((LM_DEBUG, " !!! server %s is DOWN !!!\n", unaddr.get_path_name()));
00232             delete handler;
00233             return FAILURE; //don't fail, go on for others
00234           }
00235         conn_handler = handler;
00236         ACE_DEBUG ((LM_DEBUG, "UnixSockConnector<Channel>::connect...finish...\n"));
00237         return SUCCESS;
00238       }
00239     Status disconnect(ConnInfo addr) { 
00240       ACE_UNUSED_ARG(addr);
00241       return SUCCESS; }
00242 
00243   };
00244 
00245 };
00246 
00247 #endif
00248 

Generated on Mon Feb 27 19:59:21 2006 for channel by  doxygen 1.4.6-NO