L:/channel/channel/include/Member.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00012 #ifndef _MEMBER_H_
00013 #define _MEMBER_H_
00014 
00015 #include "ace/Log_Msg.h"
00016 //for ACE_GUARD macros, need the following 2 headers
00017 #include "ace/Synch.h"
00018 #include "ace/OS.h"
00019 
00020 //std headers
00021 #include <map>
00022 
00023 //export 
00024 #include <BaseDef.h>
00025 
00026 namespace channel {
00027 
00029   template <class Channel>
00030     class  Member {
00031     public:
00032     //cannot use the following constructor because it requires the
00033     //most derived class to construct virtual base class explicitly
00034     //Member(Channel *c) : ch_(c) {}
00035     Member() : ch_(NULL) {}
00036     virtual ~Member() {};
00037     virtual Member_Type type() { return MEMBER_LOCAL; } //default local members
00038     protected:
00039     Channel *ch_;
00040   };    
00041 
00045   template <class, int> class Source;
00046   template <class, int> class Destination;
00047 
00053 
00054   template <class Channel>
00055     class Source<Channel, NAMESPACE_LINEAR> : virtual public Member<Channel> {
00056     public:
00057     typedef typename Channel::IdType IdType;
00058     typedef typename Channel::Msg Msg;
00059     typedef typename Channel::SynchPolicy SynchPolicy;
00060 
00061     Source (Channel *c) { Member<Channel>::ch_ = c; }
00062     virtual ~Source() {};
00064     Status publish_msg(IdType t, PubSub_Scope s=SCOPE_GLOBAL)
00065       {
00066         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00067         if(msg_scope_.find(t) != msg_scope_.end()) {
00068           ACE_DEBUG((LM_DEBUG, "Msg Id [%s] already published, must unpublish it before republish\n", ID2STR(t).c_str()));
00069           return FAILURE;
00070         }
00071         msg_scope_[t] = s;
00072         return this->ch_->publish_msg(t,s,this);
00073       }
00075     Status unpublish_msg(IdType t)
00076       {
00077         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00078         if(msg_scope_.find(t) != msg_scope_.end()) {
00079           ACE_DEBUG((LM_DEBUG, "Msg Id [%s] not published, cannot be unpublished\n", ID2STR(t).c_str()));
00080           return FAILURE;
00081         }
00082         PubSub_Scope s = msg_scope_[t];
00083         msg_scope_.erase(t);
00084         return this->ch_->unpublish_msg(t,s,this);
00085       }
00087     Status unpublish_all(void)
00088       {
00089         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00090         for(typename std::map<IdType, PubSub_Scope>::iterator iter =  msg_scope_.begin();
00091             iter !=  msg_scope_.end(); iter++) {
00092           this->ch_->unpublish_msg(iter->first, iter->second, this);
00093         }
00094         msg_scope_.clear();
00095         return SUCCESS;
00096       }
00105     template <class MsgType>
00106     Status send_msg(IdType t, MsgType *msg, int sz = sizeof(MsgType), 
00107                             MsgFreeCallback g = DefaultMsgFreeCallback<MsgType>,
00108                             ACE_Time_Value *timeout=0) {
00109       Msg *m = new Msg(t, msg, sz, g);
00110       return send_msg (m, timeout);
00111     }
00113     virtual Status send_msg(Msg *msg, ACE_Time_Value *timeout=0)
00114       {
00115         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00116         if (msg_scope_.find(msg->type) != msg_scope_.end()) {
00117           ACE_DEBUG((LM_DEBUG, "src sends msg [%s]\n", ID2STR(msg->type).c_str()));
00118           return this->ch_->route_msg(msg, this->type(), msg_scope_[msg->type], timeout);
00119         } else {
00120           ACE_DEBUG((LM_DEBUG, "src failed to sends unpublished msg [%s], dropped\n", ID2STR(msg->type).c_str()));
00121           delete msg;
00122         }
00123         return FAILURE;
00124       }
00126     PubSub_Scope scope(IdType t)
00127       {
00128         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, SCOPE_UNDEFINED);
00129         if(msg_scope_.find(t) != msg_scope_.end())
00130           return msg_scope_[t];
00131         else
00132           return SCOPE_UNDEFINED;
00133       }
00134     protected:
00135     //published msgs scopes
00136     std::map<IdType, PubSub_Scope> msg_scope_;
00137     typename SynchPolicy::RW_MUTEX scope_lock_;
00138   };
00139 
00149   template <class Channel>
00150     class Destination<Channel, NAMESPACE_LINEAR> : virtual public Member<Channel> {
00151     public:
00152     typedef typename Channel::IdType IdType;
00153     typedef typename Channel::Msg Msg;
00154     typedef typename Channel::SynchPolicy SynchPolicy;
00155 
00156     Destination (Channel *c) { Member<Channel>::ch_ = c; }
00157     virtual ~Destination() {}
00159     Status subscribe_msg(IdType t, PubSub_Scope s=SCOPE_GLOBAL)
00160       {
00161         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00162         if(msg_scope_.find(t) != msg_scope_.end()) {
00163           ACE_DEBUG((LM_DEBUG, "Msg Id [%s] already subscribed, must unsubscribe it before resubscribe\n", ID2STR(t).c_str()));
00164           return FAILURE;
00165         }
00166         msg_scope_[t] = s;
00167         return this->ch_->subscribe_msg(t, s, this);
00168       }
00170     Status unsubscribe_msg(IdType t)
00171       {
00172         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00173         if(msg_scope_.find(t) != msg_scope_.end()) {
00174           ACE_DEBUG((LM_DEBUG, "Msg Id [%s] not subscribed, cannot be unsubscribed\n", ID2STR(t).c_str()));
00175           return FAILURE;
00176         }
00177         msg_scope_.erase(t);
00178         return this->ch_->unsubscribe_msg(t, this);
00179       }
00181     Status unsubscribe_all(void)
00182       {
00183         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00184         for(typename std::map<IdType, PubSub_Scope>::iterator iter =  msg_scope_.begin();
00185             iter !=  msg_scope_.end(); iter++) {
00186           this->ch_->unsubscribe_msg(iter->first, this);
00187         }
00188         msg_scope_.clear();
00189         return SUCCESS;
00190       }
00192     virtual Status put_msg(Msg *msg, ACE_Time_Value *timeout=0) = 0;
00193     //utils
00194     bool operator==(Destination *s) { 
00195       return equals(s);
00196     }
00197     bool equals(Destination *s) { 
00198       if (this->type() != s->type()) return false;
00199       if (((Destination *)this) != s) return false;
00200       return true;
00201     }
00203     PubSub_Scope scope(IdType t)
00204       {
00205         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, SCOPE_UNDEFINED);
00206         if(msg_scope_.find(t) != msg_scope_.end())
00207           return msg_scope_[t];
00208         else
00209           return SCOPE_UNDEFINED;
00210       }
00211     protected:
00212     //subscribed msgs scopes
00213     std::map<IdType, PubSub_Scope> msg_scope_;
00214     typename SynchPolicy::RW_MUTEX scope_lock_;
00215   }; 
00216 
00222 
00223   template <class Channel>
00224     class Source<Channel, NAMESPACE_HIERARCHICAL> : virtual public Member<Channel> {
00225     public:
00226     typedef typename Channel::IdType IdType;
00227     typedef typename Channel::IdTrait IdTrait;
00228     typedef typename Channel::Msg Msg;
00229     typedef typename Channel::SynchPolicy SynchPolicy;
00230 
00231     Source (Channel *c) { Member<Channel>::ch_ = c; }
00232     virtual ~Source() {};
00234     Status publish_msg(IdType t, PubSub_Scope s=SCOPE_GLOBAL)
00235       {
00236         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00237         for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00238             iter != msg_scope_.end(); iter++)
00239           if (IdTrait::match(iter->first, t)) {
00240             ACE_DEBUG((LM_DEBUG, "Msg Id [%s] conflict, cannot be published\n", ID2STR(t).c_str()));
00241             return FAILURE;
00242           }
00243         msg_scope_[t] = s;
00244         return this->ch_->publish_msg(t,s,this);
00245       }
00247     Status unpublish_msg(IdType t)
00248       {
00249         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00250         if(msg_scope_.find(t) != msg_scope_.end()) {
00251           ACE_DEBUG((LM_DEBUG, "Msg Id [%s] not published, cannot be unpublished\n", ID2STR(t).c_str()));
00252           return FAILURE;
00253         }
00254         PubSub_Scope s = msg_scope_[t];
00255         msg_scope_.erase(t);
00256         return this->ch_->unpublish_msg(t,s,this);
00257       }
00259     Status unpublish_all(void)
00260       {
00261         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00262         for(typename std::map<IdType, PubSub_Scope>::iterator iter =  msg_scope_.begin();
00263             iter !=  msg_scope_.end(); iter++) {
00264           this->ch_->unpublish_msg(iter->first, iter->second, this);
00265         }
00266         msg_scope_.clear();
00267         return SUCCESS;
00268       }
00277     template <class MsgType>
00278     Status send_msg(IdType t, MsgType *msg, int sz = sizeof(MsgType), 
00279                             MsgFreeCallback g = DefaultMsgFreeCallback<MsgType>,
00280                             ACE_Time_Value *timeout=0) {
00281       Msg *m = new Msg(t, msg, sz, g);
00282       return send_msg (m, timeout);
00283     }
00285     virtual Status send_msg(Msg *msg, ACE_Time_Value *timeout=0)
00286       {
00287         //cannot use wildcard when sending
00288         if (IdTrait::endWithWildcard(msg->type)) {
00289           ACE_DEBUG((LM_DEBUG, "TrieRouter::route_msg: Wildcard in Id [%s]\n",ID2STR(msg->type).c_str()));
00290           delete msg;
00291           return FAILURE;
00292         }
00293 
00294         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00295         for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00296             iter != msg_scope_.end(); iter++)
00297           if (IdTrait::id1contains2(iter->first, msg->type)) {
00298             return this->ch_->route_msg(msg, this->type(), iter->second, timeout);
00299           }
00300         ACE_DEBUG((LM_DEBUG, "src failed to sends unpublished msg [%s], dropped\n", ID2STR(msg->type).c_str()));
00301         delete msg;
00302         return FAILURE;
00303       }
00305     PubSub_Scope scope(IdType t)
00306       {
00307         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, SCOPE_UNDEFINED);
00308         for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00309             iter != msg_scope_.end(); iter++)
00310           if (IdTrait::id1contains2(iter->first, t)) {
00311             return iter->second;
00312           }
00313         else
00314           return SCOPE_UNDEFINED;
00315       }
00316     protected:
00317     //published msgs scopes
00318     std::map<IdType, PubSub_Scope> msg_scope_;
00319     typename SynchPolicy::RW_MUTEX scope_lock_;
00320   };
00321 
00331   template <class Channel>
00332     class Destination<Channel, NAMESPACE_HIERARCHICAL> : virtual public Member<Channel> {
00333     public:
00334     typedef typename Channel::IdType IdType;
00335     typedef typename Channel::IdTrait IdTrait;
00336     typedef typename Channel::Msg Msg;
00337     typedef typename Channel::SynchPolicy SynchPolicy;
00338 
00339     Destination (Channel *c) { Member<Channel>::ch_ = c; }
00340     virtual ~Destination() {}
00342     Status subscribe_msg(IdType t, PubSub_Scope s=SCOPE_GLOBAL)
00343       {
00344         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00345         for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00346             iter != msg_scope_.end(); iter++)
00347           if (IdTrait::match(iter->first, t)) {
00348             ACE_DEBUG((LM_DEBUG, "Msg Id [%s] conflict, cannot be subscribed\n", ID2STR(t).c_str()));
00349             return FAILURE;
00350           }
00351         msg_scope_[t] = s;
00352         return this->ch_->subscribe_msg(t, s, this);
00353       }
00355     Status unsubscribe_msg(IdType t)
00356       {
00357         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00358         if(msg_scope_.find(t) != msg_scope_.end()) {
00359           ACE_DEBUG((LM_DEBUG, "Msg Id [%s] not subscribed, cannot be unsubscribed\n", ID2STR(t).c_str()));
00360           return FAILURE;
00361         }
00362         msg_scope_.erase(t);
00363         return this->ch_->unsubscribe_msg(t, this);
00364       }
00366     Status unsubscribe_all(void)
00367       {
00368         ACE_WRITE_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, FAILURE);
00369         for(typename std::map<IdType, PubSub_Scope>::iterator iter =  msg_scope_.begin();
00370             iter !=  msg_scope_.end(); iter++) {
00371           this->ch_->unsubscribe_msg(iter->first, this);
00372         }
00373         msg_scope_.clear();
00374         return SUCCESS;
00375       }
00377     virtual Status put_msg(Msg *msg, ACE_Time_Value *timeout=0) = 0;
00378     //utils
00379     bool operator==(Destination *s) { 
00380       return equals(s);
00381     }
00382     bool equals(Destination *s) { 
00383       if (this->type() != s->type()) return false;
00384       if (((Destination *)this) != s) return false;
00385       return true;
00386     }
00388     PubSub_Scope scope(IdType t)
00389       {
00390         ACE_READ_GUARD_RETURN(typename SynchPolicy::RW_MUTEX, guard, scope_lock_, SCOPE_UNDEFINED);
00391         for(typename std::map<IdType, PubSub_Scope>::iterator iter = msg_scope_.begin();
00392             iter != msg_scope_.end(); iter++)
00393           if (IdTrait::id1contains2(iter->first, t)) {
00394             return iter->second;
00395           }
00396         else
00397           return SCOPE_UNDEFINED;
00398       }
00399     protected:
00400     //subscribed msgs scopes
00401     std::map<IdType, PubSub_Scope> msg_scope_;
00402     typename SynchPolicy::RW_MUTEX scope_lock_;
00403   }; 
00404 
00412   template <class Channel>
00413     class  Callback : public Channel::Destination {
00414     public:
00415     typedef typename Channel::Msg Msg;
00416 
00417     Callback (Channel *c) : Channel::Destination(c) {}
00418     Status put_msg(Msg *msg, ACE_Time_Value *timeout) {
00419       ACE_UNUSED_ARG(timeout);
00421       return process (msg);
00422     }
00424     virtual Status process(Msg *msg) = 0;
00425   };
00426 
00427 
00428 };
00429 
00430 #endif
00431 

Generated on Mon Feb 27 19:59:21 2006 for channel by  doxygen 1.4.6-NO