00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00012 #ifndef _MAP_ROUTER_H_
00013 #define _MAP_ROUTER_H_
00014
00015
00016 #include "ace/Log_Msg.h"
00017
00018 #include "ace/Synch.h"
00019 #include "ace/OS.h"
00020
00021
00022 #include <map>
00023 #include <list>
00024 #include <vector>
00025 #include <string>
00026 #include <algorithm>
00027
00028
00029 #include <BaseDef.h>
00030 #include <RouterBase.h>
00031 #include <Msg.h>
00032
00033 namespace channel {
00034
00035 template <class, class, class, class> class Channel;
00036
00042 template <class IdType, class IdTrait, class SynchPolicy>
00043 class MapRouter : public RouterBase {
00044 public:
00045 enum { NamespaceType = NAMESPACE_LINEAR };
00046
00047 typedef Channel<IdType, IdTrait, SynchPolicy, MapRouter<IdType, IdTrait, SynchPolicy> > Channel;
00048 typedef Destination<Channel, NamespaceType> Destination;
00049 typedef Source<Channel, NamespaceType> Source;
00050
00051
00052 typedef Msg<IdType, SynchPolicy> Msg;
00053 typedef PubSub_Info_Msg<IdType> PubSub_Info_Msg;
00054
00055 private:
00056
00057 struct PubSub_Registry {
00058 std::map<Destination *, PubSub_Scope> *subers_;
00059 typename SynchPolicy::RW_MUTEX subers_lock_;
00060 PubSub_Registry() {
00061 subers_ = NULL;
00062 }
00063 ~PubSub_Registry() {
00064 if (subers_ != NULL) {
00065 ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, subers_lock_);
00066 delete subers_;
00067 }
00068 }
00069 };
00070
00071
00072 std::map<IdType, PubSub_Registry*> msg_sub_tbl_;
00073 typename SynchPolicy::RW_MUTEX msg_sub_tbl_lock_;
00074
00075
00076 std::map<IdType, std::list<Source *>*> msg_pub_tbl_;
00077 typename SynchPolicy::RW_MUTEX msg_pub_tbl_lock_;
00078
00079 public:
00080
00081 MapRouter() {
00082 }
00083
00084 ~MapRouter() {
00085 {
00086 ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_);
00087 for(typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.begin();
00088 iter != msg_sub_tbl_.end(); iter++) {
00089 if (iter->second != NULL)
00090 delete iter->second;
00091 }
00092 }
00093 {
00094 ACE_WRITE_GUARD(typename SynchPolicy::RW_MUTEX, guard, msg_pub_tbl_lock_);
00095 for(typename std::map<IdType, std::list<Source *>*>::iterator iter2 = msg_pub_tbl_.begin();
00096 iter2 != msg_pub_tbl_.end(); iter2++) {
00097 if (iter2->second != NULL)
00098 delete iter2->second;
00099 }
00100 }
00101 }
00102
00107
00108 Status publish_msg(IdType t, PubSub_Scope s, Source *src)
00109 {
00110 bool send2remote = false;
00111 if (s == SCOPE_REMOTE || s == SCOPE_GLOBAL) {
00112 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_pub_tbl_lock_, FAILURE);
00113 std::list<Source *> *srcs = NULL;
00114 typename std::map<IdType, std::list<Source *>*>::iterator iter = msg_pub_tbl_.find(t);
00115 if(iter != msg_pub_tbl_.end())
00116 srcs = iter->second;
00117 if(srcs == NULL) {
00118 srcs = new std::list<Source *>();
00119 msg_pub_tbl_[t] = srcs;
00120 send2remote = true;
00121 }
00122 if(std::find(srcs->begin(),srcs->end(),src)==srcs->end()) {
00123 srcs->push_back(src);
00124 }
00125 }
00126 if (send2remote)
00127 propagate_change_to_neighbors(OPER_PUBLISH, t);
00128 return SUCCESS;
00129 }
00130
00132 Status unpublish_msg(IdType t, PubSub_Scope s, Source *src)
00133 {
00134 bool send2remote = false;
00135 if (s == SCOPE_REMOTE || s == SCOPE_GLOBAL) {
00136 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_pub_tbl_lock_, FAILURE);
00137 std::list<Source *> *srcs = NULL;
00138 typename std::map<IdType, std::list<Source *>*>::iterator iter = msg_pub_tbl_.find(t);
00139 if(iter != msg_pub_tbl_.end())
00140 srcs = iter->second;
00141 if(srcs == NULL) return FAILURE;
00142 typename std::list<Source *>::iterator iter2 = std::find(srcs->begin(),srcs->end(),src);
00143 if (iter2 == srcs->end()) return FAILURE;
00144 srcs->erase(iter2);
00145 if(srcs->empty()) {
00146 msg_pub_tbl_.erase(t);
00147 send2remote = true;
00148 }
00149 }
00150 if (send2remote)
00151 propagate_change_to_neighbors(OPER_UNPUBLISH, t);
00152 return SUCCESS;
00153 }
00154
00156 Status subscribe_msg(IdType t, PubSub_Scope c, Destination *dest)
00157 {
00158 ACE_DEBUG((LM_DEBUG, "MapRouter::subscribe_msg: [%s]\n",ID2STR(t).c_str()));
00159 bool send2remote = false;
00160 PubSub_Registry *reg = NULL;
00161 {
00162 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_, FAILURE);
00163 typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.find(t);
00164 if (iter != msg_sub_tbl_.end())
00165 reg = iter->second;
00166 }
00167 if (reg == NULL) {
00168 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_, FAILURE);
00169 reg = new PubSub_Registry();
00170 msg_sub_tbl_[t] = reg;
00171 send2remote = true;
00172 }
00173 {
00174 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00175 if (reg->subers_ == NULL) {
00176 reg->subers_ = new std::map<Destination *, PubSub_Scope>();
00177 send2remote = true;
00178 }
00179 if (!send2remote) {
00180 send2remote = true;
00181 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00182 iter != reg->subers_->end(); iter++)
00183 if (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL)
00184 send2remote = false;
00185 }
00186 (*reg->subers_)[dest] = c;
00187 }
00188 if (send2remote && dest->type()==MEMBER_LOCAL && (c == SCOPE_REMOTE || c == SCOPE_GLOBAL))
00189 propagate_change_to_neighbors(OPER_SUBSCRIBE, t);
00190 return SUCCESS;
00191 }
00192
00194 Status unsubscribe_msg(IdType t, Destination *dest)
00195 {
00196 ACE_DEBUG((LM_DEBUG, "MapRouter::unsubscribe_msg[%s]...\n", ID2STR(t).c_str()));
00197 bool send2remote = false;
00198 PubSub_Registry *reg = NULL;
00199 {
00200 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_, FAILURE);
00201 typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.find(t);
00202 if (iter != msg_sub_tbl_.end())
00203 reg = iter->second;
00204 }
00205 if (reg == NULL)
00206 return SUCCESS;
00207 {
00208 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00209 if (reg->subers_ == NULL || reg->subers_->empty())
00210 return SUCCESS;
00211 typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->find(dest);
00212 if (iter == reg->subers_->end())
00213 return SUCCESS;
00214 if (dest->type() == MEMBER_LOCAL &&
00215 (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL))
00216 send2remote = true;
00217 reg->subers_->erase(dest);
00218 if (send2remote && !reg->subers_->empty()) {
00219 for(iter = reg->subers_->begin(); iter != reg->subers_->end(); iter++) {
00220 if (iter->second == SCOPE_REMOTE || iter->second == SCOPE_GLOBAL)
00221 send2remote = false;
00222 }
00223 }
00224 }
00225 if (send2remote)
00226 propagate_change_to_neighbors(OPER_UNSUBSCRIBE, t);
00227 return SUCCESS;
00228 }
00229
00236 Status route_msg(Msg *msg, Member_Type src_type, PubSub_Scope pub_scope, ACE_Time_Value *timeout=0)
00237 {
00238 ACE_DEBUG ((LM_DEBUG, "MapRouter::route_msg: src_type[%d],scope[%d],msg[%s]\n", src_type, pub_scope, ID2STR(msg->type).c_str()));
00239 PubSub_Registry *reg = NULL;
00240 {
00241 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_, FAILURE);
00242 typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.find(msg->type);
00243 if (iter != msg_sub_tbl_.end())
00244 reg = iter->second;
00245 }
00246 if (reg == NULL) {
00247 ACE_DEBUG ((LM_DEBUG, "msg [%s] dropped, no puber/suber\n", ID2STR(msg->type).c_str()));
00248 delete msg;
00249 return FAILURE;
00250 }
00251
00252 {
00253 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00254 if (reg->subers_ == NULL || reg->subers_->empty()) {
00255 ACE_DEBUG ((LM_DEBUG, "msg [%s] dropped, no subscriber\n", ID2STR(msg->type).c_str()));
00256 delete msg;
00257 return FAILURE;
00258 }
00259 if(reg->subers_->size() == 1) {
00260 typename std::map<Destination *, PubSub_Scope>::iterator iter=reg->subers_->begin();
00261 short src_row = src_type * SCOPE_NUMBER + pub_scope;
00262 short dst_col = iter->first->type() * SCOPE_NUMBER + iter->second;
00263 if(scope_checking_tbl_[src_row][dst_col]) {
00264 ACE_DEBUG ((LM_DEBUG, "msg [%s] delivered\n", ID2STR(msg->type).c_str()));
00265 iter->first->put_msg(msg, timeout);
00266 } else {
00267 ACE_DEBUG ((LM_DEBUG, "msg [%s] dropped, scope dont match\n", ID2STR(msg->type).c_str()));
00268 delete msg;
00269 }
00270 } else {
00271 Msg *dup=NULL;
00272 for(typename std::map<Destination *, PubSub_Scope>::iterator iter = reg->subers_->begin();
00273 iter != reg->subers_->end(); iter++) {
00274 short src_row = src_type * SCOPE_NUMBER + pub_scope;
00275 short dst_col = iter->first->type() * SCOPE_NUMBER + iter->second;
00276 if(scope_checking_tbl_[src_row][dst_col]) {
00278 dup=msg->clone();
00280 ACE_DEBUG ((LM_DEBUG, "msg [%s] delivered once\n", ID2STR(msg->type).c_str()));
00281 iter->first->put_msg(dup, timeout);
00282 }
00283 }
00284 delete msg;
00285 }
00286 }
00287
00288 return SUCCESS;
00289 }
00290
00294
00295 Status subscribed_global_msgs(std::vector<IdType> &global_msgs)
00296 {
00297 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_, FAILURE);
00298 for(typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.begin();
00299 iter != msg_sub_tbl_.end(); iter++) {
00300 PubSub_Registry *reg = iter->second;
00301 if (reg == NULL)
00302 continue;
00303
00304 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_, FAILURE);
00305 if (reg->subers_ == NULL || reg->subers_->empty())
00306 continue;
00307
00308 bool found = false;
00309 for(typename std::map<Destination *, PubSub_Scope>::iterator iter2 = reg->subers_->begin();
00310 iter2 != reg->subers_->end() && !found; iter2++) {
00311 if (iter2->first->type() == MEMBER_LOCAL &&
00312 (iter2->second == SCOPE_REMOTE || iter2->second == SCOPE_GLOBAL)) {
00313 found = true;
00314 }
00315 }
00316 if (found)
00317 global_msgs.push_back(iter->first);
00318 }
00319
00320 return SUCCESS;
00321 }
00322
00323 Status published_global_msgs(std::vector<IdType> &global_msgs)
00324 {
00325 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, msg_pub_tbl_lock_, FAILURE);
00326 for(typename std::map<IdType, std::list<Source *>* >::iterator iter = msg_pub_tbl_.begin();
00327 iter != msg_pub_tbl_.end(); iter++) {
00328 if (iter->second != NULL && !iter->second->empty())
00329 global_msgs.push_back(iter->first);
00330 }
00331
00332 return SUCCESS;
00333 }
00334
00335 void dump_routing_tables(void)
00336 {
00337 ACE_DEBUG((LM_DEBUG, "The following %d IdTypes subscribed: \n", msg_sub_tbl_.size()));
00338 {
00339 ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, msg_sub_tbl_lock_);
00340 for(typename std::map<IdType, PubSub_Registry*>::iterator iter = msg_sub_tbl_.begin();
00341 iter != msg_sub_tbl_.end(); iter++) {
00342 PubSub_Registry *reg = iter->second;
00343 if (reg == NULL)
00344 continue;
00345
00346 ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, reg->subers_lock_);
00347 if (reg->subers_ == NULL || reg->subers_->empty())
00348 continue;
00349
00350 ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(iter->first).c_str()));
00351 }
00352 }
00353
00354 ACE_DEBUG((LM_DEBUG, "The following %d IdTypes published: \n", msg_pub_tbl_.size()));
00355 {
00356 ACE_READ_GUARD(typename SynchPolicy::RW_MUTEX, guard, msg_pub_tbl_lock_);
00357 typename std::map<IdType, std::list<Source *>* >::iterator iter;
00358 for(iter = msg_pub_tbl_.begin(); iter!=msg_pub_tbl_.end(); iter++) {
00359 if(iter->second!=NULL && !iter->second->empty())
00360 ACE_DEBUG((LM_DEBUG, "%s\n", ID2STR(iter->first).c_str()));
00361 }
00362 }
00363 }
00364
00365 protected:
00369 Status propagate_change_to_neighbors(Oper_Type op, IdType t)
00370 {
00371 ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors [%s]\n",ID2STR(t).c_str()));
00372 PubSub_Info_Msg *sub = new PubSub_Info_Msg();
00373 sub->num_msg_types = 1;
00374 sub->msg_types[0] = t;
00375 IdType mt;
00376 switch(op) {
00377 case OPER_PUBLISH:
00378 mt = Channel::PUBLICATION_INFO_MSG;
00379 ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors publish "));
00380 break;
00381 case OPER_UNPUBLISH:
00382 mt = Channel::UNPUBLICATION_INFO_MSG;
00383 ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors unpublish "));
00384 break;
00385 case OPER_SUBSCRIBE:
00386 mt = Channel::SUBSCRIPTION_INFO_MSG;
00387 ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors subscribe "));
00388 break;
00389 case OPER_UNSUBSCRIBE:
00390 mt = Channel::UNSUBSCRIPTION_INFO_MSG;
00391 ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors unsubscribe "));
00392 break;
00393 default:
00394 ACE_DEBUG((LM_DEBUG, "MapRouter:::propagate_change_to_neighbors invalid IdType "));
00395 delete sub;
00396 return SUCCESS;
00397 }
00398 Msg *m = new Msg(mt, sub);
00399 route_msg(m, MEMBER_LOCAL, SCOPE_REMOTE);
00400 return SUCCESS;
00401 }
00402 };
00403
00404 };
00405
00406
00407 #endif