00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00016 #ifndef _CVM_TASK_H_
00017 #define _CVM_TASK_H_
00018
00019 #include "ace/Message_Block.h"
00020 #include "ace/Task.h"
00021 #include "ace/Thread_Mutex.h"
00022 #include "ace/Thread_Semaphore.h"
00023 #include "ace/Get_Opt.h"
00024 #include "ace/Reactor.h"
00025 #include "ace/Service_Object.h"
00026 #include "ace/OS.h"
00027 #include "ace/Signal.h"
00028 #include "ace/Log_Msg.h"
00029 #include "ace/Service_Repository.h"
00030 #include "ace/Service_Types.h"
00031 #include "ace/Dynamic_Service.h"
00032
00033 #include <Channel.h>
00034 #include <CvmBaseChannel.h>
00035
00036 #include <string>
00037 #include <iostream>
00038
00039
00040 namespace cvm {
00041
00042 typedef enum {
00043 TASK_NULL=0,
00044 TASK_INIT,
00045 TASK_ACTIVE,
00046 TASK_SUSPEND,
00047 TASK_EXIT
00048 } TASK_STATE;
00049
00052 template <class Channel>
00053 class CvmBaseTask : public ACE_Task<ACE_MT_SYNCH> {
00054 public:
00055 typedef typename Channel::Port Port;
00056
00057 protected:
00059 std::string chan_name_;
00060 int num_thr;
00061
00062 Channel *my_chan_;
00063 Port *my_port_;
00064
00066 std::string my_name_;
00067 TASK_STATE my_state_;
00068 ACE_Thread_Semaphore exit_sema_;
00069 ACE_Thread_Mutex exit_lock_;
00070
00071 public:
00072 enum { QUEUE_MAX = 16 * 1024 };
00073
00074 CvmBaseTask() : my_state_(TASK_NULL), exit_sema_(0) {
00075 num_thr = 1;
00076 my_chan_ = NULL;
00077 my_port_ = NULL;
00078 }
00079
00080 TASK_STATE state(void) { return my_state_;}
00081 std::string my_name(void) { return my_name_; }
00082 std::string chan_name(void) { return chan_name_;}
00083 Channel * my_chan(void) { return my_chan_; }
00084 Port * my_port(void) { return my_port_;}
00085
00087 virtual channel::Status prepare(void)
00088 {
00089
00090 return channel::SUCCESS;
00091 }
00092 virtual channel::Status cleanup(void)
00093 {
00094
00095 return channel::SUCCESS;
00096 }
00097 virtual int work (void)
00098 {
00099 ACE_DEBUG ((LM_DEBUG,
00100 "(%t) %s dmb_task coming up ...\n", my_name().c_str()));
00101
00102 return 0;
00103 }
00104
00107 virtual int init (int argc, char *argv[])
00108 {
00109 ACE_TCHAR cn[128];
00110 ACE_TCHAR bn[128];
00111 my_state_ = TASK_INIT;
00112 cn[0]='\0';
00113 bn[0]='\0';
00114
00115 ACE_Get_Opt get_opt (argc, argv, "t:c:", 0);
00116
00117 for (int c; (c = get_opt ()) != -1; )
00118 switch (c)
00119 {
00120 case 't':
00121 num_thr = ACE_static_cast
00122 (int, ACE_OS::atoi (get_opt.opt_arg ()));
00123 break;
00124 case 'c':
00125 ACE_OS::strsncpy
00126 (cn, get_opt.opt_arg (), 128);
00127 break;
00128 default:
00129 break;
00130 }
00131
00132 chan_name_ = cn;
00133 if (chan_name_.length() == 0) {
00134 ACE_DEBUG ((LM_DEBUG,
00135 "(%t) Task not bound to any Channel...\n"));
00136 return -1;
00137 }
00138
00139 if (chan_name_.length() > 0) {
00140 my_chan_ = CvmBaseChannel<Channel>::find_chan(cn);
00141 if (my_chan_ == NULL) {
00142 ACE_DEBUG ((LM_DEBUG,
00143 "(%t) fail to find channel[%s]...\n", cn));
00144 return -1;
00145 }
00146
00147 my_port_ = new Port(my_chan_,msg_queue());
00148 }
00149 return open(NULL);
00150 }
00151 virtual int fini ()
00152 {
00153 ACE_DEBUG((LM_DEBUG, "..fini 1\n"));
00154
00155 my_state_ = TASK_EXIT;
00156 ACE_DEBUG((LM_DEBUG, "(%t) Task [%s] fini...\n", my_name().c_str()));
00157 if(msg_queue()->state() == ACE_Message_Queue_Base::PULSED) {
00158 ACE_DEBUG ((LM_DEBUG, "task %s has been suspendded, please resume it before remove it\n", my_name().c_str() ));
00159 return -1;
00160 }
00161
00162 ACE_DEBUG((LM_DEBUG, "..fini 2\n"));
00163
00164 if(msg_queue()->state() != ACE_Message_Queue_Base::DEACTIVATED) {
00165 cleanup();
00166 msg_queue()->close();
00167 }
00168 ACE_DEBUG((LM_DEBUG, "..fini 3\n"));
00169
00170
00171
00172 for(int i=0; i<num_thr; i++)
00173 exit_sema_.acquire();
00174
00175
00176 ACE_OS::sleep(1);
00177
00178
00179 delete my_port_;
00180
00181
00182
00183 return 0;
00184 }
00185 virtual int open (void *)
00186 {
00187 if(prepare() == channel::FAILURE) {
00188 ACE_DEBUG ((LM_DEBUG,
00189 "(%t) fail inside prepare()...\n"));
00190 return -1;
00191 }
00192
00193 ACE_DEBUG ((LM_DEBUG,
00194 "(%t) finish prepare()...\n"));
00195 if (msg_queue ()->activate ()
00196 == ACE_Message_Queue_Base::ACTIVATED) {
00197 ACE_DEBUG ((LM_DEBUG,
00198 "(%t) finish activate_que()...\n"));
00199 msg_queue ()->high_water_mark (QUEUE_MAX);
00200
00201 ACE_DEBUG ((LM_DEBUG, "%d app threads start...\n", num_thr));
00202
00203 my_state_ = TASK_ACTIVE;
00204 return activate (THR_NEW_LWP | THR_DETACHED, num_thr);
00205 } else {
00206 ACE_DEBUG ((LM_DEBUG,
00207 "(%t) fail to activate msg_que...\n"));
00208 return -1;
00209 }
00210 }
00211 virtual int close (u_long)
00212 {
00213 my_state_ = TASK_EXIT;
00214
00215
00216 ACE_DEBUG ((LM_DEBUG, "Task [%s] close...\n", my_name().c_str()));
00217 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, exit_lock_, 0);
00218 if(msg_queue()->state() != ACE_Message_Queue_Base::DEACTIVATED) {
00219 cleanup();
00220 msg_queue()->close();
00221 }
00222 exit_sema_.release();
00223
00224 return 0;
00225 }
00226 virtual int put (ACE_Message_Block *mb, ACE_Time_Value *t)
00227 {
00228 return msg_queue ()->enqueue_tail
00229 (mb, (ACE_Time_Value *) &t);
00230 }
00231 virtual int info (char **strp, size_t length) const
00232 {
00233 char buf[256];
00234
00235 ACE_OS::sprintf (buf,
00236 "%s dmb_task coming up ... \n",
00237 my_name_.c_str());
00238
00239 if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
00240 return -1;
00241 else
00242 ACE_OS::strncpy (*strp, buf, length);
00243 return ACE_OS::strlen (buf);
00244 }
00245 virtual int suspend()
00246 {
00247 my_state_ = TASK_SUSPEND;
00248 my_port_->suspend();
00249 return 0;
00250 }
00251 virtual int resume()
00252 {
00253 if(my_state_ == TASK_SUSPEND) {
00254 my_state_ = TASK_ACTIVE;
00255 my_port_->resume();
00256 }
00257 return 0;
00258 }
00259 virtual int svc (void)
00260 {
00261 my_name_ = rtrv_task_name();
00262
00263 ACE_DEBUG ((LM_DEBUG,
00264 "(%t) %s starts running ...\n", my_name().c_str()));
00265
00266 return work();
00267 }
00268
00269 protected:
00270
00271 const ACE_TCHAR *rtrv_task_name (void)
00272 {
00273 ACE_Service_Repository_Iterator iter
00274 (*ACE_Service_Repository::instance (), 0);
00275
00276 for (const ACE_Service_Type *st;
00277 iter.next (st) != 0; iter.advance()) {
00278 const ACE_Service_Type_Impl *type = st->type();
00279 if (type == 0) continue;
00280 if (((ACE_Service_Object *)this) == type->object ())
00281 return st->name ();
00282 }
00283
00284 return "Unamed Task";
00285 }
00286
00287 };
00288
00289 };
00290
00291
00292 #endif