前段时间闲得蛋疼就尝试翻译了一下有关Disruptor的一些文章,第一次做这事,烂得自己也不忍回头去看了。。。
今天写日志看到那几篇文章,于是想写一个简单的例子,好让一些对Disruptor有兴趣但不是很明白的人更快的了解Disruptor的基本用法。
Disruptor使用起来非常简单,初始化->启动消费者线程,然后每当生产者产生资源就往disruptor里放。
下面是一个接收短信状态报告的例子。
首先自定义一个Event类:
1 public class DeliveryReportEvent { 2 private DeliveryReport deliveryReport; 3 4 public DeliveryReport getDeliveryReport() { 5 return deliveryReport; 6 } 7 8 public void setDeliveryReport(DeliveryReport deliveryReport) { 9 this.deliveryReport = deliveryReport;10 }11 12 13 public final static EventFactoryEVENT_FACTORY = new EventFactory () {14 public DeliveryReportEvent newInstance() {15 return new DeliveryReportEvent();16 }17 };18 }
代码很简单,就是用来存放短信状态报告DeliveryReport类的,外加一个EventFactory,用来生成Event,也很简单。
然后写一个EventHandler类,用来处理Event,
1 public class DeliveryReportEventHandler implements EventHandler{2 // private static DeliveryReportRepository repository = new DeliveryReportRepository();3 public void onEvent(DeliveryReportEvent event, long sequence,4 boolean endOfBatch) throws Exception {5 System.out.println(event.getDeliveryReport().getMessageId());6 // repository.updateDeliveryReport(event.getDeliveryReport());7 }8 }
三个参数,event就是生产者向Disruptor发布资源生产的事件,sequence是这个事件在ringbuffer中的序列号,endOfBatch指明该事件是不是ringbuffer中的最后一个事件。
在onEvent方法里处理消费者要做的事。本例直接打印状态报告ID或是更新数据库中的状态报告。
在这里我写了一个辅助类:
1 public class DisruptorHelper { 2 /** 3 * ringbuffer容量,最好是2的N次方 4 */ 5 private static final int BUFFER_SIZE = 1024 * 8; 6 private RingBufferringBuffer; 7 private SequenceBarrier sequenceBarrier; 8 private DeliveryReportEventHandler handler; 9 private BatchEventProcessor batchEventProcessor;10 private static DisruptorHelper instance;11 private static boolean inited = false;12 private DisruptorHelper(){13 ringBuffer = new RingBuffer (14 DeliveryReportEvent.EVENT_FACTORY, new SingleThreadedClaimStrategy(15 BUFFER_SIZE), new YieldingWaitStrategy());16 sequenceBarrier = ringBuffer.newBarrier();17 handler = new DeliveryReportEventHandler();18 batchEventProcessor = new BatchEventProcessor (19 ringBuffer, sequenceBarrier, handler);20 ringBuffer.setGatingSequences(batchEventProcessor21 .getSequence());22 }23 24 public static void initAndStart(){25 instance = new DisruptorHelper();26 new Thread(instance.batchEventProcessor).start();27 inited = true;28 }29 30 public static void shutdown(){31 if(!inited){32 throw new RuntimeException("Disruptor还没有初始化!");33 }34 instance.shutdown0();35 }36 37 private void shutdown0(){38 batchEventProcessor.halt();39 }40 private void produce0(DeliveryReport deliveryReport) {41 //获取下一个序列号42 long sequence = ringBuffer.next();43 //将状态报告存入ringBuffer的该序列号中44 ringBuffer.get(sequence).setDeliveryReport(deliveryReport);45 //通知消费者该资源可以消费46 ringBuffer.publish(sequence);47 }48 49 /**50 * 将状态报告放入资源队列,等待处理51 * @param deliveryReport52 */53 public static void produce(DeliveryReport deliveryReport) {54 if(!inited){55 throw new RuntimeException("Disruptor还没有初始化!");56 }57 instance.produce0(deliveryReport);58 }59 }
哈哈,因为本例的initAndStart/shutdown方法是在一个loadonstartup的servlet中的init/destory方法里调用,所以就不加synchronized了,执行前也不判断inited是否为true了。
最后是生产者调用的方法,在接收短信状态报告的webservice里:
1 public void NotifySmsDeliveryReport(DeliveryReport deliveryReport) {2 ...3 ...4 //向disruptor发布资源5 DisruptorHelper.produce(deliveryReport);6 }
关于disruptor的性能,可以去官网查看测试数据。也可以从检出代码,自己运行一下src/perf里的性能测试方法,
本例用到的包只有一个disruptor-2.7.1.jar,
<dependency>
<groupId>com.googlecode.disruptor</groupId> <artifactId>disruptor</artifactId><version>2.7.1</version>
</dependency>