L:/channel/channel/include/Msg.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00018 #ifndef _MSG_H_
00019 #define _MSG_H_
00020 
00021 #include "ace/Log_Msg.h"
00022 //for ACE_GUARD macros, need the following 2 headers
00023 #include "ace/Synch.h"
00024 #include "ace/OS.h"
00025 #include "ace/CDR_Stream.h"
00026 
00027 
00028 #include <BaseDef.h>
00029 #include <LinearIdTrait.h>
00030 #include <HierarchicalIdTrait.h>
00031 #include <Marshaler.h>
00032 
00033 namespace channel {
00034 
00050 
00052   typedef void (*MsgFreeCallback) (char * data);
00053 
00055   template <class T>
00056     void DefaultMsgFreeCallback(char *c)
00057     {
00058       T *t = (T*) c;
00059       delete t;
00060     }
00061 
00063   template <class T>
00064     void DefaultMsgArrayFreeCallback(char *a)
00065     {
00066       T *t = (T*) a;
00067       delete[] t;
00068     }
00069 
00073   void DefaultNoOpMsgFreeCallback(char *)
00074     {
00075     }
00076 
00078   template <class SynchPolicy>
00079   class MsgDataHolder {
00080   public:
00081     char *data_;
00082     int ref_count_; 
00083     typename SynchPolicy::MUTEX ref_count_lock_;
00084     MsgDataHolder (char *data) : data_(data), ref_count_(1) {}
00085     void set_data(char *d, MsgFreeCallback gobbler) {
00086       ACE_GUARD (typename SynchPolicy::MUTEX, guard, ref_count_lock_);
00087       if (gobbler != NULL)
00088         gobbler (data_);
00089       else 
00090         delete data_; 
00091       data_ = d;
00092     }
00093     void ref_incr(void) {
00094       ACE_GUARD (typename SynchPolicy::MUTEX, guard, ref_count_lock_);
00095       ref_count_++;
00096     }
00097     void ref_decr(void) {
00098       ACE_GUARD (typename SynchPolicy::MUTEX, guard, ref_count_lock_);
00099       ref_count_--;
00100       if (ref_count_ < 0) ref_count_ = 0;
00101     }
00102     void release (MsgFreeCallback gobbler) {
00103       ACE_GUARD (typename SynchPolicy::MUTEX, guard, ref_count_lock_);
00104       if (ref_count_ > 1) { 
00105         ref_count_--;
00106       } else {
00107         if (gobbler != NULL)
00108           gobbler (data_);
00109         else 
00110           delete data_; 
00111         delete this;
00112       }
00113     }
00114   private:
00115     ~MsgDataHolder() {};
00116   };
00117 
00119   class DefaultMarshaler {
00120   public:
00121     static int marshal(ACE_OutputCDR &cdr, const char *data, const int size)
00122       {
00123       cdr << ACE_CDR::ULong (size);
00124       cdr.write_char_array (data, size);
00125       return cdr.good_bit();
00126       }
00127     static int demarshal(ACE_InputCDR &cdr, char * &data, int &size, MsgFreeCallback &callback)
00128       {
00129       ACE_CDR::ULong sz;
00130       cdr >> sz;
00131       size = (int)sz;
00132       data = new char[size];
00133       cdr.read_char_array(data, size);
00134       callback = DefaultMsgArrayFreeCallback<char>;
00135       return cdr.good_bit();
00136       }
00137   };    
00138 
00140   template <class IdType, class SynchPolicy>
00141   struct  Msg {    
00142     IdType type;
00143     int size_;
00144     MsgDataHolder<SynchPolicy> *data_holder_;
00145     MsgFreeCallback gobbler_;
00146     
00147     Msg () {}
00148 
00156     template <class MsgType>
00157     Msg (IdType t, MsgType *d, int s = sizeof(MsgType), 
00158          MsgFreeCallback g = DefaultMsgFreeCallback<MsgType>) : 
00159       type(t),size_(s),gobbler_(g) {
00160       data_holder_ = new MsgDataHolder<SynchPolicy>((char *)d);
00161     }
00162 
00163     Msg *clone (void) {
00164       Msg *m = new Msg();
00165       m->type = type;
00166       m->size_ = size_;
00167       m->data_holder_ = data_holder_;
00168       m->gobbler_ = gobbler_;
00169       m->data_holder_->ref_incr();
00170       return m;
00171     }
00172     ~Msg () {
00173       data_holder_->release(gobbler_);
00174     };
00175 
00176     char * data(void) {
00177       return data_holder_->data_;
00178     }
00179     template <class MsgType>
00180     void set_data(MsgType *d, int s = sizeof(MsgType), MsgFreeCallback g = DefaultMsgFreeCallback<MsgType>) {
00181       size_ = s;
00182       gobbler_ = g;
00183       data_holder_->set_data((char *)d, gobbler_);
00184     }
00185 
00186     int marshal(ACE_OutputCDR &cdr, Marshaler *mar) {
00188       if (IdTrait<IdType>::marshal(cdr, type) == 0) {
00189         return 0;
00190       }
00192       if (mar != NULL)
00193         mar->marshal(cdr, data_holder_->data_, size_);
00194       else
00195         DefaultMarshaler::marshal(cdr, data_holder_->data_, size_);
00196       return cdr.good_bit ();
00197     }
00198 
00199     int demarshal(ACE_InputCDR &cdr, Marshaler *mar) {
00200       //demarshal_IdType is already done at outside
00201       //IdTrait<IdType>::demarshal(cdr, type) == 0) {
00202       //demarshal data
00203       char *data;
00204       if (mar != NULL)
00205         mar->demarshal(cdr, data, size_, gobbler_);
00206       else
00207         DefaultMarshaler::demarshal(cdr, data, size_, gobbler_);
00208       data_holder_ = new MsgDataHolder<SynchPolicy>(data);
00209       return cdr.good_bit();
00210     }
00211       
00212   };
00213 
00217 
00218 
00219   struct Channel_Info_Msg {
00220     enum { MAX_ADDR_NAME_LEN = 128 };
00221     char host_addr[MAX_ADDR_NAME_LEN];
00222     u_short port;
00223     char unix_addr[MAX_ADDR_NAME_LEN];
00224     int conn_type;
00225     void *intf;  
00226     bool is_local;
00227     Channel_Info_Msg() {
00228       port = 0;
00229       host_addr[0] = '\0';
00230       unix_addr[0] = '\0';
00231       conn_type = 0;
00232       intf = NULL;
00233       is_local = false;
00234     }
00235   };
00236 
00239   template <class IdType>
00240   struct PubSub_Info_Msg {
00241     enum { MAX_REGISTRY_ENTRIES = 128 };
00242     int num_msg_types;
00243     IdType msg_types[MAX_REGISTRY_ENTRIES];
00244     PubSub_Info_Msg() {
00245       num_msg_types = 0;
00246     }
00247   };
00248 
00250   class ChannelMsg_Marshaler : public Marshaler {
00251   public:
00252     virtual int marshal(ACE_OutputCDR &cdr, const char *data, const int size) 
00253       {
00254         ACE_UNUSED_ARG(size);
00255         Channel_Info_Msg *msg = (Channel_Info_Msg *) data;
00256         int len = strlen(msg->host_addr);
00257         cdr << ACE_CDR::ULong (len);
00258         if (len > 0) 
00259           cdr.write_char_array (msg->host_addr, len);
00260         cdr << ACE_CDR::ULong (msg->port);
00261         len = strlen(msg->unix_addr);
00262         cdr << ACE_CDR::ULong (len);
00263         if (len > 0) 
00264           cdr.write_char_array (msg->unix_addr, len);
00265         cdr << ACE_CDR::ULong (msg->conn_type);
00266         return cdr.good_bit();
00267       }
00268     virtual int demarshal(ACE_InputCDR &cdr, char * &data, int &size, MsgFreeCallback &callback)
00269       {
00270         Channel_Info_Msg *msg = new Channel_Info_Msg();
00271         ACE_CDR::ULong num;
00272         cdr >> num;
00273         int len = (int)num;
00274         if (len > 0) {
00275           cdr.read_char_array(msg->host_addr, len);
00276           msg->host_addr[len] = '\0';
00277         }
00278         cdr >> num;
00279         msg->port = (u_short) num;
00280         cdr >> num;
00281         len = (int)num;
00282         if (len > 0) {
00283           cdr.read_char_array(msg->unix_addr, len);
00284           msg->unix_addr[len] = '\0';
00285         }
00286         cdr >> num;
00287         msg->conn_type = (int) num;
00288         data = (char*)msg;
00289         size = sizeof(Channel_Info_Msg);
00290         callback = DefaultMsgFreeCallback<Channel_Info_Msg>;
00291         return cdr.good_bit();
00292       }
00293   };
00294 
00296   template <class IdType>
00297   class PubSubMsg_Marshaler : public Marshaler {
00298   public:
00299     virtual int marshal(ACE_OutputCDR &cdr, const char *data, const int size) 
00300       {
00301         ACE_UNUSED_ARG(size);
00302         PubSub_Info_Msg<IdType> *msg = (PubSub_Info_Msg<IdType> *) data;
00303         cdr << ACE_CDR::Long (msg->num_msg_types);
00304         for (int i=0;i<msg->num_msg_types;i++) {
00305           IdType id = msg->msg_types[i];
00306           IdTrait<IdType>::marshal(cdr, id);
00307         }
00308         return cdr.good_bit();
00309       }
00310     virtual int demarshal(ACE_InputCDR &cdr, char * &data, int &size, MsgFreeCallback &callback)
00311       {
00312         PubSub_Info_Msg<IdType> *msg = new PubSub_Info_Msg<IdType>();
00313         ACE_CDR::Long num;
00314         cdr >> num;
00315         msg->num_msg_types = (int)num;
00316         for (int i=0;i<msg->num_msg_types;i++) {
00317           IdType id;
00318           IdTrait<IdType>::demarshal(cdr, id);
00319           msg->msg_types[i] = id;
00320         }
00321         data = (char*)msg;
00322         size = sizeof(PubSub_Info_Msg<IdType>);
00323         callback = DefaultMsgFreeCallback<PubSub_Info_Msg<IdType> >;
00324         return cdr.good_bit();
00325       }
00326   };
00327 
00328 };
00329 
00330 #endif

Generated on Mon Feb 27 19:59:21 2006 for channel by  doxygen 1.4.6-NO