L:/channel/channel/include/TrieRouter.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00012 #ifndef _TRIE_ROUTER_H_
00013 #define _TRIE_ROUTER_H_
00014 
00015 //ACE headers
00016 #include "ace/Log_Msg.h"
00017 //for ACE_GUARD macros, need the following 2 headers
00018 #include "ace/Synch.h"
00019 #include "ace/OS.h"
00020 
00021 //std
00022 #include <map>
00023 #include <list>
00024 #include <vector>
00025 #include <string>
00026 #include <algorithm>
00027 
00028 //Channel
00029 #include <BaseDef.h>
00030 #include <RouterBase.h>
00031 #include <Msg.h>
00032 
00033 namespace channel {
00034 
00035   template <class, class, class, class> class Channel;
00036 
00042   template <class IdType, class IdTrait, class SynchPolicy>
00043   class TrieRouter : public RouterBase {
00044   public:
00045     enum { NamespaceType = NAMESPACE_HIERARCHICAL };
00046 
00047     typedef Channel<IdType, IdTrait, SynchPolicy, TrieRouter<IdType, IdTrait, SynchPolicy> > Channel;
00048     typedef Destination<Channel, NamespaceType> Destination;
00049     typedef Source<Channel, NamespaceType> Source;
00050     //cannot use Channel::Msg here, since Channel is not defined yet
00051     //Channel is defined using Router
00052     typedef Msg<IdType, SynchPolicy> Msg;
00053     typedef PubSub_Info_Msg<IdType> PubSub_Info_Msg;
00054     typedef typename IdTrait::NameType NameType;
00055 
00056   private:
00057 
00058     struct Pub_Registry {
00059       std::list<Source *> *pubers_;
00060       std::list<Source *> *w_pubers_;
00061       typename SynchPolicy::RW_MUTEX pubers_lock_;
00062       Pub_Registry() {
00063         pubers_ = NULL;
00064         w_pubers_ = NULL;
00065       }
00066       ~Pub_Registry() {
00067         ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, pubers_lock_);
00068         if (pubers_ != NULL) {
00069           delete pubers_;
00070         }
00071         if (w_pubers_ != NULL) {
00072           delete w_pubers_;
00073         }
00074       }
00075     };
00076     struct Sub_Registry {
00077       std::map<Destination *, PubSub_Scope> *subers_;
00078       std::map<Destination *, PubSub_Scope> *w_subers_;
00079       typename SynchPolicy::RW_MUTEX subers_lock_;
00080       Sub_Registry() {
00081         subers_ = NULL;
00082         w_subers_ = NULL;
00083       }
00084       ~Sub_Registry() {
00085         ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, subers_lock_);
00086         if (subers_ != NULL) {
00087           delete subers_;
00088         }
00089         if (w_subers_ != NULL) {
00090           delete w_subers_;
00091         }
00092       }
00093     };
00094 
00095     template <class Registry>
00096     struct TrieNode {
00097       NameType name;
00098       TrieNode *parent;
00099       std::map<NameType, TrieNode *> children;
00100       typename SynchPolicy::RW_MUTEX children_lock_;
00101       Registry registry;
00102       TrieNode(NameType n, TrieNode *p) : name(n), parent(p) {
00103       }
00104       ~TrieNode() {
00105         ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, children_lock_);
00106         for(typename std::map<NameType, TrieNode *>::iterator iter = children.begin();
00107             iter != children.end(); iter++) {
00108           if (iter->second != NULL)
00109             delete iter->second;
00110         }
00111       }
00112     };
00113 
00114     //namespace - pub/sub info indexed by IdType
00115     TrieNode<Sub_Registry> *sub_trie_root_;
00116     TrieNode<Pub_Registry> *pub_trie_root_;
00117 
00118   public:
00119 
00120     TrieRouter() {
00121       sub_trie_root_ = new TrieNode<Sub_Registry>(IdTrait::RootName, NULL);
00122       pub_trie_root_ = new TrieNode<Pub_Registry>(IdTrait::RootName, NULL);
00123     }
00124 
00125     ~TrieRouter() {
00126       delete sub_trie_root_;
00127       delete pub_trie_root_;
00128     }
00129 
00130     void propagate_child_pub_registry (IdType t, TrieNode<Pub_Registry> *node, Oper_Type oper) {
00131       Pub_Registry *reg = &(node->registry);
00132       //if encounter wildcard, stop going deeper
00133       {
00134         ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, reg->pubers_lock_);
00135         if (reg->w_pubers_ != NULL && !reg->w_pubers_->empty()) {
00136           t.push_back(IdTrait::WildcardName);
00137           propagate_change_to_neighbors(oper, t);
00138           t.pop_back();
00139           return;
00140         }
00141         if (reg->pubers_ != NULL && !reg->pubers_->empty()) {
00142           propagate_change_to_neighbors(oper, t);
00143         }
00144       }
00145       //recursively update children
00146       ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_);
00147       for(typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.begin();
00148           iter != node->children.end(); iter++) {
00149         t.push_back(iter->first);
00150         propagate_child_pub_registry (t, iter->second, oper);
00151         t.pop_back();
00152       }
00153     };
00154 
00155 
00156     void propagate_child_sub_registry (IdType t, TrieNode<Sub_Registry> *node, Oper_Type oper) {
00157       bool send2remote = false;
00158       Sub_Registry *reg = &(node->registry);
00159       //if encounter wildcard, stop going deeper
00160       {
00161         ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_);
00162         if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00163           for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00164               iter != reg->w_subers_->end() && !send2remote; iter++)
00165             if (iter->first->type() == MEMBER_LOCAL && 
00166                 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00167               send2remote = true;
00168           if (send2remote) {
00169             t.push_back(IdTrait::WildcardName);
00170             propagate_change_to_neighbors(oper, t);
00171             t.pop_back();
00172             return;
00173           }
00174         }
00175         if (reg->subers_ != NULL && !reg->subers_->empty()) {
00176           for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00177               iter != reg->subers_->end() && !send2remote; iter++)
00178             if (iter->first->type() == MEMBER_LOCAL &&
00179                 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00180               send2remote = true;
00181           if(send2remote)
00182             propagate_change_to_neighbors(oper, t);
00183         }
00184       }
00185       //recursively update children
00186       ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_);
00187       for(typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.begin();
00188           iter != node->children.end(); iter++) {
00189         t.push_back(iter->first);
00190         propagate_child_sub_registry (t, iter->second, oper);
00191         t.pop_back();
00192       }
00193     };
00194 
00195 
00200 
00201     Status publish_msg(IdType t, PubSub_Scope c, Source *src)
00202       {
00203         bool send2remote = false;
00204         Pub_Registry *reg = NULL;
00205         TrieNode<Pub_Registry> *node = pub_trie_root_;
00206         bool wildcard = false;
00207         bool hidden = false;
00208         int sz = t.size();
00209 
00210         if (!(c == SCOPE_REMOTE || c == SCOPE_GLOBAL)) 
00211           return SUCCESS;
00212 
00216         for (int i=0; i<sz; i++) {
00217           if (t[i] == IdTrait::WildcardName) {
00218             if (i == (sz-1)) {
00219               wildcard = true;
00220               break;
00221             } else {
00222               ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: Wildcard in middle [%s]\n",ID2STR(t).c_str()));
00223               return FAILURE;
00224             }
00225           }
00226           //check if it is hidden
00227           if (!hidden) {
00228             ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00229             reg = &(node->registry);
00230             if (reg->w_pubers_ != NULL && !reg->w_pubers_->empty()) 
00231               hidden = true;
00232           }
00233           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00234           typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.find(t[i]);
00235           if (iter == node->children.end()) {
00236             TrieNode<Pub_Registry> *cnd = new TrieNode<Pub_Registry> (t[i], node);
00237             node->children[t[i]] = cnd;
00238             node = cnd;
00239           } 
00240           else
00241             node = iter->second;
00242         }
00243 
00244         //update pub registry
00245         if (!wildcard) {
00246           {
00247             ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00248             reg = &(node->registry);
00249             if (reg->pubers_ == NULL) {
00250               reg->pubers_ = new std::list<Source *>();
00251               send2remote = true;
00252             }
00253             reg->pubers_->push_back(src);
00254           }
00255           //broadcast t to remote
00256           if (send2remote && !hidden && src->type() == MEMBER_LOCAL)
00257             propagate_change_to_neighbors(OPER_PUBLISH, t);
00258         }
00263         else {
00264           {
00265             ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00266             reg = &(node->registry);
00267             if (reg->w_pubers_ == NULL) {
00268               reg->w_pubers_ = new std::list<Source *>();
00269               send2remote = true;
00270             }
00271             reg->w_pubers_->push_back(src);
00272           }
00273           if (send2remote && !hidden && src->type() == MEMBER_LOCAL) {
00274             //unpublish child registry
00275             IdType id = t;
00276             ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00277             for(typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.begin();
00278                 iter != node->children.end(); iter++) {
00279               id[sz-1] = iter->first;
00280               propagate_child_pub_registry (id, iter->second, OPER_UNPUBLISH);
00281             }
00282             //broadcast t to remote
00283             propagate_change_to_neighbors(OPER_PUBLISH, t);
00284           }
00285         }
00286         return SUCCESS;
00287       }
00288 
00290     Status unpublish_msg(IdType t, PubSub_Scope s, Source *src)
00291       {
00292         bool send2remote = false;
00293         Pub_Registry *reg = NULL;
00294         TrieNode<Pub_Registry> *node = pub_trie_root_;
00295         bool wildcard = false;
00296         bool hidden = false;
00297         int sz = t.size();
00298 
00299         if (!(s == SCOPE_REMOTE || s == SCOPE_GLOBAL)) 
00300           return SUCCESS;
00301 
00305         for (int i=0; i<sz; i++) {
00306           if (t[i] == IdTrait::WildcardName) {
00307             if (i == (sz-1)) {
00308               wildcard = true;
00309               break;
00310             } else {
00311               ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: Wildcard in middle [%s]\n",ID2STR(t).c_str()));
00312               return FAILURE;
00313             }
00314           }
00315           //check if it is hidden
00316           if (!hidden) {
00317             ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00318             reg = &(node->registry);
00319             if (reg->w_pubers_ != NULL && !reg->w_pubers_->empty()) 
00320               hidden = true;
00321           }
00322           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00323           typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.find(t[i]);
00324           if (iter == node->children.end()) {
00325             ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: not found [%s]\n",ID2STR(t).c_str()));
00326             return FAILURE;
00327           }
00328           node = iter->second;
00329         }
00330         if (!wildcard) {
00331           {
00332             ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00333             reg = &(node->registry);
00334             if (reg->pubers_ == NULL) {
00335               ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: not found [%s]\n",ID2STR(t).c_str()));
00336               return FAILURE;
00337             }
00338             typename std::list<Source *>::iterator iter = std::find(reg->pubers_->begin(),
00339                  reg->pubers_->end(), src);
00340             if (iter == reg->pubers_->end()) {
00341               ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: not found [%s]\n",ID2STR(t).c_str()));
00342               return FAILURE;
00343             }
00344             reg->pubers_->erase(iter);
00345             if (reg->pubers_->empty()) {
00346               send2remote = true;
00347             }
00348           }
00349           //broadcast t to remote
00350           if (send2remote && !hidden && src->type() == MEMBER_LOCAL)
00351             propagate_change_to_neighbors(OPER_UNPUBLISH, t);
00352         }
00356         else {
00357           {
00358             ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00359             reg = &(node->registry);
00360             if (reg->w_pubers_ == NULL) {
00361               ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: not found [%s]\n",ID2STR(t).c_str()));
00362               return FAILURE;
00363             }
00364             typename std::list<Source *>::iterator iter = std::find(reg->w_pubers_->begin(),
00365                  reg->w_pubers_->end(), src);
00366             if (iter == reg->w_pubers_->end()) {
00367               ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: not found [%s]\n",ID2STR(t).c_str()));
00368               return FAILURE;
00369             }
00370             reg->w_pubers_->erase(iter);
00371             if (reg->w_pubers_->empty()) {
00372               send2remote = true;
00373             }
00374           }
00375           //
00376           if (send2remote && !hidden && src->type() == MEMBER_LOCAL) {
00377             //broadcast t to remote
00378             propagate_change_to_neighbors(OPER_UNPUBLISH, t);
00379             //publish child registry
00380             IdType id = t;
00381             ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00382             for(typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.begin();
00383                 iter != node->children.end(); iter++) {
00384               id[sz-1] = iter->first;
00385               propagate_child_pub_registry (id, iter->second, OPER_PUBLISH);
00386             }
00387           }
00388         }
00389         return SUCCESS;
00390       }
00391 
00393     Status subscribe_msg(IdType t, PubSub_Scope c, Destination *dest)
00394       {
00395         bool send2remote = false;
00396         Sub_Registry *reg = NULL;
00397         TrieNode<Sub_Registry> *node = sub_trie_root_;
00398         bool wildcard = false;
00399         bool hidden = false;
00400         int sz = t.size();
00401 
00405         for (int i=0; i<sz; i++) {
00406           if (t[i] == IdTrait::WildcardName) {
00407             if (i == (sz-1)) {
00408               wildcard = true;
00409               break;
00410             } else {
00411               ACE_DEBUG((LM_DEBUG, "TrieRouter::subscribe_msg: Wildcard in middle [%s]\n",ID2STR(t).c_str()));
00412               return FAILURE;
00413             }
00414           }
00415           if (!hidden) {
00416             ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00417             reg = &(node->registry);
00418             if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00419               for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00420                   iter != reg->w_subers_->end() && !hidden; iter++) 
00421                 if (iter->first->type() == MEMBER_LOCAL && 
00422                     (iter->second == SCOPE_GLOBAL || iter->second == SCOPE_REMOTE))
00423                   hidden = true;
00424             }
00425           }
00426           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00427           typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.find(t[i]);
00428           if (iter == node->children.end()) {
00429             TrieNode<Sub_Registry> *cnd = new TrieNode<Sub_Registry> (t[i], node);
00430             node->children[t[i]] = cnd;
00431             node = cnd;
00432           } 
00433           else
00434             node = iter->second;
00435         }
00436         //update subscriptions
00437         //if !wildcard, add src & st -> node.registry->subers, return
00438         if (!wildcard) {
00439           {
00440             ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00441             reg = &(node->registry);
00442             if (reg->subers_ == NULL) {
00443               reg->subers_ = new std::map<Destination *, PubSub_Scope>();
00444               send2remote = true;
00445             }
00446             if (!send2remote && !hidden) {
00447               send2remote = true;
00448               for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00449                   iter != reg->subers_->end(); iter++)
00450                 if (iter->first->type() == MEMBER_LOCAL &&
00451                     (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00452                   send2remote = false;
00453             }
00454             (*reg->subers_)[dest] = c;
00455           }
00456           //broadcast t to remote
00457           if (send2remote && !hidden && dest->type() == MEMBER_LOCAL && 
00458               (c == SCOPE_REMOTE || c == SCOPE_GLOBAL)) {
00459             propagate_change_to_neighbors(OPER_SUBSCRIBE, t);
00460           }
00461         }
00466         else {
00467           {
00468             ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00469             reg = &(node->registry);
00470             if (reg->w_subers_ == NULL) {
00471               reg->w_subers_ = new std::map<Destination *, PubSub_Scope>();
00472               send2remote = true;
00473             }
00474             if (!send2remote && !hidden) {
00475               send2remote = true;
00476               for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00477                   iter != reg->w_subers_->end(); iter++)
00478                 if (iter->first->type() == MEMBER_LOCAL && 
00479                     (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00480                   send2remote = false;
00481             }
00482             (*reg->w_subers_)[dest] = c;
00483           }
00484           //broadcast t to remote
00485           if (send2remote && !hidden && dest->type() == MEMBER_LOCAL && 
00486               (c == SCOPE_REMOTE || c == SCOPE_GLOBAL)) {
00487             //unsubscribe child registry
00488             IdType id = t;
00489             ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00490             for(typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.begin();
00491                 iter != node->children.end(); iter++) {
00492               id[sz-1] = iter->first;
00493               propagate_child_sub_registry (id, iter->second, OPER_UNSUBSCRIBE);
00494             }
00495             propagate_change_to_neighbors(OPER_SUBSCRIBE, t);
00496           }
00497         }
00498         return SUCCESS;
00499       }
00500 
00502     Status unsubscribe_msg(IdType t, Destination *dest)
00503       {
00504         bool send2remote = false;
00505         Sub_Registry *reg = NULL;
00506         TrieNode<Sub_Registry> *node = sub_trie_root_;
00507         bool wildcard = false;
00508         bool hidden = false;
00509         int sz = t.size();
00510 
00514         for (int i=0; i<sz; i++) {
00515           if (t[i] == IdTrait::WildcardName) {
00516             if (i == (sz-1)) {
00517               wildcard = true;
00518               break;
00519             } else {
00520               ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: Wildcard in middle [%s]\n",ID2STR(t).c_str()));
00521               return FAILURE;
00522             }
00523           }
00524           if (!hidden) {
00525             ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00526             reg = &(node->registry);
00527             if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00528               for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00529                   iter != reg->w_subers_->end() && !hidden; iter++) 
00530                 if (iter->first->type() == MEMBER_LOCAL && 
00531                     (iter->second == SCOPE_GLOBAL || iter->second == SCOPE_REMOTE))
00532                   hidden = true;
00533             }
00534           }
00535           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00536           typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.find(t[i]);
00537           if (iter == node->children.end()) {
00538             ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: not found [%s]\n",ID2STR(t).c_str()));
00539             return FAILURE;
00540           }
00541           node = iter->second;
00542         }
00543         //update subscriptions
00544         if (!wildcard) {
00545           {
00546             ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00547             reg = &(node->registry);
00548             if (reg->subers_ == NULL) {
00549               ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: not found [%s]\n",ID2STR(t).c_str()));
00550               return FAILURE;
00551             }
00552             typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->find(dest);
00553             if (iter == reg->subers_->end()) {
00554               ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: not found [%s]\n",ID2STR(t).c_str()));
00555               return FAILURE;
00556             }
00557             if (iter->first->type() == MEMBER_LOCAL && 
00558                 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00559               send2remote = true;
00560             reg->subers_->erase(dest);
00561             if (send2remote && !hidden && !reg->subers_->empty()) {
00562               for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00563                   iter != reg->subers_->end(); iter++)
00564                 if (iter->first->type() == MEMBER_LOCAL && 
00565                     (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00566                   send2remote = false;
00567             }
00568           }
00569           //broadcast t to remote
00570           if (send2remote && !hidden)
00571             propagate_change_to_neighbors(OPER_UNSUBSCRIBE, t);
00572         }
00577         else {
00578           {
00579             ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00580             reg = &(node->registry);
00581             if (reg->w_subers_ == NULL) {
00582               ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: not found [%s]\n",ID2STR(t).c_str()));
00583               return FAILURE;
00584             }
00585             typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->find(dest);
00586             if (iter == reg->w_subers_->end()) {
00587               ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: not found [%s]\n",ID2STR(t).c_str()));
00588               return FAILURE;
00589             }
00590             if (iter->first->type() == MEMBER_LOCAL && 
00591                 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00592               send2remote = true;
00593             reg->w_subers_->erase(dest);
00594             if (send2remote && !hidden && !reg->w_subers_->empty()) {
00595               for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00596                   iter != reg->w_subers_->end(); iter++)
00597                 if (iter->first->type() == MEMBER_LOCAL && 
00598                     (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00599                   send2remote = false;
00600             }
00601           }
00602           //broadcast t to remote
00603           if (send2remote && !hidden) {
00604             propagate_change_to_neighbors(OPER_UNSUBSCRIBE, t);
00605             //subscribe child registry
00606             IdType id = t;
00607             ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00608             for(typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.begin();
00609                 iter != node->children.end(); iter++) {
00610               id[sz-1] = iter->first;
00611               propagate_child_sub_registry (id, iter->second, OPER_SUBSCRIBE);
00612             }
00613           }
00614         }
00615         return SUCCESS;
00616       }
00617 
00624     Status route_msg(Msg *msg, Member_Type src_type, PubSub_Scope pub_scope, ACE_Time_Value *timeout=0)
00625       {
00626         Sub_Registry *reg = NULL;
00627         TrieNode<Sub_Registry> *node = sub_trie_root_;
00628         Msg *dup=NULL;
00629         int sz = msg->type.size();
00630 
00634         for (int i=0; i<sz; i++) {
00635           {
00636             //check if any wildcard subscription
00637             ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00638             reg = &(node->registry);
00639             if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00640               for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00641                   iter != reg->w_subers_->end(); iter++) {
00642                 //dispatch to wildcard subscriber
00643                 short src_row = src_type * SCOPE_NUMBER + pub_scope;
00644                 short dst_col = iter->first->type() * SCOPE_NUMBER + iter->second;
00645                 if(scope_checking_tbl_[src_row][dst_col]) {
00647                   dup=msg->clone();
00649                   ACE_DEBUG ((LM_DEBUG, "msg [%s] delivered once\n", ID2STR(msg->type).c_str()));
00650                   iter->first->put_msg(dup, timeout);
00651                 }
00652               }
00653             }
00654           }
00655           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00656           typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.find(msg->type[i]);
00657           if (iter == node->children.end()) {
00658             //ACE_DEBUG((LM_DEBUG, "TrieRouter::route_msg: not found [%s]\n",ID2STR(msg->type).c_str()));
00659             return FAILURE;
00660           }
00661           node = iter->second;
00662         }
00663 
00664         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00665 
00666         //dispatch wildcard subscribers
00667         reg = &(node->registry);
00668         if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00669           for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00670               iter != reg->w_subers_->end(); iter++) {
00671             short src_row = src_type * SCOPE_NUMBER + pub_scope;
00672             short dst_col = iter->first->type() * SCOPE_NUMBER + iter->second;
00673             if(scope_checking_tbl_[src_row][dst_col]) {
00675               dup=msg->clone();
00677               ACE_DEBUG ((LM_DEBUG, "msg [%s] delivered once\n", ID2STR(msg->type).c_str()));
00678               iter->first->put_msg(dup, timeout);
00679             }
00680           }
00681         }
00682 
00683         //dispatch normal subscribers
00684         reg = &(node->registry);
00685         if (reg->subers_ != NULL && !reg->subers_->empty()) {
00686           for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin(); 
00687               iter != reg->subers_->end(); iter++) {
00688             short src_row = src_type * SCOPE_NUMBER + pub_scope;
00689             short dst_col = iter->first->type() * SCOPE_NUMBER + iter->second;
00690             if(scope_checking_tbl_[src_row][dst_col]) {
00692               dup=msg->clone();
00694               ACE_DEBUG ((LM_DEBUG, "msg [%s] delivered once\n", ID2STR(msg->type).c_str()));
00695               iter->first->put_msg(dup, timeout);
00696             }
00697           }
00698         }
00699 
00700         delete msg;
00701 
00702         return SUCCESS;
00703       }
00704 
00709 
00710     Status subscribed_global_msgs(std::vector<IdType> &global_msgs)
00711       {
00712         IdType id;
00713         return subscribed_global_msgs_recursive(global_msgs, sub_trie_root_, id);
00714       }
00715     Status subscribed_global_msgs_recursive(std::vector<IdType> &global_msgs, TrieNode<Sub_Registry> *node, IdType &id)
00716       {
00717         Sub_Registry *reg = &(node->registry);
00718 
00719         {
00720           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00721 
00722           //check wildcard subscription
00723           if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00724             for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00725                 iter != reg->w_subers_->end(); iter++) 
00726               if (iter->first->type() == MEMBER_LOCAL && 
00727                   (iter->second == SCOPE_GLOBAL || iter->second == SCOPE_REMOTE)) {
00728                 id.push_back(IdTrait::WildcardName);
00729                 global_msgs.push_back(id);
00730                 id.pop_back();
00731                 return SUCCESS; //wildcard stop going deeper
00732               }
00733           }
00734 
00735           //check normal subscription
00736           if (reg->subers_ != NULL && !reg->subers_->empty()) {
00737             for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00738                 iter != reg->subers_->end() ; iter++) 
00739               if (iter->first->type() == MEMBER_LOCAL && 
00740                   (iter->second == SCOPE_GLOBAL || iter->second == SCOPE_REMOTE)) {
00741                 global_msgs.push_back(id);
00742                 break;
00743               }
00744           }
00745         }
00746 
00747         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00748         for(typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.begin();
00749             iter != node->children.end(); iter++)
00750           if (iter->second != NULL) {
00751             id.push_back(iter->first);
00752             subscribed_global_msgs_recursive(global_msgs, iter->second, id);
00753             id.pop_back();
00754           }
00755 
00756         return SUCCESS;
00757       }
00758 
00759     Status published_global_msgs(std::vector<IdType> &global_msgs)
00760       {
00761         IdType id;
00762         return published_global_msgs_recursive(global_msgs, pub_trie_root_, id);
00763       }
00764     Status published_global_msgs_recursive(std::vector<IdType> &global_msgs, TrieNode<Pub_Registry> *node, IdType &id)
00765       {
00766         Pub_Registry *reg = &(node->registry);
00767 
00768         {
00769           ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->pubers_lock_, FAILURE);
00770           //check wildcard 
00771           if (reg->w_pubers_ != NULL && !reg->w_pubers_->empty()) {
00772             id.push_back(IdTrait::WildcardName);
00773             global_msgs.push_back(id);
00774             id.pop_back();
00775             return SUCCESS; //wildcard stop going deeper
00776           }
00777           //check normal subscription
00778           if (reg->pubers_ != NULL && !reg->pubers_->empty()) {
00779             global_msgs.push_back(id);
00780           }
00781         }
00782 
00783         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00784         for(typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.begin();
00785             iter != node->children.end(); iter++)
00786           if (iter->second != NULL) {
00787             id.push_back(iter->first);
00788             published_global_msgs_recursive(global_msgs, iter->second, id);
00789             id.pop_back();
00790           }
00791 
00792         return SUCCESS;
00793       }
00794 
00795     void dump_pub_ids_recursive (TrieNode<Pub_Registry> *node, IdType &id) {
00796       Pub_Registry *reg = &(node->registry);
00797 
00798       {
00799         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->pubers_lock_, FAILURE);
00800         //check wildcard 
00801         if (reg->w_pubers_ != NULL && !reg->w_pubers_->empty()) {
00802           id.push_back(IdTrait::WildcardName);
00803           ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(id).c_str()));
00804           id.pop_back();
00805         }
00806         //check normal subscription
00807         if (reg->pubers_ != NULL && !reg->pubers_->empty()) {
00808           ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(id).c_str()));
00809         }
00810       }
00811 
00812       ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00813       for(typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.begin();
00814           iter != node->children.end(); iter++)
00815         if (iter->second != NULL) {
00816           id.push_back(iter->first);
00817           dump_pub_ids_recursive(iter->second, id);
00818           id.pop_back();
00819         }
00820     }
00821 
00822     void dump_sub_ids_recursive (TrieNode<Sub_Registry> *node, IdType &id) {
00823       Sub_Registry *reg = &(node->registry);
00824 
00825       {
00826         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00827         //check wildcard 
00828         if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00829           id.push_back(IdTrait::WildcardName);
00830           ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(id).c_str()));
00831           id.pop_back();
00832         }
00833         //check normal subscription
00834         if (reg->subers_ != NULL && !reg->subers_->empty()) {
00835           ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(id).c_str()));
00836         }
00837       }
00838 
00839       ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00840       for(typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.begin();
00841           iter != node->children.end(); iter++)
00842         if (iter->second != NULL) {
00843           id.push_back(iter->first);
00844           dump_sub_ids_recursive(iter->second, id);
00845           id.pop_back();
00846         }
00847     }
00848 
00849     void dump_routing_tables(void)
00850       {
00851         ACE_DEBUG((LM_DEBUG, "The following IdTypes subscribed: \n"));
00852         {
00853           IdType id;
00854           dump_sub_ids_recursive(sub_trie_root_,id);
00855         }
00856 
00857         ACE_DEBUG((LM_DEBUG, "The following IdTypes published: \n"));
00858         {
00859           IdType id;
00860           dump_pub_ids_recursive(pub_trie_root_,id);
00861         }
00862       }
00863 
00864   protected:
00868     Status propagate_change_to_neighbors(Oper_Type op, IdType t)
00869       {
00870         ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors [%s]\n",ID2STR(t).c_str()));
00871         PubSub_Info_Msg *sub = new PubSub_Info_Msg();
00872         sub->num_msg_types = 1;
00873         sub->msg_types[0] = t;
00874         IdType mt;
00875         switch(op) {
00876         case OPER_PUBLISH:
00877           mt = Channel::PUBLICATION_INFO_MSG;
00878           ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors publish "));
00879           break;
00880         case OPER_UNPUBLISH:
00881           mt = Channel::UNPUBLICATION_INFO_MSG;
00882           ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors unpublish "));
00883           break;
00884         case OPER_SUBSCRIBE:
00885           mt = Channel::SUBSCRIPTION_INFO_MSG;
00886           ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors subscribe "));
00887           break;
00888         case OPER_UNSUBSCRIBE:
00889           mt = Channel::UNSUBSCRIPTION_INFO_MSG;
00890           ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors unsubscribe "));
00891           break;
00892         default:
00893           ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors invalid IdType "));
00894           delete sub;
00895           return SUCCESS;
00896         }
00897         Msg *m = new Msg(mt, sub);
00898         route_msg(m, MEMBER_LOCAL, SCOPE_REMOTE);
00899         return SUCCESS;
00900       }
00901   };
00902 
00903 };
00904 
00905 
00906 
00907 #endif
00908 

Generated on Mon Feb 27 19:59:21 2006 for channel by  doxygen 1.4.6-NO