L:/channel/channel/include/ConnHandler.h

Go to the documentation of this file.
00001 
00002 // Copyright (c) 2005, 2006 Yigong Liu
00003 // Permission to use, copy, modify, distribute and sell this software for any 
00004 //     purpose is hereby granted without fee, provided that the above copyright 
00005 //     notice appear in all copies and that both that copyright notice and this 
00006 //     permission notice appear in supporting documentation.
00007 // The author makes no representations about the 
00008 //     suitability of this software for any purpose. It is provided "as is" 
00009 //     without express or implied warranty.
00011 
00012 #ifndef _CONN_HANDLER_
00013 #define _CONN_HANDLER_
00014 
00015 #include "ace/SOCK_Stream.h"
00016 #include "ace/Message_Block.h"
00017 #include "ace/CDR_Stream.h"
00018 #include "ace/Log_Msg.h"
00019 
00020 #include <BaseDef.h>
00021 
00022 namespace channel {
00023 
00024   template <class, class> class Connector;
00025   template <class, class> class RemoteInterface;
00026   class ConnInfo;
00027 
00036   template <class Channel, class Transport>
00037     class ConnHandler {
00038     public:
00039     typedef typename Channel::IdType IdType;
00040     typedef typename Channel::IdTrait IdTrait;
00041     typedef typename Channel::Msg Msg;
00042     typedef Connector<Channel, Transport> Connector;
00043     typedef ConnHandler<Channel, Transport> ConnHandlerType;
00044     typedef RemoteInterface<Channel, Transport> RemoteInterface;
00045 
00046     Connector *cm_; //my Connector
00047     ConnInfo rmt_addr_; //my peer "published" addr
00048     RemoteInterface *intf_; //my local interface to channel
00049 
00050     ConnHandler ()
00051       : cm_(NULL), intf_(NULL) {}
00052     virtual ~ConnHandler () {}
00055     virtual ACE_SOCK_Stream &peer_stream(void) = 0; //return my peer, either unix_domain or tcp sockets
00056     virtual void shut_down(void) = 0;
00057 
00059     int set_up(Connector *c, Interface_Role r)
00060       {
00061         cm_ = c;
00062         intf_ = new RemoteInterface(this, r, c->ch_);
00063         if (r == ACTIVE_ROLE) {
00064           //active side will add itself to conn_map here
00065           //passive side have to wait for the 1st msg
00066           if (c->add_conn (rmt_addr_, this) == FAILURE)
00067             return -1;
00068 
00069           //notify local channel members that peer connected
00070           intf_->notify_connected (rmt_addr_);
00071 
00072           // active side will initiate the start-up handshaking process
00073           if(intf_->send2remote_chan_info(Transport::type())==FAILURE) return -1;
00074         }
00075         return 0;
00076       }
00077 
00079     int service (void)
00080       {
00081         ConnInfo addr;
00082         ACE_DEBUG ((LM_DEBUG, "ConnHandler thread start...\n"));
00083         Msg *msg = 0;
00084         for (;;)
00085           switch (recv_message (msg)) {
00086           case -1: 
00087           case 0: 
00088             ACE_DEBUG ((LM_DEBUG, "(%t) error with reading socket or remote close connection...\n"));
00089             ACE_DEBUG ((LM_DEBUG, "(%t) one reader thread exits...\n"));
00090             //cleanup old data
00091             if(!cm_->exit_start_) {
00092               addr = rmt_addr_;
00093               delete intf_;
00094               cm_->del_conn(this);
00095             } else {
00096               ACE_DEBUG ((LM_DEBUG, "before closeup...\n"));
00097             }
00098             return 0;   // Client closed connection.
00099             break;
00100           default: 
00101             //ACE_DEBUG ((LM_DEBUG, "recv a msg...\n"));
00102             if (handle_message(msg) == FAILURE)
00103               return -1;
00104           }
00105         /* NOTREACHED */
00106         return 0;
00107       }
00108 
00109     Status handle_message(Msg *msg)
00110       {
00111         return intf_->handle_message(msg);
00112       }
00113 
00115     int recv_message(Msg *&msg)
00116       {
00117         //ACE_DEBUG ((LM_DEBUG,
00118         //            "(%t) start recving a msg...\n"));
00119 
00120         ACE_Message_Block *hdr =
00121           new ACE_Message_Block (ACE_DEFAULT_CDR_BUFSIZE);
00122         // Align the Message Block for a CDR stream
00123         ACE_CDR::mb_align(hdr);
00124 
00125         if (peer_stream().recv_n (hdr->wr_ptr (), 8) == 8) {
00126           //ACE_DEBUG ((LM_DEBUG,
00127           //          "(%t) recving a msg...1.5\n"));
00128           hdr->wr_ptr (8);               // Reflect addition of 8 bytes
00129 
00130           // Create a CDR stream to parse the 8-byte header.
00131           ACE_InputCDR hdr_cdr (hdr);
00132 
00133           // Extract the byte-order and use helper methods to
00134           // disambiguate octet, booleans, and chars.
00135           ACE_CDR::Boolean byte_order;
00136           hdr_cdr >> ACE_InputCDR::to_boolean (byte_order);
00137 
00138           // Set the byte-order on the stream...
00139           hdr_cdr.reset_byte_order (byte_order);
00140 
00141           // Extract the length
00142           ACE_CDR::Long msg_len;
00143           hdr_cdr >> msg_len;
00144 
00145           hdr->release ();
00146           hdr = 0;
00147 
00148           //the mem space for msg is allocated here inside mblk,
00149           //it will be deleted by default when release(). If it
00150           //need to be forwarded to other threads, need mark DONT_DELETE
00151           ACE_Message_Block *mb = new ACE_Message_Block (msg_len);
00152           // Align the Message Block for a CDR stream
00153           ACE_CDR::mb_align(mb);
00154   
00155           if (peer_stream().recv_n (mb->wr_ptr (), msg_len) == msg_len) {
00156             mb->wr_ptr (msg_len);
00157 
00158             ACE_InputCDR cdr (mb);
00159             cdr.reset_byte_order (byte_order);
00160             
00161             IdType type;
00162             IdTrait::demarshal(cdr, type);
00163             Marshaler *mar = cm_->get_marshaler(type);
00164             msg = new Msg();
00165             msg->type = type;
00166             msg->demarshal(cdr, mar);
00167 
00168             mb->release();
00169 
00170             ACE_DEBUG((LM_DEBUG, "read something from sock...\n"));
00171           
00172             return msg_len;
00173           } else {
00174             mb->release();
00175             mb = 0;
00176           }
00177         }
00178 
00179         hdr->release ();
00180         hdr = 0;
00181   
00182         ACE_DEBUG ((LM_DEBUG,
00183                       "(%t) something wrong when reading socket...\n"));
00184 
00185         return -1;
00186       }
00187 
00189     int send_msg (Msg *msg)
00190       {
00191         Marshaler *mar = cm_->get_marshaler(msg->type);
00192 
00193         ACE_DEBUG((LM_DEBUG, "ConnHandler::send_msg [%s]\n",ID2STR(msg->type).c_str()));
00194 
00195         const size_t max_payload_size = 
00196           4 //type_len
00197           //+ IdTrait<typename Channel::IdType>::size(msg->type)
00198           + IdTrait::size(msg->type)
00199           + 4 //data_len
00200           + msg->size_
00201           + ACE_CDR::MAX_ALIGNMENT; // padding;
00202         
00203         ACE_OutputCDR payload (max_payload_size);
00204         ACE_DEBUG((LM_DEBUG, "before msg marshal\n"));
00205         msg->marshal(payload, mar);
00206         ACE_DEBUG((LM_DEBUG, "after msg marshal\n"));
00207         ACE_CDR::ULong length = payload.total_length ();
00208 
00209         iovec iov[2];
00210         int iov_size = 0;
00211 
00212         ACE_OutputCDR header(ACE_CDR::MAX_ALIGNMENT+8);
00213         header << ACE_OutputCDR::from_boolean(ACE_CDR_BYTE_ORDER);
00214         header << ACE_CDR::ULong(length);
00215 
00216         iov[0].iov_base = header.begin()->rd_ptr();
00217         iov[0].iov_len = 8;
00218         iov[1].iov_base = payload.begin ()->rd_ptr ();
00219         iov[1].iov_len  = length;
00220         iov_size = 2;
00221     
00222         //acquire lock first
00223         //ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, conn->lock_, -1);
00224         ACE_GUARD_REACTION(ACE_Thread_Mutex, guard, intf_->lock_, ACE_DEBUG ((LM_DEBUG, "failed to lock out-sock\n")); delete msg; return -1);
00225   
00226         // Send header and payload efficiently using "gather-write".
00227         peer_stream().sendv_n (iov, iov_size);
00228         //note: the msg_block don't own this out_msg (since it is not created
00229         //by this mblk; we have to delete it explicitly 
00230         delete msg;
00231 
00232         ACE_DEBUG((LM_DEBUG, "ConnHandler::send_msg OUT 1 msg\n"));
00233 
00234         return 0;
00235       }
00236   };
00237 
00238 };
00239 
00240 #endif

Generated on Mon Feb 27 19:59:21 2006 for channel by  doxygen 1.4.6-NO