00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00012 #ifndef _TRIE_ROUTER_H_
00013 #define _TRIE_ROUTER_H_
00014
00015
00016 #include "ace/Log_Msg.h"
00017
00018 #include "ace/Synch.h"
00019 #include "ace/OS.h"
00020
00021
00022 #include <map>
00023 #include <list>
00024 #include <vector>
00025 #include <string>
00026 #include <algorithm>
00027
00028
00029 #include <BaseDef.h>
00030 #include <RouterBase.h>
00031 #include <Msg.h>
00032
00033 namespace channel {
00034
00035 template <class, class, class, class> class Channel;
00036
00042 template <class IdType, class IdTrait, class SynchPolicy>
00043 class TrieRouter : public RouterBase {
00044 public:
00045 enum { NamespaceType = NAMESPACE_HIERARCHICAL };
00046
00047 typedef Channel<IdType, IdTrait, SynchPolicy, TrieRouter<IdType, IdTrait, SynchPolicy> > Channel;
00048 typedef Destination<Channel, NamespaceType> Destination;
00049 typedef Source<Channel, NamespaceType> Source;
00050
00051
00052 typedef Msg<IdType, SynchPolicy> Msg;
00053 typedef PubSub_Info_Msg<IdType> PubSub_Info_Msg;
00054 typedef typename IdTrait::NameType NameType;
00055
00056 private:
00057
00058 struct Pub_Registry {
00059 std::list<Source *> *pubers_;
00060 std::list<Source *> *w_pubers_;
00061 typename SynchPolicy::RW_MUTEX pubers_lock_;
00062 Pub_Registry() {
00063 pubers_ = NULL;
00064 w_pubers_ = NULL;
00065 }
00066 ~Pub_Registry() {
00067 ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, pubers_lock_);
00068 if (pubers_ != NULL) {
00069 delete pubers_;
00070 }
00071 if (w_pubers_ != NULL) {
00072 delete w_pubers_;
00073 }
00074 }
00075 };
00076 struct Sub_Registry {
00077 std::map<Destination *, PubSub_Scope> *subers_;
00078 std::map<Destination *, PubSub_Scope> *w_subers_;
00079 typename SynchPolicy::RW_MUTEX subers_lock_;
00080 Sub_Registry() {
00081 subers_ = NULL;
00082 w_subers_ = NULL;
00083 }
00084 ~Sub_Registry() {
00085 ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, subers_lock_);
00086 if (subers_ != NULL) {
00087 delete subers_;
00088 }
00089 if (w_subers_ != NULL) {
00090 delete w_subers_;
00091 }
00092 }
00093 };
00094
00095 template <class Registry>
00096 struct TrieNode {
00097 NameType name;
00098 TrieNode *parent;
00099 std::map<NameType, TrieNode *> children;
00100 typename SynchPolicy::RW_MUTEX children_lock_;
00101 Registry registry;
00102 TrieNode(NameType n, TrieNode *p) : name(n), parent(p) {
00103 }
00104 ~TrieNode() {
00105 ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, children_lock_);
00106 for(typename std::map<NameType, TrieNode *>::iterator iter = children.begin();
00107 iter != children.end(); iter++) {
00108 if (iter->second != NULL)
00109 delete iter->second;
00110 }
00111 }
00112 };
00113
00114
00115 TrieNode<Sub_Registry> *sub_trie_root_;
00116 TrieNode<Pub_Registry> *pub_trie_root_;
00117
00118 public:
00119
00120 TrieRouter() {
00121 sub_trie_root_ = new TrieNode<Sub_Registry>(IdTrait::RootName, NULL);
00122 pub_trie_root_ = new TrieNode<Pub_Registry>(IdTrait::RootName, NULL);
00123 }
00124
00125 ~TrieRouter() {
00126 delete sub_trie_root_;
00127 delete pub_trie_root_;
00128 }
00129
00130 void propagate_child_pub_registry (IdType t, TrieNode<Pub_Registry> *node, Oper_Type oper) {
00131 Pub_Registry *reg = &(node->registry);
00132
00133 {
00134 ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, reg->pubers_lock_);
00135 if (reg->w_pubers_ != NULL && !reg->w_pubers_->empty()) {
00136 t.push_back(IdTrait::WildcardName);
00137 propagate_change_to_neighbors(oper, t);
00138 t.pop_back();
00139 return;
00140 }
00141 if (reg->pubers_ != NULL && !reg->pubers_->empty()) {
00142 propagate_change_to_neighbors(oper, t);
00143 }
00144 }
00145
00146 ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_);
00147 for(typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.begin();
00148 iter != node->children.end(); iter++) {
00149 t.push_back(iter->first);
00150 propagate_child_pub_registry (t, iter->second, oper);
00151 t.pop_back();
00152 }
00153 };
00154
00155
00156 void propagate_child_sub_registry (IdType t, TrieNode<Sub_Registry> *node, Oper_Type oper) {
00157 bool send2remote = false;
00158 Sub_Registry *reg = &(node->registry);
00159
00160 {
00161 ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_);
00162 if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00163 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00164 iter != reg->w_subers_->end() && !send2remote; iter++)
00165 if (iter->first->type() == MEMBER_LOCAL &&
00166 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00167 send2remote = true;
00168 if (send2remote) {
00169 t.push_back(IdTrait::WildcardName);
00170 propagate_change_to_neighbors(oper, t);
00171 t.pop_back();
00172 return;
00173 }
00174 }
00175 if (reg->subers_ != NULL && !reg->subers_->empty()) {
00176 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00177 iter != reg->subers_->end() && !send2remote; iter++)
00178 if (iter->first->type() == MEMBER_LOCAL &&
00179 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00180 send2remote = true;
00181 if(send2remote)
00182 propagate_change_to_neighbors(oper, t);
00183 }
00184 }
00185
00186 ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_);
00187 for(typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.begin();
00188 iter != node->children.end(); iter++) {
00189 t.push_back(iter->first);
00190 propagate_child_sub_registry (t, iter->second, oper);
00191 t.pop_back();
00192 }
00193 };
00194
00195
00200
00201 Status publish_msg(IdType t, PubSub_Scope c, Source *src)
00202 {
00203 bool send2remote = false;
00204 Pub_Registry *reg = NULL;
00205 TrieNode<Pub_Registry> *node = pub_trie_root_;
00206 bool wildcard = false;
00207 bool hidden = false;
00208 int sz = t.size();
00209
00210 if (!(c == SCOPE_REMOTE || c == SCOPE_GLOBAL))
00211 return SUCCESS;
00212
00216 for (int i=0; i<sz; i++) {
00217 if (t[i] == IdTrait::WildcardName) {
00218 if (i == (sz-1)) {
00219 wildcard = true;
00220 break;
00221 } else {
00222 ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: Wildcard in middle [%s]\n",ID2STR(t).c_str()));
00223 return FAILURE;
00224 }
00225 }
00226
00227 if (!hidden) {
00228 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00229 reg = &(node->registry);
00230 if (reg->w_pubers_ != NULL && !reg->w_pubers_->empty())
00231 hidden = true;
00232 }
00233 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00234 typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.find(t[i]);
00235 if (iter == node->children.end()) {
00236 TrieNode<Pub_Registry> *cnd = new TrieNode<Pub_Registry> (t[i], node);
00237 node->children[t[i]] = cnd;
00238 node = cnd;
00239 }
00240 else
00241 node = iter->second;
00242 }
00243
00244
00245 if (!wildcard) {
00246 {
00247 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00248 reg = &(node->registry);
00249 if (reg->pubers_ == NULL) {
00250 reg->pubers_ = new std::list<Source *>();
00251 send2remote = true;
00252 }
00253 reg->pubers_->push_back(src);
00254 }
00255
00256 if (send2remote && !hidden && src->type() == MEMBER_LOCAL)
00257 propagate_change_to_neighbors(OPER_PUBLISH, t);
00258 }
00263 else {
00264 {
00265 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00266 reg = &(node->registry);
00267 if (reg->w_pubers_ == NULL) {
00268 reg->w_pubers_ = new std::list<Source *>();
00269 send2remote = true;
00270 }
00271 reg->w_pubers_->push_back(src);
00272 }
00273 if (send2remote && !hidden && src->type() == MEMBER_LOCAL) {
00274
00275 IdType id = t;
00276 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00277 for(typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.begin();
00278 iter != node->children.end(); iter++) {
00279 id[sz-1] = iter->first;
00280 propagate_child_pub_registry (id, iter->second, OPER_UNPUBLISH);
00281 }
00282
00283 propagate_change_to_neighbors(OPER_PUBLISH, t);
00284 }
00285 }
00286 return SUCCESS;
00287 }
00288
00290 Status unpublish_msg(IdType t, PubSub_Scope s, Source *src)
00291 {
00292 bool send2remote = false;
00293 Pub_Registry *reg = NULL;
00294 TrieNode<Pub_Registry> *node = pub_trie_root_;
00295 bool wildcard = false;
00296 bool hidden = false;
00297 int sz = t.size();
00298
00299 if (!(s == SCOPE_REMOTE || s == SCOPE_GLOBAL))
00300 return SUCCESS;
00301
00305 for (int i=0; i<sz; i++) {
00306 if (t[i] == IdTrait::WildcardName) {
00307 if (i == (sz-1)) {
00308 wildcard = true;
00309 break;
00310 } else {
00311 ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: Wildcard in middle [%s]\n",ID2STR(t).c_str()));
00312 return FAILURE;
00313 }
00314 }
00315
00316 if (!hidden) {
00317 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00318 reg = &(node->registry);
00319 if (reg->w_pubers_ != NULL && !reg->w_pubers_->empty())
00320 hidden = true;
00321 }
00322 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00323 typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.find(t[i]);
00324 if (iter == node->children.end()) {
00325 ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: not found [%s]\n",ID2STR(t).c_str()));
00326 return FAILURE;
00327 }
00328 node = iter->second;
00329 }
00330 if (!wildcard) {
00331 {
00332 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00333 reg = &(node->registry);
00334 if (reg->pubers_ == NULL) {
00335 ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: not found [%s]\n",ID2STR(t).c_str()));
00336 return FAILURE;
00337 }
00338 typename std::list<Source *>::iterator iter = std::find(reg->pubers_->begin(),
00339 reg->pubers_->end(), src);
00340 if (iter == reg->pubers_->end()) {
00341 ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: not found [%s]\n",ID2STR(t).c_str()));
00342 return FAILURE;
00343 }
00344 reg->pubers_->erase(iter);
00345 if (reg->pubers_->empty()) {
00346 send2remote = true;
00347 }
00348 }
00349
00350 if (send2remote && !hidden && src->type() == MEMBER_LOCAL)
00351 propagate_change_to_neighbors(OPER_UNPUBLISH, t);
00352 }
00356 else {
00357 {
00358 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.pubers_lock_, FAILURE);
00359 reg = &(node->registry);
00360 if (reg->w_pubers_ == NULL) {
00361 ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: not found [%s]\n",ID2STR(t).c_str()));
00362 return FAILURE;
00363 }
00364 typename std::list<Source *>::iterator iter = std::find(reg->w_pubers_->begin(),
00365 reg->w_pubers_->end(), src);
00366 if (iter == reg->w_pubers_->end()) {
00367 ACE_DEBUG((LM_DEBUG, "TrieRouter::unpublish_msg: not found [%s]\n",ID2STR(t).c_str()));
00368 return FAILURE;
00369 }
00370 reg->w_pubers_->erase(iter);
00371 if (reg->w_pubers_->empty()) {
00372 send2remote = true;
00373 }
00374 }
00375
00376 if (send2remote && !hidden && src->type() == MEMBER_LOCAL) {
00377
00378 propagate_change_to_neighbors(OPER_UNPUBLISH, t);
00379
00380 IdType id = t;
00381 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00382 for(typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.begin();
00383 iter != node->children.end(); iter++) {
00384 id[sz-1] = iter->first;
00385 propagate_child_pub_registry (id, iter->second, OPER_PUBLISH);
00386 }
00387 }
00388 }
00389 return SUCCESS;
00390 }
00391
00393 Status subscribe_msg(IdType t, PubSub_Scope c, Destination *dest)
00394 {
00395 bool send2remote = false;
00396 Sub_Registry *reg = NULL;
00397 TrieNode<Sub_Registry> *node = sub_trie_root_;
00398 bool wildcard = false;
00399 bool hidden = false;
00400 int sz = t.size();
00401
00405 for (int i=0; i<sz; i++) {
00406 if (t[i] == IdTrait::WildcardName) {
00407 if (i == (sz-1)) {
00408 wildcard = true;
00409 break;
00410 } else {
00411 ACE_DEBUG((LM_DEBUG, "TrieRouter::subscribe_msg: Wildcard in middle [%s]\n",ID2STR(t).c_str()));
00412 return FAILURE;
00413 }
00414 }
00415 if (!hidden) {
00416 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00417 reg = &(node->registry);
00418 if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00419 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00420 iter != reg->w_subers_->end() && !hidden; iter++)
00421 if (iter->first->type() == MEMBER_LOCAL &&
00422 (iter->second == SCOPE_GLOBAL || iter->second == SCOPE_REMOTE))
00423 hidden = true;
00424 }
00425 }
00426 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00427 typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.find(t[i]);
00428 if (iter == node->children.end()) {
00429 TrieNode<Sub_Registry> *cnd = new TrieNode<Sub_Registry> (t[i], node);
00430 node->children[t[i]] = cnd;
00431 node = cnd;
00432 }
00433 else
00434 node = iter->second;
00435 }
00436
00437
00438 if (!wildcard) {
00439 {
00440 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00441 reg = &(node->registry);
00442 if (reg->subers_ == NULL) {
00443 reg->subers_ = new std::map<Destination *, PubSub_Scope>();
00444 send2remote = true;
00445 }
00446 if (!send2remote && !hidden) {
00447 send2remote = true;
00448 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00449 iter != reg->subers_->end(); iter++)
00450 if (iter->first->type() == MEMBER_LOCAL &&
00451 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00452 send2remote = false;
00453 }
00454 (*reg->subers_)[dest] = c;
00455 }
00456
00457 if (send2remote && !hidden && dest->type() == MEMBER_LOCAL &&
00458 (c == SCOPE_REMOTE || c == SCOPE_GLOBAL)) {
00459 propagate_change_to_neighbors(OPER_SUBSCRIBE, t);
00460 }
00461 }
00466 else {
00467 {
00468 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00469 reg = &(node->registry);
00470 if (reg->w_subers_ == NULL) {
00471 reg->w_subers_ = new std::map<Destination *, PubSub_Scope>();
00472 send2remote = true;
00473 }
00474 if (!send2remote && !hidden) {
00475 send2remote = true;
00476 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00477 iter != reg->w_subers_->end(); iter++)
00478 if (iter->first->type() == MEMBER_LOCAL &&
00479 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00480 send2remote = false;
00481 }
00482 (*reg->w_subers_)[dest] = c;
00483 }
00484
00485 if (send2remote && !hidden && dest->type() == MEMBER_LOCAL &&
00486 (c == SCOPE_REMOTE || c == SCOPE_GLOBAL)) {
00487
00488 IdType id = t;
00489 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00490 for(typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.begin();
00491 iter != node->children.end(); iter++) {
00492 id[sz-1] = iter->first;
00493 propagate_child_sub_registry (id, iter->second, OPER_UNSUBSCRIBE);
00494 }
00495 propagate_change_to_neighbors(OPER_SUBSCRIBE, t);
00496 }
00497 }
00498 return SUCCESS;
00499 }
00500
00502 Status unsubscribe_msg(IdType t, Destination *dest)
00503 {
00504 bool send2remote = false;
00505 Sub_Registry *reg = NULL;
00506 TrieNode<Sub_Registry> *node = sub_trie_root_;
00507 bool wildcard = false;
00508 bool hidden = false;
00509 int sz = t.size();
00510
00514 for (int i=0; i<sz; i++) {
00515 if (t[i] == IdTrait::WildcardName) {
00516 if (i == (sz-1)) {
00517 wildcard = true;
00518 break;
00519 } else {
00520 ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: Wildcard in middle [%s]\n",ID2STR(t).c_str()));
00521 return FAILURE;
00522 }
00523 }
00524 if (!hidden) {
00525 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00526 reg = &(node->registry);
00527 if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00528 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00529 iter != reg->w_subers_->end() && !hidden; iter++)
00530 if (iter->first->type() == MEMBER_LOCAL &&
00531 (iter->second == SCOPE_GLOBAL || iter->second == SCOPE_REMOTE))
00532 hidden = true;
00533 }
00534 }
00535 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00536 typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.find(t[i]);
00537 if (iter == node->children.end()) {
00538 ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: not found [%s]\n",ID2STR(t).c_str()));
00539 return FAILURE;
00540 }
00541 node = iter->second;
00542 }
00543
00544 if (!wildcard) {
00545 {
00546 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00547 reg = &(node->registry);
00548 if (reg->subers_ == NULL) {
00549 ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: not found [%s]\n",ID2STR(t).c_str()));
00550 return FAILURE;
00551 }
00552 typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->find(dest);
00553 if (iter == reg->subers_->end()) {
00554 ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: not found [%s]\n",ID2STR(t).c_str()));
00555 return FAILURE;
00556 }
00557 if (iter->first->type() == MEMBER_LOCAL &&
00558 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00559 send2remote = true;
00560 reg->subers_->erase(dest);
00561 if (send2remote && !hidden && !reg->subers_->empty()) {
00562 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00563 iter != reg->subers_->end(); iter++)
00564 if (iter->first->type() == MEMBER_LOCAL &&
00565 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00566 send2remote = false;
00567 }
00568 }
00569
00570 if (send2remote && !hidden)
00571 propagate_change_to_neighbors(OPER_UNSUBSCRIBE, t);
00572 }
00577 else {
00578 {
00579 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00580 reg = &(node->registry);
00581 if (reg->w_subers_ == NULL) {
00582 ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: not found [%s]\n",ID2STR(t).c_str()));
00583 return FAILURE;
00584 }
00585 typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->find(dest);
00586 if (iter == reg->w_subers_->end()) {
00587 ACE_DEBUG((LM_DEBUG, "TrieRouter::unsubscribe_msg: not found [%s]\n",ID2STR(t).c_str()));
00588 return FAILURE;
00589 }
00590 if (iter->first->type() == MEMBER_LOCAL &&
00591 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00592 send2remote = true;
00593 reg->w_subers_->erase(dest);
00594 if (send2remote && !hidden && !reg->w_subers_->empty()) {
00595 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00596 iter != reg->w_subers_->end(); iter++)
00597 if (iter->first->type() == MEMBER_LOCAL &&
00598 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00599 send2remote = false;
00600 }
00601 }
00602
00603 if (send2remote && !hidden) {
00604 propagate_change_to_neighbors(OPER_UNSUBSCRIBE, t);
00605
00606 IdType id = t;
00607 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00608 for(typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.begin();
00609 iter != node->children.end(); iter++) {
00610 id[sz-1] = iter->first;
00611 propagate_child_sub_registry (id, iter->second, OPER_SUBSCRIBE);
00612 }
00613 }
00614 }
00615 return SUCCESS;
00616 }
00617
00624 Status route_msg(Msg *msg, Member_Type src_type, PubSub_Scope pub_scope, ACE_Time_Value *timeout=0)
00625 {
00626 Sub_Registry *reg = NULL;
00627 TrieNode<Sub_Registry> *node = sub_trie_root_;
00628 Msg *dup=NULL;
00629 int sz = msg->type.size();
00630
00634 for (int i=0; i<sz; i++) {
00635 {
00636
00637 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00638 reg = &(node->registry);
00639 if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00640 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00641 iter != reg->w_subers_->end(); iter++) {
00642
00643 short src_row = src_type * SCOPE_NUMBER + pub_scope;
00644 short dst_col = iter->first->type() * SCOPE_NUMBER + iter->second;
00645 if(scope_checking_tbl_[src_row][dst_col]) {
00647 dup=msg->clone();
00649 ACE_DEBUG ((LM_DEBUG, "msg [%s] delivered once\n", ID2STR(msg->type).c_str()));
00650 iter->first->put_msg(dup, timeout);
00651 }
00652 }
00653 }
00654 }
00655 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00656 typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.find(msg->type[i]);
00657 if (iter == node->children.end()) {
00658
00659 return FAILURE;
00660 }
00661 node = iter->second;
00662 }
00663
00664 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->registry.subers_lock_, FAILURE);
00665
00666
00667 reg = &(node->registry);
00668 if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00669 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00670 iter != reg->w_subers_->end(); iter++) {
00671 short src_row = src_type * SCOPE_NUMBER + pub_scope;
00672 short dst_col = iter->first->type() * SCOPE_NUMBER + iter->second;
00673 if(scope_checking_tbl_[src_row][dst_col]) {
00675 dup=msg->clone();
00677 ACE_DEBUG ((LM_DEBUG, "msg [%s] delivered once\n", ID2STR(msg->type).c_str()));
00678 iter->first->put_msg(dup, timeout);
00679 }
00680 }
00681 }
00682
00683
00684 reg = &(node->registry);
00685 if (reg->subers_ != NULL && !reg->subers_->empty()) {
00686 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00687 iter != reg->subers_->end(); iter++) {
00688 short src_row = src_type * SCOPE_NUMBER + pub_scope;
00689 short dst_col = iter->first->type() * SCOPE_NUMBER + iter->second;
00690 if(scope_checking_tbl_[src_row][dst_col]) {
00692 dup=msg->clone();
00694 ACE_DEBUG ((LM_DEBUG, "msg [%s] delivered once\n", ID2STR(msg->type).c_str()));
00695 iter->first->put_msg(dup, timeout);
00696 }
00697 }
00698 }
00699
00700 delete msg;
00701
00702 return SUCCESS;
00703 }
00704
00709
00710 Status subscribed_global_msgs(std::vector<IdType> &global_msgs)
00711 {
00712 IdType id;
00713 return subscribed_global_msgs_recursive(global_msgs, sub_trie_root_, id);
00714 }
00715 Status subscribed_global_msgs_recursive(std::vector<IdType> &global_msgs, TrieNode<Sub_Registry> *node, IdType &id)
00716 {
00717 Sub_Registry *reg = &(node->registry);
00718
00719 {
00720 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00721
00722
00723 if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00724 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->w_subers_->begin();
00725 iter != reg->w_subers_->end(); iter++)
00726 if (iter->first->type() == MEMBER_LOCAL &&
00727 (iter->second == SCOPE_GLOBAL || iter->second == SCOPE_REMOTE)) {
00728 id.push_back(IdTrait::WildcardName);
00729 global_msgs.push_back(id);
00730 id.pop_back();
00731 return SUCCESS;
00732 }
00733 }
00734
00735
00736 if (reg->subers_ != NULL && !reg->subers_->empty()) {
00737 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00738 iter != reg->subers_->end() ; iter++)
00739 if (iter->first->type() == MEMBER_LOCAL &&
00740 (iter->second == SCOPE_GLOBAL || iter->second == SCOPE_REMOTE)) {
00741 global_msgs.push_back(id);
00742 break;
00743 }
00744 }
00745 }
00746
00747 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00748 for(typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.begin();
00749 iter != node->children.end(); iter++)
00750 if (iter->second != NULL) {
00751 id.push_back(iter->first);
00752 subscribed_global_msgs_recursive(global_msgs, iter->second, id);
00753 id.pop_back();
00754 }
00755
00756 return SUCCESS;
00757 }
00758
00759 Status published_global_msgs(std::vector<IdType> &global_msgs)
00760 {
00761 IdType id;
00762 return published_global_msgs_recursive(global_msgs, pub_trie_root_, id);
00763 }
00764 Status published_global_msgs_recursive(std::vector<IdType> &global_msgs, TrieNode<Pub_Registry> *node, IdType &id)
00765 {
00766 Pub_Registry *reg = &(node->registry);
00767
00768 {
00769 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->pubers_lock_, FAILURE);
00770
00771 if (reg->w_pubers_ != NULL && !reg->w_pubers_->empty()) {
00772 id.push_back(IdTrait::WildcardName);
00773 global_msgs.push_back(id);
00774 id.pop_back();
00775 return SUCCESS;
00776 }
00777
00778 if (reg->pubers_ != NULL && !reg->pubers_->empty()) {
00779 global_msgs.push_back(id);
00780 }
00781 }
00782
00783 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00784 for(typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.begin();
00785 iter != node->children.end(); iter++)
00786 if (iter->second != NULL) {
00787 id.push_back(iter->first);
00788 published_global_msgs_recursive(global_msgs, iter->second, id);
00789 id.pop_back();
00790 }
00791
00792 return SUCCESS;
00793 }
00794
00795 void dump_pub_ids_recursive (TrieNode<Pub_Registry> *node, IdType &id) {
00796 Pub_Registry *reg = &(node->registry);
00797
00798 {
00799 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->pubers_lock_, FAILURE);
00800
00801 if (reg->w_pubers_ != NULL && !reg->w_pubers_->empty()) {
00802 id.push_back(IdTrait::WildcardName);
00803 ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(id).c_str()));
00804 id.pop_back();
00805 }
00806
00807 if (reg->pubers_ != NULL && !reg->pubers_->empty()) {
00808 ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(id).c_str()));
00809 }
00810 }
00811
00812 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00813 for(typename std::map<NameType, TrieNode<Pub_Registry> *>::iterator iter = node->children.begin();
00814 iter != node->children.end(); iter++)
00815 if (iter->second != NULL) {
00816 id.push_back(iter->first);
00817 dump_pub_ids_recursive(iter->second, id);
00818 id.pop_back();
00819 }
00820 }
00821
00822 void dump_sub_ids_recursive (TrieNode<Sub_Registry> *node, IdType &id) {
00823 Sub_Registry *reg = &(node->registry);
00824
00825 {
00826 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00827
00828 if (reg->w_subers_ != NULL && !reg->w_subers_->empty()) {
00829 id.push_back(IdTrait::WildcardName);
00830 ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(id).c_str()));
00831 id.pop_back();
00832 }
00833
00834 if (reg->subers_ != NULL && !reg->subers_->empty()) {
00835 ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(id).c_str()));
00836 }
00837 }
00838
00839 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, node->children_lock_, FAILURE);
00840 for(typename std::map<NameType, TrieNode<Sub_Registry> *>::iterator iter = node->children.begin();
00841 iter != node->children.end(); iter++)
00842 if (iter->second != NULL) {
00843 id.push_back(iter->first);
00844 dump_sub_ids_recursive(iter->second, id);
00845 id.pop_back();
00846 }
00847 }
00848
00849 void dump_routing_tables(void)
00850 {
00851 ACE_DEBUG((LM_DEBUG, "The following IdTypes subscribed: \n"));
00852 {
00853 IdType id;
00854 dump_sub_ids_recursive(sub_trie_root_,id);
00855 }
00856
00857 ACE_DEBUG((LM_DEBUG, "The following IdTypes published: \n"));
00858 {
00859 IdType id;
00860 dump_pub_ids_recursive(pub_trie_root_,id);
00861 }
00862 }
00863
00864 protected:
00868 Status propagate_change_to_neighbors(Oper_Type op, IdType t)
00869 {
00870 ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors [%s]\n",ID2STR(t).c_str()));
00871 PubSub_Info_Msg *sub = new PubSub_Info_Msg();
00872 sub->num_msg_types = 1;
00873 sub->msg_types[0] = t;
00874 IdType mt;
00875 switch(op) {
00876 case OPER_PUBLISH:
00877 mt = Channel::PUBLICATION_INFO_MSG;
00878 ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors publish "));
00879 break;
00880 case OPER_UNPUBLISH:
00881 mt = Channel::UNPUBLICATION_INFO_MSG;
00882 ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors unpublish "));
00883 break;
00884 case OPER_SUBSCRIBE:
00885 mt = Channel::SUBSCRIPTION_INFO_MSG;
00886 ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors subscribe "));
00887 break;
00888 case OPER_UNSUBSCRIBE:
00889 mt = Channel::UNSUBSCRIPTION_INFO_MSG;
00890 ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors unsubscribe "));
00891 break;
00892 default:
00893 ACE_DEBUG((LM_DEBUG, "TrieRouter:::propagate_change_to_neighbors invalid IdType "));
00894 delete sub;
00895 return SUCCESS;
00896 }
00897 Msg *m = new Msg(mt, sub);
00898 route_msg(m, MEMBER_LOCAL, SCOPE_REMOTE);
00899 return SUCCESS;
00900 }
00901 };
00902
00903 };
00904
00905
00906
00907 #endif
00908