Disruptor 源码分析(4) Disruptor的消费者
ConsumerRepository是消费者的入口
ConsumerInfo中保存了一个消费者的所有的信息.
ConsumerRepository的consumerInfos字段是个ConsumerInfo数组,保存了全部消费者的信息.
ConsumerInfo中有个SequenceBarrier字段,这个是用来获取生产者的位置信息的.
ConsumerInfo中的EventProcessor是真正的消费者.
有人会说我自己写的是EventHandler作为消费者啊,其实EventProcessor是做了封装
EventProcessor中有个字段就是eventHandler.
如果对Disruptor只传入EventHandler的话:
Disruptor类中:
public EventHandlerGroup<T> handleEventsWith(final EventHandler<T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } Disruptor类中: EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<T>[] eventHandlers) { final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
private final DataProvider<T> dataProvider; private final SequenceBarrier sequenceBarrier; private final EventHandler<T> eventHandler;
@Override public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); T event = null; long nextSequence = sequence.get() + 1L; try { while (true) { try { final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (nextSequence > availableSequence) { Thread.yield(); } while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } finally { notifyShutdown(); running.set(false); } }