Join Data Parallel Programming Tutorial
Computers are having more and more cores (CPUs). How to harness these
great computing power and make applications scalable for more CPUs is a
challenging task. Many vendors are providing libraries to support
multi-core
programming, such as Intel's Threading Building Block. Data Parallel
paradigm is particularly suitable for parallelizing computationally
intensive work, allowing multiple tasks concurrently working on
different parts of data. Data parallel programming scales well to
larger number of processors by dividing the data collection into
smaller pieces. Program performance increases as you add more
processors.
Join's toolset (async / synch ports and chords) provides a good
foundation for data parallel programming. In this tutoral we discuss
how to implement parallel loops and parallel map-reduce algorithm in
Join. Here is the
tutorial's full source code.
Since the basic model is to initiate multiple tasks working
concurrently on different parts of data set, we define the following
interface for the main (initiating) task to wait for and synchronize
with worker tasks.
template
<typename V>
class future {
public:
Here we define synchronous methods (get(), wait()) to allow initiating task to
wait for parallel worker tasks to finish; and return the final result
if available. Please note that the synch port get() has not been defined with any
chord, similar to a "pure virtual" method. So future behaves as an "abstract"
class defining the interface to wait for concurrent computations.
Please note that we need to virtual destructor, so that derived class
can be destroyed properly.
synch<V()> get;
V wait() { return get(); }
virtual ~future() {}
};
The simplest form of parallelism
is a loop of iterations that can each run simultaneously without
interfering with each other. The interface will follow STL's for_each algorithm, defining a
sequence of data items and a unary function to be applied (as loop
body) to each of data items. Since the unary function will run
concurrently in different tasks, the class async_loop will inherit
class future for user code to wait for all concurrent computations to
finish. Also because no final result is returned, we use
future<void> here. To return result from parallel computations,
please use the next parallel map_reduce algorithm.
template
<typename InputIterator, typename UnaryFunction>
class async_loop: public joint, public future<void> {
Here we save the data defining the
parallel loop: the start and end of sequence (first_, last_) and the
unary function to be applied to each of sequence.
InputIterator
first_;
InputIterator last_;
UnaryFunction f_;
Next
we use a few async ports to define the "states" of async loop. done() - all concurrent
computations have finished. total()
and one_finish() are used to
record how many concurrent computation are still unfinished. run_priv() is an async port which
forks off all concurrent computations.
async<void> done;
async<int> total;
async<void> one_finish;
async<tuple<InputIterator, int> > run_priv;
public:
Users
of async_loop will call run() to start parallel loop. If the
data sequence is empty, no work is needed and done() is called directly.
Otherwise the first computation task is spawned.
void run() {
if (first_ == last_)
done();
else
run_priv(first_, 0);
}
In
constructor, we define three chords for forking concurrent
computations and waiting for all computations tasks to finish. Please
note
that two of chords have only async ports. Their chord bodies will
run asynchronously as concurrent tasks in executor's thread pool.
async_loop(executor *e, InputIterator first, InputIterator last,
UnaryFunction f) :
joint(e), first_(first), last_(last),
f_(f) {
chord(run_priv, &async_loop::run_cb);
chord(get, done, &async_loop::get_cb);
chord(one_finish, total, &async_loop::finish_cb);
}
private:
Here
is the chord body for forking concurrent computations. The passed-in
argument will contain the data item (in sequence) to be worked on and
the position in the chain of concurrent tasks.
void
run_cb(tuple<InputIterator, int> r) {
InputIterator iter = tuples::get<0>(r);
InputIterator next = iter; next++;
int n = tuples::get<1>(r);
If
this is the last data item in sequence, set the total number of
concurrent computations and start the process of counting how many
computations have finished.
if (next == last_) {//all loop bodies started
total(n+1);
}
else {
Otherwise
start the next concurrent loop-body.
run_priv(next, n+1); //fire the
next iteration
}
Then
we do the concurrent computation: calling the unary function with data
item. And finally one_finished()
is called to notify one concurrent computation is done.
f_(*r.arg1);
one_finish();
}
Here
is the chord body to count how many concurrent computations are still
left to finish and when all of them is done, done() is called.
void
finish_cb(void_t one, int tot) {
if (tot > 1)
total(tot-1);
else
done();
}
Here
is a simple chord to unblock the calling thread of get() when all concurrent computations are
finished.
void
get_cb(void_t get, void_t
done) {}
};
Class
map_reduce_async catch the
common idiom of forking multiple concurrent computations and then
collecting and merging the results from all of them. It follows the
functional programming style of map/reduce. The basic model of parallel
map_reduce is to apply a function (map function) concurrently to a
sequence of data items marked by the start/end iterators, and then
collect and merge the results by applying the reduce function to
results from map tasks. Since we need to return a result to initiator,
class map_reduce_async inherit future<ResultType>.
template
<typename InputIterator,
typename ResultType,
typename MapFunction,
typename ReduceFunction>
class map_reduce_async: public joint, public future<ResultType> {
Here
we save the data defining the concurrent computations.
InputIterator
first_;
InputIterator last_;
ResultType result_;
MapFunction map_fun_;
ReduceFunction reduce_fun_;
We
use the following async ports to define the "states" of concurrent
map_reduce. done() - all the
map and reduce tasks have finished and the result is ready to retrieve.
total() - how many map tasks
are not finished yet. map_m()
- async port to fork all map tasks. reduce_m()
- async port to reduce / merge the result from one completed map task.
async<void> done;
async<int> total;
async<tuple<InputIterator, int> > map_m;
async<ResultType> reduce_m;
public:
In
constructor, we save the data defining the parallel map_reduce
computation: the sequence of data items (first, last), the init-value
of result to be merged, map function and reduce function. And create
three chords for forking concurrent map tasks and reducing / merging
results from them. Please note some chords have only async ports and
their chord bodies will run asynchronously and concurrently as tasks in
executor's thread pool.
map_reduce_async(typename future<ResultType>::executor *e,
InputIterator first,
InputIterator last,
ResultType init_val,
MapFunction mf,
ReduceFunction rf) :
joint(e), first_(first),
last_(last), result_(init_val),
map_fun_(mf), reduce_fun_(rf) {
chord(map_m, &map_reduce_async::map_cb);
chord(reduce_m, total,
&map_reduce_async::reduce_cb);
chord(future<ResultType>::get, done,
&map_reduce_async::get_cb);
}
Users
of map_reduce_async will call run() to start concurrent computations.
If the
data sequence is empty, no work is needed and done() is called directly.
So the init-value will be returned when get()/wait() is called.
Otherwise the first map task is spawned.
void run() {
if (first_ == last_)
done();
else
map_m(make_tuple(first_, 0));
}
private:
Here
we define the chord body to fork map tasks. Map tasks are spawned as a
chain. The passed-in
argument will contain the data item (in sequence) to be worked on and
the position in the chain of concurrent tasks.
void
map_cb(tuple<InputIterator, int> r) {
InputIterator iter = tuples::get<0>(r);
int n = tuples::get<1>(r);
InputIterator next = iter; next++;
If
this is the last data item in sequence, set the total number of
concurrent computations and start the reducing process to merge the
results from all map tasks.
if (next == last_) {//all loop bodies started
total(n+1);
}
else {
Otherwise
the next map task in chain is spawned.
map_m(make_tuple(next, n+1)); //fire the next
iteration
}
Here
we do the real job by calling map function with the current data item
in sequence and pass the result to the async reduce_m().
reduce_m(map_fun_(*iter));
}
Here
we define the chord body for reducing / merging the results from all
map tasks. First we call the reduce function to merge the resutlt from
one map task to result. Then we check if any map tasks are still
pending, otherwise we are finished and done()
is called.
void
reduce_cb(ResultType reduce, int tot) {
result_ = reduce_fun_(result_, reduce);
if (tot > 1)
total(tot-1);
else
done();
}
Here
we define the chord body for returning the result to caller of
get() / wait() when all map /
reduce tasks are finished.
ResultType
get_cb(void_t get, void_t done) {
return result_;
}
};
Class
parallel is a wrapper of the above parallel loop and map_reduce
algorithms to provide a simple STL style functional interface to invoke
these algorithms. Also class parallel specifies which executor's thread
pool to run all the async tasks spawned from algorithms.
class parallel {
joint::spawn_type exec_;
public:
In
constructor, we pass in and save the executor.
parallel(joint::spawn_type e) : exec_(e) {}
Here
is the method to invoke the parallel loop algorithm, following STL's for_each algorithm. An async_loop
object is created with the arguments passed in and is returned as a
future<void> to caller. Caller can synchronize and wait for all
parallel loop bodies using this future object.
template
<typename InputIterator, typename UnaryFunction>
future<void> *for_each(InputIterator first, InputIterator
last, UnaryFunction f) {
async_loop<InputIterator, UnaryFunction>
*aloop =
new async_loop<InputIterator,
UnaryFunction>(exec_, first, last, f);
aloop->run();
return aloop;
}
Here
is the method to invoke parallel map_reduce algorithm. A
map_reduce_async object is created with the arguments passed in and
returned to caller as a future<ResultType> to caller. Caller can wait for all parallel map /
reduce tasks and retrieve the final result using this future object.
template
<typename InputIterator,
typename ResultType,
typename MapFunction,
typename ReduceFunction>
future<ResultType> *map_reduce(InputIterator first,
InputIterator last, ResultType init, MapFunction mf, ReduceFunction rf)
{
map_reduce_async<InputIterator,ResultType,MapFunction,ReduceFunction>
*mapred =
new
map_reduce_async<InputIterator,ResultType,MapFunction,ReduceFunction>(exec_,
first, last, init, mf, rf);
mapred->run();
return mapred;
}
};
In
the following, we define a simple data seqeunce of integers to
demonstrate parallel algorithms.
int data[] = {1, 2,
3, 4, 5, 6, 7, 8, 9, 10};
int size = sizeof(data) / sizeof(int);
Here
we define a "square" function to be used as either the loop body of
parallel loop or map function of parallel map / reduce.
int sqr(int val) {
//do some work
for(int i=0; i<3; i++) {
log1.stream() << "one map task calculating: "
<< val << " * " << val << " ...[" << i
<< "]" << logger::endl;
thread_sleep(2);
}
return val * val;
}
Here
is a "plus" function we'll use as the reduce / merge function.
int plus(int total,
int one_more) {
log1.stream() << "reduce task accumulates: " <<
one_more << logger::endl;
return total + one_more;
}
In
main() function, we set up a test of the above parallel algorithms.
int main(int argc,
char **argv) {
First
we creates a thread pool with six threads to run the async tasks
spawned from the parallel algorithms.
executor
exec(6);
Then
we create a parallel object with the thread pool.
parallel
para(exec.execute);
Next
we invoke the parallel loop algorithm and asking function sqr() to be
applied concurrently to each item of the data sequence.
future<void> *f1 = para.for_each(data, data+size, sqr);
log1.msg("main thread waiting for parallel.for_each() ...");
Here
we wait for all tasks of parallel loop to finish.
f1->wait();
log1.msg("parallel.for_each() is done, start
parallel.map_reduce() ...");
Here
we invoke the parallel map_reduce algorithm on the data sequence using sqr() as map function and plus() as reduce function.
future<int> *f2 = para.map_reduce(data, data+size, 0, sqr,
plus);
log1.msg("main thread waiting for parallel.map_reduce() ...");
Here
we wait for all tasks of parallel map_reduce to finish and retrieve the
final result.
log1.stream()
<< "parallel.map_reduce() returns: "
<< f2->get() << logger::endl;
Finally
we clean up by shutting down executor (wait for all its threads to
exit) and delete two future objects.
exec.shutdown();
delete f1;
delete f2;
return 0;
}