单机无锁线程安全队列-Disruptor
Disruptor
1、基本介绍
说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。
而Disruptor是一个线程安全、低延迟、吞吐量高的队列,并且解决BlockingQueue加锁带来的性能下降问题,十分适合单机使用。
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。
2、与BlockingQueue对比
- 使用CAS代替锁
- 多播模式,同一事件可以交给多个消费者处理
- 基于环形数组RingBuffer,创建时就固定长度,不出现空间新分配情况,减少垃圾回收
这是官网与BlockingQueue对比的延迟直方图,可以看出,BlockingQueue出现延迟的机率比Disruptor高得多。
3、生产者消费者模式
在Disruptor中,生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现:
引入最新包
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>4.0.0</version>
</dependency>
定义一个商品
@Data
public class Goods {
private String name;
}
定义生产者
public class Producer {
private final RingBuffer<Goods> ringBuffer;
public Producer(RingBuffer<Goods> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* 生产货品
* @param goodsName
*/
public void onData(String goodsName) {
long sequence = ringBuffer.next();
try {
Goods goods = ringBuffer.get(sequence);
goods.setName(goodsName);
} finally {
ringBuffer.publish(sequence);
}
}
}
定义消费者
@Data
public class Consumer implements EventHandler<Goods>{
private String name;
public Consumer(String name){
this.name = name;
}
@Override
public void onEvent(Goods goods, long l, boolean b) {
//消费者接收到货品
System.out.println(name+"消费了"+goods.getName());
}
@Override
public void onBatchStart(long batchSize, long queueDepth) {
EventHandler.super.onBatchStart(batchSize, queueDepth);
}
@Override
public void onStart() {
EventHandler.super.onStart();
}
@Override
public void onShutdown() {
EventHandler.super.onShutdown();
}
@Override
public void onTimeout(long sequence) throws Exception {
EventHandler.super.onTimeout(sequence);
}
@Override
public void setSequenceCallback(Sequence sequenceCallback) {
EventHandler.super.setSequenceCallback(sequenceCallback);
}
}
一个生产者对一个消费者
public class DisruptorDemo {
public static void main(String[] args) throws InterruptedException {
Disruptor<Goods> disruptor = new Disruptor<>(
Goods::new,
16, // RingBuffer 大小,必须是 2 的 N 次方
Executors.defaultThreadFactory(), //线程池
ProducerType.SINGLE, //指定单生产者还是多生产者
new YieldingWaitStrategy() //等待策略
);
RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
//单生产者,单消费者
disruptor.handleEventsWith(new Consumer("Consumer1"));
disruptor.start();
Producer producer = new Producer(ringBuffer);
while (true){
producer.onData("goods"+UUID.randomUUID());
Thread.sleep(1000);
}
}
}
一个生产者对多个消费者
消费者按顺序消费:
public class DisruptorDemo {
public static void main(String[] args) throws InterruptedException {
Disruptor<Goods> disruptor = new Disruptor<>(
Goods::new,
16, // RingBuffer 大小,必须是 2 的 N 次方
Executors.defaultThreadFactory(), //线程池
ProducerType.MULTI, //指定单生产者还是多生产者
new YieldingWaitStrategy() //等待策略
);
RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
//多个消费者按顺序消费
disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));
disruptor.start();
Producer producer = new Producer(ringBuffer);
while (true){
producer.onData("goods"+UUID.randomUUID());
Thread.sleep(1000);
}
}
}
多播模式,同一事件可以交给多个消费者处理
只需要将上述代码修改一下即可
//Consumer1、Consumer2、Consumer3先消费,Consumer4后消费
disruptor.handleEventsWith(new Consumer("Consumer1"),new Consumer("Consumer2"),new Consumer("Consumer3"))
.then(new Consumer("Consumer4"));
多个生产者对多个消费者
public class DisruptorDemo {
public static void main(String[] args) throws InterruptedException {
Disruptor<Goods> disruptor = new Disruptor<>(
Goods::new,
16, // RingBuffer 大小,必须是 2 的 N 次方
Executors.defaultThreadFactory(), //线程池
ProducerType.MULTI, //指定单生产者还是多生产者
new YieldingWaitStrategy() //等待策略
);
RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));
disruptor.start();
Producer producer1 = new Producer(ringBuffer);
Producer producer2 = new Producer(ringBuffer);
Producer producer3 = new Producer(ringBuffer);
while (true){
producer1.onData("goods"+UUID.randomUUID());
producer2.onData("goods"+UUID.randomUUID());
producer3.onData("goods"+UUID.randomUUID());
Thread.sleep(1000);
}
}
}
除了上述多播模式中多个消费者各自处理事件(一个event事件会同时被多个消费者处理),其实还有Disruptor另一种模式:多个消费者合作处理一批事件(一个event事件会被其中一个消费者处理),由Disruptor 的 WorkPool 支持,不过在4.0中已经被去除了
看了github的issue,作者大概意思说难以维护,并且在LMAX公司也不会用到WorkPool,所以就去除了。
4、RingBuffer原理
Disruptor内部由环形数组Ring Buffer(数组必须为2的n次方)。
1、Ring Buffer使用环形数组,有效避免线性数组index越界问题,而且数组内元素的内存地址是连续的,对CPU缓存友好,在硬件级别,数组中的元素是会被预加载的,所以RingBuffer中,CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动,可以实现数据在数组中的环形存取。
2、在多生产者场景下,多个生产者会进行竞争,防止读到还未写的元素。引入了一个与Ring Buffer大小相同的buffer:available Buffer,用来判断Ring Buffer某个元素是否已经就绪。
3、为什么available Buffer也做成圈呢?这样做是防止把上一轮的数据当成这一轮的数据,错误判断Ring Buffer元素可用。
4、为什么Ring Buffer要2的n次方,因为会涉及到二进制&运算,来算出元素位置,在源码中可以找到。
5、具体RingBuffer写数据和读数据流程,可以参考美团技术博客:https://tech.meituan.com/2016/11/18/disruptor.html
5、等待策略
生产者和消费者都可能出现速度过快的情况,比如队列满了,生产者需要等待消费者消费后才能生产,或者消费者消费过快导致队列为空,进而需要等待生产者生产。
Disruptor目前一共内置了8种等待策略。
- BlockingWaitStrategy:用了ReentrantLock的等待唤醒机制实现等待逻辑,是默认策略,对CPU的消耗最小
- BusySpinWaitStrategy: 持续自旋,会消耗大量CPU资源
- LiteBlockingWaitStrategy: 基于BlockingWaitStrategy,非重入锁的阻塞等待策略,在没有锁竞争的时候会省去唤醒操作
- TimeoutBlockingWaitStrategy: 超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出
- LiteTimeoutBlockingWaitStrategy: 基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
- SleepingWaitStrategy: 三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的睡眠
- YieldingWaitStrategy: 二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
- PhasedBackoffWaitStrategy: 四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个
6、结束
Disruptor简单的介绍已经结束了,点个赞再走啦!~