// // event3.cpp // // Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include #include #include #include #include #include #include #include using namespace boost::join; logger log1("log"); template class event: public joint { public: typedef boost::function subscriber; struct subscription_data { subscriber callback_; subscription_data(subscriber cb) : callback_(cb) {} }; typedef boost::shared_ptr subscription; private: std::vector subs_; async ready; async subscription_empty; synch process_subscription; async subscribe_priv; public: synch post; subscription subscribe(subscriber cb) { subscription sub(new subscription_data(cb)); subscribe_priv(sub); return sub; } async unsubscribe; void operator()(V v) { post(v); } event() { chord(post, ready, &event::post_cb); chord(process_subscription, subscribe_priv, &event::sub_cb); //set unsubscribe to lower priority so subscribe call will be processed first chord(process_subscription, unsubscribe, &event::unsub_cb); //create a lowest priority chord for stopping process_subscription recursion when all subs are processed chord(process_subscription, subscription_empty, &event::sub_empty_cb); ready(); subscription_empty(); //init state } private: void post_cb(V post, void_t r) { process_subscription(); for(size_t i=0; icallback_(post); ready(); } void sub_cb(void_t proc, subscription sub) { subs_.push_back(sub); process_subscription(); } void unsub_cb(void_t proc, subscription unsub) { typename std::vector::iterator iter; if((iter = std::find(subs_.begin(), subs_.end(), unsub)) != subs_.end()) { subs_.erase(iter); } process_subscription(); } void sub_empty_cb(void_t proc, void_t empty) { subscription_empty(); } }; void thread_sleep(int sec) { boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += sec; boost::thread::sleep(xt); } void consumer_cb(std::string evt_data, int num) { log1.stream() << "evt_consumer[" << num << "] recv: " << evt_data << logger::endl; } void evt_producer(event &evt) { std::ostringstream ostr; for(int i=0; i<10; i++) { ostr << i; log1.stream() << "producer synchronously sends evt [" << i << "]" << logger::endl; evt(ostr.str()); ostr.str(""); thread_sleep(1); } } void evt_consumer(event &evt, int c) { event::subscription sub; int times = (c == 1)?10:5; for(int i=0; i evt; //spawn prod/consum tasks exec.spawn(boost::bind(evt_consumer, evt, 1)); exec.spawn(boost::bind(evt_consumer, evt, 2)); exec.spawn(boost::bind(evt_producer, evt)); //main thread have to wait about 10 secs here, other executor's threads will exit //and Demo will fail thread_sleep(10); exec.shutdown(); return 0; }