L:/channel/cvm/include/CvmBaseConnector.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00017 #ifndef _CVM_BASE_CONNECTOR_H_
00018 #define _CVM_BASE_CONNECTOR_H_
00019 
00020 #include "ace/Dynamic_Service.h"
00021 
00022 #include <Channel.h>
00023 #include <CvmService.h>
00024 
00025 #include <iostream>
00026 #include <vector>
00027 #include <string>
00028 
00029 
00030 namespace cvm {
00031 
00034   template <class Channel, class Transport>
00035     class CvmBaseConnector : public CvmService {
00036     public:
00037     typedef channel::Connector<Channel, Transport> Connector;
00038 
00039     Connector *conn_;
00040     Connector *conn(void) { return conn_; }
00042     virtual int init (int argc, ACE_TCHAR *argv[])
00043       {
00044         //-------------- Channel init -----------------
00045         ACE_TCHAR tcp_port[MAXHOSTNAMELEN];
00046         ACE_TCHAR unix_addr[MAXHOSTNAMELEN];
00047         ACE_TCHAR rmt_addr[MAXHOSTNAMELEN];
00048         int num_thr=1;  //default one thread in out_mgr pool
00049         strcpy(unix_addr, channel::UNDEFINED_UNIX_ADDR);
00050         std::vector<std::string> rmt_addrs;
00051         ACE_TCHAR cn[128];
00052         cn[0]='\0';
00053 
00054         conn_ = NULL;
00055 
00056         bool my_port_defined = false;
00057         bool my_unix_defined = false;
00058         bool rmt_addr_defined = false;
00059 
00060         ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:u:r:t:c:"), 0);
00061         get_opt.long_option (ACE_TEXT ("listen_port"), 'p',
00062                              ACE_Get_Opt::ARG_REQUIRED);
00063         get_opt.long_option (ACE_TEXT ("unix_addr"), 'u',
00064                              ACE_Get_Opt::ARG_REQUIRED);
00065         get_opt.long_option (ACE_TEXT ("rmt_addr"), 'r',
00066                              ACE_Get_Opt::ARG_REQUIRED);
00067         get_opt.long_option (ACE_TEXT ("num_thr"), 't',
00068                              ACE_Get_Opt::ARG_REQUIRED);
00069         get_opt.long_option (ACE_TEXT ("chan"), 'c',
00070                              ACE_Get_Opt::ARG_REQUIRED);
00071 
00072         for (int c; (c = get_opt ()) != -1;)
00073           switch (c) {
00074           case 'u': // unix_addr this chan is listening at
00075             my_unix_defined = true;
00076             ACE_OS::strsncpy
00077               (unix_addr, get_opt.opt_arg (), MAXHOSTNAMELEN);
00078             break;
00079           case 'p': // tcp port this chan is listening at
00080             my_port_defined = true;
00081             ACE_OS::strsncpy
00082               (tcp_port, get_opt.opt_arg (), MAXHOSTNAMELEN);
00083             break;
00084           case 't': // how many threads should the sock sender pool be
00085             num_thr = ACE_static_cast
00086               (int, ACE_OS::atoi (get_opt.opt_arg ()));
00087             break;
00088           case 'r': // remote chan addr to connect with
00089             rmt_addr_defined = true;
00090             ACE_OS::strsncpy
00091               (rmt_addr, get_opt.opt_arg (), MAXHOSTNAMELEN);
00092             rmt_addrs.push_back(std::string(rmt_addr));
00093             break;
00094           case 'c': // channel name
00095             ACE_OS::strsncpy
00096               (cn, get_opt.opt_arg (), 128);
00097             break;
00098           }
00099   
00100         
00101         ACE_DEBUG((LM_DEBUG, "(%t) thr_num=%d\n", num_thr));
00102         
00103         if (!my_port_defined && Connector::type() == channel::INET_SOCK) {
00104           ACE_DEBUG((LM_DEBUG, "(%t) No info about tcp port , failed...\n"));
00105           return -1;
00106         } else
00107           ACE_DEBUG((LM_DEBUG, "(%t) port=%d\n",tcp_port));
00108 
00109         if (!my_unix_defined && Connector::type() == channel::UNIX_SOCK) {
00110           ACE_DEBUG((LM_DEBUG, "(%t) No info about unix domain socket, failed...\n"));
00111           return -1;
00112         } else
00113           ACE_DEBUG((LM_DEBUG, "(%t) unix_addr=%s\n", unix_addr));
00114 
00115         Channel *ch_ = CvmBaseChannel<Channel>::find_chan(cn);
00116         if (ch_ == NULL) {
00117           ACE_DEBUG ((LM_DEBUG,
00118                       "(%t) Connector not bound to any Channel ...\n"));
00119           return -1;
00120         }
00121   
00122         conn_ = new Connector(ch_, false/*no-internal-driver-thr*/, num_thr);
00123         if(Connector::type() == channel::INET_SOCK) {
00124           if (conn_->open (tcp_port) == channel::FAILURE) return -1;
00125         } else if(my_unix_defined) {
00126           if (conn_->open (unix_addr) == channel::FAILURE) return -1;
00127         }
00128 
00129         if (rmt_addr_defined) {
00130           std::vector<std::string>::iterator iter;
00131           for(iter=rmt_addrs.begin(); iter!=rmt_addrs.end(); iter++) {
00132             channel::ConnInfo ci;
00133             ci.parse((*iter));
00134             if(ci.valid()) {
00135               switch(ci.type()) {
00136               case channel::INET_SOCK:
00137                 if (conn_ != NULL && Connector::type() == channel::INET_SOCK) {
00138                   ACE_DEBUG((LM_DEBUG, "(%t) try to connect to ip=%s port=%d\n", ci.ip().c_str(), ci.port()));
00139                   if (conn_->connect (ci) == channel::FAILURE)
00140                     { 
00141                       continue;
00142                     }
00143                 } else {
00144                   ACE_DEBUG((LM_DEBUG, "(%t) No tcp_connector drop ip=%s port=%d\n", ci.ip().c_str(), ci.port()));
00145                 }
00146                 break;
00147               case channel::UNIX_SOCK:
00148                 if (conn_ != NULL && Connector::type() == channel::UNIX_SOCK) {
00149                   ACE_DEBUG((LM_DEBUG, "(%t) try to connect to unix_addr=%s\n" ,ci.unix_addr().c_str()));
00150                   if (conn_->connect (ci) == channel::FAILURE)
00151                     { 
00152                       continue;
00153                     }
00154                 } else {
00155                   ACE_DEBUG((LM_DEBUG, "(%t) No unix_connector drop unix_addr=%s\n" ,ci.unix_addr().c_str())); 
00156                 }
00157                 break;
00158               default:
00159                 break;
00160               }
00161             }
00162           }
00163         }
00164 
00165         ACE_DEBUG((LM_DEBUG, "### Connector is up and running ... ###\n"));
00166 
00167         return 0;
00168       }
00169     virtual int close (u_long)
00170       {
00171         //do nothing
00172         return 0;
00173       }
00174     virtual int fini ()
00175       {
00176         ACE_DEBUG((LM_DEBUG, "### Channel [%s] is shuting down ... ###\n", name().c_str()));
00177 
00178         conn_->close ();  
00179         delete conn_;
00180 
00181         ACE_DEBUG((LM_DEBUG, "Channel/Connector shutdown and ask all reader/writer threads exit...\n"));
00182         ACE_DEBUG((LM_DEBUG, "all reader/writeer threads exit...\n"));
00183         ACE_DEBUG((LM_DEBUG, "shutdown clock thread now...\n"));
00184         channel::Clock::instance()->stop();
00185         ACE_DEBUG((LM_DEBUG, "clock thread exit...\n"));
00186         //ACE_Reactor::end_event_loop ();
00187         ACE_OS::sleep(1);  //sleep 1 sec to give other threads another chance
00188         return 0;
00189       }
00190     virtual int info (ACE_TCHAR **strp, size_t length = 0) const
00191       {
00192         char buf[256];
00193 
00194         ACE_OS::sprintf (buf,
00195                          "CvmBaseConnector coming up ... \n");
00196 
00197         if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
00198           return -1;
00199         else
00200           ACE_OS::strncpy (*strp, buf, length);
00201         return ACE_OS::strlen (buf);
00202       }
00203     //virtual int suspend ();
00204     //virtual int resume ();
00205 
00206     static Connector * find_connector(const char* name) //find dynamic connectors
00207       {
00208         CvmBaseConnector<Channel, Transport> *d = ACE_Dynamic_Service<CvmBaseConnector<Channel, Transport> >::instance(ACE_TEXT(name));
00209         if (d==NULL)
00210           return NULL;
00211         else 
00212           return d->conn();
00213       }
00214   };
00215 
00216 };
00217 
00218 #endif

Generated on Mon Feb 27 19:59:22 2006 for channel by  doxygen 1.4.6-NO