Synopsis of Join Classes
- classes
namespace
boost {
namespace join {
template
<typename MsgT, typename QueT> class async;
template <typename ResT, typename MsgT> class synch;
template <typename scheduler, size_t max_size>
class joint;
joint joins(joint::spawn_type s =
0, int hb = 0, const char *name = 0);
class executor;
}
}
- Asynchronous ports
template
<typename MsgT, typename QueT>
class async {
public:
typedef
MsgT argument_type;
typedef void result_type;
......
async(size_t sz=0) : ... { ... }
void
operator()(MsgT msg)
}
Asynchronous port objects provide one-way non-blocking message passing
interfaces.
typename QueT:
The queue type async ports used to buffer messages internally
when no chords are ready to fire. The default QueT is std::deque. QueT
must satisfy the following interface:
class QueT {
void push_back(MsgT);
MsgT front();
void pop_front();
void clear();
}
async(size_t sz=0):
Effect: initialize a asynchronous port with underlying
message queue of
QueT type.
The queue size
sz is specified
for flow control; the default value 0 means
unlimited queue size. Internally there are 2 implementations for this
class. If the signature is async<void>, there is no
message to be passed, and template specialization will choose
a proper implementation without a queue (and its overhead) while
maintaining the
same semantics.
Throw:
void operator()(MsgT msg):
Effect: the port api to
send asynchronous messages; calling threads will return immediately.
Since async calls will return immediately, and
msg argument may be
buffered in queues, the
msg
argument cannot be references or pointers to
variables on calling stacks which will probably be unwound when async
calls
return.
Throws: not_in_chord_exception,
queue_overflow_exception, no_executor_exception
- Synchronous ports
template
<typename ResT, typename MsgT>
class synch {
public:
typedef MsgT argument_type;
typedef ResT result_type;
......
synch(size_t sz=0) : ... { ... }
ResT operator()(MsgT msg);
ResT operator()(MsgT msg, boost::xtime &timeout);
};
Synchronous ports provides blocking message ports.
synch(size_t sz=0):
Effect: initialize a synchronous port with underlying thread
waiting queue.
The queue size sz is specified
for flow control; the default value 0 means
unlimited queue size. Internally there are four implementations and
template specialization help choose the most efficient one.
Throw:
ResT operator()(MsgT msg):
Effect: the port
interface to send a synchronous message;
calling thread will block here till a reply is available. Since the
call will block till a result is returned, the msg argument can be
anything just as normal function calls, including pointers or
references to variables on stack.
Throws: not_in_chord_exception,
queue_overflow_exception, or
application_specific_exceptions_raised_inside_chord_body
ResT operator()(MsgT msg, boost::xtime
&timeout):
Effect: this is a variant of
the above interface with a timeout. The call will return either the
result is available or return with a "synch_time_out_exception" when
the timeout is reached.
Throws: not_in_chord_exception,
queue_overflow_exception, synch_time_out_exception,
application_specific_exceptions_raised_inside_chord_body
- Joint
template
<template <size_t> scheduler =
sched_first_match,
size_t
max_size = 32>
class joint {
typedef function1<void, typename
joint_base::callable> spawn_type;
joint(spawn_type spawn =
0, int heartbeat = 0, const char *name = 0):
template
<typename PortT1, typename PortT2,
......, typename CallT>
void chord(PortT1
&p1, PortT2 &p2, ......, CallT c, int priority=0)
template
<typename PortT1, typename PortT2, ......, typename CallT>
void
chord_override(PortT1 &p1, PortT2 &p2, ......, CallT c, int priority=0)
template
<typename PortT1, typename PortT2,
......>
void
chord_remove(PortT1
&p1, PortT2 &p2, ......)
......
};
Joint can be used as the parent of classes which use
async<> / synch<> methods and chords to define concurrent
activities; or it can be used to directly join message ports
together. Joint maintains the status of message arrival and
synchronization.
typename
spawn_type:
the type of execution service used to drive Join
based asynchronous applications. It could be the async "execute" method
of the following thread pool based executor, or it could be a adaptor
to existing applications execution service, as long as it provides a
functional / functor interface to spawn a task:
void
operator()(callable t);
max_size:
defines the capacity
of Joint, ie. how many async<> / synch<> ports can be
defined in this Joint. When max_size <= 32, an integer bitmask is
used
to maintain status; while for "larger" Joint, std::bitset<> is
used for status.
scheduling_policy:
Joint
supports three basic kinds of scheduling policies that decide which
chord
will fire when multiple chords become ready at the same time:
- sched_first_match: fire
the first chord which is ready to
fire
- sched_longest_match: when
multiple chords are ready to fire,
fire the one defined with most async<> / synch<> ports.
- sched_round_robin:
internally the last chord fired is
remembered and chords are fired in round-robin style.
Additionally Joint supports priority based variants
of the above three scheduling algorithms.
joint(spawn_type spawn = 0, int
heartbeat = 0, const char *name = 0):
an Joint is constructed with the following
two settings:
- spawn_type spawn: the
asynchronous port to spawn a new task.
It could be an adaptor to spawn tasks into existing applications main
threads (and it must be non-blocking and returns immediately), or the
execute port of the following executor (thread pool),
ie.
the task waiting queues. In Join based applications, concurrency are
generated by
chords with only async<> ports whose body will run as a task
in
executor thread pool.
- int heartbeat: used to
control the life time of message coordination definitions (chords)
inside the joint. If a heartbeat ( > 0 ) is provided, the joint's
internal definitions (chords) will be automatically reset / destroyed
after its chords have fired "heartbeat" times totally. So after
heartbeat expires, all of joint's chords will be destroyed and ports
detached.
- const char *name: the name of Joint; it is used as a flag to turn
on/off debug messages, when it is set to a non-NULL value, debugging
message will be printed out.
template
<typename PortT1, typename PortT2,
......, typename CallT>
void chord(PortT1 &p1, PortT2
&p2, ......, CallT c, int priority=0)
Effect:
overloaded chord()
functions to create chords with different number of
async<> / synch<> ports. By default Joint has just eight
overloaded chord() functions to create chords for one to eight
async<> / synch<> ports, because any of these
async<> ports can be a std::vector<async<>>, so in
fact
chords can be created for unlimited number of
async<> ports, while each chord can have at most one
synchronous port:
- PortT &p, &p1, ... &pn: async<> / synch<>
ports which are in the header of chord body; only one of them can be
synchronous and if we do have one, it must the first port.
- CallT c: a function, a lambda, class-method, or object-method
which will consume and process the messages from chord's specified
ports. Normally the signature of CallT is deduced from chord's port /
port types:
PortT1::result_type
call(PortT1::argument_type, PortT2::argument_type, ...);
Please note that if a port's message type is "void" (such as
async<void>), we substitue its argument_type as "void_t" which is
an "empty" struct type defined internally.
- int priority: the scheduling priority of this chord; the
default value 0 has the highest priority, the greater the number, the
lower the priority. During scheduling, the chords with higher priority
will be scanned and scheduled first.
Throws: hidden_chord_exception,
too_many_ports_exception
template <typename PortT1, typename
PortT2, ......, typename CallT>
void chord_override(PortT1 &p1,
PortT2 &p2, ......, CallT c, int priority=0)
Effect: allow overriding
an existing chord (the chord body is replaced with the new one).
It is mostly used in child class to override a chord defined in parent
class. The overridden chord is identified by the set of async<> /
synch<> ports in its header. It uses the same arguments as
normal chord() functions.
Throws: chord_override_exception
(chord not found)
template
<typename PortT1, typename PortT2,
......>
void chord_remove(PortT1
&p1, PortT2 &p2, ......)
Effect: remove the chord
identified by the set of async / synch ports.
Throws: chord_remove_exception
(chord not found)
- Factory functions to create "unnamed" joints
joint
joins(joint::spawn_type s = 0, int hb = 0, const char *name =
0);
or a more general templated function:
template <
template <size_t> class scheduler,
size_t
max_size
>
joint_t<scheduler, max_size> joins_t(joint::spawn_type s = 0, int
hb = 0, const char *name = 0);
Often joint is used directly to join message
ports together, and we don't need direct reference to the "joint"
object. In these cases, the factory functions can be used to create
unnamed joints, such as:
joins(exec)
.chord(async1, async2, handler1)
.chord(async2, async3, handler2);
- executor
template
<template <size_t> class scheduler=sched_first_match, size_t
sz=32>
class executor :
public Joint {
executor(int
num_threads, const char *name = NULL);
async<task>
execute;
synch<void,void> shutdown;
};
Chords with only
async<> ports will run its body as an asynchronous task,
either in a newly spawned thread, or in the thread
pool of executor, or in exisiting applications' execution services
(such as Boost.Asio's
main threads). So executor is the "engine" of Join based
applications. Different execution strategies can be used, such as a
thread per request or thread pool, as long as it provides a
functional / functor interface to spawn a task:
void
operator()(task t).
Here the default executor class is thread
pool based and defined using
async<> / synch<> ports and chords, and is a good
sample
of
Join.
executor(int num_threads, const char
*name = NULL):
- int num_threads: number of worker threads in executor's thread
pool
- const char *name: name of executor, a flag to turn on/off
debugging messages
async<task> execute: an
asynchronous port for applications to submit tasks; never block; it
is the default
task queue of
executor.
synch<void()> shutdown:
a synchronous port for application code to wait for executor to
shutdown (and its threads to exit). All chords defined with
shutdown have a low priority (1)
than normal (0), so shutdown will return when all submited tasks have
been finished and all worker threads exit.
- support of group/bundle of async<> ports:
boost::std::vector<async<MsgT>
>
In Cω the chords
are defined with a few async/synch ports with distinct names.
What if
we want to
define chords with a group / std::vector of asynchronous message ports
of the same message type? Inspired by
similar abstractions in CCR[4] and C#.Joins[3], Join allows the
definition of an std::vector of async ports and their participation
in chord as a whole. Since there are at most one synch port per
chord, so there is no support for std::vector of synch ports. The
argument type for vector of async ports will be vector<MsgT>.
Please
refer to the Group of Asynchronous port
tutotail (join_many.cpp) for their usage.
- exceptions
In Cω , many of the following errors related to the definitions of
async/synch ports and chords will be found by the compiler. Since
Join is implemented as a library, these errors can only be
reported as the following exceptions during runtime:
class
join_exception : public
std::exception
the
base class of all Join related exceptions
class not_in_chord_exception :
public
join_exception
When a async / synch port is
declared
and no chord include it, this port has no body defined similar
to pure virtual port in C++ abstract classes (pure virtual ports
have only signature / declaration, no body). When this class
is
instantiated and client code calls this port, not_in_chord_exception
will be thrown.
class double_association_exception :
public
join_exception
Every async / synch port should only
be associated with one Joint. When attempting to use the port in the
chords of more than one Joints, this exception is thrown.
class
queue_overflow_exception :
public join_exception
For basic flow control, async / synch
port can be constructed with a limited queue size. When the buffered
(unprocessed) messages exceed the queue size, this exception is thrown.
class no_executor_exception :
public
join_exception
When an Joint contains chords with only
async ports but without an executor associated, this exception is
thrown.
class hidden_chord_exception :
public
join_exception
When Joint's scheduling policy is
fire_as_soon_as_possible (default), if the definition of a new chord
contains all the ports of another chord, or vice versa, this exception
will be thrown.
class too_many_ports_exception :
public join_exception
When defining a new chord, the total
number of async / synch ports (accumulated thru all the defined
chords) exceeds the max_size of Joint, this exception is thrown.
class chord_override_exception :
public join_exception
When chord_override() is called and no
existing chord found with the same set of async / synch ports, this
exception is thrown.
class chord_remove_exception :
public join_exception
When chord_remove() is called and no
existing chord found with the same set of async / synch ports, this
exception is thrown.
class
synch_not_1st_exception : public join_exception
When chords are defined with synchronous
port, however the synchronous port is not used as the first port
of chord, this exception is thrown.
class single_synch_exception : public
join_exception
When chords are defined with more than
one synchronous ports, this exception is thrown.
class synch_time_out_exception :
public
join_exception
The call of a synchronous port is timed
out.