L:/channel/channel/include/MapRouter.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00012 #ifndef _MAP_ROUTER_H_
00013 #define _MAP_ROUTER_H_
00014 
00015 //ACE headers
00016 #include "ace/Log_Msg.h"
00017 //for ACE_GUARD macros, need the following 2 headers
00018 #include "ace/Synch.h"
00019 #include "ace/OS.h"
00020 
00021 //std
00022 #include <map>
00023 #include <list>
00024 #include <vector>
00025 #include <string>
00026 #include <algorithm>
00027 
00028 //Channel
00029 #include <BaseDef.h>
00030 #include <RouterBase.h>
00031 #include <Msg.h>
00032 
00033 namespace channel {
00034 
00035   template <class, class, class, class> class Channel;
00036 
00042   template <class IdType, class IdTrait, class SynchPolicy>
00043   class MapRouter : public RouterBase {
00044   public:
00045     enum { NamespaceType = NAMESPACE_LINEAR };
00046 
00047     typedef Channel<IdType, IdTrait, SynchPolicy, MapRouter<IdType, IdTrait, SynchPolicy> > Channel;
00048     typedef Destination<Channel, NamespaceType> Destination;
00049     typedef Source<Channel, NamespaceType> Source;
00050     //cannot use Channel::Msg here, since Channel is not defined yet
00051     //Channel is defined using Router
00052     typedef Msg<IdType, SynchPolicy> Msg;
00053     typedef PubSub_Info_Msg<IdType> PubSub_Info_Msg;
00054 
00055   private:
00056 
00057     struct PubSub_Registry {
00058       std::map<Destination *, PubSub_Scope> *subers_;
00059       typename SynchPolicy::RW_MUTEX subers_lock_;
00060       PubSub_Registry() {
00061         subers_ = NULL;
00062       }
00063       ~PubSub_Registry() {
00064         if (subers_ != NULL) {
00065           ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, subers_lock_);
00066           delete subers_;
00067         }
00068       }
00069     };
00070 
00071     //subscription(routing) table
00072     std::map<IdType, PubSub_Registry*> msg_sub_tbl_;
00073     typename SynchPolicy::RW_MUTEX msg_sub_tbl_lock_;
00074 
00075     //publication(global) table
00076     std::map<IdType, std::list<Source *>*> msg_pub_tbl_;
00077     typename SynchPolicy::RW_MUTEX msg_pub_tbl_lock_;
00078 
00079   public:
00080 
00081     MapRouter() {
00082     }
00083 
00084     ~MapRouter() {
00085       {
00086         ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_);
00087         for(typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.begin();
00088             iter != msg_sub_tbl_.end(); iter++) {
00089           if (iter->second != NULL)
00090             delete iter->second;
00091         }
00092       }
00093       {
00094         ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, msg_pub_tbl_lock_);
00095         for(typename std::map<IdType, std::list<Source *>*>::iterator iter2 = msg_pub_tbl_.begin();
00096             iter2 != msg_pub_tbl_.end(); iter2++) {
00097           if (iter2->second != NULL)
00098             delete iter2->second;
00099         }
00100       }
00101     }
00102 
00107 
00108     Status publish_msg(IdType t, PubSub_Scope s, Source *src)
00109       {
00110         bool send2remote = false;
00111         if (s == SCOPE_REMOTE || s == SCOPE_GLOBAL) {
00112           ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_pub_tbl_lock_, FAILURE);
00113           std::list<Source *> *srcs = NULL;
00114           typename std::map<IdType, std::list<Source *>*>::iterator iter = msg_pub_tbl_.find(t);
00115           if(iter != msg_pub_tbl_.end())
00116             srcs = iter->second;
00117           if(srcs == NULL) {
00118             srcs = new std::list<Source *>();
00119             msg_pub_tbl_[t] = srcs;
00120             send2remote = true;
00121           }
00122           if(std::find(srcs->begin(),srcs->end(),src)==srcs->end()) {
00123             srcs->push_back(src);
00124           }
00125         }
00126         if (send2remote)
00127           propagate_change_to_neighbors(OPER_PUBLISH, t);
00128         return SUCCESS;
00129       }
00130 
00132     Status unpublish_msg(IdType t, PubSub_Scope s, Source *src)
00133       {
00134         bool send2remote = false;
00135         if (s == SCOPE_REMOTE || s == SCOPE_GLOBAL) {
00136           ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_pub_tbl_lock_, FAILURE);
00137           std::list<Source *> *srcs = NULL;
00138           typename std::map<IdType, std::list<Source *>*>::iterator iter = msg_pub_tbl_.find(t);
00139           if(iter != msg_pub_tbl_.end())
00140             srcs = iter->second;
00141           if(srcs == NULL) return FAILURE;
00142           typename std::list<Source *>::iterator iter2 = std::find(srcs->begin(),srcs->end(),src);
00143           if (iter2 == srcs->end()) return FAILURE;
00144           srcs->erase(iter2);
00145           if(srcs->empty()) {
00146             msg_pub_tbl_.erase(t);
00147             send2remote = true;
00148           }
00149         }
00150         if (send2remote)
00151           propagate_change_to_neighbors(OPER_UNPUBLISH, t);
00152         return SUCCESS;
00153       }
00154 
00156     Status subscribe_msg(IdType t, PubSub_Scope c, Destination *dest) 
00157       {
00158         ACE_DEBUG((LM_DEBUG, "MapRouter::subscribe_msg: [%s]\n",ID2STR(t).c_str()));
00159         bool send2remote = false;
00160         PubSub_Registry *reg = NULL;
00161         {
00162           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_, FAILURE);
00163           typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.find(t);
00164           if (iter != msg_sub_tbl_.end())
00165             reg = iter->second;
00166         }
00167         if (reg == NULL) {
00168           ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_, FAILURE);
00169           reg = new PubSub_Registry();
00170           msg_sub_tbl_[t] = reg;
00171           send2remote = true;
00172         }
00173         {
00174           ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00175           if (reg->subers_ == NULL) {
00176             reg->subers_ = new std::map<Destination *, PubSub_Scope>();
00177             send2remote = true;
00178           }
00179           if (!send2remote) {
00180             send2remote = true;
00181             for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00182                 iter != reg->subers_->end(); iter++)
00183               if (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL)
00184                 send2remote = false;
00185           }
00186           (*reg->subers_)[dest] = c;
00187         }
00188         if (send2remote && dest->type()==MEMBER_LOCAL && (c == SCOPE_REMOTE || c == SCOPE_GLOBAL))
00189           propagate_change_to_neighbors(OPER_SUBSCRIBE, t);
00190         return SUCCESS;
00191       }
00192 
00194     Status unsubscribe_msg(IdType t, Destination *dest)
00195       {
00196         ACE_DEBUG((LM_DEBUG, "MapRouter::unsubscribe_msg[%s]...\n", ID2STR(t).c_str()));
00197         bool send2remote = false;
00198         PubSub_Registry *reg = NULL;
00199         {
00200           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_, FAILURE);
00201           typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.find(t);
00202           if (iter != msg_sub_tbl_.end())
00203             reg = iter->second;
00204         }
00205         if (reg == NULL) 
00206           return SUCCESS;
00207         {
00208           ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00209           if (reg->subers_ == NULL || reg->subers_->empty())
00210             return SUCCESS;
00211           typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->find(dest);
00212           if (iter == reg->subers_->end())
00213             return SUCCESS;
00214           if (dest->type() == MEMBER_LOCAL && 
00215               (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00216             send2remote = true;
00217           reg->subers_->erase(dest);
00218           if (send2remote && !reg->subers_->empty()) {
00219             for(iter = reg->subers_->begin(); iter != reg->subers_->end(); iter++) {
00220               if (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL)
00221                 send2remote = false;
00222             }
00223           }
00224         }
00225         if (send2remote)
00226           propagate_change_to_neighbors(OPER_UNSUBSCRIBE, t);
00227         return SUCCESS;
00228       }
00229     
00236     Status route_msg(Msg *msg, Member_Type src_type, PubSub_Scope pub_scope, ACE_Time_Value *timeout=0)
00237       {
00238         ACE_DEBUG ((LM_DEBUG, "MapRouter::route_msg: src_type[%d],scope[%d],msg[%s]\n", src_type, pub_scope, ID2STR(msg->type).c_str()));
00239         PubSub_Registry *reg = NULL;
00240         {
00241           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_, FAILURE);
00242           typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.find(msg->type);
00243           if (iter != msg_sub_tbl_.end())
00244             reg = iter->second;
00245         }
00246         if (reg == NULL) {
00247           ACE_DEBUG ((LM_DEBUG, "msg [%s] dropped, no puber/suber\n", ID2STR(msg->type).c_str()));
00248           delete msg;
00249           return FAILURE;
00250         }
00251         //distribute to subscribers
00252         {
00253           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00254           if (reg->subers_ == NULL || reg->subers_->empty()) {
00255             ACE_DEBUG ((LM_DEBUG, "msg [%s] dropped, no subscriber\n", ID2STR(msg->type).c_str()));
00256             delete msg;
00257             return FAILURE;
00258           }
00259           if(reg->subers_->size() == 1) {
00260             typename std::map<Destination *, PubSub_Scope>::iterator iter=reg->subers_->begin();
00261             short src_row = src_type * SCOPE_NUMBER + pub_scope;
00262             short dst_col = iter->first->type() * SCOPE_NUMBER + iter->second;
00263             if(scope_checking_tbl_[src_row][dst_col]) {
00264               ACE_DEBUG ((LM_DEBUG, "msg [%s] delivered\n", ID2STR(msg->type).c_str()));
00265               iter->first->put_msg(msg, timeout);
00266             } else {
00267               ACE_DEBUG ((LM_DEBUG, "msg [%s] dropped, scope dont match\n", ID2STR(msg->type).c_str()));
00268               delete msg;
00269             }
00270           } else {
00271             Msg *dup=NULL;
00272             for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin(); 
00273                 iter != reg->subers_->end(); iter++) {
00274               short src_row = src_type * SCOPE_NUMBER + pub_scope;
00275               short dst_col = iter->first->type() * SCOPE_NUMBER + iter->second;
00276               if(scope_checking_tbl_[src_row][dst_col]) {
00278                 dup=msg->clone();
00280                 ACE_DEBUG ((LM_DEBUG, "msg [%s] delivered once\n", ID2STR(msg->type).c_str()));
00281                 iter->first->put_msg(dup, timeout);
00282               }
00283             }
00284             delete msg;
00285           }
00286         }
00287 
00288         return SUCCESS;
00289       }
00290 
00294 
00295     Status subscribed_global_msgs(std::vector<IdType> &global_msgs)
00296       {
00297         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_, FAILURE);
00298         for(typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.begin();
00299             iter != msg_sub_tbl_.end(); iter++) {
00300           PubSub_Registry *reg = iter->second;
00301           if (reg == NULL) 
00302             continue;
00303 
00304           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00305           if (reg->subers_ == NULL || reg->subers_->empty()) 
00306             continue;
00307 
00308           bool found = false;
00309           for(typename std::map<Destination *, PubSub_Scope>::iterator iter2 = reg->subers_->begin(); 
00310               iter2 != reg->subers_->end() && !found; iter2++) {
00311             if (iter2->first->type() == MEMBER_LOCAL && 
00312                 (iter2->second == SCOPE_REMOTE || iter2->second == SCOPE_GLOBAL)) {
00313               found = true;
00314             }
00315           }
00316           if (found) 
00317             global_msgs.push_back(iter->first);
00318         }
00319 
00320         return SUCCESS;
00321       }
00322 
00323     Status published_global_msgs(std::vector<IdType> &global_msgs)
00324       {
00325         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_pub_tbl_lock_, FAILURE);
00326         for(typename std::map<IdType, std::list<Source *>* >::iterator iter = msg_pub_tbl_.begin();
00327             iter != msg_pub_tbl_.end(); iter++) {
00328           if (iter->second != NULL && !iter->second->empty())
00329             global_msgs.push_back(iter->first);
00330         }
00331 
00332         return SUCCESS;
00333       }
00334 
00335     void dump_routing_tables(void)
00336       {
00337         ACE_DEBUG((LM_DEBUG, "The following %d IdTypes subscribed: \n", msg_sub_tbl_.size()));
00338         {
00339           ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_);
00340           for(typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.begin();
00341               iter != msg_sub_tbl_.end(); iter++) {
00342             PubSub_Registry *reg = iter->second;
00343             if (reg == NULL) 
00344               continue;
00345 
00346             ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_);
00347             if (reg->subers_ == NULL || reg->subers_->empty()) 
00348               continue;
00349 
00350             ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(iter->first).c_str()));
00351           }
00352         }
00353 
00354         ACE_DEBUG((LM_DEBUG, "The following %d IdTypes published: \n", msg_pub_tbl_.size()));
00355         {
00356           ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, msg_pub_tbl_lock_);
00357           typename std::map<IdType, std::list<Source *>* >::iterator iter;
00358           for(iter = msg_pub_tbl_.begin(); iter!=msg_pub_tbl_.end(); iter++) {
00359             if(iter->second!=NULL && !iter->second->empty())
00360               ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(iter->first).c_str()));
00361           }
00362         }
00363       }
00364 
00365   protected:
00369     Status propagate_change_to_neighbors(Oper_Type op, IdType t)
00370       {
00371         ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors [%s]\n",ID2STR(t).c_str()));
00372         PubSub_Info_Msg *sub = new PubSub_Info_Msg();
00373         sub->num_msg_types = 1;
00374         sub->msg_types[0] = t;
00375         IdType mt;
00376         switch(op) {
00377         case OPER_PUBLISH:
00378           mt = Channel::PUBLICATION_INFO_MSG;
00379           ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors publish "));
00380           break;
00381         case OPER_UNPUBLISH:
00382           mt = Channel::UNPUBLICATION_INFO_MSG;
00383           ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors unpublish "));
00384           break;
00385         case OPER_SUBSCRIBE:
00386           mt = Channel::SUBSCRIPTION_INFO_MSG;
00387           ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors subscribe "));
00388           break;
00389         case OPER_UNSUBSCRIBE:
00390           mt = Channel::UNSUBSCRIPTION_INFO_MSG;
00391           ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors unsubscribe "));
00392           break;
00393         default:
00394           ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors invalid IdType "));
00395           delete sub;
00396           return SUCCESS;
00397         }
00398         Msg *m = new Msg(mt, sub);
00399         route_msg(m, MEMBER_LOCAL, SCOPE_REMOTE);
00400         return SUCCESS;
00401       }
00402   };
00403 
00404 };
00405 
00406 
00407 #endif

Generated on Mon Feb 27 19:59:21 2006 for channel by  doxygen 1.4.6-NO