typedef typename
name_space::id_type id_type;
typedef typename name_space::synch_policy synch_policy;
typedef typename name_space::executor executor;
typedef name<id_type,executor,synch_policy> name_type;
typedef message<id_type> msg_type;
typedef typename name_type::binding_set_type binding_set_type;
typedef msg_queue_type<boost::shared_ptr<msg_type>,
synch_policy,
platform> que_type;
typedef pull_recver_sample<name_space, platform,msg_queue_type>
recver_type;
typedef named_in<name_space, recver_type> named_in_type;
Then constructor is defined.
name_type * name_;
pull_sender_sample(name_type *n, executor *) : que_type(), name_(n) { }
~pull_sender_sample() {}
Next, the most important method in sending class is defined. Since this
is a pull dispatcher, the message is not pushed directly to receivers.
Instead, the message is stored in queue first, then bound receivers are
notified that a message is available and receiving threads are
unblocked. Notice the binding_set (set of bound receivers) are
retrieved from associated name object.
void notify(boost::shared_ptr<msg_type> msg) {
//first store msg in que
put(msg);
//notify recevers
typename synch_policy::scoped_lock
lock(this->name_->bind_lock_);
binding_set_type &bindings = this->name_->bindings_;
if (!bindings.empty()) {
bool cont = true;
for(typename binding_set_type::iterator iter =
bindings.begin();
iter != bindings.end() && cont; iter++) {
named_in_type *named_in = (named_in_type
*)(*iter);
recver_type *recver = (recver_type
*)named_in;
//for pull dispatchers, only send msgs
to local receviers
if (named_in->type_ ==
named_in_type::member_local)
cont = recver->notify(msg->id_);
}
}
}
After receiving notification, the 1st notified receiver will
pull/retrieve message from sender's queue.
int
pull(boost::shared_ptr<msg_type> & msg) {
if (!this->empty()) {
get(msg);
return 1;
}
return 0;
}
The following methods are various "send()" which are the API of this
class. One basic rule is that message data are passed in as pointers,
and this data will be owned by channel and released by channel after
nobody refers to it anymore. To change this default behaviour (such as
passing data on stack as message), invoker of "send()" can pass in
proper deleter together with data pointer or pass in proper share_ptr
initialized with proprt deleter.
//after sending, channel becomes owner
template <typename user_msg_type>
void send(user_msg_type *msg) {
boost::shared_ptr<void> m0(msg);
boost::shared_ptr<msg_type> m(new msg_type(name_->id_,
m0));
notify (m);
}
//after sending: 1> channel becomes owner, if deleter does real
deletion
// 2> sender still owns msg, if deleter does nothing
template <typename user_msg_type, typename deleter>
void send(user_msg_type *msg, deleter deler) {
boost::shared_ptr<void> m0(msg, deler);
boost::shared_ptr<msg_type> m(new msg_type(name_->id_,
m0));
notify (m);
}
//user_msg is already smarter pointer, channel becomes owner
template <typename user_msg_type>
void send(boost::shared_ptr<user_msg_type> msg) {
boost::shared_ptr<void> m0(msg);
boost::shared_ptr<msg_type> m(new msg_type(name_->id_,
m0));
notify (m);
}
//for channel internal use on wildcard named_out
template <typename user_msg_type>
void send(id_type id, boost::shared_ptr<user_msg_type> msg) {
boost::shared_ptr<void> m0(msg);
boost::shared_ptr<msg_type> m(new msg_type(id, m0));
notify (m);
}
The following is about the implementation of receiving class.
It has the same template parameters as sending class.
template <typename name_space, typename platform, template
<class,class,class> class msg_que_type>
struct pull_recver_sample {
Similarly,
associated types are defined.
typedef typename name_space::id_type id_type;
typedef typename name_space::synch_policy synch_policy;
typedef typename name_space::executor executor;
typedef name<id_type,executor,synch_policy> name_type;
typedef message<id_type> msg_type;
typedef pull_sender_sample<name_space, platform, msg_que_type>
sender_type;
typedef named_out<name_space, sender_type> named_out_type;
typedef typename name_type::binding_set_type binding_set_type;
Some synchronization variables and constructor.
name_type * name_;
typename synch_policy::mutex mutex_;
typename synch_policy::condition cond_;
int num_waiting_;
pull_recver_sample(name_type *n) :
name_(n), num_waiting_(0) {}
~pull_recver_sample() {}
Here comes the "main" function of receiving threads. Recving thread
will block here and wait for notifications from senders. And then
messages will be pulled/retrieved from sending queue and processed.
void recv(id_type & id, boost::shared_ptr<void> & msg) {
int st;
boost::shared_ptr<msg_type> mesg;
typename synch_policy::scoped_lock lock(mutex_);
while(1) {
st = pull(mesg);
if (st != 0) {
id = mesg->id_;
msg = mesg->data_;
return;
}
num_waiting_++;
cond_.wait(lock);
num_waiting_--;
}
}
Sending thread will call this method to wake up blocked receiving
threads.
bool notify(id_type id) {
typename synch_policy::scoped_lock lock(mutex_);
if (num_waiting_ > 0) {
cond_.notify_one();
return false;
}
else
return true;
}
Since a receiver can be bound to multiple senders, the following pull
method try to retrieve data from any bound senders.
int pull(boost::shared_ptr<msg_type> & msg) {
int st = 0;
//go-thru binding_set to pull from senders
typename synch_policy::scoped_lock lock(name_->bind_lock_);
binding_set_type &bindings = name_->bindings_;
if (!bindings.empty()) {
for(typename binding_set_type::iterator iter =
bindings.begin();
iter != bindings.end() && st == 0; iter++) {
named_out_type *named_out =
(named_out_type *)(*iter);
sender_type *sender = (sender_type
*)(named_out);
st = sender->pull(msg);
}
}
return st;
}
Finally, a dispatcher template class wraps these sending class and
receiving class together.
template <typename
name_space, typename platform, template <class,class,class> class
queue_type = unbounded_que>
struct pull_dispatcher_sample {
typedef
detail::pull_sender_sample<name_space,platform,queue_type> sender;
typedef
detail::pull_recver_sample<name_space,platform,queue_type> recver;
};
Here is a
simple program demos usage of this dispatcher.