00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00017 #ifndef _CVM_BASE_CONNECTOR_H_
00018 #define _CVM_BASE_CONNECTOR_H_
00019
00020 #include "ace/Dynamic_Service.h"
00021
00022 #include <Channel.h>
00023 #include <CvmService.h>
00024
00025 #include <iostream>
00026 #include <vector>
00027 #include <string>
00028
00029
00030 namespace cvm {
00031
00034 template <class Channel, class Transport>
00035 class CvmBaseConnector : public CvmService {
00036 public:
00037 typedef channel::Connector<Channel, Transport> Connector;
00038
00039 Connector *conn_;
00040 Connector *conn(void) { return conn_; }
00042 virtual int init (int argc, ACE_TCHAR *argv[])
00043 {
00044
00045 ACE_TCHAR tcp_port[MAXHOSTNAMELEN];
00046 ACE_TCHAR unix_addr[MAXHOSTNAMELEN];
00047 ACE_TCHAR rmt_addr[MAXHOSTNAMELEN];
00048 int num_thr=1;
00049 strcpy(unix_addr, channel::UNDEFINED_UNIX_ADDR);
00050 std::vector<std::string> rmt_addrs;
00051 ACE_TCHAR cn[128];
00052 cn[0]='\0';
00053
00054 conn_ = NULL;
00055
00056 bool my_port_defined = false;
00057 bool my_unix_defined = false;
00058 bool rmt_addr_defined = false;
00059
00060 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:u:r:t:c:"), 0);
00061 get_opt.long_option (ACE_TEXT ("listen_port"), 'p',
00062 ACE_Get_Opt::ARG_REQUIRED);
00063 get_opt.long_option (ACE_TEXT ("unix_addr"), 'u',
00064 ACE_Get_Opt::ARG_REQUIRED);
00065 get_opt.long_option (ACE_TEXT ("rmt_addr"), 'r',
00066 ACE_Get_Opt::ARG_REQUIRED);
00067 get_opt.long_option (ACE_TEXT ("num_thr"), 't',
00068 ACE_Get_Opt::ARG_REQUIRED);
00069 get_opt.long_option (ACE_TEXT ("chan"), 'c',
00070 ACE_Get_Opt::ARG_REQUIRED);
00071
00072 for (int c; (c = get_opt ()) != -1;)
00073 switch (c) {
00074 case 'u':
00075 my_unix_defined = true;
00076 ACE_OS::strsncpy
00077 (unix_addr, get_opt.opt_arg (), MAXHOSTNAMELEN);
00078 break;
00079 case 'p':
00080 my_port_defined = true;
00081 ACE_OS::strsncpy
00082 (tcp_port, get_opt.opt_arg (), MAXHOSTNAMELEN);
00083 break;
00084 case 't':
00085 num_thr = ACE_static_cast
00086 (int, ACE_OS::atoi (get_opt.opt_arg ()));
00087 break;
00088 case 'r':
00089 rmt_addr_defined = true;
00090 ACE_OS::strsncpy
00091 (rmt_addr, get_opt.opt_arg (), MAXHOSTNAMELEN);
00092 rmt_addrs.push_back(std::string(rmt_addr));
00093 break;
00094 case 'c':
00095 ACE_OS::strsncpy
00096 (cn, get_opt.opt_arg (), 128);
00097 break;
00098 }
00099
00100
00101 ACE_DEBUG((LM_DEBUG, "(%t) thr_num=%d\n", num_thr));
00102
00103 if (!my_port_defined && Connector::type() == channel::INET_SOCK) {
00104 ACE_DEBUG((LM_DEBUG, "(%t) No info about tcp port , failed...\n"));
00105 return -1;
00106 } else
00107 ACE_DEBUG((LM_DEBUG, "(%t) port=%d\n",tcp_port));
00108
00109 if (!my_unix_defined && Connector::type() == channel::UNIX_SOCK) {
00110 ACE_DEBUG((LM_DEBUG, "(%t) No info about unix domain socket, failed...\n"));
00111 return -1;
00112 } else
00113 ACE_DEBUG((LM_DEBUG, "(%t) unix_addr=%s\n", unix_addr));
00114
00115 Channel *ch_ = CvmBaseChannel<Channel>::find_chan(cn);
00116 if (ch_ == NULL) {
00117 ACE_DEBUG ((LM_DEBUG,
00118 "(%t) Connector not bound to any Channel ...\n"));
00119 return -1;
00120 }
00121
00122 conn_ = new Connector(ch_, false, num_thr);
00123 if(Connector::type() == channel::INET_SOCK) {
00124 if (conn_->open (tcp_port) == channel::FAILURE) return -1;
00125 } else if(my_unix_defined) {
00126 if (conn_->open (unix_addr) == channel::FAILURE) return -1;
00127 }
00128
00129 if (rmt_addr_defined) {
00130 std::vector<std::string>::iterator iter;
00131 for(iter=rmt_addrs.begin(); iter!=rmt_addrs.end(); iter++) {
00132 channel::ConnInfo ci;
00133 ci.parse((*iter));
00134 if(ci.valid()) {
00135 switch(ci.type()) {
00136 case channel::INET_SOCK:
00137 if (conn_ != NULL && Connector::type() == channel::INET_SOCK) {
00138 ACE_DEBUG((LM_DEBUG, "(%t) try to connect to ip=%s port=%d\n", ci.ip().c_str(), ci.port()));
00139 if (conn_->connect (ci) == channel::FAILURE)
00140 {
00141 continue;
00142 }
00143 } else {
00144 ACE_DEBUG((LM_DEBUG, "(%t) No tcp_connector drop ip=%s port=%d\n", ci.ip().c_str(), ci.port()));
00145 }
00146 break;
00147 case channel::UNIX_SOCK:
00148 if (conn_ != NULL && Connector::type() == channel::UNIX_SOCK) {
00149 ACE_DEBUG((LM_DEBUG, "(%t) try to connect to unix_addr=%s\n" ,ci.unix_addr().c_str()));
00150 if (conn_->connect (ci) == channel::FAILURE)
00151 {
00152 continue;
00153 }
00154 } else {
00155 ACE_DEBUG((LM_DEBUG, "(%t) No unix_connector drop unix_addr=%s\n" ,ci.unix_addr().c_str()));
00156 }
00157 break;
00158 default:
00159 break;
00160 }
00161 }
00162 }
00163 }
00164
00165 ACE_DEBUG((LM_DEBUG, "### Connector is up and running ... ###\n"));
00166
00167 return 0;
00168 }
00169 virtual int close (u_long)
00170 {
00171
00172 return 0;
00173 }
00174 virtual int fini ()
00175 {
00176 ACE_DEBUG((LM_DEBUG, "### Channel [%s] is shuting down ... ###\n", name().c_str()));
00177
00178 conn_->close ();
00179 delete conn_;
00180
00181 ACE_DEBUG((LM_DEBUG, "Channel/Connector shutdown and ask all reader/writer threads exit...\n"));
00182 ACE_DEBUG((LM_DEBUG, "all reader/writeer threads exit...\n"));
00183 ACE_DEBUG((LM_DEBUG, "shutdown clock thread now...\n"));
00184 channel::Clock::instance()->stop();
00185 ACE_DEBUG((LM_DEBUG, "clock thread exit...\n"));
00186
00187 ACE_OS::sleep(1);
00188 return 0;
00189 }
00190 virtual int info (ACE_TCHAR **strp, size_t length = 0) const
00191 {
00192 char buf[256];
00193
00194 ACE_OS::sprintf (buf,
00195 "CvmBaseConnector coming up ... \n");
00196
00197 if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
00198 return -1;
00199 else
00200 ACE_OS::strncpy (*strp, buf, length);
00201 return ACE_OS::strlen (buf);
00202 }
00203
00204
00205
00206 static Connector * find_connector(const char* name)
00207 {
00208 CvmBaseConnector<Channel, Transport> *d = ACE_Dynamic_Service<CvmBaseConnector<Channel, Transport> >::instance(ACE_TEXT(name));
00209 if (d==NULL)
00210 return NULL;
00211 else
00212 return d->conn();
00213 }
00214 };
00215
00216 };
00217
00218 #endif