首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

Disruptor 源码分析(四) Disruptor的消费者

2013-10-11 
Disruptor 源码分析(4) Disruptor的消费者ConsumerRepository是消费者的入口ConsumerInfo中保存了一个消费

Disruptor 源码分析(4) Disruptor的消费者

ConsumerRepository是消费者的入口

ConsumerInfo中保存了一个消费者的所有的信息.
ConsumerRepository的consumerInfos字段是个ConsumerInfo数组,保存了全部消费者的信息.

ConsumerInfo中有个SequenceBarrier字段,这个是用来获取生产者的位置信息的.

ConsumerInfo中的EventProcessor是真正的消费者.
有人会说我自己写的是EventHandler作为消费者啊,其实EventProcessor是做了封装
EventProcessor中有个字段就是eventHandler.

如果对Disruptor只传入EventHandler的话:

Disruptor类中:

public EventHandlerGroup<T> handleEventsWith(final EventHandler<T>... handlers)    {        return createEventProcessors(new Sequence[0], handlers);    } Disruptor类中: EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,                                               final EventHandler<T>[] eventHandlers)    {                  final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);


可以看到默认的EventProcessor是BatchEventProcessor.

BatchEventProcessor类:

  private final DataProvider<T> dataProvider;    private final SequenceBarrier sequenceBarrier;    private final EventHandler<T> eventHandler;


dataProvider其实就是ringBuffer,用来获取ringBuffer那个位置的Event
sequenceBarrier用来获取生产者的位置信息
eventHandler是具体处理Event的策略,这个是在Disruptor的handleEventsWith方法中传进来的.

消费者的实现.
BatchEventProcessor类是实现了Runnable接口的.所以每一个消费者都是可以作为一个线程来跑的.
这也是 Disruptor的构造类传入一个线程池的原因,线程池就是用来跑消费者的.


@Override    public void run()    {        if (!running.compareAndSet(false, true))        {            throw new IllegalStateException("Thread is already running");        }        sequenceBarrier.clearAlert();        notifyStart();        T event = null;        long nextSequence = sequence.get() + 1L;        try        {            while (true)            {                try                {                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);                    if (nextSequence > availableSequence)                    {                        Thread.yield();                    }                    while (nextSequence <= availableSequence)                    {                        event = dataProvider.get(nextSequence);                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);                        nextSequence++;                    }                    sequence.set(availableSequence);                }                catch (final TimeoutException e)                {                    notifyTimeout(sequence.get());                }                catch (final AlertException ex)                {                    if (!running.get())                    {                        break;                    }                }                catch (final Throwable ex)                {                    exceptionHandler.handleEventException(ex, nextSequence, event);                    sequence.set(nextSequence);                    nextSequence++;                }            }        }        finally        {            notifyShutdown();            running.set(false);        }    }



run方法看起来很长,其实主要就是这个:
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
这个是获取生产者的位置的

if (nextSequence > availableSequence) 表示消费者超过生产者,这个是不允许的.
这里的操作是Thread.yield();不同的EventProcessor这里采取的策略也会不一样.
while (true) 可以看到在不成功的状态下会不断去检查生产者的信息.

waitFor的实现是waitStrategy.waitFor来实现的,waitStrategy有很多中不同的策略,默认是BlockingWaitStrategy.
BlockingWaitStrategy的实现中用到了lock.lock();意思就是同时只允许一个个消费者排队去抢,下一个消费者要等待上一个消费者处理完一个之后才能抢.

在SequenceBarrie的waitFor的最后一步,会通过getHighestPublishedSequence来检查生产者的位置信息的标志是否正常.这个是和生产者的publish方法联系起来的.

                 


热点排行