Support for Message Passing Concurrency
Join's async methods and chords provide strong support for
asynchronous message based concurrency designs.
Compare to Erlang
Erlang's support for concurrency is based on a few primitives:
- spawn - create very light weight process
- Pid ! Message - send message to a process
- receive ... end
A typical Erlang server contains a message processing main loop such as
the following:
loop()
->
receive
{rectangle, Width, Ht} ->
io:format("Area of rectangle is ~p~n", [Width * Ht]),
loop();
{circle, R} ->
io:format("Area of circle is ~p~n", [3.14159 * R * R]),
loop();
Other
->
io:format("I dont know what the area of a ~p is ~n",
[Other])
loop()
end.
The same (synchronous) server code can be expressed in Join as
following:
class
Server : public joint {
private:
async<void>
activate;
synch<void>
process;
public:
//the server will
serve 2 messages:
async<tuple<float, float> > rectangle;
async<float> circle;
Server(spawn_type
e): joint(e) {
chord(activate, &Server::main_loop);
chord(process, rectangle, &Server::calc_rect_area);
chord(process, circle, &Server::calc_circle_area);
//start server thread
activate();
}
private:
//server main loop
void
main_loop(void_t act) {
//all the next 2 message processing
methods will run in this thread
for(;;)
process();
}
void
calc_rect_area(void_t p, tuple<float, float>
rect) {
cout
<< "Area of rectangle is " << tuples::get<0>(rect) *
tuples::get<1>(rect)
<< endl;
}
void
calc_circle_area(void_t p, float
circ) {
cout
<< "Area of circle is " << 3.14159 * circ * circ
<< endl;
}
};
We can do a comparison of the above 2 code snippets:
- Both above codes implement synchronous servers; server will own
its own thread/process and block waiting for incoming requests. Erlang'
server is running in Erlang's light weight process while Join's server
is running in a thread from executor's thread pool. Erlang's process is
much cheaper than normal OS thread, so we can have thousands of Erlang
servers running without bringing the machine down. To achieve similar
benefit with Join and OS thread, we have to implement an asynchronous
server as following code.
- Erlang check messages types by pattern matching during runtime,
if Erlang server
receives a unexpected message, "Other -> ..." section is invoked. In
the above Join based server, the message types are defined at interface
and
checked by compiler. Because sending a message to Join based server is
to invoke its async methods, so compiler can prevent invoking
unsupported methods.
A purely asynchronous Join based server can be expressed as following:
class
Server : public joint {
public:
//the server will
serve 2 messages:
async<tuple<float, float> > rectangle;
async<float> circle;
Server(spawn_type
e): joint(e) {
chord(rectangle, &Server::calc_rect_area);
chord(circle, &Server::calc_circle_area);
}
private:
void calc_rect_area(tuple<float, float> rect) {
cout
<< "Area of rectangle is " << tuples::get<0>(rect) *
tuples::get<1>(rect)
<< endl;
}
void
calc_circle_area(float circ) {
cout
<< "Area of circle is " << 3.14159 * circ * circ
<< endl;
}
};
All these asynchronous servers do not have their own threads, instead
they share the threads from executor's thread pool, ie. their message
processing methods will be dispatched to thread pool to be executed. So
if the executor's thread pool contain enough threads, we should be able
to use thousands of these async active objects.
Of course we can develop traditional style message passing system in
Join as following.
Normally we'll have a generic
message container:
class message {
public:
int type;
char *data;
}
class server :
public joint {
synch<shared_ptr<message>, void> receive;
public:
async<shared_ptr<message> > send;
server() {
chord(receive,
send, &server::forward);
new
thread(bind(&server::main_loop, this);
}
private:
void main_loop() {
for(;;)
{
shared_ptr<message>
msg = receive();
switch(msg->type) {
case type1:
... handle msg type 1; break;
case type2:
... handle msg type 2; break;
.......
default:
cout <<
"unknown msgs";
break;
}
}
}
shared_ptr<message> forward(void_t r, shared_ptr<message>
s) {
return s.arg1;
}
};
Here
the message type resolution happens during runtime. Since C/C++ do not
have pattern matching so we have to switch based on msg->type. The
"default" case is similar to Erlang's "Other..." case and the 1st
messages
of message queue is always returned no matter what type it is, which is
similar to
Erlang's AnyMessage.
Reactive Finite State Machines
One common asynchronous message based design is based on reactive
finite state machine (FSM). Each entitiy in this design is an active
object, communicating with each other thru asynchronous messages, and
the behaviour of these entities are controlled by their state machines,
normally encoded as a state transition table. One common encoding is a
two dimensional table, some cells of it are valid and containing the
following triplet:
<state,
message(event), action / next state>
- state: the current state of entity
- message (event): the message arrived or event happened when
entity is in the above state
- next state: the new state the entity will transition to when the
event happened
- action: the processing the entity will carry out during transition
With Join's async methods and chords, we can directly define the above
state transition table in a declarative manner. The following rules are
applied:
- using async methods to represent states
- using async methods to represent messages / events
- using chords to define actions and valid transitions
For example, the state machine of a light can be encoded as the
following Join based class:
class light : public
joint {
public:
//using async methods to represent states
async<void> on;
async<void> off;
//using async methods to represent messages / events
async<void> switch;
light(spawn_type e) : joint(e) {
//in constructor, using chords to
define valid actions and transitions
chord(on, switch, &light::dark);
chord(off, switch, &light::bright);
//initialize the state of light to
"off" by
calling async method
off();
}
void dark(void_t on, void_t switch) {
//do some transition actions here
//then transition to the next state
"off" by
calling async method
off();
}
void bright(void_t off, void_t switch) {
//do some transition actions here
//then transition to the next state
"on" by
calling async method
on();
}
};
Join really shines when more complicated state machines are handled.
For example, the state transition is triggered by combination of events:
<state,
event1 & event2 & event3, action / next state>
Apart from the advantage of encoding the state machine declaratively,
the above state machine is multi-thread safe and the transition actions
executes asynchronously (and could concurrently if the logic allows) in
executor's thread
pool. If the executor is an adaptor to other framework's execution
service (such as Boost.Asio's event completion queue), the transition
actions will be executed there. Many entities and their FSMs can share
the same thread pool.
By defining a synchronous process() method,
and adding it to the above chords, we can easily define an "active"
state machine with its own thread (or so called reactive iterative
server), similar to the above synchronous Join server.