Chords and Joints
The Role of Chords, and Joints
In sequential languages, a common construct of control flow is "switch
/ case" statements:
switch(val) {
case val1: statements1; break;
case val2: statements2; break;
...
default: ...
}
In CSP based concurrent languages, such as Occam, Limbo, etc., there is
a "ATL" construct to branch control flow based on the communication
readiness of channels:
alt {
i = <- inchan =>
statements1;
outchan <- = "sent it!" =>
statements2;
}
Different blocks of statements will run based on the communication
readiness of channels (inchan, outchan). However in "alt", if the
qualifier before "=>" contains several channel communication
operations (<-, ->), only the first one will be tested. The
relationship between different "alting" segments are OR relationship.
In JoCaml, an "count-down" idiom is defined as :
def count(n) &tick() = count(n-1)
or count(0) & wait() = reply to wait
There are 2 join-patterns (chords) here: count(n) & tick(),
count(0) & wait(), each of which contains 2 ports. Similar to CSP,
join-patterns act as qualifier "guarding" the actions statements
following it. If all two ports in a join-pattern are called, the action
statements of this chord is ready to run. The relationship inside a
join-pattern or chord is AND, while the relationship between different
chords are OR.
Similarly in Join, joints act as a "switch / case" construct for
message passing, run different codes depending on the communication
readiness of ports. So it is a construct for message coordination and
orchestration, such as following code, different operations are done
based on readiness of ports:
async<T1> port1;
async<T2> port2;
async<T3> port3;
joins(executor)
.chord(port1, port2, operation1)
.chord(port2, port3, operation2);
Chord Definition
Chords are created when joint's chord() methods are invoked. A chord
binds a set of ports to the processing logic which consume and
process messages from those ports. The processing logic is a
function object which can be in the form of normal function, lambda,
class method, or object method. Its signature can be deduced as
following:
for a chord of a set of ports with
types:
PortT1, PortT2, PortT3, ...
the chord binds to a function object "call" with signature:
PortT1::result_type
call(PortT1::argument_type,
PortT2::argument_type, PortT3::argument_type, ...);
For async<MsgT> port, its result_type is "void", and its
argument_type is the message type it carries - MsgT.
For synch<ResT,MsgT>, obviously its result_type is ResT and its
argument_type is MsgT.
For ports sending "void" messages (async<void>, synch<ResT,
void>), their argument type is "void_t" defined as struct void_t {}; so C/C++ compilers
can be satisfied.
In the set of ports of a chord, there can be at most one
synch<R,T> port; and if there is one, it must be the first
argument of chord and so its result_type is the return type of chord's
"call" function object.
So a thread safe message queue with async send port and synchronous
"blocking" recv port can be defined as following:
template <MsgT>
class msg_que : public joint {
public:
async<MsgT> send;
synch<MsgT,void> recv;
msg_que() {
chord(recv, send,
&msg_que::proc);
}
private:
MsgT proc(void_t r, MsgT s) { return s;
}
}
Please note that synchronous port "recv" is used as the first
argument of chord; and the signature of msg_que::proc matches the
argument types of chord's ports.
Using boost::lambda, the msg_que class can be redefined as following:
template <MsgT>
class msg_que {
public:
async<MsgT> send;
synch<MsgT,void> recv;
msg_que() {
joins().chord(recv, send,
_2);
}
}
Joint Definition
A joint "joins" a set of chords which may share ports, thus
competing for messages. Internally joint synchronizes the consumption
of messages and schedules the firing of chords.
A joint can be defined and used in 2 ways:
- The joint initially defines a set of chords which later may need
to be changed (by chord_remove, chord_override, reset) to change
message processing logic. In this case we need a name or reference to
the "joint" object to invoke those methods. For this, the "joint" class
can be used as parent of application classes, or a "joint" object can
be instantiated; for example:
async<T1> chan1;
async<T2> chan2;
async<T3> chan3;
joint joins1(executor);
joins1
.chord(chan1, chan2, proc1)
.chord(chan2, chan3, proc2);
...
joins1.override_chord(chan1, chan2, new_proc);
joins1.remove_chord(...);
- In many cases, joint is used one-shot to create a "fixed" set of
synchronization chords which never change during its lifetime. And
async<> / synch<> ports are the primary programming
interfaces. We can
use "factory" funtions "joins() / joins_t()" to create unnamed joints
for this purpose:
async<T1> chan1;
async<T2> chan2;
async<T3> chan3;
joins(executor)
.chord(chan1, chan2, proc1)
.chord(chan2, chan3, proc2);
The variance of joint can be configured from the following aspects:
simple schedulers:
sched_first_match, sched_longest_match,
sched_round_robin;
priority based schedulers:
sched_pri_first_match, sched_pri_longest_match,
sched_pri_round_robin;
- max number of ports: 32 or larger
The templated joint type can be defined
similar to: joint_t<sched_round_robin, 32>. The default
"joint" type is "joint_t<sched_first_match,32>".
or unnamed joints can be created with template factory function:
joins_t<sched_round_robin, 32>(executor,...). The default factory
method "joins()" is "joins_t<sched_first_match,32>()".
- instantiation parameters:
- executor: if a joint contains chords with all async ports
(such as the above 2 joint definitions), chord's function have to run
in a separate thread; executors with type
boost::function<void(callable)> are used to spawn new tasks.
- heartbeat: an experiment feature to allow auto-destruction of
joint and its chords. If a positive heartbeat is specified, after
"heartbeat" number of firings of chords, this joint will be
auto-destroyed and ports detached.
- name: if defined, debuging messages will be printed for this
joint.
Runtime Semantics
Quoted and modified from Cω
Concurrency Extensions Tutorials [2]:
The body (message processing logic) of a
chord can only execute once
all the ports in
its header have been called. When a async / synch port
is called there may be zero, one, or more chords which are enabled
(ready to fire):
- If no chord is enabled then the port invocation is queued up.
If the port is asynchronous, then this simply involves adding the
message to a queue. If the method is
synchronous, then the calling thread is blocked.
- If there is a single enabled chord, then the messages of the
calls involved in the match are de-queued, any blocked thread involved
in the match is awakened, and the body runs.
- If there are several chords which are enabled then the joint
scheduling policy decides which is chosen to run.
- chord body execution:
- When a chord which involves only asynchronous ports runs,
then it does so in a thread of executors' thread pool.
- If a chord involves a (single) synch<> method, the chord
body will execute in the thread which calls the synch<> method
- Exception handling: if an
exception is thrown inside a chord
"body" function object,
- if this chord only involves asynchronous methods, its body will
run in a thread from executor's thread pool, and the exception thrown
inside chord body will be caught and then simply dropped (in fact if
executor's log is
turned on, we'll see it in log), since the semantics of async calls are
to return immediately with no result.
- if this chord has a synchronous method, the chord body will
execute in the calling thread of this synchronous method and this
caller will also get the exception.