00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00018 #ifndef _MSG_H_
00019 #define _MSG_H_
00020
00021 #include "ace/Log_Msg.h"
00022
00023 #include "ace/Synch.h"
00024 #include "ace/OS.h"
00025 #include "ace/CDR_Stream.h"
00026
00027
00028 #include <BaseDef.h>
00029 #include <LinearIdTrait.h>
00030 #include <HierarchicalIdTrait.h>
00031 #include <Marshaler.h>
00032
00033 namespace channel {
00034
00050
00052 typedef void (*MsgFreeCallback) (char * data);
00053
00055 template <class T>
00056 void DefaultMsgFreeCallback(char *c)
00057 {
00058 T *t = (T*) c;
00059 delete t;
00060 }
00061
00063 template <class T>
00064 void DefaultMsgArrayFreeCallback(char *a)
00065 {
00066 T *t = (T*) a;
00067 delete[] t;
00068 }
00069
00073 void DefaultNoOpMsgFreeCallback(char *)
00074 {
00075 }
00076
00078 template <class SynchPolicy>
00079 class MsgDataHolder {
00080 public:
00081 char *data_;
00082 int ref_count_;
00083 typename SynchPolicy::MUTEX ref_count_lock_;
00084 MsgDataHolder (char *data) : data_(data), ref_count_(1) {}
00085 void set_data(char *d, MsgFreeCallback gobbler) {
00086 ACE_GUARD (typename SynchPolicy::MUTEX, guard, ref_count_lock_);
00087 if (gobbler != NULL)
00088 gobbler (data_);
00089 else
00090 delete data_;
00091 data_ = d;
00092 }
00093 void ref_incr(void) {
00094 ACE_GUARD (typename SynchPolicy::MUTEX, guard, ref_count_lock_);
00095 ref_count_++;
00096 }
00097 void ref_decr(void) {
00098 ACE_GUARD (typename SynchPolicy::MUTEX, guard, ref_count_lock_);
00099 ref_count_--;
00100 if (ref_count_ < 0) ref_count_ = 0;
00101 }
00102 void release (MsgFreeCallback gobbler) {
00103 ACE_GUARD (typename SynchPolicy::MUTEX, guard, ref_count_lock_);
00104 if (ref_count_ > 1) {
00105 ref_count_--;
00106 } else {
00107 if (gobbler != NULL)
00108 gobbler (data_);
00109 else
00110 delete data_;
00111 delete this;
00112 }
00113 }
00114 private:
00115 ~MsgDataHolder() {};
00116 };
00117
00119 class DefaultMarshaler {
00120 public:
00121 static int marshal(ACE_OutputCDR &cdr, const char *data, const int size)
00122 {
00123 cdr << ACE_CDR::ULong (size);
00124 cdr.write_char_array (data, size);
00125 return cdr.good_bit();
00126 }
00127 static int demarshal(ACE_InputCDR &cdr, char * &data, int &size, MsgFreeCallback &callback)
00128 {
00129 ACE_CDR::ULong sz;
00130 cdr >> sz;
00131 size = (int)sz;
00132 data = new char[size];
00133 cdr.read_char_array(data, size);
00134 callback = DefaultMsgArrayFreeCallback<char>;
00135 return cdr.good_bit();
00136 }
00137 };
00138
00140 template <class IdType, class SynchPolicy>
00141 struct Msg {
00142 IdType type;
00143 int size_;
00144 MsgDataHolder<SynchPolicy> *data_holder_;
00145 MsgFreeCallback gobbler_;
00146
00147 Msg () {}
00148
00156 template <class MsgType>
00157 Msg (IdType t, MsgType *d, int s = sizeof(MsgType),
00158 MsgFreeCallback g = DefaultMsgFreeCallback<MsgType>) :
00159 type(t),size_(s),gobbler_(g) {
00160 data_holder_ = new MsgDataHolder<SynchPolicy>((char *)d);
00161 }
00162
00163 Msg *clone (void) {
00164 Msg *m = new Msg();
00165 m->type = type;
00166 m->size_ = size_;
00167 m->data_holder_ = data_holder_;
00168 m->gobbler_ = gobbler_;
00169 m->data_holder_->ref_incr();
00170 return m;
00171 }
00172 ~Msg () {
00173 data_holder_->release(gobbler_);
00174 };
00175
00176 char * data(void) {
00177 return data_holder_->data_;
00178 }
00179 template <class MsgType>
00180 void set_data(MsgType *d, int s = sizeof(MsgType), MsgFreeCallback g = DefaultMsgFreeCallback<MsgType>) {
00181 size_ = s;
00182 gobbler_ = g;
00183 data_holder_->set_data((char *)d, gobbler_);
00184 }
00185
00186 int marshal(ACE_OutputCDR &cdr, Marshaler *mar) {
00188 if (IdTrait<IdType>::marshal(cdr, type) == 0) {
00189 return 0;
00190 }
00192 if (mar != NULL)
00193 mar->marshal(cdr, data_holder_->data_, size_);
00194 else
00195 DefaultMarshaler::marshal(cdr, data_holder_->data_, size_);
00196 return cdr.good_bit ();
00197 }
00198
00199 int demarshal(ACE_InputCDR &cdr, Marshaler *mar) {
00200
00201
00202
00203 char *data;
00204 if (mar != NULL)
00205 mar->demarshal(cdr, data, size_, gobbler_);
00206 else
00207 DefaultMarshaler::demarshal(cdr, data, size_, gobbler_);
00208 data_holder_ = new MsgDataHolder<SynchPolicy>(data);
00209 return cdr.good_bit();
00210 }
00211
00212 };
00213
00217
00218
00219 struct Channel_Info_Msg {
00220 enum { MAX_ADDR_NAME_LEN = 128 };
00221 char host_addr[MAX_ADDR_NAME_LEN];
00222 u_short port;
00223 char unix_addr[MAX_ADDR_NAME_LEN];
00224 int conn_type;
00225 void *intf;
00226 bool is_local;
00227 Channel_Info_Msg() {
00228 port = 0;
00229 host_addr[0] = '\0';
00230 unix_addr[0] = '\0';
00231 conn_type = 0;
00232 intf = NULL;
00233 is_local = false;
00234 }
00235 };
00236
00239 template <class IdType>
00240 struct PubSub_Info_Msg {
00241 enum { MAX_REGISTRY_ENTRIES = 128 };
00242 int num_msg_types;
00243 IdType msg_types[MAX_REGISTRY_ENTRIES];
00244 PubSub_Info_Msg() {
00245 num_msg_types = 0;
00246 }
00247 };
00248
00250 class ChannelMsg_Marshaler : public Marshaler {
00251 public:
00252 virtual int marshal(ACE_OutputCDR &cdr, const char *data, const int size)
00253 {
00254 ACE_UNUSED_ARG(size);
00255 Channel_Info_Msg *msg = (Channel_Info_Msg *) data;
00256 int len = strlen(msg->host_addr);
00257 cdr << ACE_CDR::ULong (len);
00258 if (len > 0)
00259 cdr.write_char_array (msg->host_addr, len);
00260 cdr << ACE_CDR::ULong (msg->port);
00261 len = strlen(msg->unix_addr);
00262 cdr << ACE_CDR::ULong (len);
00263 if (len > 0)
00264 cdr.write_char_array (msg->unix_addr, len);
00265 cdr << ACE_CDR::ULong (msg->conn_type);
00266 return cdr.good_bit();
00267 }
00268 virtual int demarshal(ACE_InputCDR &cdr, char * &data, int &size, MsgFreeCallback &callback)
00269 {
00270 Channel_Info_Msg *msg = new Channel_Info_Msg();
00271 ACE_CDR::ULong num;
00272 cdr >> num;
00273 int len = (int)num;
00274 if (len > 0) {
00275 cdr.read_char_array(msg->host_addr, len);
00276 msg->host_addr[len] = '\0';
00277 }
00278 cdr >> num;
00279 msg->port = (u_short) num;
00280 cdr >> num;
00281 len = (int)num;
00282 if (len > 0) {
00283 cdr.read_char_array(msg->unix_addr, len);
00284 msg->unix_addr[len] = '\0';
00285 }
00286 cdr >> num;
00287 msg->conn_type = (int) num;
00288 data = (char*)msg;
00289 size = sizeof(Channel_Info_Msg);
00290 callback = DefaultMsgFreeCallback<Channel_Info_Msg>;
00291 return cdr.good_bit();
00292 }
00293 };
00294
00296 template <class IdType>
00297 class PubSubMsg_Marshaler : public Marshaler {
00298 public:
00299 virtual int marshal(ACE_OutputCDR &cdr, const char *data, const int size)
00300 {
00301 ACE_UNUSED_ARG(size);
00302 PubSub_Info_Msg<IdType> *msg = (PubSub_Info_Msg<IdType> *) data;
00303 cdr << ACE_CDR::Long (msg->num_msg_types);
00304 for (int i=0;i<msg->num_msg_types;i++) {
00305 IdType id = msg->msg_types[i];
00306 IdTrait<IdType>::marshal(cdr, id);
00307 }
00308 return cdr.good_bit();
00309 }
00310 virtual int demarshal(ACE_InputCDR &cdr, char * &data, int &size, MsgFreeCallback &callback)
00311 {
00312 PubSub_Info_Msg<IdType> *msg = new PubSub_Info_Msg<IdType>();
00313 ACE_CDR::Long num;
00314 cdr >> num;
00315 msg->num_msg_types = (int)num;
00316 for (int i=0;i<msg->num_msg_types;i++) {
00317 IdType id;
00318 IdTrait<IdType>::demarshal(cdr, id);
00319 msg->msg_types[i] = id;
00320 }
00321 data = (char*)msg;
00322 size = sizeof(PubSub_Info_Msg<IdType>);
00323 callback = DefaultMsgFreeCallback<PubSub_Info_Msg<IdType> >;
00324 return cdr.good_bit();
00325 }
00326 };
00327
00328 };
00329
00330 #endif