Implementation of Sample Pull Dispatcher

The following is the description of implementation of a simple pull dispatcher.  Its full source is here...

Each dispatcher has 2 parts: sending algorithm and receiving algorithm. They are defined as 2 separate nested template classes. With this dispatcher, sending thread buffers messages in queues at sending side, notifies the receivers and returns right away, receiving threads are blocked waiting at receiving side and unblocked when messages are available.

Let's look at the sending class at first. Because it is a pull dispatcher, the messages will be buffered inside channel at sending side. A queue will be part of sending class. Queue type is one of template parameter of sender class to allow customization. We set the sender class inherit from queue class so that queue's methods are exposed to allow clients code to provision the queue's settings such as size or blocking timeout.

    template <typename name_space, typename platform, template <class,class,class> class msg_queue_type>
    struct pull_sender_sample :
        public msg_queue_type<boost::shared_ptr<message<typename name_space::id_type> >,
                typename name_space::synch_policy,
                platform> {

Next, we define a few data types associated with sending class:
    typedef typename name_space::id_type id_type;
    typedef typename name_space::synch_policy synch_policy;
    typedef typename name_space::executor executor;
    typedef name<id_type,executor,synch_policy> name_type;
    typedef message<id_type> msg_type;
    typedef typename name_type::binding_set_type binding_set_type;
    typedef msg_queue_type<boost::shared_ptr<msg_type>, synch_policy,
                   platform> que_type;
    typedef pull_recver_sample<name_space, platform,msg_queue_type> recver_type;
    typedef named_in<name_space, recver_type> named_in_type;

Then constructor is defined.
    name_type * name_;
    pull_sender_sample(name_type *n, executor *) : que_type(), name_(n) { }
    ~pull_sender_sample() {}

Next, the most important method in sending class is defined. Since this is a pull dispatcher, the message is not pushed directly to receivers. Instead, the message is stored in queue first, then bound receivers are notified that a message is available and receiving threads are unblocked. Notice the binding_set (set of bound receivers) are retrieved from associated name object.
    void notify(boost::shared_ptr<msg_type> msg) {
      //first store msg in que
      put(msg);
      //notify recevers
      typename synch_policy::scoped_lock lock(this->name_->bind_lock_);
      binding_set_type &bindings = this->name_->bindings_;
      if (!bindings.empty()) {
        bool cont = true;
        for(typename binding_set_type::iterator iter = bindings.begin();
        iter != bindings.end() && cont; iter++) {
          named_in_type *named_in = (named_in_type *)(*iter);
          recver_type *recver = (recver_type *)named_in;
          //for pull dispatchers, only send msgs to local receviers
          if (named_in->type_ == named_in_type::member_local)
        cont = recver->notify(msg->id_);
        }
      }
    }

After receiving notification, the 1st notified receiver will pull/retrieve message from sender's queue.
    int pull(boost::shared_ptr<msg_type> & msg) {
      if (!this->empty()) {
        get(msg);
        return 1;
      }
      return 0;
    }

The following methods are various "send()" which are the API of this class. One basic rule is that message data are passed in as pointers, and this data will be owned by channel and released by channel after nobody refers to it anymore. To change this default behaviour (such as passing data on stack as message), invoker of "send()" can pass in proper deleter together with data pointer or pass in proper share_ptr initialized with proprt deleter.
    //after sending, channel becomes owner
    template <typename user_msg_type>
    void send(user_msg_type *msg) {
      boost::shared_ptr<void> m0(msg);
      boost::shared_ptr<msg_type> m(new msg_type(name_->id_, m0));
      notify (m);
    }

    //after sending: 1> channel becomes owner, if deleter does real deletion
    // 2> sender still owns msg, if deleter does nothing
    template <typename user_msg_type, typename deleter>
    void send(user_msg_type *msg, deleter deler) {
      boost::shared_ptr<void> m0(msg, deler);
      boost::shared_ptr<msg_type> m(new msg_type(name_->id_, m0));
      notify (m);
    }

    //user_msg is already smarter pointer, channel becomes owner
    template <typename user_msg_type>
    void send(boost::shared_ptr<user_msg_type> msg) {
      boost::shared_ptr<void> m0(msg);
      boost::shared_ptr<msg_type> m(new msg_type(name_->id_, m0));
      notify (m);
    }

    //for channel internal use on wildcard named_out
    template <typename user_msg_type>
    void send(id_type id, boost::shared_ptr<user_msg_type> msg) {
      boost::shared_ptr<void> m0(msg);
      boost::shared_ptr<msg_type> m(new msg_type(id, m0));
      notify (m);
    }

The following is about the implementation of receiving class.

It has the same template parameters as sending class.
      template <typename name_space, typename platform, template <class,class,class> class msg_que_type>
      struct pull_recver_sample {
Similarly, associated types are defined.
    typedef typename name_space::id_type id_type;
    typedef typename name_space::synch_policy synch_policy;
    typedef typename name_space::executor executor;
    typedef name<id_type,executor,synch_policy> name_type;
    typedef message<id_type> msg_type;
    typedef pull_sender_sample<name_space, platform, msg_que_type> sender_type;
    typedef named_out<name_space, sender_type> named_out_type;
    typedef typename name_type::binding_set_type binding_set_type;

Some synchronization variables and constructor.
    name_type * name_;
    typename synch_policy::mutex mutex_;
    typename synch_policy::condition cond_;
    int num_waiting_;

    pull_recver_sample(name_type *n) :
      name_(n), num_waiting_(0) {}
    ~pull_recver_sample() {}

Here comes the "main" function of receiving threads. Recving thread will block here and wait for notifications from senders. And then messages will be pulled/retrieved from sending queue and processed.
    void recv(id_type & id, boost::shared_ptr<void> & msg) {
      int st;
      boost::shared_ptr<msg_type> mesg;
      typename synch_policy::scoped_lock lock(mutex_);
      while(1) {
        st = pull(mesg);
        if (st != 0) {
          id = mesg->id_;
          msg = mesg->data_;
          return;
        }
        num_waiting_++;
        cond_.wait(lock);
        num_waiting_--;
      }         
    }

Sending thread will call this method to wake up blocked receiving threads.
    bool notify(id_type id) {
      typename synch_policy::scoped_lock lock(mutex_);
      if (num_waiting_ > 0) {
        cond_.notify_one();
        return false;
      }
      else
        return true;
    }

Since a receiver can be bound to multiple senders, the following pull method try to retrieve data from any bound senders.
    int pull(boost::shared_ptr<msg_type> & msg) {
      int st = 0;
      //go-thru binding_set to pull from senders
      typename synch_policy::scoped_lock lock(name_->bind_lock_);
      binding_set_type &bindings = name_->bindings_;
      if (!bindings.empty()) {
        for(typename binding_set_type::iterator iter = bindings.begin();
        iter != bindings.end() && st == 0; iter++) {
          named_out_type *named_out = (named_out_type *)(*iter);
          sender_type *sender = (sender_type *)(named_out);
          st = sender->pull(msg);
        }
      }
      return st;
    }

Finally, a dispatcher template class wraps these sending class and receiving class together.
    template <typename name_space, typename platform, template <class,class,class> class queue_type = unbounded_que>
    struct pull_dispatcher_sample {
      typedef detail::pull_sender_sample<name_space,platform,queue_type> sender;
      typedef detail::pull_recver_sample<name_space,platform,queue_type> recver;
    };

Here is a simple program demos usage of this dispatcher.