Disruptor 高性能并发框架
by 陈健森 at 2016.6
简述
Disruptor 是一个异步并发处理框架。是由 LMAX 公司开发的一款高效的无锁内存队列。它使用无锁的方式实现了一个环形队列,非常适合于实现生产者和消费者模式,比如事件和消息的发布。
Disruptor 最大特点是高性能,其 LMAX 架构可以获得每秒 6 百万订单,用 1 微秒的延迟获得吞吐量为 100K+。
RingBuffer
RingBuffer 是其核心,生产者向 RingBuffer 中写入元素,消费者从 RingBuffer 中消费元素。

随着你不停地填充这个 buffer(可能也会有相应的读取),这个序号会一直增长,直到绕过这个环。
槽的个数是 2 的 N 次方更有利于基于二进制的计算机进行计算。(注:2 的 N 次方换成二进制就是1000,100,10,1这样的数字, sequence & (array length-1) = array index,比如一共有 8 槽,3&(8-1)=3,HashMap 就是用这个方式来定位数组元素的,这种方式比取模的速度更快。)
会预先分配内存,可以做到完全的内存复用。在系统的运行过程中,不会有新的空间需要分配或者老的空间需要回收。因此,可以大大减少系统分配空间以及回收空间的额外开销。
简单例子
maven引入jar包
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.1</version>
</dependency>
创建数据类
//代表数据的类
public class PCData {
private int value;
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
}
工厂类,生产 PCData
//产生PCData的工厂类,它会在Disruptor系统初始化时,构造所有的缓冲区中的对象实例(预先分配空间)
public class PCDataFactory implements EventFactory<PCData> {
@Override
public PCData newInstance() {
return new PCData();
}
}
消费者
//消费者实现为WorkHandler接口,是Disruptor框架中的类
public class Consumer implements WorkHandler<PCData>{
//onEvent()方法是框架的回调用法
@Override
public void onEvent(PCData event) throws Exception {
System.out.println(Thread.currentThread().getId() + ":Event:--"
+ event.getValue()*event.getValue() + "--");
}
}
生产者
public class Producer {
//环形缓冲区,装载生产好的数据;
private final RingBuffer<PCData> ringBuffer;
public Producer(RingBuffer<PCData> ringBuffer) {
this.ringBuffer = ringBuffer;
}
//将数据推入到缓冲区的方法:将数据装载到ringBuffer
public void pushData(int value) {
long sequence = ringBuffer.next(); //获取下一个可用的序列号
try {
PCData event = ringBuffer.get(sequence);//通过序列号获取空闲可用的PCData
event.setValue(value); //设置数值
} finally {
ringBuffer.publish(sequence); //数据发布,只有发布后的数据才会真正被消费者看见
}
}
}
测试类
public class DisruptorTest {
@SuppressWarnings("unchecked")
public static void main(String[] args) throws InterruptedException {
Executor executor = Executors.newCachedThreadPool(); //创建线程池
PCDataFactory factory = new PCDataFactory();
int bufferSize = 32; //设置缓冲区的大小为32
//ProducerType.MULTI:创建一个环形缓冲区支持多事件发布到一个环形缓冲区
Disruptor<PCData> disruptor = new Disruptor<PCData>(factory, bufferSize, executor,
ProducerType.MULTI,new BlockingWaitStrategy());
//设置了三个消费者,系统会将每一个消费者实例映射到一个线程中,也就是这里提供了4个消费者线程
disruptor.handleEventsWithWorkerPool(new Consumer(),new Consumer(),new Consumer());
disruptor.start(); //启动并初始化disruptor
RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer(); //获取已经初始化好的ringBuffer
Producer producer = new Producer(ringBuffer);
for (int i = 0; i < 20 ; i++ ) {
producer.pushData(i); //存入数据
Thread.sleep(100);
System.out.println("add data " + i);
}
}
}
输出结果
9:Event:--0--
add data 0
10:Event:--1--
add data 1
11:Event:--4--
add data 2
9:Event:--9--
add data 3
...
根据 Disruptor 的官方报告,性能要比 BlockingQueue 至少高一个数量级以上。
提高消费者的相应时间:选择合适的策略
BlockingWaitStrategy:这是默认的策略。使用锁和条件进行数据的监控和线程的唤醒。因为涉及到线程的切换,是最节省 CPU,但在高并发下性能表现最糟糕的一种等待策略。
SleepingWaitStrategy:会自旋等待数据,如果不成功,才让出 cpu,最终进行线程休眠,以确保不占用太多的 CPU 数据,因此可能产生比较高的平均延时。比较适合对延时要求不高的场合,好处是对生产者线程的影响最小。典型的应用场景是异步日志。
YieldingWaitStrategy:用于低延时的场合。消费者线程不断循环监控缓冲区变化,在循环内部,会使用
Thread.yield()
让出 cpu 给别的线程执行时间。BusySpinWaitStrategy:开启的是一个死循环监控,消费者线程会尽最大努力监控缓冲区变化,因此, CPU 负担比较大
Last updated