L:/channel/cvm/include/CvmBaseTask.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00016 #ifndef _CVM_TASK_H_
00017 #define _CVM_TASK_H_
00018 
00019 #include "ace/Message_Block.h"
00020 #include "ace/Task.h"
00021 #include "ace/Thread_Mutex.h"
00022 #include "ace/Thread_Semaphore.h"
00023 #include "ace/Get_Opt.h"
00024 #include "ace/Reactor.h"
00025 #include "ace/Service_Object.h"
00026 #include "ace/OS.h"
00027 #include "ace/Signal.h"
00028 #include "ace/Log_Msg.h"
00029 #include "ace/Service_Repository.h"
00030 #include "ace/Service_Types.h"
00031 #include "ace/Dynamic_Service.h"
00032 
00033 #include <Channel.h>
00034 #include <CvmBaseChannel.h>
00035 
00036 #include <string>
00037 #include <iostream>
00038 
00039 
00040 namespace cvm {
00041 
00042   typedef enum {
00043     TASK_NULL=0,
00044     TASK_INIT,
00045     TASK_ACTIVE,
00046     TASK_SUSPEND,
00047     TASK_EXIT
00048   } TASK_STATE;
00049 
00052   template <class Channel>
00053   class CvmBaseTask : public ACE_Task<ACE_MT_SYNCH> {
00054   public:
00055     typedef typename Channel::Port Port;
00056     
00057   protected:
00059     std::string chan_name_;
00060     int num_thr;
00061 
00062     Channel *my_chan_;
00063     Port *my_port_;
00064 
00066     std::string my_name_;
00067     TASK_STATE my_state_;
00068     ACE_Thread_Semaphore exit_sema_; //for thread group exit gracely
00069     ACE_Thread_Mutex exit_lock_; //for thread group exit gracely
00070 
00071   public:
00072     enum { QUEUE_MAX = 16 * 1024 };
00073 
00074     CvmBaseTask() : my_state_(TASK_NULL), exit_sema_(0) {
00075       num_thr = 1;
00076       my_chan_ = NULL;
00077       my_port_ = NULL;
00078     }
00079 
00080     TASK_STATE state(void) { return my_state_;}
00081     std::string my_name(void) { return my_name_; }
00082     std::string chan_name(void) { return chan_name_;}
00083     Channel * my_chan(void) { return my_chan_; }
00084     Port * my_port(void) { return my_port_;}
00085  
00087     virtual channel::Status prepare(void) //initialization before thread starts
00088       {
00089         //subscribe msg
00090         return channel::SUCCESS;
00091       }
00092     virtual channel::Status cleanup(void) //cleanup before threads exit
00093       {
00094         //unsubscribe msg
00095         return channel::SUCCESS;
00096       }
00097     virtual int work (void)      //main processing loop
00098       {
00099         ACE_DEBUG ((LM_DEBUG,
00100                     "(%t) %s dmb_task coming up ...\n", my_name().c_str()));
00101 
00102         return 0;
00103       }
00104 
00107     virtual int init (int argc, char *argv[])
00108       {
00109         ACE_TCHAR cn[128];
00110         ACE_TCHAR bn[128];
00111         my_state_ = TASK_INIT;
00112         cn[0]='\0';
00113         bn[0]='\0';
00114 
00115         ACE_Get_Opt     get_opt (argc, argv, "t:c:", 0);
00116 
00117         for (int c; (c = get_opt ()) != -1; )
00118           switch (c)
00119             {
00120             case 't':
00121               num_thr = ACE_static_cast
00122                 (int, ACE_OS::atoi (get_opt.opt_arg ()));        
00123               break;
00124             case 'c': // channel name
00125               ACE_OS::strsncpy
00126                 (cn, get_opt.opt_arg (), 128);
00127               break;
00128             default:
00129               break;
00130             }
00131 
00132         chan_name_ = cn;
00133         if (chan_name_.length() == 0) {
00134           ACE_DEBUG ((LM_DEBUG,
00135                       "(%t) Task not bound to any Channel...\n"));
00136           return -1;
00137         }
00138   
00139         if (chan_name_.length() > 0) {
00140           my_chan_ = CvmBaseChannel<Channel>::find_chan(cn);
00141           if (my_chan_ == NULL) {
00142             ACE_DEBUG ((LM_DEBUG,
00143                         "(%t) fail to find channel[%s]...\n", cn));
00144             return -1;
00145           }
00146 
00147           my_port_ = new Port(my_chan_,msg_queue());
00148         }
00149         return open(NULL);
00150       }
00151     virtual int fini ()
00152       {
00153         ACE_DEBUG((LM_DEBUG, "..fini 1\n"));
00154 
00155         my_state_ = TASK_EXIT;
00156         ACE_DEBUG((LM_DEBUG, "(%t) Task [%s] fini...\n", my_name().c_str()));
00157         if(msg_queue()->state() == ACE_Message_Queue_Base::PULSED) {
00158           ACE_DEBUG ((LM_DEBUG, "task %s has been suspendded, please resume it before remove it\n", my_name().c_str() ));
00159           return -1;
00160         }
00161 
00162         ACE_DEBUG((LM_DEBUG, "..fini 2\n"));
00163 
00164         if(msg_queue()->state() != ACE_Message_Queue_Base::DEACTIVATED) {
00165           cleanup();
00166           msg_queue()->close();
00167         }
00168         ACE_DEBUG((LM_DEBUG, "..fini 3\n"));
00169 
00170         //since AppTask threads are detached, we cannot wait for them
00171         //use semaphore to synchronise the process
00172         for(int i=0; i<num_thr; i++)
00173           exit_sema_.acquire();
00174 
00175         //we still need wait some time for OS cleanup Task threads
00176         ACE_OS::sleep(1);
00177 
00178         //clean up port
00179         delete my_port_;
00180 
00181         //ACE_DEBUG((LM_DEBUG, "..fini 4\n"));
00182 
00183         return 0;
00184       }
00185     virtual int open (void *)
00186       {
00187         if(prepare() == channel::FAILURE) {
00188           ACE_DEBUG ((LM_DEBUG,
00189                       "(%t) fail inside prepare()...\n"));
00190           return -1;
00191         }
00192 
00193         ACE_DEBUG ((LM_DEBUG,
00194                     "(%t) finish prepare()...\n"));
00195         if (msg_queue ()->activate ()
00196             == ACE_Message_Queue_Base::ACTIVATED) {
00197           ACE_DEBUG ((LM_DEBUG,
00198                       "(%t) finish activate_que()...\n"));
00199           msg_queue ()->high_water_mark (QUEUE_MAX);
00200 
00201           ACE_DEBUG ((LM_DEBUG, "%d app threads start...\n", num_thr));
00202 
00203           my_state_ = TASK_ACTIVE;
00204           return activate (THR_NEW_LWP | THR_DETACHED, num_thr);
00205         } else {
00206           ACE_DEBUG ((LM_DEBUG,
00207                       "(%t) fail to activate msg_que...\n"));
00208           return -1;
00209         }
00210       }
00211     virtual int close (u_long)
00212       {
00213         my_state_ = TASK_EXIT;
00214         //should implement reference counting, so we only
00215         //cleanup when the last thread exits
00216         ACE_DEBUG ((LM_DEBUG,  "Task [%s] close...\n", my_name().c_str()));
00217         ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, exit_lock_, 0);
00218         if(msg_queue()->state() != ACE_Message_Queue_Base::DEACTIVATED) {
00219           cleanup();
00220           msg_queue()->close();
00221         }
00222         exit_sema_.release();
00223 
00224         return 0;
00225       }
00226     virtual int put (ACE_Message_Block *mb, ACE_Time_Value *t)
00227       {
00228         return msg_queue ()->enqueue_tail
00229           (mb, (ACE_Time_Value *) &t);
00230       }
00231     virtual int info (char **strp, size_t length) const
00232       {
00233         char buf[256];
00234 
00235         ACE_OS::sprintf (buf,
00236                          "%s dmb_task coming up ... \n",
00237                          my_name_.c_str());
00238 
00239         if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
00240           return -1;
00241         else
00242           ACE_OS::strncpy (*strp, buf, length);
00243         return ACE_OS::strlen (buf);
00244       }
00245     virtual int suspend()
00246       {
00247         my_state_ = TASK_SUSPEND;
00248         my_port_->suspend();
00249         return 0; //add later
00250       }
00251     virtual int resume()
00252       {
00253         if(my_state_ == TASK_SUSPEND) {
00254           my_state_ = TASK_ACTIVE;
00255           my_port_->resume();
00256         }
00257         return 0; //add later
00258       }
00259     virtual int svc (void)   
00260       {
00261         my_name_ = rtrv_task_name();
00262 
00263         ACE_DEBUG ((LM_DEBUG,
00264                     "(%t) %s starts running ...\n", my_name().c_str()));
00265 
00266         return work();
00267       }     
00268 
00269   protected:
00270 
00271     const ACE_TCHAR *rtrv_task_name (void)
00272       {
00273         ACE_Service_Repository_Iterator iter
00274           (*ACE_Service_Repository::instance (), 0);
00275 
00276         for (const ACE_Service_Type *st;
00277              iter.next (st) != 0; iter.advance()) {
00278           const ACE_Service_Type_Impl *type = st->type();
00279           if (type == 0) continue;
00280           if (((ACE_Service_Object *)this) == type->object ())
00281             return st->name ();
00282         }
00283   
00284         return "Unamed Task";
00285       }
00286 
00287   };
00288 
00289 };
00290 
00291 
00292 #endif

Generated on Mon Feb 27 19:59:22 2006 for channel by  doxygen 1.4.6-NO