Sample 12.  port and signal: unnamed point of tightly-coupled local interactions 

Channel's name space and binding thru name-matching provide a loosely coupled model for asynchronous distributed message passing. However setting up name space and designing name assignment scheme could be too much trouble for applications based on tightly coupled message passing inside a single address space. This tutorial explains 3 samples showing how Channel's port and signal classes can help build these kind of applications.

The 1st sample is using a pipeline of asynchronous tasks to generate all primes less than a thousand. For the original design and considerations, please refer to Russ Cox's essay "Bell Labs and CSP Threads".  The original code is based on tasks(coroutines) and synchronous communication channels. In this sample, the code is changed to  be based on port and asynchronous tasks executed by thread pool executor. Here is the complete source code prime_sieve.cpp.  Let's walk thru the code.

First we define the type of thread pool executor to run the asynchronous tasks. Also we instantiate port template class to define port type and its related choice arbiter type.
typedef port<exec_type> port_type;
typedef port_type::choice_async choice_type;
A port can be used as a message queue and allow applications put data into it and get data from it. However mostly ports are used with choice/join arbiters.

Next we define a class for the asynchronous tasks which will do the real job of finding primes. A task consists of 3 main parts: <1> a port to receive messages; <2> a run() method to define task's functionality; <3> a executor to execute run() method. Please notice that in constructor the choice arbiter bind the port to run() method, so that each incoming message will create an asynchronous task which will execute run()'s code in a thread from thread pool.
  void run(boost::shared_ptr<void> p)
  {
    int val = *(int *)p.get();
    if (my_prime_ == -1) { //first time
      my_prime_ = val;
      cout << "------ prime_task [" << my_no_ << "] found prime = " << val << endl;
      next_ = new prime_task(exe_); //create the next task
    }
    else {
      if (val % my_prime_) { //not my multiples
    next_->port_.send(p);
      } else { //my multiples
    cout << "prime_task [" << my_no_ << "] drop " << val << endl;
      }
    }
  }
The method run() takes a single argument of pointer to message data which it received from port. If it is the 1st time this task receives a message, the received integer will be a prime and it is saved, and the next asynchronous task will be created. If it is not the 1st message, it will be compared with the prime saved by this task and dropped if it is a multiple of the saved prime; otherwise the message is sent to the port of the next task.

The main() function is simple. First we create the thread pool executor and create the 1st task. Then we start feeding integers (2-1000) to the port of the 1st task. Before exit, we'll wait for the graceful exit of all thread pool threads.

The 2nd example demos the signal class and its associated slot type. It is a modification of tutorial 1. Instead of using channel, named_out and named_in, this sample uses signal and slot to set up event dispatching system without the hassle of choosing id and assigning names. Here is the complete source gui_evt3.cpp .

The 3rd sample demos how "unnamed" port can be used together with names / ids in the same choice arbiter. Here is the complete source code  chat_join_timer.cpp.  This sample is based on sample 6 with the addition of a timer sending events to a port. Let's walk thru code.

First port type is defined based on channel type.
    typedef chat_chan::port port_t;

Next my_timer class is defined, which uses asio's deadline_timer and set timeout period to 5 seconds. When timeout happens, a null_event message is sent to a port.
class my_timer
{
public:
  my_timer(boost::asio::io_service& io, port_t &p)
    : timer_(io, boost::posix_time::seconds(5)),
      port_(p)
  {
    timer_.async_wait(boost::bind(&my_timer::timeout, this));
  }

  void timeout()
  {
    port_.send(new null_event); //send timeout notifications
    timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(5));
    timer_.async_wait(boost::bind(&my_timer::timeout, this));
  }

private:
  boost::asio::deadline_timer timer_;
  port_t &port_;
};

In main(), most code is similar to sample 6, except the following code for timer and port.
First, timer and port are created and a new thread is spawn to run asio.
  boost::asio::io_service io;
  port_t timeout_port;
  my_timer(io,timeout_port);
  //spawn a thread to run timer
  boost::thread t(boost::bind(&boost::asio::io_service::run, &io)); 

Next, add the timeout_port to the choice arbiter and bind it with a timeout handler which simply prints a message.
 choice.bind(timeout_port, timeout_hdlr);
Notice the same choice arbiter handles both "names/ids" and the timeout_port objects.

During testing, we should see the timeout message every 5 seconds.