00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00034 #ifndef _INTERFACE_H_
00035 #define _INTERFACE_H_
00036
00037 #include "ace/Message_Block.h"
00038 #include "ace/Thread_Mutex.h"
00039 #include "ace/Thread_Semaphore.h"
00040 #include "ace/Log_Msg.h"
00042 #include "ace/Synch.h"
00043 #include "ace/OS.h"
00044
00045 #include <vector>
00046
00047 #include <BaseDef.h>
00048
00049
00050 namespace channel {
00051
00052 class ConnInfo;
00053 template <class, class> class Out_Bound_Msg;
00054
00056 template <class Channel>
00057 class Interface : public Channel::Destination, public Channel::Source {
00058 public:
00059 Interface (Channel *chan) :
00060 Channel::Destination(chan), Channel::Source(chan)
00061 {
00062 ch_ = chan;
00063
00064
00065
00066 subscribe_msg(Channel::SUBSCRIPTION_INFO_MSG, SCOPE_LOCAL);
00067 subscribe_msg(Channel::UNSUBSCRIPTION_INFO_MSG, SCOPE_LOCAL);
00068 subscribe_msg(Channel::PUBLICATION_INFO_MSG, SCOPE_LOCAL);
00069 subscribe_msg(Channel::UNPUBLICATION_INFO_MSG, SCOPE_LOCAL);
00070
00071 publish_msg(Channel::CHANNEL_CONN_MSG, SCOPE_LOCAL);
00072 publish_msg(Channel::CHANNEL_DISCONN_MSG, SCOPE_LOCAL);
00073 publish_msg(Channel::INIT_SUBSCRIPTION_INFO_MSG, SCOPE_LOCAL);
00074 publish_msg(Channel::INIT_PUBLICATION_INFO_MSG, SCOPE_LOCAL);
00075 publish_msg(Channel::SUBSCRIPTION_INFO_MSG, SCOPE_LOCAL);
00076 publish_msg(Channel::UNSUBSCRIPTION_INFO_MSG, SCOPE_LOCAL);
00077 publish_msg(Channel::PUBLICATION_INFO_MSG, SCOPE_LOCAL);
00078 publish_msg(Channel::UNPUBLICATION_INFO_MSG, SCOPE_LOCAL);
00079 }
00080
00081 ~Interface ()
00082 {
00083
00084
00085 this->unsubscribe_all();
00086 this->unpublish_all();
00087 }
00088
00089 public:
00090
00091 Channel *ch_;
00092
00093
00094 };
00095
00096 template <class, class> class Binder;
00097 template <class, class> class Filter;
00098 template <class, class> class Translator;
00099
00103 template <class Channel>
00104 class LocalInterface : public Interface<Channel> {
00105 public:
00106 typedef typename Channel::IdType IdType;
00107 typedef typename Channel::IdTrait IdTrait;
00108 typedef typename Channel::Msg Msg;
00109 typedef typename Channel::Source Source;
00110 typedef typename Channel::Destination Destination;
00111 typedef typename Channel::Channel_Info_Msg Channel_Info_Msg;
00112 typedef typename Channel::PubSub_Info_Msg PubSub_Info_Msg;
00113 typedef Binder<typename Channel::IdType, typename Channel::IdTrait> Binder;
00114 typedef Filter<typename Channel::IdType, typename Channel::IdTrait> Filter;
00115 typedef Translator<typename Channel::IdType, typename Channel::IdTrait> Translator;
00116
00117 LocalInterface (Channel *chan, Binder *binder) : Interface<Channel>(chan) {
00118 if (binder == NULL) {
00119 filter_ = NULL;
00120 translator_ = NULL;
00121 } else {
00122 filter_ = binder->filter;
00123 translator_ = binder->translator;
00124 }
00125 };
00126
00129 Member_Type type() { return MEMBER_REMOTE;}
00130 Status put_msg(Msg *msg, ACE_Time_Value *timeout=NULL)
00131 {
00132 ACE_DEBUG ((LM_DEBUG, "LocalInterface::put_msg: recv MSG [%s]...\n", ID2STR(msg->type).c_str()));
00133 if (peer_interface_ != NULL) {
00134
00135 if (translator_ != NULL &&
00136 msg->type != Channel::CHANNEL_CONN_MSG &&
00137 msg->type != Channel::CHANNEL_DISCONN_MSG &&
00138 msg->type != Channel::INIT_SUBSCRIPTION_INFO_MSG &&
00139 msg->type != Channel::INIT_PUBLICATION_INFO_MSG &&
00140 msg->type != Channel::SUBSCRIPTION_INFO_MSG &&
00141 msg->type != Channel::UNSUBSCRIPTION_INFO_MSG &&
00142 msg->type != Channel::PUBLICATION_INFO_MSG &&
00143 msg->type != Channel::UNPUBLICATION_INFO_MSG
00144 ) {
00145 translator_->translate_outward(msg->type);
00146 }
00147
00148 return peer_interface_->send_msg(msg, timeout);
00149 }
00150 return SUCCESS;
00151 }
00152
00155 Status send_msg(Msg *msg, ACE_Time_Value *timeout=0)
00156 {
00157 ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg: recv MSG [%s]...\n", ID2STR(msg->type).c_str()));
00158 PubSub_Info_Msg *subinfo;
00159
00160 if (msg->type == Channel::CHANNEL_CONN_MSG ||
00161 msg->type == Channel::CHANNEL_DISCONN_MSG) {
00163 }
00164 else if (msg->type == Channel::SUBSCRIPTION_INFO_MSG) {
00165 ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg recv SUBSCRIPTION_INFO_MSG...\n"));
00166 subinfo = (PubSub_Info_Msg *)msg->data();
00167 Destination *dest = (Destination *) this;
00168 std::vector<IdType> global_msgs;
00170 this->ch_->published_global_msgs (global_msgs);
00171 for(int i=0; i<subinfo->num_msg_types; i++) {
00172 if (filter_ != NULL && filter_->block_outward((IdType)subinfo->msg_types[i]))
00173 continue;
00174 if (dest->scope(subinfo->msg_types[i]) == SCOPE_UNDEFINED) {
00175 if(std::find_if(global_msgs.begin(), global_msgs.end(),
00176 std::bind2nd(std::ptr_fun(IdTrait::match),(IdType)subinfo->msg_types[i])) !=
00177 global_msgs.end()) {
00178 ACE_DEBUG ((LM_DEBUG, "remote subsc to [%s]\n",ID2STR(subinfo->msg_types[i]).c_str()));
00179 subscribe_msg((IdType)subinfo->msg_types[i], SCOPE_LOCAL);
00180 peer_interface_->publish_msg ((IdType)subinfo->msg_types[i], SCOPE_LOCAL);
00181 }
00182 }
00183 }
00184 }
00185 else if (msg->type == Channel::UNSUBSCRIPTION_INFO_MSG) {
00186 ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg recv UNSUBSCRIPTION_INFO_MSG...\n"));
00187 subinfo = (PubSub_Info_Msg *)msg->data();
00188 for(int i=0; i<subinfo->num_msg_types; i++)
00189 unsubscribe_msg((IdType)subinfo->msg_types[i]);
00190 }
00191 else if (msg->type == Channel::PUBLICATION_INFO_MSG) {
00192 ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg recv PUBSCRIPTION_INFO_MSG...\n"));
00193 PubSub_Info_Msg *pubinfo = (PubSub_Info_Msg *)msg->data();
00194 Source *src = (Source *) this;
00195 std::vector<IdType> mtypes;
00197 this->ch_->subscribed_global_msgs(mtypes);
00198 for(int i=0; i<pubinfo->num_msg_types; i++) {
00199 if (filter_ != NULL && filter_->block_inward((IdType)subinfo->msg_types[i]))
00200 continue;
00201 if (src->scope (pubinfo->msg_types[i]) == SCOPE_UNDEFINED) {
00202 if(std::find_if(mtypes.begin(), mtypes.end(),
00203 std::bind2nd(std::ptr_fun(IdTrait::match), (IdType)pubinfo->msg_types[i])) !=
00204 mtypes.end()) {
00205 ACE_DEBUG ((LM_DEBUG, "remote pub msg [%s]\n",ID2STR(pubinfo->msg_types[i]).c_str()));
00207 publish_msg ((IdType)pubinfo->msg_types[i], SCOPE_LOCAL);
00208 peer_interface_->subscribe_msg((IdType)pubinfo->msg_types[i], SCOPE_LOCAL);
00209 }
00210 }
00211 }
00212 }
00213 else if (msg->type == Channel::UNPUBLICATION_INFO_MSG) {
00214 ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg recv UNPUBSCRIPTION_INFO_MSG...\n"));
00215 }
00216 else {
00218
00219
00220 if (translator_ != NULL) {
00221 translator_->translate_inward(msg->type);
00222 }
00223 ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg forward an application msg [%s] to a local chan\n", ID2STR(msg->type).c_str()));
00224 }
00226 Source::send_msg (msg, timeout);
00227 return SUCCESS;
00228 }
00229
00231 LocalInterface *peer_interface_;
00233 Filter *filter_;
00234 Translator *translator_;
00235 };
00236
00237 template <class, class> class Connector;
00238 template <class, class> class ConnHandler;
00239 template <class> class UnixSockTransport;
00240 template <class> class TcpSockTransport;
00241
00244 template <class Channel, class Transport>
00245 class RemoteInterface : public Interface<Channel> {
00246 public:
00247 typedef typename Channel::IdType IdType;
00248 typedef typename Channel::IdTrait IdTrait;
00249 typedef typename Channel::Msg Msg;
00250 typedef typename Channel::Source Source;
00251 typedef typename Channel::Destination Destination;
00252 typedef typename Channel::Channel_Info_Msg Channel_Info_Msg;
00253 typedef typename Channel::PubSub_Info_Msg PubSub_Info_Msg;
00254 typedef ConnHandler<Channel, Transport> ConnHandler;
00255 typedef Connector<Channel, UnixSockTransport<Channel> > UnixSockConnector;
00256 typedef Connector<Channel, TcpSockTransport<Channel> > TcpSockConnector;
00257 typedef Connector<Channel, Transport> Connector;
00258 typedef Binder<typename Channel::IdType, typename Channel::IdTrait> Binder;
00259 typedef Filter<typename Channel::IdType, typename Channel::IdTrait> Filter;
00260 typedef Translator<typename Channel::IdType, typename Channel::IdTrait> Translator;
00261
00262 private:
00264 ConnHandler *conn_handler_;
00266 std::vector<Msg *> pending_msgs_;
00268 Filter *filter_;
00269 Translator *translator_;
00270
00271 public:
00272 void dump(void);
00273
00274 RemoteInterface(ConnHandler *st, Interface_Role r, Channel *c)
00275 : Interface<Channel>(c) {
00276 state_ = CONN_INIT;
00277 conn_handler_ = st;
00278 st->intf_ = this;
00279 role_ = r;
00280
00281 Binder *binder = conn_handler_->cm_->get_binder(conn_handler_->rmt_addr_);
00282 if (binder == NULL) {
00283 filter_ = NULL;
00284 translator_ = NULL;
00285 } else {
00286 filter_ = binder->filter;
00287 translator_ = binder->translator;
00288 }
00289 }
00290
00291 ~RemoteInterface() {}
00292 ConnHandler *conn_handler(void) { return conn_handler_; }
00293 void conn_handler(ConnHandler *ch) { conn_handler_=ch; }
00294
00295 Interface_Role role(void) { return role_;}
00296 void role(Interface_Role r) { role_ = r;}
00297
00301 void add_pending_msg(Msg *m) { pending_msgs_.push_back(m);}
00302 void resend_pending_msgs(void)
00303 {
00304 if (conn_handler_ != NULL) {
00305 for(size_t i=0; i<pending_msgs_.size();i++) {
00306 put_msg(pending_msgs_[i]);
00307 ACE_DEBUG ((LM_DEBUG, "...resend 1 msg[%s]\n", ID2STR(pending_msgs_[i]->type).c_str()));
00308 }
00309 pending_msgs_.clear();
00310 }
00311 }
00312
00318 Status notify_connected (ConnInfo addr)
00319 {
00320 ACE_DEBUG ((LM_DEBUG, "notify local members remote connection complete...\n"));
00321 if (addr.valid()) {
00322 Channel_Info_Msg *msg = new Channel_Info_Msg();
00323 msg->conn_type = addr.type();
00324 switch (addr.type()) {
00325 case UNIX_SOCK:
00326 {
00327 strcpy(msg->unix_addr, addr.unix_addr().c_str());
00328 }
00329 break;
00330 case INET_SOCK:
00331 {
00332 strcpy(msg->host_addr, addr.ip().c_str());
00333 msg->port = addr.port();
00334 }
00335 break;
00336 }
00337 Msg *m = new Msg(Channel::CHANNEL_CONN_MSG, msg);
00338 send_msg(m);
00339 }
00340 return SUCCESS;
00341 }
00342
00347 Status handle_message(Msg *msg)
00348 {
00349 ConnInfo addr;
00350 PubSub_Info_Msg *subinfo;
00351 Channel_Info_Msg *chaninfo;
00352
00353 ACE_DEBUG ((LM_DEBUG, "recv MSG [%s] from sock...\n", ID2STR(msg->type).c_str()));
00354
00357 if(msg->type == Channel::CHANNEL_CONN_MSG) {
00358 ACE_DEBUG ((LM_DEBUG, "recv CHANNEL_CONN_MSG...\n"));
00359 chaninfo = (Channel_Info_Msg *)msg->data();
00360 switch(role()) {
00361 case ACTIVE_ROLE:
00362 ACE_DEBUG ((LM_DEBUG, "i am active end of connection...\n"));
00363 ACE_DEBUG ((LM_DEBUG, "should not recv Channel_Info_Msg, dropped ...\n"));
00364 break;
00365 case PASSIVE_ROLE:
00366 ACE_DEBUG ((LM_DEBUG, "I am passive...\n"));
00367 switch((Interface_Type) chaninfo->conn_type) {
00368 case INET_SOCK:
00369 addr.set(chaninfo->host_addr, chaninfo->port);
00370 break;
00371 case UNIX_SOCK:
00372 addr.set(chaninfo->unix_addr);
00373 break;
00374 default:
00375 ACE_DEBUG ((LM_DEBUG, "Invalid connection type...\n"));
00376 }
00377
00378 conn_handler_->rmt_addr_ = addr;
00379 if(conn_handler_->cm_->add_conn(addr,conn_handler_) == FAILURE)
00380 return FAILURE;
00381
00383 chaninfo->is_local = false;
00384 chaninfo->intf = this;
00385
00387 send2remote_init_subscribe_msg();
00388 break;
00389 default:
00390 break;
00391 }
00392 }
00393 else if(msg->type == Channel::CHANNEL_DISCONN_MSG) {
00394 }
00395 else if (msg->type == Channel::INIT_SUBSCRIPTION_INFO_MSG) {
00396 ACE_DEBUG ((LM_DEBUG, "recv INIT_SUBSCRIPTION_INFO_MSG...\n"));
00397 subinfo = (PubSub_Info_Msg *)msg->data();
00398 std::vector<IdType> pub_msgs;
00400 this->ch_->published_global_msgs (pub_msgs);
00401 for(int i=0; i<subinfo->num_msg_types; i++) {
00402 if (filter_ != NULL && filter_->block_outward((IdType)subinfo->msg_types[i]))
00403 continue;
00404 if(std::find_if(pub_msgs.begin(), pub_msgs.end(),
00405 std::bind2nd(std::ptr_fun(IdTrait::match), (IdType)subinfo->msg_types[i])) !=
00406 pub_msgs.end()) {
00407 ACE_DEBUG ((LM_DEBUG, "remote subsc to [%s]\n",ID2STR(subinfo->msg_types[i]).c_str()));
00408 subscribe_msg((IdType)subinfo->msg_types[i], SCOPE_LOCAL);
00409 send2remote_pubsub_msg (OPER_PUBLISH, (IdType)subinfo->msg_types[i]);
00410 }
00411
00412 }
00413 if(role() == ACTIVE_ROLE) {
00415 send2remote_init_subscribe_msg();
00416 }
00417 {
00418 ACE_DEBUG ((LM_DEBUG, "conn active now...\n"));
00419 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, FAILURE);
00420 state_ = CONN_ACTIVE;
00421 resend_pending_msgs();
00422 }
00423 }
00425 else if (msg->type == Channel::INIT_PUBLICATION_INFO_MSG) {
00426 }
00428 else if (msg->type == Channel::SUBSCRIPTION_INFO_MSG) {
00429 ACE_DEBUG ((LM_DEBUG, "recv SUBSCRIPTION_INFO_MSG...\n"));
00430 subinfo = (PubSub_Info_Msg *)msg->data();
00431 Destination *dest = (Destination *) this;
00432 std::vector<IdType> global_msgs;
00434 this->ch_->published_global_msgs (global_msgs);
00435 for(int i=0; i<subinfo->num_msg_types; i++) {
00436 if (filter_ != NULL && filter_->block_outward((IdType)subinfo->msg_types[i]))
00437 continue;
00438 if (dest->scope(subinfo->msg_types[i]) == SCOPE_UNDEFINED) {
00439 if(std::find_if(global_msgs.begin(), global_msgs.end(),
00440 std::bind2nd(std::ptr_fun(IdTrait::match), (IdType)subinfo->msg_types[i])) !=
00441 global_msgs.end()) {
00442 ACE_DEBUG ((LM_DEBUG, "remote subsc to [%s]\n",ID2STR(subinfo->msg_types[i]).c_str()));
00443 subscribe_msg((IdType)subinfo->msg_types[i], SCOPE_LOCAL);
00444 send2remote_pubsub_msg (OPER_PUBLISH, (IdType)subinfo->msg_types[i]);
00445 }
00446 }
00447 }
00448 }
00449 else if (msg->type == Channel::UNSUBSCRIPTION_INFO_MSG) {
00450 ACE_DEBUG ((LM_DEBUG, "recv UNSUBSCRIPTION_INFO_MSG...\n"));
00451 subinfo = (PubSub_Info_Msg *)msg->data();
00452 for(int i=0; i<subinfo->num_msg_types; i++)
00453 unsubscribe_msg((IdType)subinfo->msg_types[i]);
00454 }
00455 else if (msg->type == Channel::PUBLICATION_INFO_MSG) {
00456 ACE_DEBUG ((LM_DEBUG, "recv PUBLICAION_INFO_MSG...\n"));
00457 PubSub_Info_Msg *pubinfo = (PubSub_Info_Msg *)msg->data();
00458 Source *src = (Source *) this;
00459 std::vector<IdType> mtypes;
00461 this->ch_->subscribed_global_msgs(mtypes);
00462 for(int i=0; i<pubinfo->num_msg_types; i++) {
00463 if (filter_ != NULL && filter_->block_inward((IdType)pubinfo->msg_types[i]))
00464 continue;
00465 if (src->scope (pubinfo->msg_types[i]) == SCOPE_UNDEFINED) {
00466 if(std::find_if(mtypes.begin(), mtypes.end(),
00467 std::bind2nd(std::ptr_fun(IdTrait::match),(IdType)pubinfo->msg_types[i])) !=
00468 mtypes.end()) {
00469 ACE_DEBUG ((LM_DEBUG, "remote pub msg [%s]\n",ID2STR(pubinfo->msg_types[i]).c_str()));
00471 publish_msg ((IdType)pubinfo->msg_types[i], SCOPE_LOCAL);
00472 send2remote_pubsub_msg(OPER_SUBSCRIBE, (IdType)pubinfo->msg_types[i]);
00473 }
00474 }
00475 }
00476 }
00477 else if (msg->type == Channel::UNPUBLICATION_INFO_MSG) {
00478 }
00479 else {
00480
00481
00482 if (translator_ != NULL) {
00483 translator_->translate_inward(msg->type);
00484 }
00485 ACE_DEBUG ((LM_DEBUG, "recv an application msg [%s] from sock and sent to local\n", ID2STR(msg->type).c_str()));
00486 }
00487
00489 send_msg (msg);
00490 return SUCCESS;
00491 }
00492
00494 Status send2remote_chan_info(Interface_Type ct)
00495 {
00496 ACE_DEBUG ((LM_DEBUG, "send_chan_info_to_remote...\n"));
00497 Channel_Info_Msg *msg = new Channel_Info_Msg();
00498 msg->conn_type = ct;
00499 Connector *cm_ = conn_handler_->cm_;
00500 switch (ct) {
00501 case UNIX_SOCK:
00502 {
00503 UnixSockConnector *unix_cm_ = (UnixSockConnector *)cm_;
00504 strcpy(msg->unix_addr, unix_cm_->unix_addr().c_str());
00505 }
00506 break;
00507 case INET_SOCK:
00508 {
00509 TcpSockConnector *tcp_cm_ = (TcpSockConnector *)cm_;
00510 strcpy(msg->host_addr, tcp_cm_->host_addr().c_str());
00511 msg->port = tcp_cm_->port();
00512 }
00513 break;
00514 }
00515 Msg *m = new Msg(Channel::CHANNEL_CONN_MSG, msg);
00516 return put_msg(m);
00517 }
00518
00521 Status send2remote_init_subscribe_msg(void)
00522 {
00524 PubSub_Info_Msg *sub = new PubSub_Info_Msg();
00525 std::vector<IdType> mtypes;
00527 this->ch_->subscribed_global_msgs(mtypes);
00528 sub->num_msg_types = mtypes.size();
00529 for(int i=0; i<sub->num_msg_types; i++) {
00530 if (filter_ != NULL && filter_->block_inward(mtypes[i]))
00531 continue;
00532 sub->msg_types[i] = mtypes[i];
00533 }
00534 ACE_DEBUG ((LM_DEBUG, "send init_subscription info...\n"));
00535 Msg *m = new Msg(Channel::INIT_SUBSCRIPTION_INFO_MSG, sub);
00536 put_msg(m);
00537 return SUCCESS;
00538 }
00539
00542 Status send2remote_init_publish_msg(void)
00543 {
00545 PubSub_Info_Msg *pub = new PubSub_Info_Msg();
00546 std::vector<IdType> mtypes;
00548 this->ch_->published_global_msgs(mtypes);
00549 pub->num_msg_types = mtypes.size();
00550 for(int i=0; i<pub->num_msg_types; i++) {
00551 if (filter_ != NULL && filter_->block_outward(mtypes[i]))
00552 continue;
00553 pub->msg_types[i] = mtypes[i];
00554 }
00555 ACE_DEBUG ((LM_DEBUG, "send init_publication info...\n"));
00556 Msg *m = new Msg(Channel::INIT_PUBLICATION_INFO_MSG, pub);
00557 put_msg(m);
00558 return SUCCESS;
00559 }
00560
00562 Status send2remote_pubsub_msg(Oper_Type op, IdType t)
00563 {
00564 PubSub_Info_Msg *sub = new PubSub_Info_Msg();
00565 sub->num_msg_types = 1;
00566 sub->msg_types[0] = t;
00567 IdType mt;
00568 switch(op) {
00569 case OPER_PUBLISH:
00570 if (filter_ != NULL && filter_->block_outward(t)) {
00571 delete sub;
00572 return SUCCESS;
00573 }
00574 mt = Channel::PUBLICATION_INFO_MSG;
00575 ACE_DEBUG((LM_DEBUG, "RemoteInterface::send2remote_pubsub_msg publish "));
00576 break;
00577 case OPER_UNPUBLISH:
00578 mt = Channel::UNPUBLICATION_INFO_MSG;
00579 ACE_DEBUG((LM_DEBUG, "RemoteInterface::send2remote_pubsub_msg unpublish "));
00580 break;
00581 case OPER_SUBSCRIBE:
00582 if (filter_ != NULL && filter_->block_inward(t)) {
00583 delete sub;
00584 return SUCCESS;
00585 }
00586 mt = Channel::SUBSCRIPTION_INFO_MSG;
00587 ACE_DEBUG((LM_DEBUG, "RemoteInterface::send2remote_pubsub_msg subscribe "));
00588 break;
00589 case OPER_UNSUBSCRIBE:
00590 mt = Channel::UNSUBSCRIPTION_INFO_MSG;
00591 ACE_DEBUG((LM_DEBUG, "RemoteInterface::send2remote_pubsub_msg unsubscribe "));
00592 break;
00593 default:
00594 ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors invalid IdType "));
00595 delete sub;
00596 return SUCCESS;
00597 }
00598 Msg *m = new Msg(mt, sub);
00599 put_msg(m);
00600 return SUCCESS;
00601 }
00602
00604 Member_Type type() { return MEMBER_REMOTE;}
00607 Status put_msg(Msg *msg, ACE_Time_Value *timeout=NULL)
00608 {
00609 ACE_DEBUG ((LM_DEBUG, "RemoteInterface<Channel>::put_msg send 2 remote...\n"));
00610 if(conn_handler_!=NULL) {
00611 if(state_ != CONN_ACTIVE &&
00612 msg->type != Channel::CHANNEL_CONN_MSG &&
00613 msg->type != Channel::INIT_SUBSCRIPTION_INFO_MSG &&
00614 msg->type != Channel::INIT_PUBLICATION_INFO_MSG ) {
00615 ACE_GUARD_REACTION(ACE_Thread_Mutex, guard, lock_, ACE_DEBUG ((LM_DEBUG, "failed to lock out-sock\n")); delete msg; return FAILURE);
00616 add_pending_msg(msg);
00617 ACE_DEBUG ((LM_DEBUG, "... msgs[%s] buffered \n", ID2STR(msg->type).c_str()));
00618 } else {
00619 ACE_Message_Block *payload;
00620 Out_Bound_Msg<Channel, ConnHandler> *out_msg;
00621
00622
00623 if (translator_ != NULL &&
00624 msg->type != Channel::CHANNEL_CONN_MSG &&
00625 msg->type != Channel::CHANNEL_DISCONN_MSG &&
00626 msg->type != Channel::INIT_SUBSCRIPTION_INFO_MSG &&
00627 msg->type != Channel::INIT_PUBLICATION_INFO_MSG &&
00628 msg->type != Channel::SUBSCRIPTION_INFO_MSG &&
00629 msg->type != Channel::UNSUBSCRIPTION_INFO_MSG &&
00630 msg->type != Channel::PUBLICATION_INFO_MSG &&
00631 msg->type != Channel::UNPUBLICATION_INFO_MSG
00632 ) {
00633 translator_->translate_outward(msg->type);
00634 }
00635 out_msg = new Out_Bound_Msg<Channel, ConnHandler>(msg, conn_handler_);
00638 payload = new ACE_Message_Block((const char*)out_msg, sizeof(Out_Bound_Msg<Channel, ConnHandler>));
00639 payload->wr_ptr(sizeof(Out_Bound_Msg<Channel, ConnHandler>));
00641 ACE_DEBUG ((LM_DEBUG, "... msgs[%s] sent \n", ID2STR(msg->type).c_str()));
00642 conn_handler_->cm_->output_mgr_.put(payload,timeout);
00643 }
00644 } else {
00645 delete msg;
00646 return FAILURE;
00647 }
00648 return SUCCESS;
00649 }
00650
00651 public:
00652 Interface_Role role_;
00653 Interface_State state_;
00654 ACE_Thread_Mutex lock_;
00655 };
00656
00657
00658 };
00659
00660
00661 #endif