首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

Disruptor3.2官方例证测试

2013-12-11 
Disruptor3.2官方例子测试事件对象:/** * POJO * @author lenovoe * */public class ValueEvent {private

Disruptor3.2官方例子测试
事件对象:
/**
* POJO
* @author lenovoe
*
*/
public class ValueEvent {
private long value;

    public long getValue()
    {
        return value;
    }
    public void setValue(final long value)
    {
        this.value = value;
    }
    public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()
    {
        public ValueEvent newInstance()
        {
            return new ValueEvent();
        }
    };
}

事件生产者:
/**
* 生产者
* @author lenovoe
*
*/
public class Producer implements Runnable{
private RingBuffer<ValueEvent> ringBuffer = null;
public Producer(RingBuffer<ValueEvent> rb) {
ringBuffer = rb;
}
public void run() {
// Publishers claim events in sequence
long sequence = ringBuffer.next();
ValueEvent event = ringBuffer.get(sequence);
event.setValue(1234); // this could be more complex with multiple fields
// make the event available to EventProcessors
ringBuffer.publish(sequence); 
}
}

事件消费者:
/**
* 处理RingBuffer中的事件对象
* @author lenovoe
*
*/
public class ConsumeEventHandler implements EventHandler<ValueEvent>,LifecycleAware{
public void onEvent(ValueEvent event, long sequence, boolean endOfBatch) throws Exception {
// TODO Auto-generated method stub
System.out.println("处理事件对象:"+event.getValue());
//Thread.sleep(2000);
}

public void onStart() {
// TODO Auto-generated method stub
System.out.println("开始处理事件");
}

public void onShutdown() {
// TODO Auto-generated method stub
System.out.println("结束处理事件");
}
}

测试类:
/**
* 测试类
* @author lenovoe
*
*/
public class TestShow {
//number of elements to create within the ring buffer
private static final int BUFFER_SIZE = 16;
//JDK 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
private final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
//单生产者,策略使用YieldingWaitStrategy
private final RingBuffer<ValueEvent> ringBuffer = RingBuffer.create(ProducerType.SINGLE, ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
//游标
private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//消费者
private final ConsumeEventHandler handler = new ConsumeEventHandler();
private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(
ringBuffer, sequenceBarrier, handler);

public TestShow() {
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
//2X版本:ringBuffer.setGatingSequences(batchEventProcessor.getSequence());
}

public void consume() {
EXECUTOR.submit(batchEventProcessor);
}
public void produce() {
new Thread(new Producer(ringBuffer)).start();
}
public void shutdown() {
EXECUTOR.shutdown();
}

public static void main(String[] args) throws InterruptedException {
TestShow test = new TestShow();
test.produce();
test.produce();
test.produce();
test.consume();
test.shutdown();
Thread.sleep(5000);
System.exit(0);
}

}

热点排行