Join Data Parallel Programming Tutorial

Computers are having more and more cores (CPUs). How to harness these great computing power and make applications scalable for more CPUs is a challenging task. Many vendors are providing libraries to support multi-core programming, such as Intel's Threading Building Block. Data Parallel paradigm is particularly suitable for parallelizing computationally intensive work, allowing multiple tasks concurrently working on different parts of data. Data parallel programming scales well to larger number of processors by dividing the data collection into smaller pieces. Program performance increases as you add more processors.

Join's toolset (async / synch ports and chords) provides a good foundation for data parallel programming. In this tutoral we discuss how to implement parallel loops and parallel map-reduce algorithm in Join. Here is the tutorial's full source code.

Since the basic model is to initiate multiple tasks working concurrently on different parts of data set, we define the following interface for the main (initiating) task to wait for and synchronize with worker tasks.
template <typename V>
class future {
public:
Here we define synchronous methods (get(), wait()) to allow initiating task to wait for parallel worker tasks to finish; and return the final result if available. Please note that the synch port get() has not been defined with any chord, similar to a "pure virtual" method. So future behaves as an "abstract" class defining the interface to wait for concurrent computations. Please note that we need to virtual destructor, so that derived class can be destroyed properly.
  synch<V()> get;
  V wait() { return get(); }
  virtual ~future() {}
};

The simplest form of parallelism is a loop of iterations that can each run simultaneously without interfering with each other. The interface will follow STL's for_each algorithm, defining a sequence of data items and a unary function to be applied (as loop body) to each of data items. Since the unary function will run concurrently in different tasks, the class async_loop will inherit class future for user code to wait for all concurrent computations to finish. Also because no final result is returned, we use future<void> here. To return result from parallel computations, please use the next parallel map_reduce algorithm.

template <typename InputIterator, typename UnaryFunction>
class async_loop: public joint, public future<void> {
Here we save the data defining the parallel loop: the start and end of sequence (first_, last_) and the unary function to be applied to each of sequence.
  InputIterator first_;
  InputIterator last_;
  UnaryFunction f_;
Next we use a few async ports to define the "states" of async loop. done() - all concurrent computations have finished. total() and one_finish() are used to record how many concurrent computation are still unfinished. run_priv() is an async port which forks off all concurrent computations.
  async<void> done;
  async<int> total;
  async<void> one_finish;
  async<tuple<InputIterator, int> > run_priv;
public:
Users of async_loop will call run() to start parallel loop. If the data sequence is empty, no work is needed and done() is called directly. Otherwise the first computation task is spawned.
  void run() {
    if (first_ == last_)
      done();
    else
      run_priv(first_, 0);
  }
In constructor, we define three chords for forking concurrent computations and waiting for all computations tasks to finish. Please note that two of chords have only async ports. Their chord bodies will run asynchronously as concurrent tasks in executor's thread pool.

  async_loop(executor *e, InputIterator first, InputIterator last, UnaryFunction f) :
    joint(e), first_(first), last_(last), f_(f) {
    chord(run_priv, &async_loop::run_cb);
    chord(get, done, &async_loop::get_cb);
    chord(one_finish, total, &async_loop::finish_cb);
  }
private:
Here is the chord body for forking concurrent computations. The passed-in argument will contain the data item (in sequence) to be worked on and the position in the chain of concurrent tasks.
  void run_cb(tuple<InputIterator, int> r) {
    InputIterator iter = tuples::get<0>(r);
    InputIterator next = iter; next++;
    int n = tuples::get<1>(r);
If this is the last data item in sequence, set the total number of concurrent computations and start the process of counting how many computations have finished.
    if (next == last_) {//all loop bodies started
      total(n+1);
    }
    else {
Otherwise start the next concurrent loop-body.
      run_priv(next, n+1); //fire the next iteration
    }
Then we do the concurrent computation: calling the unary function with data item. And finally one_finished() is called to notify one concurrent computation is done.
    f_(*r.arg1);
    one_finish();
  }
Here is the chord body to count how many concurrent computations are still left to finish and when all of them is done, done() is called.
  void finish_cb(void_t one, int tot) {
    if (tot > 1)
      total(tot-1);
    else
      done();
  }
Here is a simple chord to unblock the calling thread of get() when all concurrent computations are finished.
  void get_cb(void_t get, void_t done) {}
};

Class map_reduce_async catch the common idiom of forking multiple concurrent computations and then collecting and merging the results from all of them. It follows the functional programming style of map/reduce. The basic model of parallel map_reduce is to apply a function (map function) concurrently to a sequence of data items marked by the start/end iterators, and then collect and merge the results by applying the reduce function to results from map tasks. Since we need to return a result to initiator, class map_reduce_async inherit future<ResultType>.
template <typename InputIterator,
      typename ResultType,
      typename MapFunction,
      typename ReduceFunction>
class map_reduce_async: public joint, public future<ResultType> {
Here we save the data defining the concurrent computations.
  InputIterator first_;
  InputIterator last_;
  ResultType result_;
  MapFunction map_fun_;
  ReduceFunction reduce_fun_;
We use the following async ports to define the "states" of concurrent map_reduce. done() - all the map and reduce tasks have finished and the result is ready to retrieve. total() - how many map tasks are not finished yet. map_m() - async port to fork all map tasks. reduce_m() - async port to reduce / merge the result from one completed map task.
  async<void> done;
  async<int> total;
  async<tuple<InputIterator, int> > map_m;
  async<ResultType> reduce_m;
public:
In constructor, we save the data defining the parallel map_reduce computation: the sequence of data items (first, last), the init-value of result to be merged, map function and reduce function. And create three chords for forking concurrent map tasks and reducing / merging results from them. Please note some chords have only async ports and their chord bodies will run asynchronously and concurrently as tasks in executor's thread pool.
  map_reduce_async(typename future<ResultType>::executor *e,
                                InputIterator first,
                                InputIterator last,
                                ResultType init_val,
                                MapFunction mf,
                                ReduceFunction rf) :
    joint(e), first_(first), last_(last), result_(init_val),
    map_fun_(mf), reduce_fun_(rf) {
    chord(map_m, &map_reduce_async::map_cb);
    chord(reduce_m, total, &map_reduce_async::reduce_cb);
    chord(future<ResultType>::get, done, &map_reduce_async::get_cb);
  }
Users of map_reduce_async will call run() to start concurrent computations. If the data sequence is empty, no work is needed and done() is called directly. So the init-value will be returned when get()/wait() is called. Otherwise the first map task is spawned.
  void run() {
    if (first_ == last_)
      done();
    else
      map_m(make_tuple(first_, 0));
  }
private:
Here we define the chord body to fork map tasks. Map tasks are spawned as a chain. The passed-in argument will contain the data item (in sequence) to be worked on and the position in the chain of concurrent tasks.
  void map_cb(tuple<InputIterator, int> r) {
    InputIterator iter = tuples::get<0>(r);
    int n = tuples::get<1>(r);
    InputIterator next = iter; next++;
If this is the last data item in sequence, set the total number of concurrent computations and start the reducing process to merge the results from all map tasks.
    if (next == last_) {//all loop bodies started
      total(n+1);
    }
    else {
Otherwise the next map task in chain is spawned.
      map_m(make_tuple(next, n+1)); //fire the next iteration
    }
Here we do the real job by calling map function with the current data item in sequence and pass the result to the async reduce_m().
    reduce_m(map_fun_(*iter));
  }
Here we define the chord body for reducing / merging the results from all map tasks. First we call the reduce function to merge the resutlt from one map task to result. Then we check if any map tasks are still pending, otherwise we are finished and done() is called.
  void reduce_cb(ResultType reduce, int tot) {
    result_ = reduce_fun_(result_, reduce);
    if (tot > 1)
      total(tot-1);
    else
      done();
  }
Here we define the chord body for returning the result to caller of get() / wait() when all map / reduce tasks are finished.
  ResultType get_cb(void_t get, void_t done) {
    return result_;
  }
};

Class parallel is a wrapper of the above parallel loop and map_reduce algorithms to provide a simple STL style functional interface to invoke these algorithms. Also class parallel specifies which executor's thread pool to run all the async tasks spawned from algorithms.
class parallel {
  joint::spawn_type exec_;
public:
In constructor, we pass in and save the executor.
  parallel(joint::spawn_type e) : exec_(e) {}
Here is the method to invoke the parallel loop algorithm, following STL's for_each algorithm. An async_loop object is created with the arguments passed in and is returned as a future<void> to caller. Caller can synchronize and wait for all parallel loop bodies using this future object.
  template <typename InputIterator, typename UnaryFunction>
  future<void> *for_each(InputIterator first, InputIterator last, UnaryFunction f) {
    async_loop<InputIterator, UnaryFunction> *aloop =
      new async_loop<InputIterator, UnaryFunction>(exec_, first, last, f);
    aloop->run();
    return aloop;
  }
Here is the method to invoke parallel map_reduce algorithm. A map_reduce_async object is created with the arguments passed in and returned to caller as a future<ResultType> to caller. Caller can wait for all parallel map / reduce tasks and retrieve the final result using this future object.
  template <typename InputIterator,
        typename ResultType,
        typename MapFunction,
        typename ReduceFunction>
  future<ResultType> *map_reduce(InputIterator first, InputIterator last, ResultType init, MapFunction mf, ReduceFunction rf) {
    map_reduce_async<InputIterator,ResultType,MapFunction,ReduceFunction> *mapred =
      new map_reduce_async<InputIterator,ResultType,MapFunction,ReduceFunction>(exec_, first, last, init, mf, rf);
    mapred->run();
    return mapred;
  }
};

In the following, we define a simple data seqeunce of integers to demonstrate parallel algorithms.
int data[] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
int size = sizeof(data) / sizeof(int);
Here we define a "square" function to be used as either the loop body of parallel loop or map function of parallel map / reduce.
int sqr(int val) {
  //do some work
  for(int i=0; i<3; i++) {
    log1.stream() << "one map task calculating: " << val << " * " << val << " ...[" << i << "]" << logger::endl;
    thread_sleep(2);
  }
  return val * val;
}
Here is a "plus" function we'll use as the reduce / merge function.
int plus(int total, int one_more) {
  log1.stream() << "reduce task accumulates: " << one_more << logger::endl;
  return total + one_more;
}

In main() function, we set up a test of the above parallel algorithms.
int main(int argc, char **argv) {
First we creates a thread pool with six threads to run the async tasks spawned from the parallel algorithms.
  executor exec(6); 
Then we create a parallel object with the thread pool.
  parallel para(exec.execute);
Next we invoke the parallel loop algorithm and asking function sqr() to be applied concurrently to each item of the data sequence.
  future<void> *f1 = para.for_each(data, data+size, sqr);
  log1.msg("main thread waiting for parallel.for_each() ...");
Here we wait for all tasks of parallel loop to finish.
  f1->wait();
  log1.msg("parallel.for_each() is done, start parallel.map_reduce() ...");
Here we invoke the parallel map_reduce algorithm on the data sequence using sqr() as map function and plus() as reduce function.
  future<int> *f2 = para.map_reduce(data, data+size, 0, sqr, plus);
  log1.msg("main thread waiting for parallel.map_reduce() ...");
Here we wait for all tasks of parallel map_reduce to finish and retrieve the final result.
  log1.stream() << "parallel.map_reduce() returns: " << f2->get() << logger::endl;
Finally we clean up by shutting down executor (wait for all its threads to exit) and delete two future objects.
  exec.shutdown();
  delete f1;
  delete f2;
  return 0;
}