首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > C语言 >

终极挑战:一读一写情况下,无锁队列的实现,该怎么解决

2012-03-22 
终极挑战:一读一写情况下,无锁队列的实现如题,需要考虑多核。[解决办法]问题大了去了。dequeue函数中:tmp

终极挑战:一读一写情况下,无锁队列的实现
如题,需要考虑多核。

[解决办法]
问题大了去了。

dequeue函数中:
tmp = queue->head ;
if(queue->head->next == NULL)
{
//在这里发生线程切换,此时假设队列中有一个节点。那么head和tail均不为空,且指向同一个节点,
//并且head->next为空。这时候另一个线程调用enqueue函数,插入了一个节点,所以此时队列中有2个节点。
//接着线程切换回来,因为已经判断过是否head->next为NULL,所以下面的动作导致丢了节点。
queue->head = queue->tail = NULL ;
return tmp ;
}

[解决办法]
[code=C/C++][/code]
typedef struct _qnode_st qnode_t ;
struct _qnode_st{
qnode_t *next ;
} ;

typedef struct _queue_st queue_t ;
struct _queue_st{
qnode_t *head ;
qnode_t *tail ;
} ;

int enqueue(volatile queue_t *queue , qnode_t *node)
{
qnode_t *tmp ;

node->next = NULL ;
//1、保存尾部指针
tmp = queue->tail ;

//2、对尾部赋值
queue->tail = node ;

//3、如果头部为空,既队列为空,那么对头部赋值,否则挂接队列尾部
if(queue->head == NULL)
queue->head = node ;
else
tmp->next = node ;

return 0 ;
}

qnode_t *dequeue(volatile queue_t *queue)
{
qnode_t *tmp ;

//1、如果头部为空,那么队列为空
if(queue->head == NULL)
return NULL ;
//2、保存头部节点
tmp = queue->head ;

//3、如果只有一个节点,那么清除队列
if(queue->head->next == NULL)
{
queue->head = queue->tail = NULL ; ===========>>>>此条语句执行前,先执行了 enqueue , 那么就会把 enqueue执行的数据给丢弃了........ 加锁无后果之忧
return tmp ;
}

//4、否则将下个节点放置在头部
queue->head = queue->head->next ;
return tmp ;
}

[解决办法]
http://erdani.org/publications/cuj-2004-10.pdf
看了这个再来。。
or, this one.
http://www.audiomulch.com/~rossb/code/lockfree/liblfds/index.htm
[解决办法]
看看这样行不行,
1 入队线程按常规进行。
2 出队的线程去检查头结点标记:
2.1 如果已经标记过的节点,并且不是最后一个,就直接扔掉,并清除标记,goto 2
2.2 如果已经标记过,但是是最后一个节点,就返回空
2.3 如果没有标记过,并且是最后一个节点,不修改队列,在独享一块内存中标记一下, 然后返回一个这个节点的一个副本
2.4 如果没有标记过,并且不是最后一个节点,按常规进行

[解决办法]
单生产/单消费模式的共享队列是不需要加锁同步的。
下面是以前写的“线程安全的共享缓冲队列”实现部分。

C/C++ code
#include <stdio.h>#include <pthread.h>#include <unistd.h>typedef int                V_INT32;typedef unsigned int        V_UINT32;struct VQueueIdx{    VQueueIdx(): idx_new( 0 ), idx_old( 0 )    {}    void  resetInfo()    {        idx_new = 0;        idx_old = 0;    }        bool isEmpty() const    {        return ( idx_new == idx_old );    }    V_INT32 idx_new;    V_INT32 idx_old;};template <class Type, const int QLen>class VQueueTemp{public:        //从队列中取出    bool pickFromQueue( Type & save_item );    //存入队列    bool sendToQueue( const Type & new_item );    void resetQueue()        {        queue_idx.resetInfo();        }        //查询队列    const Type * lookupQueue( int idx ) const;        bool isQueueEmpty() const    {        return queue_idx.isEmpty();    }        bool isQueueFull() const    {        return ( queue_idx.idx_new + 1 ) % QLen == queue_idx.idx_old;    }    private:    void decQueue();    void incQueue();        bool readQueueItem( Type & save_item )  const;    bool writeQueueItem( const Type & new_item );    int queueItems() const    {        return  (queue_idx.idx_new - queue_idx.idx_old + QLen)%QLen;    }        Type   msg_queue[QLen];        VQueueIdx queue_idx;    };template <class Type, const int QLen> inline void VQueueTemp<Type, QLen>::decQueue(){    int idx = queue_idx.idx_old;    if ( ++idx >= QLen )    {        idx = 0;    }    queue_idx.idx_old = idx;}template <class Type, const int QLen> inline void VQueueTemp<Type, QLen>::incQueue(){    int idx = queue_idx.idx_new;    if ( ++idx >= QLen )    {        idx = 0;    }    queue_idx.idx_new = idx;}template <class Type, const int QLen> inline bool VQueueTemp<Type, QLen>::sendToQueue( const Type & new_item ){        if ( writeQueueItem(new_item) )    {            incQueue();            return true;    }    return false;}template <class Type, const int QLen> inline bool VQueueTemp<Type, QLen>::pickFromQueue( Type & save_item ){    if ( readQueueItem( save_item ) )    {        decQueue();            return true;    }        return false;    }template <class Type, const int QLen> inline bool VQueueTemp<Type, QLen>::readQueueItem( Type & save_item ) const{    if( !isQueueEmpty() )        {            save_item = msg_queue[queue_idx.idx_old];        return true;    }        return false;}template <class Type, const int QLen> inline bool VQueueTemp<Type, QLen>::writeQueueItem( const Type & new_item ){    if ( !isQueueFull() )    {            msg_queue[queue_idx.idx_new] = new_item;        return true;    }        return false;}template <class Type, const int QLen> inline const Type * VQueueTemp<Type, QLen>::lookupQueue( int idx ) const{    if ( queueItems() > idx )    {                return &msg_queue[(queue_idx.idx_old+idx)%QLen];    }    return NULL;}    }        Type   msg_queue[QLen];        VQueueIdx queue_idx;    };template <class Type, const int QLen> inline void VQueueTemp<Type, QLen>::decQueue(){    int idx = queue_idx.idx_old;    if ( ++idx >= QLen )    {        idx = 0;    }    queue_idx.idx_old = idx;}template <class Type, const int QLen> inline void VQueueTemp<Type, QLen>::incQueue(){    int idx = queue_idx.idx_new;    if ( ++idx >= QLen )    {        idx = 0;    }    queue_idx.idx_new = idx;}template <class Type, const int QLen> inline bool VQueueTemp<Type, QLen>::sendToQueue( const Type & new_item ){        if ( writeQueueItem(new_item) )    {            incQueue();            return true;    }    return false;}template <class Type, const int QLen> inline bool VQueueTemp<Type, QLen>::pickFromQueue( Type & save_item ){    if ( readQueueItem( save_item ) )    {        decQueue();            return true;    }        return false;    }template <class Type, const int QLen> inline bool VQueueTemp<Type, QLen>::readQueueItem( Type & save_item ) const{    if( !isQueueEmpty() )        {            save_item = msg_queue[queue_idx.idx_old];        return true;    }        return false;}template <class Type, const int QLen> inline bool VQueueTemp<Type, QLen>::writeQueueItem( const Type & new_item ){    if ( !isQueueFull() )    {            msg_queue[queue_idx.idx_new] = new_item;        return true;    }        return false;}template <class Type, const int QLen> inline const Type * VQueueTemp<Type, QLen>::lookupQueue( int idx ) const{    if ( queueItems() > idx )    {                return &msg_queue[(queue_idx.idx_old+idx)%QLen];    }    return NULL;} 


[解决办法]
很简单的方案
一个无符号整数in_elems保存进队列元素的总数
一个无符号整数out_elems保存出队列元素的总素

判断队列中是否有新数据
(has_elems = in_elems - out_elems) > 0

读线程每次只负责读取has_elems个元素,并将out_elems += has_elems

写队列每次在队列下增加n个元素,并将in_elems += n

原因:has_elems的操作事实上被序列化了,不存在竞争关系

缺点:不断的判断队列中是否有新数据可能会降低性能,如果读线程比写线程慢很多,in_elems可能被溢出过2次导致计算不正确,最好用long来保存这两个值
附加解决方案:最好能用个condition在进队列时激活一下,在判断队列为空时在condition上等待

热点排行