// // parallel.cpp // // Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "boost/tuple/tuple.hpp" #include #include using namespace boost; using namespace boost::join; logger log1("log"); //a simple wrapper over a synch<> port to wait for an async value template class future { public: synch get; V wait() { return get(); } virtual ~future() {} }; template class async_loop: public joint, public future { InputIterator first_; InputIterator last_; UnaryFunction f_; async done; async total; //total of bodies running async one_finish; async > run_priv; public: void run() { if (first_ == last_) done(); else run_priv(make_tuple(first_, 0)); } async_loop(joint::spawn_type e, InputIterator first, InputIterator last, UnaryFunction f) : joint(e), first_(first), last_(last), f_(f) { chord(run_priv, &async_loop::run_cb); chord(get, done, &async_loop::get_cb); chord(one_finish, total, &async_loop::finish_cb); } private: void run_cb(tuple r) { InputIterator iter = tuples::get<0>(r); InputIterator next = iter; next++; int n = tuples::get<1>(r); if (next == last_) {//all loop bodies started total(n+1); } else { run_priv(make_tuple(next, n+1)); //fire the next iteration } f_(*iter); one_finish(); } void finish_cb(void_t one, int tot) { if (tot > 1) total(tot-1); else done(); } void get_cb(void_t get, void_t done) {} }; template class map_reduce_async: public joint, public future { InputIterator first_; InputIterator last_; ResultType result_; MapFunction map_fun_; ReduceFunction reduce_fun_; async done; async total; //total of bodies running async > map_m; async reduce_m; public: map_reduce_async(joint::spawn_type e, InputIterator first, InputIterator last, ResultType init_val, MapFunction mf, ReduceFunction rf) : joint(e), first_(first), last_(last), result_(init_val), map_fun_(mf), reduce_fun_(rf) { chord(map_m, &map_reduce_async::map_cb); chord(reduce_m, total, &map_reduce_async::reduce_cb); chord(future::get, done, &map_reduce_async::get_cb); } void run() { if (first_ == last_) done(); else map_m(make_tuple(first_, 0)); } private: void map_cb(tuple r) { InputIterator iter = tuples::get<0>(r); int n = tuples::get<1>(r); InputIterator next = iter; next++; if (next == last_) {//all loop bodies started total(n+1); } else { map_m(make_tuple(next, n+1)); //fire the next iteration } reduce_m(map_fun_(*iter)); } void reduce_cb(ResultType reduce, int tot) { result_ = reduce_fun_(result_, reduce); if (tot > 1) total(tot-1); else done(); } ResultType get_cb(void_t get, void_t done) { return result_; } }; class parallel { joint::spawn_type exec_; public: parallel(joint::spawn_type e) : exec_(e) {} template future* for_each(InputIterator first, InputIterator last, UnaryFunction f) { async_loop *aloop = new async_loop(exec_, first, last, f); aloop->run(); return aloop; } template future* map_reduce(InputIterator first, InputIterator last, ResultType init, MapFunction mf, ReduceFunction rf) { map_reduce_async *mapred = new map_reduce_async(exec_, first, last, init, mf, rf); mapred->run(); return mapred; } }; void thread_sleep(int sec) { boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += sec; boost::thread::sleep(xt); } int data[] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; int length = sizeof(data) / sizeof(int); int sqr(int val) { //do some work for(int i=0; i<3; i++) { log1.stream() << "one map task calculating: " << val << " * " << val << " ...[" << i << "]" << logger::endl; thread_sleep(2); } return val * val; } int plus(int total, int one_more) { log1.stream() << "reduce task accumulates: " << one_more << logger::endl; return total + one_more; } int main(int argc, char **argv) { executor exec(6); parallel para(exec.execute); future *f1 = para.for_each(data, data+length, sqr); log1.msg("main thread waiting for parallel.for_each() ..."); f1->wait(); log1.msg("parallel.for_each() is done, start parallel.map_reduce() ..."); future *f2 = para.map_reduce(data, data+length, 0, sqr, plus); log1.msg("main thread waiting for parallel.map_reduce() ..."); log1.stream() << "parallel.map_reduce() returns: " << f2->get() << logger::endl; delete f1; delete f2; exec.shutdown(); return 0; }