L:/channel/channel/include/Interface.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00034 #ifndef _INTERFACE_H_
00035 #define _INTERFACE_H_
00036 
00037 #include "ace/Message_Block.h"
00038 #include "ace/Thread_Mutex.h"
00039 #include "ace/Thread_Semaphore.h"
00040 #include "ace/Log_Msg.h"
00042 #include "ace/Synch.h"
00043 #include "ace/OS.h"
00044 
00045 #include <vector>
00046 
00047 #include <BaseDef.h>
00048 
00049 
00050 namespace channel {
00051 
00052   class ConnInfo;
00053   template <class, class> class Out_Bound_Msg;
00054 
00056   template <class Channel>
00057   class  Interface : public Channel::Destination, public Channel::Source {
00058   public:
00059     Interface (Channel *chan)  : 
00060       Channel::Destination(chan), Channel::Source(chan)
00061       {
00062         ch_ = chan;
00063         //subscribe to system msgs from local channel on behalf of the remote side
00064         //since CHANNEL_(DIS)CONN_MSG and INIT_SUB/PUB_INFO_MSG
00065         //are generated inside Interface, no need to subscribe to them in channels
00066         subscribe_msg(Channel::SUBSCRIPTION_INFO_MSG, SCOPE_LOCAL);
00067         subscribe_msg(Channel::UNSUBSCRIPTION_INFO_MSG, SCOPE_LOCAL);
00068         subscribe_msg(Channel::PUBLICATION_INFO_MSG, SCOPE_LOCAL);
00069         subscribe_msg(Channel::UNPUBLICATION_INFO_MSG, SCOPE_LOCAL);
00070         //publish system msgs at local channel on behalf of the remote side
00071         publish_msg(Channel::CHANNEL_CONN_MSG, SCOPE_LOCAL);
00072         publish_msg(Channel::CHANNEL_DISCONN_MSG, SCOPE_LOCAL);
00073         publish_msg(Channel::INIT_SUBSCRIPTION_INFO_MSG, SCOPE_LOCAL);
00074         publish_msg(Channel::INIT_PUBLICATION_INFO_MSG, SCOPE_LOCAL);
00075         publish_msg(Channel::SUBSCRIPTION_INFO_MSG, SCOPE_LOCAL);
00076         publish_msg(Channel::UNSUBSCRIPTION_INFO_MSG, SCOPE_LOCAL);
00077         publish_msg(Channel::PUBLICATION_INFO_MSG, SCOPE_LOCAL);
00078         publish_msg(Channel::UNPUBLICATION_INFO_MSG, SCOPE_LOCAL);
00079       }
00080 
00081     ~Interface () //unpub/unsub msgs
00082     {
00083       //detach from channel
00084       //why we need this->
00085       this->unsubscribe_all();
00086       this->unpublish_all();
00087     }
00088 
00089   public:
00090 
00091     Channel *ch_;
00092     //translators
00093     //filters
00094   };
00095 
00096   template <class, class> class Binder;
00097   template <class, class> class Filter;
00098   template <class, class> class Translator;
00099 
00103   template <class Channel>
00104   class  LocalInterface : public Interface<Channel> {
00105   public:
00106     typedef typename Channel::IdType IdType;
00107     typedef typename Channel::IdTrait IdTrait;
00108     typedef typename Channel::Msg Msg;
00109     typedef typename Channel::Source Source;
00110     typedef typename Channel::Destination Destination;
00111     typedef typename Channel::Channel_Info_Msg Channel_Info_Msg;
00112     typedef typename Channel::PubSub_Info_Msg PubSub_Info_Msg;
00113     typedef Binder<typename Channel::IdType, typename Channel::IdTrait> Binder;
00114     typedef Filter<typename Channel::IdType, typename Channel::IdTrait> Filter;
00115     typedef Translator<typename Channel::IdType, typename Channel::IdTrait> Translator;
00116 
00117     LocalInterface (Channel *chan, Binder *binder) : Interface<Channel>(chan) {
00118       if (binder == NULL) {
00119         filter_ = NULL;
00120         translator_ = NULL;
00121       } else {
00122         filter_ = binder->filter;
00123         translator_ = binder->translator;
00124       }
00125     };
00126 
00129     Member_Type type() { return MEMBER_REMOTE;}
00130     Status put_msg(Msg *msg, ACE_Time_Value *timeout=NULL)
00131       {
00132         ACE_DEBUG ((LM_DEBUG, "LocalInterface::put_msg: recv MSG [%s]...\n", ID2STR(msg->type).c_str()));
00133         if (peer_interface_ != NULL) {
00134           //translate it if set
00135           if (translator_ != NULL &&
00136               msg->type != Channel::CHANNEL_CONN_MSG &&
00137               msg->type != Channel::CHANNEL_DISCONN_MSG &&
00138               msg->type != Channel::INIT_SUBSCRIPTION_INFO_MSG &&
00139               msg->type != Channel::INIT_PUBLICATION_INFO_MSG &&
00140               msg->type != Channel::SUBSCRIPTION_INFO_MSG &&
00141               msg->type != Channel::UNSUBSCRIPTION_INFO_MSG &&
00142               msg->type != Channel::PUBLICATION_INFO_MSG &&
00143               msg->type != Channel::UNPUBLICATION_INFO_MSG 
00144               ) {
00145             translator_->translate_outward(msg->type);
00146           }
00147 
00148           return peer_interface_->send_msg(msg, timeout);
00149         }
00150         return SUCCESS;
00151       }
00152 
00155     Status send_msg(Msg *msg, ACE_Time_Value *timeout=0)
00156       {
00157         ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg: recv MSG [%s]...\n", ID2STR(msg->type).c_str()));
00158         PubSub_Info_Msg *subinfo;
00159 
00160         if (msg->type == Channel::CHANNEL_CONN_MSG || 
00161             msg->type == Channel::CHANNEL_DISCONN_MSG) {
00163         }
00164         else if (msg->type == Channel::SUBSCRIPTION_INFO_MSG) {
00165           ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg recv SUBSCRIPTION_INFO_MSG...\n"));
00166           subinfo = (PubSub_Info_Msg *)msg->data();
00167           Destination *dest = (Destination *) this;
00168           std::vector<IdType> global_msgs;
00170           this->ch_->published_global_msgs (global_msgs);
00171           for(int i=0; i<subinfo->num_msg_types; i++) {
00172             if (filter_ != NULL && filter_->block_outward((IdType)subinfo->msg_types[i]))
00173               continue;
00174             if (dest->scope(subinfo->msg_types[i]) == SCOPE_UNDEFINED) {
00175               if(std::find_if(global_msgs.begin(), global_msgs.end(), 
00176                               std::bind2nd(std::ptr_fun(IdTrait::match),(IdType)subinfo->msg_types[i])) != 
00177                  global_msgs.end()) {
00178                 ACE_DEBUG ((LM_DEBUG, "remote subsc to [%s]\n",ID2STR(subinfo->msg_types[i]).c_str()));
00179                 subscribe_msg((IdType)subinfo->msg_types[i], SCOPE_LOCAL);
00180                 peer_interface_->publish_msg ((IdType)subinfo->msg_types[i], SCOPE_LOCAL);
00181               }
00182             }
00183           }
00184         } 
00185         else if (msg->type == Channel::UNSUBSCRIPTION_INFO_MSG) {
00186           ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg recv UNSUBSCRIPTION_INFO_MSG...\n"));
00187           subinfo = (PubSub_Info_Msg *)msg->data();
00188           for(int i=0; i<subinfo->num_msg_types; i++)
00189             unsubscribe_msg((IdType)subinfo->msg_types[i]);
00190         } 
00191         else if (msg->type == Channel::PUBLICATION_INFO_MSG) {
00192           ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg recv PUBSCRIPTION_INFO_MSG...\n"));
00193           PubSub_Info_Msg *pubinfo = (PubSub_Info_Msg *)msg->data();
00194           Source *src = (Source *) this;
00195           std::vector<IdType> mtypes;
00197           this->ch_->subscribed_global_msgs(mtypes);
00198           for(int i=0; i<pubinfo->num_msg_types; i++) {
00199             if (filter_ != NULL && filter_->block_inward((IdType)subinfo->msg_types[i]))
00200               continue;
00201             if (src->scope (pubinfo->msg_types[i]) == SCOPE_UNDEFINED) {
00202               if(std::find_if(mtypes.begin(), mtypes.end(), 
00203                               std::bind2nd(std::ptr_fun(IdTrait::match), (IdType)pubinfo->msg_types[i])) != 
00204                  mtypes.end()) {
00205                 ACE_DEBUG ((LM_DEBUG, "remote pub msg [%s]\n",ID2STR(pubinfo->msg_types[i]).c_str()));
00207                 publish_msg ((IdType)pubinfo->msg_types[i], SCOPE_LOCAL);
00208                 peer_interface_->subscribe_msg((IdType)pubinfo->msg_types[i], SCOPE_LOCAL);
00209               }
00210             }
00211           }
00212         }
00213         else if (msg->type == Channel::UNPUBLICATION_INFO_MSG) {
00214           ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg recv UNPUBSCRIPTION_INFO_MSG...\n"));
00215         } 
00216         else {
00218           //application msgs go to Channel
00219           //translate it if set
00220           if (translator_ != NULL) {
00221             translator_->translate_inward(msg->type);
00222           }
00223           ACE_DEBUG ((LM_DEBUG, "LocalInterface::send_msg forward an application msg [%s] to a local chan\n", ID2STR(msg->type).c_str()));
00224         }
00226         Source::send_msg (msg, timeout);
00227         return SUCCESS;
00228       }
00229 
00231     LocalInterface *peer_interface_;
00233     Filter *filter_;
00234     Translator *translator_;
00235   };
00236 
00237   template <class, class> class Connector;
00238   template <class, class> class ConnHandler;
00239   template <class> class UnixSockTransport;
00240   template <class> class TcpSockTransport;
00241 
00244   template <class Channel, class Transport>
00245   class RemoteInterface : public Interface<Channel> {
00246   public:
00247     typedef typename Channel::IdType IdType;
00248     typedef typename Channel::IdTrait IdTrait;
00249     typedef typename Channel::Msg Msg;
00250     typedef typename Channel::Source Source;
00251     typedef typename Channel::Destination Destination;
00252     typedef typename Channel::Channel_Info_Msg Channel_Info_Msg;
00253     typedef typename Channel::PubSub_Info_Msg PubSub_Info_Msg;
00254     typedef ConnHandler<Channel, Transport> ConnHandler;
00255     typedef Connector<Channel, UnixSockTransport<Channel> > UnixSockConnector;
00256     typedef Connector<Channel, TcpSockTransport<Channel> > TcpSockConnector;
00257     typedef Connector<Channel, Transport> Connector;
00258     typedef Binder<typename Channel::IdType, typename Channel::IdTrait> Binder;
00259     typedef Filter<typename Channel::IdType, typename Channel::IdTrait> Filter;
00260     typedef Translator<typename Channel::IdType, typename Channel::IdTrait> Translator;
00261 
00262   private:
00264     ConnHandler *conn_handler_;
00266     std::vector<Msg *> pending_msgs_;
00268     Filter *filter_;
00269     Translator *translator_;
00270 
00271   public:
00272     void dump(void);
00273 
00274     RemoteInterface(ConnHandler *st, Interface_Role r, Channel *c)
00275       : Interface<Channel>(c) {
00276       state_ = CONN_INIT;
00277       conn_handler_ = st;
00278       st->intf_ = this;
00279       role_ = r;
00280       //check if any binder<filter,tranlator> specified for this remote peer
00281       Binder *binder = conn_handler_->cm_->get_binder(conn_handler_->rmt_addr_);
00282       if (binder == NULL) {
00283         filter_ = NULL;
00284         translator_ = NULL;
00285       } else {
00286         filter_ = binder->filter;
00287         translator_ = binder->translator;
00288       }
00289     }
00290 
00291     ~RemoteInterface() {}
00292     ConnHandler *conn_handler(void) { return conn_handler_; }
00293     void conn_handler(ConnHandler *ch) { conn_handler_=ch; }
00294 
00295     Interface_Role role(void) { return role_;}
00296     void role(Interface_Role r) { role_ = r;}
00297 
00301     void add_pending_msg(Msg *m) { pending_msgs_.push_back(m);}
00302     void resend_pending_msgs(void)
00303       {
00304         if (conn_handler_ != NULL) {
00305           for(size_t i=0; i<pending_msgs_.size();i++) {
00306             put_msg(pending_msgs_[i]);
00307             ACE_DEBUG ((LM_DEBUG, "...resend 1 msg[%s]\n", ID2STR(pending_msgs_[i]->type).c_str()));
00308           }
00309           pending_msgs_.clear();
00310         }
00311       }
00312 
00318     Status notify_connected (ConnInfo addr)
00319       {
00320         ACE_DEBUG ((LM_DEBUG, "notify local members remote connection complete...\n"));
00321         if (addr.valid()) {
00322           Channel_Info_Msg *msg = new Channel_Info_Msg();
00323           msg->conn_type = addr.type();
00324           switch (addr.type()) {
00325           case UNIX_SOCK:
00326             {
00327               strcpy(msg->unix_addr, addr.unix_addr().c_str());
00328             }
00329             break;
00330           case INET_SOCK:
00331             {
00332               strcpy(msg->host_addr, addr.ip().c_str());
00333               msg->port = addr.port();
00334             }
00335             break;
00336           }
00337           Msg *m = new Msg(Channel::CHANNEL_CONN_MSG, msg);
00338           send_msg(m);
00339         }
00340         return SUCCESS;
00341       }
00342     
00347     Status handle_message(Msg *msg)
00348       {
00349         ConnInfo addr;
00350         PubSub_Info_Msg *subinfo;
00351         Channel_Info_Msg *chaninfo;
00352 
00353         ACE_DEBUG ((LM_DEBUG, "recv MSG [%s] from sock...\n", ID2STR(msg->type).c_str()));
00354 
00357         if(msg->type == Channel::CHANNEL_CONN_MSG) {
00358           ACE_DEBUG ((LM_DEBUG, "recv CHANNEL_CONN_MSG...\n"));
00359           chaninfo = (Channel_Info_Msg *)msg->data();
00360           switch(role()) {
00361           case ACTIVE_ROLE:
00362             ACE_DEBUG ((LM_DEBUG, "i am active end of connection...\n"));
00363             ACE_DEBUG ((LM_DEBUG, "should not recv Channel_Info_Msg, dropped ...\n"));
00364             break;
00365           case PASSIVE_ROLE:
00366             ACE_DEBUG ((LM_DEBUG, "I am passive...\n"));
00367             switch((Interface_Type) chaninfo->conn_type) {
00368             case INET_SOCK:
00369               addr.set(chaninfo->host_addr, chaninfo->port);
00370               break;
00371             case UNIX_SOCK:
00372               addr.set(chaninfo->unix_addr);
00373               break;
00374             default:
00375               ACE_DEBUG ((LM_DEBUG, "Invalid connection type...\n"));
00376             }
00377       
00378             conn_handler_->rmt_addr_ = addr;
00379             if(conn_handler_->cm_->add_conn(addr,conn_handler_) == FAILURE)
00380               return FAILURE;
00381 
00383             chaninfo->is_local = false;
00384             chaninfo->intf = this;
00385 
00387             send2remote_init_subscribe_msg();
00388             break;
00389           default:
00390             break;
00391           }
00392         }
00393         else if(msg->type == Channel::CHANNEL_DISCONN_MSG) {
00394         }
00395         else if (msg->type == Channel::INIT_SUBSCRIPTION_INFO_MSG) {
00396           ACE_DEBUG ((LM_DEBUG, "recv INIT_SUBSCRIPTION_INFO_MSG...\n"));
00397           subinfo = (PubSub_Info_Msg *)msg->data();
00398           std::vector<IdType> pub_msgs;
00400           this->ch_->published_global_msgs (pub_msgs);
00401           for(int i=0; i<subinfo->num_msg_types; i++) {
00402             if (filter_ != NULL && filter_->block_outward((IdType)subinfo->msg_types[i]))
00403               continue;
00404             if(std::find_if(pub_msgs.begin(), pub_msgs.end(), 
00405                             std::bind2nd(std::ptr_fun(IdTrait::match), (IdType)subinfo->msg_types[i])) != 
00406                pub_msgs.end()) {
00407               ACE_DEBUG ((LM_DEBUG, "remote subsc to [%s]\n",ID2STR(subinfo->msg_types[i]).c_str()));
00408               subscribe_msg((IdType)subinfo->msg_types[i], SCOPE_LOCAL);
00409               send2remote_pubsub_msg (OPER_PUBLISH, (IdType)subinfo->msg_types[i]);
00410             }
00411         
00412           }
00413           if(role() == ACTIVE_ROLE) {     
00415             send2remote_init_subscribe_msg();   
00416           } 
00417           {
00418             ACE_DEBUG ((LM_DEBUG, "conn active now...\n"));
00419             ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, FAILURE);
00420             state_ = CONN_ACTIVE;
00421             resend_pending_msgs();
00422           }
00423         }
00425         else if (msg->type == Channel::INIT_PUBLICATION_INFO_MSG) {
00426         }
00428         else if (msg->type == Channel::SUBSCRIPTION_INFO_MSG) {
00429           ACE_DEBUG ((LM_DEBUG, "recv SUBSCRIPTION_INFO_MSG...\n"));
00430           subinfo = (PubSub_Info_Msg *)msg->data();
00431           Destination *dest = (Destination *) this;
00432           std::vector<IdType> global_msgs;
00434           this->ch_->published_global_msgs (global_msgs);
00435           for(int i=0; i<subinfo->num_msg_types; i++) {
00436             if (filter_ != NULL && filter_->block_outward((IdType)subinfo->msg_types[i]))
00437               continue;
00438             if (dest->scope(subinfo->msg_types[i]) == SCOPE_UNDEFINED) {
00439               if(std::find_if(global_msgs.begin(), global_msgs.end(), 
00440                               std::bind2nd(std::ptr_fun(IdTrait::match), (IdType)subinfo->msg_types[i])) != 
00441                  global_msgs.end()) {
00442                 ACE_DEBUG ((LM_DEBUG, "remote subsc to [%s]\n",ID2STR(subinfo->msg_types[i]).c_str()));
00443                 subscribe_msg((IdType)subinfo->msg_types[i], SCOPE_LOCAL);
00444                 send2remote_pubsub_msg (OPER_PUBLISH, (IdType)subinfo->msg_types[i]);
00445               }
00446             }
00447           }
00448         }
00449         else if (msg->type == Channel::UNSUBSCRIPTION_INFO_MSG) {
00450           ACE_DEBUG ((LM_DEBUG, "recv UNSUBSCRIPTION_INFO_MSG...\n"));
00451           subinfo = (PubSub_Info_Msg *)msg->data();
00452           for(int i=0; i<subinfo->num_msg_types; i++)
00453             unsubscribe_msg((IdType)subinfo->msg_types[i]);
00454         }
00455         else if (msg->type == Channel::PUBLICATION_INFO_MSG) {
00456           ACE_DEBUG ((LM_DEBUG, "recv PUBLICAION_INFO_MSG...\n"));
00457           PubSub_Info_Msg *pubinfo = (PubSub_Info_Msg *)msg->data();
00458           Source *src = (Source *) this;
00459           std::vector<IdType> mtypes;
00461           this->ch_->subscribed_global_msgs(mtypes);
00462           for(int i=0; i<pubinfo->num_msg_types; i++) {
00463             if (filter_ != NULL && filter_->block_inward((IdType)pubinfo->msg_types[i]))
00464               continue;
00465             if (src->scope (pubinfo->msg_types[i]) == SCOPE_UNDEFINED) {
00466               if(std::find_if(mtypes.begin(), mtypes.end(), 
00467                               std::bind2nd(std::ptr_fun(IdTrait::match),(IdType)pubinfo->msg_types[i])) != 
00468                  mtypes.end()) {
00469                 ACE_DEBUG ((LM_DEBUG, "remote pub msg [%s]\n",ID2STR(pubinfo->msg_types[i]).c_str()));
00471                 publish_msg ((IdType)pubinfo->msg_types[i], SCOPE_LOCAL);
00472                 send2remote_pubsub_msg(OPER_SUBSCRIBE, (IdType)pubinfo->msg_types[i]);
00473               }
00474             }
00475           }
00476         }
00477         else if (msg->type == Channel::UNPUBLICATION_INFO_MSG) {
00478         } 
00479         else {
00480           //application msgs go to Channel
00481           //translate it if set
00482           if (translator_ != NULL) {
00483             translator_->translate_inward(msg->type);
00484           }
00485           ACE_DEBUG ((LM_DEBUG, "recv an application msg [%s] from sock and sent to local\n", ID2STR(msg->type).c_str()));
00486         }
00487 
00489         send_msg (msg);
00490         return SUCCESS;
00491       }
00492 
00494     Status send2remote_chan_info(Interface_Type ct)
00495       {
00496         ACE_DEBUG ((LM_DEBUG, "send_chan_info_to_remote...\n"));
00497         Channel_Info_Msg *msg = new Channel_Info_Msg();
00498         msg->conn_type = ct;
00499         Connector *cm_ = conn_handler_->cm_;
00500         switch (ct) {
00501         case UNIX_SOCK:
00502           {
00503             UnixSockConnector *unix_cm_ = (UnixSockConnector *)cm_;
00504             strcpy(msg->unix_addr, unix_cm_->unix_addr().c_str());
00505           }
00506           break;
00507         case INET_SOCK:
00508           {
00509             TcpSockConnector *tcp_cm_ = (TcpSockConnector *)cm_;
00510             strcpy(msg->host_addr, tcp_cm_->host_addr().c_str());
00511             msg->port = tcp_cm_->port();
00512           }
00513           break;
00514         }
00515         Msg *m = new Msg(Channel::CHANNEL_CONN_MSG, msg);
00516         return put_msg(m);
00517       }
00518 
00521     Status send2remote_init_subscribe_msg(void)
00522       {
00524         PubSub_Info_Msg *sub = new PubSub_Info_Msg();
00525         std::vector<IdType> mtypes;
00527         this->ch_->subscribed_global_msgs(mtypes);
00528         sub->num_msg_types = mtypes.size();
00529         for(int i=0; i<sub->num_msg_types; i++) {
00530           if (filter_ != NULL && filter_->block_inward(mtypes[i]))
00531             continue;
00532           sub->msg_types[i] = mtypes[i];
00533         }
00534         ACE_DEBUG ((LM_DEBUG, "send init_subscription info...\n")); 
00535         Msg *m = new Msg(Channel::INIT_SUBSCRIPTION_INFO_MSG, sub);
00536         put_msg(m);
00537         return SUCCESS;
00538       }
00539 
00542     Status send2remote_init_publish_msg(void)
00543       {
00545         PubSub_Info_Msg *pub = new PubSub_Info_Msg();
00546         std::vector<IdType> mtypes;
00548         this->ch_->published_global_msgs(mtypes);
00549         pub->num_msg_types = mtypes.size();
00550         for(int i=0; i<pub->num_msg_types; i++) {
00551           if (filter_ != NULL && filter_->block_outward(mtypes[i]))
00552             continue;
00553           pub->msg_types[i] = mtypes[i];
00554         }
00555         ACE_DEBUG ((LM_DEBUG, "send init_publication info...\n")); 
00556         Msg *m = new Msg(Channel::INIT_PUBLICATION_INFO_MSG, pub);
00557         put_msg(m);
00558         return SUCCESS;
00559       }
00560 
00562     Status send2remote_pubsub_msg(Oper_Type op, IdType t)
00563       {
00564         PubSub_Info_Msg *sub = new PubSub_Info_Msg();
00565         sub->num_msg_types = 1;
00566         sub->msg_types[0] = t;
00567         IdType mt;
00568         switch(op) {
00569         case OPER_PUBLISH:
00570           if (filter_ != NULL && filter_->block_outward(t)) {
00571             delete sub;
00572             return SUCCESS;
00573           }
00574           mt = Channel::PUBLICATION_INFO_MSG;
00575           ACE_DEBUG((LM_DEBUG, "RemoteInterface::send2remote_pubsub_msg publish "));
00576           break;
00577         case OPER_UNPUBLISH:
00578           mt = Channel::UNPUBLICATION_INFO_MSG;
00579           ACE_DEBUG((LM_DEBUG, "RemoteInterface::send2remote_pubsub_msg unpublish "));
00580           break;
00581         case OPER_SUBSCRIBE:
00582           if (filter_ != NULL && filter_->block_inward(t)) {
00583             delete sub;
00584             return SUCCESS;
00585           }
00586           mt = Channel::SUBSCRIPTION_INFO_MSG;
00587           ACE_DEBUG((LM_DEBUG, "RemoteInterface::send2remote_pubsub_msg subscribe "));
00588           break;
00589         case OPER_UNSUBSCRIBE:
00590           mt = Channel::UNSUBSCRIPTION_INFO_MSG;
00591           ACE_DEBUG((LM_DEBUG, "RemoteInterface::send2remote_pubsub_msg unsubscribe "));
00592           break;
00593         default:
00594           ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors invalid IdType "));
00595           delete sub;
00596           return SUCCESS;
00597         }
00598         Msg *m = new Msg(mt, sub);
00599         put_msg(m);
00600         return SUCCESS;
00601       }
00602 
00604     Member_Type type() { return MEMBER_REMOTE;}
00607     Status put_msg(Msg *msg, ACE_Time_Value *timeout=NULL)
00608       {
00609         ACE_DEBUG ((LM_DEBUG, "RemoteInterface<Channel>::put_msg send 2 remote...\n"));
00610         if(conn_handler_!=NULL) {
00611           if(state_ != CONN_ACTIVE && 
00612              msg->type != Channel::CHANNEL_CONN_MSG && 
00613              msg->type != Channel::INIT_SUBSCRIPTION_INFO_MSG &&
00614              msg->type != Channel::INIT_PUBLICATION_INFO_MSG ) {
00615             ACE_GUARD_REACTION(ACE_Thread_Mutex, guard, lock_, ACE_DEBUG ((LM_DEBUG, "failed to lock out-sock\n")); delete msg; return FAILURE);
00616             add_pending_msg(msg);
00617             ACE_DEBUG ((LM_DEBUG, "... msgs[%s] buffered \n", ID2STR(msg->type).c_str()));
00618           } else {
00619             ACE_Message_Block *payload;
00620             Out_Bound_Msg<Channel, ConnHandler> *out_msg;
00621   
00622             //translate it if set
00623             if (translator_ != NULL &&
00624                 msg->type != Channel::CHANNEL_CONN_MSG &&
00625                 msg->type != Channel::CHANNEL_DISCONN_MSG &&
00626                 msg->type != Channel::INIT_SUBSCRIPTION_INFO_MSG &&
00627                 msg->type != Channel::INIT_PUBLICATION_INFO_MSG &&
00628                 msg->type != Channel::SUBSCRIPTION_INFO_MSG &&
00629                 msg->type != Channel::UNSUBSCRIPTION_INFO_MSG &&
00630                 msg->type != Channel::PUBLICATION_INFO_MSG &&
00631                 msg->type != Channel::UNPUBLICATION_INFO_MSG 
00632                 ) {
00633               translator_->translate_outward(msg->type);
00634             }
00635             out_msg = new Out_Bound_Msg<Channel, ConnHandler>(msg, conn_handler_);
00638             payload = new ACE_Message_Block((const char*)out_msg, sizeof(Out_Bound_Msg<Channel, ConnHandler>));
00639             payload->wr_ptr(sizeof(Out_Bound_Msg<Channel, ConnHandler>)); 
00641             ACE_DEBUG ((LM_DEBUG, "... msgs[%s] sent \n", ID2STR(msg->type).c_str()));
00642             conn_handler_->cm_->output_mgr_.put(payload,timeout);
00643           }
00644         } else {
00645           delete msg;
00646           return FAILURE;
00647         }
00648         return SUCCESS;
00649       }
00650 
00651   public:
00652     Interface_Role role_;  
00653     Interface_State state_;
00654     ACE_Thread_Mutex lock_;
00655   };
00656 
00657 
00658 };
00659 
00660 
00661 #endif

Generated on Mon Feb 27 19:59:21 2006 for channel by  doxygen 1.4.6-NO