// // event2.cpp // // Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include #include #include #include #include #include #include #include #include "boost/tuple/tuple.hpp" using namespace boost; using namespace boost::join; logger log1("log"); template class event: public joint { public: typedef boost::function subscriber; struct subscription_data { subscriber callback_; subscription_data(subscriber cb) : callback_(cb) {} }; typedef boost::shared_ptr subscription; private: std::vector subs_; async ready; async subscribe_priv; async > post_priv; public: subscription subscribe(subscriber cb) { subscription sub(new subscription_data(cb)); subscribe_priv(sub); return sub; } async unsubscribe; void operator()(V v) { post(v); } void post(V v) { post_priv(make_tuple(v, 0)); } event(joint::spawn_type e) : joint(e) { chord(subscribe_priv, ready, &event::sub_cb); //set unsubscribe to lower priority so subscribe call will be processed first chord(unsubscribe, ready, &event::unsub_cb); //post chord has the lowest priority so that sub/unsub msgs will be processed first chord(post_priv, ready, &event::post_cb); ready(); } private: void post_cb(tuple post, void_t r) { V val = tuples::get<0>(post); size_t num = tuples::get<1>(post); subscriber cb; if (num < subs_.size()) { cb = subs_[num]->callback_; if((num+1) < subs_.size()) { //start next callback in another task post_priv(make_tuple(val, (num+1))); } } ready(); if(cb) cb(val); } void sub_cb(subscription sub, void_t r) { subs_.push_back(sub); ready(); } void unsub_cb(subscription unsub, void_t r) { typename std::vector::iterator iter; if((iter = std::find(subs_.begin(), subs_.end(), unsub)) != subs_.end()) { subs_.erase(iter); } ready(); } }; void thread_sleep(int sec) { boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += sec; boost::thread::sleep(xt); } void consumer_cb(std::string evt_data, int num) { log1.stream() << "evt_consumer[" << num << "] recv: " << evt_data << logger::endl; } void evt_producer(event &evt) { std::ostringstream ostr; for(int i=0; i<10; i++) { ostr << i; log1.stream() << "producer asynchronously sends evt [" << i << "]" << logger::endl; evt(ostr.str()); ostr.str(""); thread_sleep(1); } } void evt_consumer(event &evt, int c) { event::subscription sub; int times = (c == 1)?10:5; for(int i=0; i evt(exec.execute); //spawn prod/consum tasks exec.spawn(boost::bind(evt_consumer, evt, 1)); exec.spawn(boost::bind(evt_consumer, evt, 2)); exec.spawn(boost::bind(evt_producer, evt)); //main thread have to wait about 10 secs here, other executor's threads will exit //and Demo will fail thread_sleep(10); exec.shutdown(); return 0; }