00001
00002
00003
00004
00005
00006
00007
00008
00009
00011
00012 #ifndef _CONN_HANDLER_
00013 #define _CONN_HANDLER_
00014
00015 #include "ace/SOCK_Stream.h"
00016 #include "ace/Message_Block.h"
00017 #include "ace/CDR_Stream.h"
00018 #include "ace/Log_Msg.h"
00019
00020 #include <BaseDef.h>
00021
00022 namespace channel {
00023
00024 template <class, class> class Connector;
00025 template <class, class> class RemoteInterface;
00026 class ConnInfo;
00027
00036 template <class Channel, class Transport>
00037 class ConnHandler {
00038 public:
00039 typedef typename Channel::IdType IdType;
00040 typedef typename Channel::IdTrait IdTrait;
00041 typedef typename Channel::Msg Msg;
00042 typedef Connector<Channel, Transport> Connector;
00043 typedef ConnHandler<Channel, Transport> ConnHandlerType;
00044 typedef RemoteInterface<Channel, Transport> RemoteInterface;
00045
00046 Connector *cm_;
00047 ConnInfo rmt_addr_;
00048 RemoteInterface *intf_;
00049
00050 ConnHandler ()
00051 : cm_(NULL), intf_(NULL) {}
00052 virtual ~ConnHandler () {}
00055 virtual ACE_SOCK_Stream &peer_stream(void) = 0;
00056 virtual void shut_down(void) = 0;
00057
00059 int set_up(Connector *c, Interface_Role r)
00060 {
00061 cm_ = c;
00062 intf_ = new RemoteInterface(this, r, c->ch_);
00063 if (r == ACTIVE_ROLE) {
00064
00065
00066 if (c->add_conn (rmt_addr_, this) == FAILURE)
00067 return -1;
00068
00069
00070 intf_->notify_connected (rmt_addr_);
00071
00072
00073 if(intf_->send2remote_chan_info(Transport::type())==FAILURE) return -1;
00074 }
00075 return 0;
00076 }
00077
00079 int service (void)
00080 {
00081 ConnInfo addr;
00082 ACE_DEBUG ((LM_DEBUG, "ConnHandler thread start...\n"));
00083 Msg *msg = 0;
00084 for (;;)
00085 switch (recv_message (msg)) {
00086 case -1:
00087 case 0:
00088 ACE_DEBUG ((LM_DEBUG, "(%t) error with reading socket or remote close connection...\n"));
00089 ACE_DEBUG ((LM_DEBUG, "(%t) one reader thread exits...\n"));
00090
00091 if(!cm_->exit_start_) {
00092 addr = rmt_addr_;
00093 delete intf_;
00094 cm_->del_conn(this);
00095 } else {
00096 ACE_DEBUG ((LM_DEBUG, "before closeup...\n"));
00097 }
00098 return 0;
00099 break;
00100 default:
00101
00102 if (handle_message(msg) == FAILURE)
00103 return -1;
00104 }
00105
00106 return 0;
00107 }
00108
00109 Status handle_message(Msg *msg)
00110 {
00111 return intf_->handle_message(msg);
00112 }
00113
00115 int recv_message(Msg *&msg)
00116 {
00117
00118
00119
00120 ACE_Message_Block *hdr =
00121 new ACE_Message_Block (ACE_DEFAULT_CDR_BUFSIZE);
00122
00123 ACE_CDR::mb_align(hdr);
00124
00125 if (peer_stream().recv_n (hdr->wr_ptr (), 8) == 8) {
00126
00127
00128 hdr->wr_ptr (8);
00129
00130
00131 ACE_InputCDR hdr_cdr (hdr);
00132
00133
00134
00135 ACE_CDR::Boolean byte_order;
00136 hdr_cdr >> ACE_InputCDR::to_boolean (byte_order);
00137
00138
00139 hdr_cdr.reset_byte_order (byte_order);
00140
00141
00142 ACE_CDR::Long msg_len;
00143 hdr_cdr >> msg_len;
00144
00145 hdr->release ();
00146 hdr = 0;
00147
00148
00149
00150
00151 ACE_Message_Block *mb = new ACE_Message_Block (msg_len);
00152
00153 ACE_CDR::mb_align(mb);
00154
00155 if (peer_stream().recv_n (mb->wr_ptr (), msg_len) == msg_len) {
00156 mb->wr_ptr (msg_len);
00157
00158 ACE_InputCDR cdr (mb);
00159 cdr.reset_byte_order (byte_order);
00160
00161 IdType type;
00162 IdTrait::demarshal(cdr, type);
00163 Marshaler *mar = cm_->get_marshaler(type);
00164 msg = new Msg();
00165 msg->type = type;
00166 msg->demarshal(cdr, mar);
00167
00168 mb->release();
00169
00170 ACE_DEBUG((LM_DEBUG, "read something from sock...\n"));
00171
00172 return msg_len;
00173 } else {
00174 mb->release();
00175 mb = 0;
00176 }
00177 }
00178
00179 hdr->release ();
00180 hdr = 0;
00181
00182 ACE_DEBUG ((LM_DEBUG,
00183 "(%t) something wrong when reading socket...\n"));
00184
00185 return -1;
00186 }
00187
00189 int send_msg (Msg *msg)
00190 {
00191 Marshaler *mar = cm_->get_marshaler(msg->type);
00192
00193 ACE_DEBUG((LM_DEBUG, "ConnHandler::send_msg [%s]\n",ID2STR(msg->type).c_str()));
00194
00195 const size_t max_payload_size =
00196 4
00197
00198 + IdTrait::size(msg->type)
00199 + 4
00200 + msg->size_
00201 + ACE_CDR::MAX_ALIGNMENT;
00202
00203 ACE_OutputCDR payload (max_payload_size);
00204 ACE_DEBUG((LM_DEBUG, "before msg marshal\n"));
00205 msg->marshal(payload, mar);
00206 ACE_DEBUG((LM_DEBUG, "after msg marshal\n"));
00207 ACE_CDR::ULong length = payload.total_length ();
00208
00209 iovec iov[2];
00210 int iov_size = 0;
00211
00212 ACE_OutputCDR header(ACE_CDR::MAX_ALIGNMENT+8);
00213 header << ACE_OutputCDR::from_boolean(ACE_CDR_BYTE_ORDER);
00214 header << ACE_CDR::ULong(length);
00215
00216 iov[0].iov_base = header.begin()->rd_ptr();
00217 iov[0].iov_len = 8;
00218 iov[1].iov_base = payload.begin ()->rd_ptr ();
00219 iov[1].iov_len = length;
00220 iov_size = 2;
00221
00222
00223
00224 ACE_GUARD_REACTION(ACE_Thread_Mutex, guard, intf_->lock_, ACE_DEBUG ((LM_DEBUG, "failed to lock out-sock\n")); delete msg; return -1);
00225
00226
00227 peer_stream().sendv_n (iov, iov_size);
00228
00229
00230 delete msg;
00231
00232 ACE_DEBUG((LM_DEBUG, "ConnHandler::send_msg OUT 1 msg\n"));
00233
00234 return 0;
00235 }
00236 };
00237
00238 };
00239
00240 #endif