使用ACE_Proactor模式,在使用ACE_Asynch_Write_Stream.write发送数据的问题
1. 使用ACE_Proactor模式,客户端实现代码如下:
void TClientHandler::open(ACE_HANDLE handle, ACE_Message_Block &msgBlock){ this->handle(handle); int size = 0; int buf_len = sizeof(int); ACE_OS::getsockopt(handle, SOL_SOCKET, SO_RCVBUF, (char*)(&size), &buf_len); iLog.info("Receive size is %d", size); size = size * 10; ACE_OS::setsockopt(handle, SOL_SOCKET, SO_RCVBUF, (const char*)(&size), sizeof(int)); size = 0; ACE_OS::getsockopt(handle, SOL_SOCKET, SO_SNDBUF, (char*)(&size), &buf_len); iLog.info("Send size is %d", size); size = size * 10; ACE_OS::setsockopt(handle, SOL_SOCKET, SO_SNDBUF, (const char*)(&size), sizeof(int)); if (this->m_reader.open (*this) != 0) { delete this; return; } if (this->m_writer.open (*this) != 0) { delete this; return; } ACE_Message_Block *mb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN); if (this->m_reader.read (*mb, mb->space ()) != 0) { delete this; return; } // 向tmc发送注册命令 TMessage *tmsg = new (sizeof(TMessage))TMessage; tmsg->msgHeader.version = TVERSION_BASE; tmsg->msgHeader.cmdCode = CMD_LOGIN; tmsg->msgHeader.sendProcType = getLocalProcType(); tmsg->msgHeader.sendProcHandle = getLocalProcHandle(); this->write_message(tmsg, sizeof(TMessage)); return;}//异步读完成后会调用此函数 void TClientHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){ ACE_Message_Block &mbb = result.message_block(); if (!result.success () || result.bytes_transferred () == 0) { // 知道remoteaddress是哪个? mbb.release(); delete this; return; } ACE_Message_Block *mb = NULL; do { // 用这种方法尽量保证可以clone到数据 mb = mbb.clone(); } while (NULL == mb); mbb.release(); // 根据消息类型,将消息放入到不同的队列里面 TMessage *tmsg = NULL; tmsg = (TMessage *)mb->base(); // 每个接收的消息都记录一下日志 //writeLog(tmsg); if (MT_RESPONSE == tmsg->msgHeader.msgType) { // 如果是响应消息就放入到响应队列里 TResponseQueue::instance()->enqueue(mb); //iLog.info("Put message to response queue, retCode=%d", retCode); } else { // 走到这里都是请求消息或者通知消息,还有异步请求与异步响应 if (CMD_LOGIN == tmsg->msgHeader.cmdCode) { // 如果是命令码为CMD_LOGIN的通知消息,先处理一下,再放入消息队列 iLog.info("Login notice, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length); TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent); // 这个消息不需要放入队列,在这里release掉 mb->release(); // 发异步消息取相应TMC上所有连接的进程 TMessage *mymsg = new (sizeof(TMessage))TMessage; mymsg->msgHeader.version = TVERSION_BASE; mymsg->msgHeader.msgType = MT_ASYN_REQUEST; // 发送异步请求消息 mymsg->msgHeader.cmdCode = CMD_GETPROCINTMC; mymsg->msgHeader.sendProcType = getLocalProcType(); mymsg->msgHeader.sendProcHandle = getLocalProcHandle(); this->write_message(mymsg, sizeof(TMessage)); } else if (CMD_GETPROCINTMC == tmsg->msgHeader.cmdCode) { iLog.info("Process in tmc, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length); TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent); // 这个消息不需要放入队列,在这里release掉 mb->release(); } else { // 如果是同步请求消息,异步请求,通知消息和,异步请求,异步响应,就放入TikyTaskMgr队列 TikyTaskMgr *taskMgr = TikyAppMgr::instance()->getTaskMgr(DEFAULT_TASKMGRID); if (NULL != taskMgr) { taskMgr->putq(mb); //iLog.info("Put messaget to taskmgr, retCode=%d", retCode); } else { iLog.error("TaskMgr(%d) is not found...", DEFAULT_TASKMGRID); // 回送一个响应消息,没有找到TaskMgr TMessage *rspMsg = new (sizeof(TMessage))TMessage; rspMsg->msgHeader.msgType = MT_RESPONSE; rspMsg->msgHeader.sendProcType = tmsg->msgHeader.recvProcType; rspMsg->msgHeader.sendProcHandle = tmsg->msgHeader.recvProcHandle; rspMsg->msgHeader.recvProcType = tmsg->msgHeader.sendProcType; rspMsg->msgHeader.recvProcHandle = tmsg->msgHeader.sendProcHandle; rspMsg->msgHeader.sn = tmsg->msgHeader.sn; // 没有TaskMgr,就回送一个错误信息 rspMsg->msgHeader.retCode = ERR_TCOM_NOTFOUNDTASKMGR; this->write_message(rspMsg, sizeof(TMessage)); mb->release(); } } } ACE_Message_Block *nmb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN); if (this->m_reader.read(*nmb, nmb->space()) != 0) { nmb->release(); delete this; return; }}//异步写完成后会调用此函数void TClientHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){ iLog.info("Handle result is %d", result.success()); ACE_Message_Block &mb = result.message_block(); iLog.info("Write result=%ld, trans=%d, write=%d, len=%d", result.error(), result.bytes_transferred(), result.bytes_to_write(), mb.length()); TMessage *tmsg = (TMessage *)mb.base(); // 记录一下发送消息日志 writeLog(tmsg); delete tmsg; mb.release(); return;}int TClientHandler::write_message(TMessage *tmsg, unsigned short msgLength){ if (NULL == tmsg) { iLog.error("[TClientHandler::write_message] tmsg is NULL."); return -1; } ACE_Message_Block *mbb = new ACE_Message_Block((const char *)tmsg, msgLength); // 要设置一下wr_ptr的指针,要不mbb->length()返回的会是0 mbb->wr_ptr(msgLength); int retCode = this->m_writer.write(*mbb, mbb->length()); if (retCode != 0) { iLog.error("Send msg failure, lasterror=%d, length=%d, retCode=%d", ACE_OS::last_error(), mbb->length(), retCode); delete tmsg; mbb->release(); delete this; } return retCode;}