关于采用消息队列处理网络信息的问题,高分求解`````
为了提高服务端处理效率,我将收到的消息暂时保存由一个后台线程处理,现在碰到问题是 我使用了一个while循环读取共享的map类型数据,但程序启动后cpu 一直100%,如何实现一边接收 一边处理的这种方式呢?
[解决办法]
有数据时才读,其它时间等信号。
[解决办法]
to hehaheha(不笑)
知道重叠i/o和完成端口吧?投递i/o操作都需要一个结构OVERLAPPED,OVERLAPPED里边都有一个成员叫 HANDLE hEvent ,这个成员就是一个event,每次在操作系统完成i/o读写后,由操作系统setevent,用户线程通过GetOverlappedResult或者GetQueuedCompletionStatus(内部可能调用了)
等待这个事件
所以说每次setevent并不慢
[解决办法]
在循环时sleep吧
我以前好像也遇过这样的问题,需要sleep的
否则while始终占着cpu
[解决办法]
“程序启动后cpu 一直100%,如何实现一边接收 一边处理”
从楼主的描述,个人推断问题关键不在怎么等待网络事件通知,因为仅仅是网络处理不会繁忙到CPU一直100%的,我猜测堵塞是因为楼主的网络处理线程直接进行了比较耗时的业务处理操作吧?
网络处理线程的责任应该仅限向一个缓冲区的末尾增加新接收的数据,以及每次业务处理线程请求数据时从缓冲区头部开始分析包是否已接收完全,将完整包传递给业务处理线程。就这两个功能再别多做啦,否则使用线程就没意义。线程就是为了将不同类工作分开,使其有机会并行,比如网络收发、数据计算、IO操作、界面显示这些工作都是可以并行的,如果某线程调用的函数中连续执行这些不同类的操作,那么这个线程肯定磕磕绊绊跑不动,不是sleep不sleep的问题
[解决办法]
俺给你个俺写的消息队列参考下吧,其中Mutex和Event的具体实现就不提供了,自己封装个吧
//.h
class FoundationAPI TSDataPriorityQueue
{
private:
struct WaitInfo
{
void*pt;
Eventm_event;
WaitInfo() : m_event(false)
{
}
};
class PriorityInfo
{
public:
size_tm_priority;
void*m_ptr;
public:
PriorityInfo();
PriorityInfo(const PriorityInfo &other);
PriorityInfo& operator=(const PriorityInfo &other);
bool operator <(const PriorityInfo &other)const;
bool operator == (const PriorityInfo &other)const;
bool operator > (const PriorityInfo &other)const;
};
typedef std::priority_queue <PriorityInfo> DataQueue;
typedef std::deque <WaitInfo*> WaitQueue;
typedef FastMutex::ScopeLockFMLock;
private:
DataQueuem_queue;
WaitQueuem_wait_queue;
mutable ThreadSpace::FastMutexm_mutex;
private:
void* dequeue();
public:
void Enqueue(const void *ptr, size_t level);
void* WaitDequeue(t_ulong milliseconds);
void* WaitDequeue();//注意,也可能传送出0,因为WakeUpAll();
void* PeekQueue()const;
void WakeUpAll();
bool HasIdleThread()const;
bool IsEmpty()const;
size_t Size()const;
void Clear();
TSDataPriorityQueue();
~TSDataPriorityQueue();
};
//.cpp
TSDataPriorityQueue::PriorityInfo::PriorityInfo() : m_priority(LOW), m_ptr(0)
{
}
TSDataPriorityQueue::PriorityInfo::PriorityInfo(const PriorityInfo &other)
{
m_priority = other.m_priority;
m_ptr = other.m_ptr;
}
TSDataPriorityQueue::PriorityInfo& TSDataPriorityQueue::PriorityInfo::operator=(const PriorityInfo &other)
{
if(this != &other)
{
m_priority = other.m_priority;
m_ptr = other.m_ptr;
}
return *this;
}
bool TSDataPriorityQueue::PriorityInfo::operator <(const PriorityInfo &other)const
{
return (m_priority < other.m_priority);
}
bool TSDataPriorityQueue::PriorityInfo::operator == (const PriorityInfo &other)const
{
return (m_priority == other.m_priority);
}
bool TSDataPriorityQueue::PriorityInfo::operator > (const PriorityInfo &other)const
{
return (m_priority > other.m_priority);
}
void TSDataPriorityQueue::WakeUpAll()
{
FMLock lock(m_mutex);
for(WaitQueue::iterator it = m_wait_queue.begin(); it != m_wait_queue.end(); ++it)
{
(*it)-> pt = 0;
(*it)-> m_event.Set();
}
m_wait_queue.clear();
}
void* TSDataPriorityQueue::dequeue()
{
FMLock lock(m_mutex);
void *ptr = 0;
if(!m_queue.empty())
{
PriorityInfo info = m_queue.top();
ptr = info.m_ptr;
m_queue.pop();
}
return ptr;
}
void TSDataPriorityQueue::Enqueue(const void *ptr, size_t level)
{
FMLock lock(m_mutex);
if(m_wait_queue.empty())
{
PriorityInfo info;
info.m_priority = level;
info.m_ptr = const_cast <void*> (ptr);
m_queue.push(info);
}else
{
WaitInfo *pwi = m_wait_queue.front();
m_wait_queue.pop_front();
pwi-> pt = const_cast <void*> (ptr);
pwi-> m_event.Set();
}
}
void* TSDataPriorityQueue::PeekQueue()const
{
FMLock lock(m_mutex);
void *ptr = 0;
if(!m_queue.empty())
{
PriorityInfo info = m_queue.top();
ptr = info.m_ptr;
}
return ptr;
}
void* TSDataPriorityQueue::WaitDequeue(t_ulong milliseconds)
{
void* ptr = 0;
WaitInfo *pwi = 0;
{
FMLock lock(m_mutex);
ptr = dequeue();
if(ptr != NULL)
{
return ptr;
}
pwi = new WaitInfo;
pwi-> pt = 0;
m_wait_queue.push_back(pwi);
}
if(pwi-> m_event.Wait(milliseconds))
{
ptr = pwi-> pt;
}
{
FMLock lock(m_mutex);
for(WaitQueue::iterator it = m_wait_queue.begin(); it != m_wait_queue.end(); ++it)
{
if(pwi == (*it))
m_wait_queue.erase(it);
break;
}
}
delete pwi;
return ptr;
}
void* TSDataPriorityQueue::WaitDequeue()
{
return WaitDequeue(INFINITE);
}
bool TSDataPriorityQueue::HasIdleThread()const
{
FMLock lock(m_mutex);
return (m_wait_queue.size() != 0);
}
bool TSDataPriorityQueue::IsEmpty()const
{
FMLock lock(m_mutex);
return m_queue.empty();
}
size_t TSDataPriorityQueue::Size()const
{
FMLock lock(m_mutex);
return m_queue.size();
}
void TSDataPriorityQueue::Clear()
{
FMLock lock(m_mutex);
while(!m_queue.empty())
{
m_queue.pop();
}
}
TSDataPriorityQueue::TSDataPriorityQueue() { }
TSDataPriorityQueue::~TSDataPriorityQueue() { }