00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00012 #ifndef _MEMBER_H_
00013 #define _MEMBER_H_
00014
00015 #include "ace/Log_Msg.h"
00016
00017 #include "ace/Synch.h"
00018 #include "ace/OS.h"
00019
00020
00021 #include <map>
00022
00023
00024 #include <BaseDef.h>
00025
00026 namespace channel {
00027
00029 template <class Channel>
00030 class Member {
00031 public:
00032
00033
00034
00035 Member() : ch_(NULL) {}
00036 virtual ~Member() {};
00037 virtual Member_Type type() { return MEMBER_LOCAL; }
00038 protected:
00039 Channel *ch_;
00040 };
00041
00045 template <class, int> class Source;
00046 template <class, int> class Destination;
00047
00053
00054 template <class Channel>
00055 class Source<Channel, NAMESPACE_LINEAR> : virtual public Member<Channel> {
00056 public:
00057 typedef typename Channel::IdType IdType;
00058 typedef typename Channel::Msg Msg;
00059 typedef typename Channel::SynchPolicy SynchPolicy;
00060
00061 Source (Channel *c) { Member<Channel>::ch_ = c; }
00062 virtual ~Source() {};
00064 Status publish_msg(IdType t, PubSub_Scope s=SCOPE_GLOBAL)
00065 {
00066 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00067 if(msg_scope_.find(t) != msg_scope_.end()) {
00068 ACE_DEBUG((LM_DEBUG, "Msg Id [%s] already published, must unpublish it before republish\n", ID2STR(t).c_str()));
00069 return FAILURE;
00070 }
00071 msg_scope_[t] = s;
00072 return this->ch_->publish_msg(t,s,this);
00073 }
00075 Status unpublish_msg(IdType t)
00076 {
00077 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00078 if(msg_scope_.find(t) != msg_scope_.end()) {
00079 ACE_DEBUG((LM_DEBUG, "Msg Id [%s] not published, cannot be unpublished\n", ID2STR(t).c_str()));
00080 return FAILURE;
00081 }
00082 PubSub_Scope s = msg_scope_[t];
00083 msg_scope_.erase(t);
00084 return this->ch_->unpublish_msg(t,s,this);
00085 }
00087 Status unpublish_all(void)
00088 {
00089 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00090 for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00091 iter != msg_scope_.end(); iter++) {
00092 this->ch_->unpublish_msg(iter->first, iter->second, this);
00093 }
00094 msg_scope_.clear();
00095 return SUCCESS;
00096 }
00105 template <class MsgType>
00106 Status send_msg(IdType t, MsgType *msg, int sz = sizeof(MsgType),
00107 MsgFreeCallback g = DefaultMsgFreeCallback<MsgType>,
00108 ACE_Time_Value *timeout=0) {
00109 Msg *m = new Msg(t, msg, sz, g);
00110 return send_msg (m, timeout);
00111 }
00113 virtual Status send_msg(Msg *msg, ACE_Time_Value *timeout=0)
00114 {
00115 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00116 if (msg_scope_.find(msg->type) != msg_scope_.end()) {
00117 ACE_DEBUG((LM_DEBUG, "src sends msg [%s]\n", ID2STR(msg->type).c_str()));
00118 return this->ch_->route_msg(msg, this->type(), msg_scope_[msg->type], timeout);
00119 } else {
00120 ACE_DEBUG((LM_DEBUG, "src failed to sends unpublished msg [%s], dropped\n", ID2STR(msg->type).c_str()));
00121 delete msg;
00122 }
00123 return FAILURE;
00124 }
00126 PubSub_Scope scope(IdType t)
00127 {
00128 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, SCOPE_UNDEFINED);
00129 if(msg_scope_.find(t) != msg_scope_.end())
00130 return msg_scope_[t];
00131 else
00132 return SCOPE_UNDEFINED;
00133 }
00134 protected:
00135
00136 std::map<IdType, PubSub_Scope> msg_scope_;
00137 typename SynchPolicy::RW_MUTEX scope_lock_;
00138 };
00139
00149 template <class Channel>
00150 class Destination<Channel, NAMESPACE_LINEAR> : virtual public Member<Channel> {
00151 public:
00152 typedef typename Channel::IdType IdType;
00153 typedef typename Channel::Msg Msg;
00154 typedef typename Channel::SynchPolicy SynchPolicy;
00155
00156 Destination (Channel *c) { Member<Channel>::ch_ = c; }
00157 virtual ~Destination() {}
00159 Status subscribe_msg(IdType t, PubSub_Scope s=SCOPE_GLOBAL)
00160 {
00161 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00162 if(msg_scope_.find(t) != msg_scope_.end()) {
00163 ACE_DEBUG((LM_DEBUG, "Msg Id [%s] already subscribed, must unsubscribe it before resubscribe\n", ID2STR(t).c_str()));
00164 return FAILURE;
00165 }
00166 msg_scope_[t] = s;
00167 return this->ch_->subscribe_msg(t, s, this);
00168 }
00170 Status unsubscribe_msg(IdType t)
00171 {
00172 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00173 if(msg_scope_.find(t) != msg_scope_.end()) {
00174 ACE_DEBUG((LM_DEBUG, "Msg Id [%s] not subscribed, cannot be unsubscribed\n", ID2STR(t).c_str()));
00175 return FAILURE;
00176 }
00177 msg_scope_.erase(t);
00178 return this->ch_->unsubscribe_msg(t, this);
00179 }
00181 Status unsubscribe_all(void)
00182 {
00183 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00184 for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00185 iter != msg_scope_.end(); iter++) {
00186 this->ch_->unsubscribe_msg(iter->first, this);
00187 }
00188 msg_scope_.clear();
00189 return SUCCESS;
00190 }
00192 virtual Status put_msg(Msg *msg, ACE_Time_Value *timeout=0) = 0;
00193
00194 bool operator==(Destination *s) {
00195 return equals(s);
00196 }
00197 bool equals(Destination *s) {
00198 if (this->type() != s->type()) return false;
00199 if (((Destination *)this) != s) return false;
00200 return true;
00201 }
00203 PubSub_Scope scope(IdType t)
00204 {
00205 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, SCOPE_UNDEFINED);
00206 if(msg_scope_.find(t) != msg_scope_.end())
00207 return msg_scope_[t];
00208 else
00209 return SCOPE_UNDEFINED;
00210 }
00211 protected:
00212
00213 std::map<IdType, PubSub_Scope> msg_scope_;
00214 typename SynchPolicy::RW_MUTEX scope_lock_;
00215 };
00216
00222
00223 template <class Channel>
00224 class Source<Channel, NAMESPACE_HIERARCHICAL> : virtual public Member<Channel> {
00225 public:
00226 typedef typename Channel::IdType IdType;
00227 typedef typename Channel::IdTrait IdTrait;
00228 typedef typename Channel::Msg Msg;
00229 typedef typename Channel::SynchPolicy SynchPolicy;
00230
00231 Source (Channel *c) { Member<Channel>::ch_ = c; }
00232 virtual ~Source() {};
00234 Status publish_msg(IdType t, PubSub_Scope s=SCOPE_GLOBAL)
00235 {
00236 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00237 for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00238 iter != msg_scope_.end(); iter++)
00239 if (IdTrait::match(iter->first, t)) {
00240 ACE_DEBUG((LM_DEBUG, "Msg Id [%s] conflict, cannot be published\n", ID2STR(t).c_str()));
00241 return FAILURE;
00242 }
00243 msg_scope_[t] = s;
00244 return this->ch_->publish_msg(t,s,this);
00245 }
00247 Status unpublish_msg(IdType t)
00248 {
00249 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00250 if(msg_scope_.find(t) != msg_scope_.end()) {
00251 ACE_DEBUG((LM_DEBUG, "Msg Id [%s] not published, cannot be unpublished\n", ID2STR(t).c_str()));
00252 return FAILURE;
00253 }
00254 PubSub_Scope s = msg_scope_[t];
00255 msg_scope_.erase(t);
00256 return this->ch_->unpublish_msg(t,s,this);
00257 }
00259 Status unpublish_all(void)
00260 {
00261 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00262 for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00263 iter != msg_scope_.end(); iter++) {
00264 this->ch_->unpublish_msg(iter->first, iter->second, this);
00265 }
00266 msg_scope_.clear();
00267 return SUCCESS;
00268 }
00277 template <class MsgType>
00278 Status send_msg(IdType t, MsgType *msg, int sz = sizeof(MsgType),
00279 MsgFreeCallback g = DefaultMsgFreeCallback<MsgType>,
00280 ACE_Time_Value *timeout=0) {
00281 Msg *m = new Msg(t, msg, sz, g);
00282 return send_msg (m, timeout);
00283 }
00285 virtual Status send_msg(Msg *msg, ACE_Time_Value *timeout=0)
00286 {
00287
00288 if (IdTrait::endWithWildcard(msg->type)) {
00289 ACE_DEBUG((LM_DEBUG, "TrieRouter::route_msg: Wildcard in Id [%s]\n",ID2STR(msg->type).c_str()));
00290 delete msg;
00291 return FAILURE;
00292 }
00293
00294 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00295 for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00296 iter != msg_scope_.end(); iter++)
00297 if (IdTrait::id1contains2(iter->first, msg->type)) {
00298 return this->ch_->route_msg(msg, this->type(), iter->second, timeout);
00299 }
00300 ACE_DEBUG((LM_DEBUG, "src failed to sends unpublished msg [%s], dropped\n", ID2STR(msg->type).c_str()));
00301 delete msg;
00302 return FAILURE;
00303 }
00305 PubSub_Scope scope(IdType t)
00306 {
00307 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, SCOPE_UNDEFINED);
00308 for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00309 iter != msg_scope_.end(); iter++)
00310 if (IdTrait::id1contains2(iter->first, t)) {
00311 return iter->second;
00312 }
00313 else
00314 return SCOPE_UNDEFINED;
00315 }
00316 protected:
00317
00318 std::map<IdType, PubSub_Scope> msg_scope_;
00319 typename SynchPolicy::RW_MUTEX scope_lock_;
00320 };
00321
00331 template <class Channel>
00332 class Destination<Channel, NAMESPACE_HIERARCHICAL> : virtual public Member<Channel> {
00333 public:
00334 typedef typename Channel::IdType IdType;
00335 typedef typename Channel::IdTrait IdTrait;
00336 typedef typename Channel::Msg Msg;
00337 typedef typename Channel::SynchPolicy SynchPolicy;
00338
00339 Destination (Channel *c) { Member<Channel>::ch_ = c; }
00340 virtual ~Destination() {}
00342 Status subscribe_msg(IdType t, PubSub_Scope s=SCOPE_GLOBAL)
00343 {
00344 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00345 for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00346 iter != msg_scope_.end(); iter++)
00347 if (IdTrait::match(iter->first, t)) {
00348 ACE_DEBUG((LM_DEBUG, "Msg Id [%s] conflict, cannot be subscribed\n", ID2STR(t).c_str()));
00349 return FAILURE;
00350 }
00351 msg_scope_[t] = s;
00352 return this->ch_->subscribe_msg(t, s, this);
00353 }
00355 Status unsubscribe_msg(IdType t)
00356 {
00357 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00358 if(msg_scope_.find(t) != msg_scope_.end()) {
00359 ACE_DEBUG((LM_DEBUG, "Msg Id [%s] not subscribed, cannot be unsubscribed\n", ID2STR(t).c_str()));
00360 return FAILURE;
00361 }
00362 msg_scope_.erase(t);
00363 return this->ch_->unsubscribe_msg(t, this);
00364 }
00366 Status unsubscribe_all(void)
00367 {
00368 ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00369 for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00370 iter != msg_scope_.end(); iter++) {
00371 this->ch_->unsubscribe_msg(iter->first, this);
00372 }
00373 msg_scope_.clear();
00374 return SUCCESS;
00375 }
00377 virtual Status put_msg(Msg *msg, ACE_Time_Value *timeout=0) = 0;
00378
00379 bool operator==(Destination *s) {
00380 return equals(s);
00381 }
00382 bool equals(Destination *s) {
00383 if (this->type() != s->type()) return false;
00384 if (((Destination *)this) != s) return false;
00385 return true;
00386 }
00388 PubSub_Scope scope(IdType t)
00389 {
00390 ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, SCOPE_UNDEFINED);
00391 for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00392 iter != msg_scope_.end(); iter++)
00393 if (IdTrait::id1contains2(iter->first, t)) {
00394 return iter->second;
00395 }
00396 else
00397 return SCOPE_UNDEFINED;
00398 }
00399 protected:
00400
00401 std::map<IdType, PubSub_Scope> msg_scope_;
00402 typename SynchPolicy::RW_MUTEX scope_lock_;
00403 };
00404
00412 template <class Channel>
00413 class Callback : public Channel::Destination {
00414 public:
00415 typedef typename Channel::Msg Msg;
00416
00417 Callback (Channel *c) : Channel::Destination(c) {}
00418 Status put_msg(Msg *msg, ACE_Time_Value *timeout) {
00419 ACE_UNUSED_ARG(timeout);
00421 return process (msg);
00422 }
00424 virtual Status process(Msg *msg) = 0;
00425 };
00426
00427
00428 };
00429
00430 #endif
00431