Samples
Thread safe buffer and message queue
Handle multiple message flows
Using ports as the interface between components
Joint's life cycle
Applications
Thread safe buffer and message queue
a thread safe buffer can be defined in various manners.
the most explicit version:
template
<typename V>
class buffer: public joint {
public:
async<V> put;
synch<V,void> get;
buffer() {
chord(get, put,
&buffer::chord_body);
}
V chord_body(void_t g, V p) {
return p;
}
};
buffer.cpp
simplified version using boost::lambda or boost::phoenix and "joins()"
factory function:
template
<typename V>
class buffer {
public:
async<V> put;
synch<V,void> get;
buffer() {
joins().chord(get,
put, lambda::_2);
}
};
buffer_lambda.cpp
buffer_phoenix.cpp
Using boost::tuples, boost::phoenix, and "joins()" factory function, we
can create message queue (essentially a thread safe buffer) using
factory function in JoCaml
style:
template
<typename V>
boost::tuple<async<V>,synch<V,void>
> create_msg_que() {
async<V> send;
synch<V,void> recv;
joins().chord(recv, send, arg2);
return boost::make_tuple(send,
recv);
}
...
async<int> send;
synch<int, void> recv;
boost::tie(send, recv) = create_msg_que<int>();
producer_consumer.cpp
Handle multiple message flows
A simple demo of joint handling 3 message flows and performing
different processing on messages when they are available in different
ports:
flows_bundle
make_data_ports(joint::spawn_type e) {
async<int> flow1;
async<int> flow2;
async<int> flow3;
joins(e)
.chord(flow1,
flow2, std::cout << (arg1 + arg2) << "\n")
.chord(flow2,
flow3, std::cout << (arg1 - arg2) << "\n")
.chord(flow1,
flow3, std::cout << (arg1 * arg2) << "\n");
return boost::make_tuple(flow1,
flow2, flow3);
}
flows.cpp
a sample of joining a vector of message flows into one output flow:
template
<typename T>
class join_many : public joint {
public:
vector<async<T> >
inputs;
synch<vector<T>,
void> output;
join_many(int num) :
inputs(num) {
chord(output,
inputs, &join_many::chord_body);
}
vector<T>
chord_body(void_t out, vector<T> in) {
return in;
}
};
join_many.cpp
Using ports as the interface between
components
A sample of a chain of nodes, each node exposes its input and output
interfaces as ports.
Each node reads from its input, does some transformation of data, wait
a second and send it to output interface, all these are performed in
executor's thread pool.
The way to connect one node (A) to the next (B) is simple:
A.output = B.input.
template
<typename T>
class node {
int my_no;
boost::function<T(T)> func_;
public:
async<T> input;
async<T> output;
node(joint::spawn_type e, int n, boost::function<T(T)> f)
:
my_no(n), func_(f), stop(false) {
joins(e).chord(input, bind(&node::proc, this,
_1));
}
private:
void proc(T in) {
T o = func_(in);
thread_sleep(1);
output(o);
}
};
chain.cpp
Another sample of using async ports as service interface. The server
exposes the following interface taking requests in the form of a string
name and a response port:
class
IService {
public:
async<tuple<string,
async<string> > > Service;
};
When server receives such a request, it does some work, and send then
result in response port.
In other side, clients first create some response ports (Result2.first
and Result2.second) and pack them inside service requests when sending
requests to servers; then block waiting on these response ports for
the results from servers.
async_call_ret.cpp
Joint's
life cycle
After the initial definition, a joint's set of chords can modified thru
methods: chord_override(), chord_remove() or reset(). And if a joint's
heartbeat is set during its creation, its set of chords will be
auto-destroyed after "heartbeat" number of firings of chords.
Here is a sample demonstrating these joint's lifecycle related methods.
joint_lifetime.cpp
Applications:
Thread
Safe Events Tutorial
Join allow us to experiment with
some interesting designs of thread safe
events dispatching with the following design considerations:
- Allow subscribing / unsubscribing to events concurrently from
different threads; events data structures must be thread safe.
- Allow callback code to invoke the event
interface again, change / delete event subscription or post a new
event during the process of event dispatching / handling. This is
difficult to
handle with the "synchronous" semantics of
normal method calls.
Join's async ports provide an alternative to solve these issues.
- When applications run on multi-core machines, we may want the
callbacks to execute concurrently for better performance.
Data
Parallel Programming Tutorial
How to harness the computing power of
multi-core computers and make applications
scalable for more CPUs is a challenging task. Data Parallel paradigm is
particularly suitable for
parallelizing computationally intensive work, allowing multiple tasks
concurrently working on different parts of data. Data parallel
programming scales well to larger number of processors by dividing the
data collection into smaller pieces. Program performance increases as
you add more processors.
Join's toolset (async / synch ports and chords) provides a good
foundation for data parallel programming. In this tutoral we discuss
how to implement parallel loops and parallel map-reduce algorithm in
Join.