首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

使用ACE_Proactor模式,在使用ACE_Asynch_Write_Stream.write发送数据的有关问题

2012-07-28 
使用ACE_Proactor模式,在使用ACE_Asynch_Write_Stream.write发送数据的问题1. 使用ACE_Proactor模式,客户

使用ACE_Proactor模式,在使用ACE_Asynch_Write_Stream.write发送数据的问题
1. 使用ACE_Proactor模式,客户端实现代码如下:

void TClientHandler::open(ACE_HANDLE handle, ACE_Message_Block &msgBlock){      this->handle(handle);    int size = 0;      int buf_len = sizeof(int);      ACE_OS::getsockopt(handle, SOL_SOCKET, SO_RCVBUF, (char*)(&size), &buf_len);    iLog.info("Receive size is %d", size);    size = size * 10;      ACE_OS::setsockopt(handle, SOL_SOCKET, SO_RCVBUF, (const char*)(&size), sizeof(int));    size = 0;    ACE_OS::getsockopt(handle, SOL_SOCKET, SO_SNDBUF, (char*)(&size), &buf_len);    iLog.info("Send size is %d", size);    size = size * 10;    ACE_OS::setsockopt(handle, SOL_SOCKET, SO_SNDBUF, (const char*)(&size), sizeof(int));    if (this->m_reader.open (*this) != 0)    {        delete this;        return;    }      if (this->m_writer.open (*this) != 0)    {        delete this;        return;    }    ACE_Message_Block *mb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN);    if (this->m_reader.read (*mb, mb->space ()) != 0)    {        delete this;        return;    }    //  向tmc发送注册命令    TMessage *tmsg = new (sizeof(TMessage))TMessage;    tmsg->msgHeader.version = TVERSION_BASE;    tmsg->msgHeader.cmdCode = CMD_LOGIN;    tmsg->msgHeader.sendProcType = getLocalProcType();    tmsg->msgHeader.sendProcHandle =  getLocalProcHandle();    this->write_message(tmsg, sizeof(TMessage));    return;}//异步读完成后会调用此函数  void TClientHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){      ACE_Message_Block &mbb = result.message_block();      if (!result.success () || result.bytes_transferred () == 0)    {        // 知道remoteaddress是哪个?        mbb.release();        delete this;        return;    }        ACE_Message_Block *mb = NULL;    do     {        // 用这种方法尽量保证可以clone到数据        mb = mbb.clone();    } while (NULL == mb);    mbb.release();    // 根据消息类型,将消息放入到不同的队列里面    TMessage *tmsg = NULL;    tmsg = (TMessage *)mb->base();    // 每个接收的消息都记录一下日志    //writeLog(tmsg);    if (MT_RESPONSE == tmsg->msgHeader.msgType)    {        // 如果是响应消息就放入到响应队列里        TResponseQueue::instance()->enqueue(mb);        //iLog.info("Put message to response queue, retCode=%d", retCode);    }    else    {        // 走到这里都是请求消息或者通知消息,还有异步请求与异步响应        if (CMD_LOGIN == tmsg->msgHeader.cmdCode)        {            // 如果是命令码为CMD_LOGIN的通知消息,先处理一下,再放入消息队列            iLog.info("Login notice, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length);            TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent);            // 这个消息不需要放入队列,在这里release掉            mb->release();            // 发异步消息取相应TMC上所有连接的进程            TMessage *mymsg = new (sizeof(TMessage))TMessage;            mymsg->msgHeader.version = TVERSION_BASE;            mymsg->msgHeader.msgType = MT_ASYN_REQUEST;  // 发送异步请求消息            mymsg->msgHeader.cmdCode = CMD_GETPROCINTMC;            mymsg->msgHeader.sendProcType = getLocalProcType();            mymsg->msgHeader.sendProcHandle =  getLocalProcHandle();            this->write_message(mymsg, sizeof(TMessage));        }        else if (CMD_GETPROCINTMC == tmsg->msgHeader.cmdCode)        {            iLog.info("Process in tmc, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length);            TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent);            // 这个消息不需要放入队列,在这里release掉            mb->release();        }        else        {            // 如果是同步请求消息,异步请求,通知消息和,异步请求,异步响应,就放入TikyTaskMgr队列            TikyTaskMgr *taskMgr = TikyAppMgr::instance()->getTaskMgr(DEFAULT_TASKMGRID);            if (NULL != taskMgr)            {                taskMgr->putq(mb);                //iLog.info("Put messaget to taskmgr, retCode=%d", retCode);            }            else            {                iLog.error("TaskMgr(%d) is not found...", DEFAULT_TASKMGRID);                                // 回送一个响应消息,没有找到TaskMgr                TMessage *rspMsg = new (sizeof(TMessage))TMessage;                rspMsg->msgHeader.msgType = MT_RESPONSE;                rspMsg->msgHeader.sendProcType = tmsg->msgHeader.recvProcType;                rspMsg->msgHeader.sendProcHandle = tmsg->msgHeader.recvProcHandle;                rspMsg->msgHeader.recvProcType = tmsg->msgHeader.sendProcType;                rspMsg->msgHeader.recvProcHandle = tmsg->msgHeader.sendProcHandle;                rspMsg->msgHeader.sn = tmsg->msgHeader.sn;                // 没有TaskMgr,就回送一个错误信息                rspMsg->msgHeader.retCode = ERR_TCOM_NOTFOUNDTASKMGR;                this->write_message(rspMsg, sizeof(TMessage));                mb->release();            }        }    }    ACE_Message_Block *nmb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN);    if (this->m_reader.read(*nmb, nmb->space()) != 0)    {        nmb->release();        delete this;        return;    }}//异步写完成后会调用此函数void TClientHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){    iLog.info("Handle result is %d", result.success());    ACE_Message_Block &mb = result.message_block();    iLog.info("Write result=%ld, trans=%d, write=%d, len=%d", result.error(), result.bytes_transferred(),        result.bytes_to_write(), mb.length());    TMessage *tmsg = (TMessage *)mb.base();    // 记录一下发送消息日志    writeLog(tmsg);    delete tmsg;    mb.release();    return;}int TClientHandler::write_message(TMessage *tmsg, unsigned short msgLength){    if (NULL == tmsg)    {        iLog.error("[TClientHandler::write_message] tmsg is NULL.");        return -1;    }    ACE_Message_Block *mbb = new ACE_Message_Block((const char *)tmsg, msgLength);    // 要设置一下wr_ptr的指针,要不mbb->length()返回的会是0    mbb->wr_ptr(msgLength);    int retCode = this->m_writer.write(*mbb, mbb->length());    if (retCode != 0)    {        iLog.error("Send msg failure, lasterror=%d, length=%d, retCode=%d", ACE_OS::last_error(), mbb->length(), retCode);        delete tmsg;        mbb->release();        delete this;    }    return retCode;}


写一个循环不间断调用write_message发送数据,会看到发送的数据,会用一个TCP报文发出去,不知道有没有人遇到相同的问题?

热点排行