当前位置: 首页 > article >正文

单机无锁线程安全队列-Disruptor

Disruptor

1、基本介绍

说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。
而Disruptor是一个线程安全、低延迟、吞吐量高的队列,并且解决BlockingQueue加锁带来的性能下降问题,十分适合单机使用。
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。

2、与BlockingQueue对比

  1. 使用CAS代替锁
  2. 多播模式,同一事件可以交给多个消费者处理
  3. 基于环形数组RingBuffer,创建时就固定长度,不出现空间新分配情况,减少垃圾回收

这是官网与BlockingQueue对比的延迟直方图,可以看出,BlockingQueue出现延迟的机率比Disruptor高得多。

img.png

3、生产者消费者模式

在Disruptor中,生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现:

引入最新包

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>4.0.0</version>
        </dependency>

定义一个商品

@Data
public class Goods {

    private String name;

}

定义生产者

public class Producer {
    private final RingBuffer<Goods> ringBuffer;

    public Producer(RingBuffer<Goods> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    /**
     * 生产货品
     * @param goodsName
     */
    public void onData(String goodsName) {
        long sequence = ringBuffer.next();
        try {
            Goods goods = ringBuffer.get(sequence);
            goods.setName(goodsName);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

定义消费者

@Data
public class Consumer implements EventHandler<Goods>{

    private String name;

    public Consumer(String name){
        this.name = name;
    }

    @Override
    public void onEvent(Goods goods, long l, boolean b)  {
        //消费者接收到货品
        System.out.println(name+"消费了"+goods.getName());
    }

    @Override
    public void onBatchStart(long batchSize, long queueDepth) {
        EventHandler.super.onBatchStart(batchSize, queueDepth);
    }

    @Override
    public void onStart() {
        EventHandler.super.onStart();
    }

    @Override
    public void onShutdown() {
        EventHandler.super.onShutdown();
    }

    @Override
    public void onTimeout(long sequence) throws Exception {
        EventHandler.super.onTimeout(sequence);
    }

    @Override
    public void setSequenceCallback(Sequence sequenceCallback) {
        EventHandler.super.setSequenceCallback(sequenceCallback);
    }
}

一个生产者对一个消费者

img_1.png

public class DisruptorDemo {
    
    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.SINGLE,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        //单生产者,单消费者
        disruptor.handleEventsWith(new Consumer("Consumer1"));
        disruptor.start();
        Producer producer = new Producer(ringBuffer);
        while (true){
            producer.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

一个生产者对多个消费者

消费者按顺序消费:

img_2.png

public class DisruptorDemo {
    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.MULTI,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        //多个消费者按顺序消费
        disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));
        disruptor.start();
        Producer producer = new Producer(ringBuffer);
        while (true){
            producer.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

多播模式,同一事件可以交给多个消费者处理

img_4.png
只需要将上述代码修改一下即可

   //Consumer1、Consumer2、Consumer3先消费,Consumer4后消费
   disruptor.handleEventsWith(new Consumer("Consumer1"),new Consumer("Consumer2"),new Consumer("Consumer3"))
   .then(new Consumer("Consumer4"));

多个生产者对多个消费者

img_5.png

public class DisruptorDemo {

    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.MULTI,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));
        disruptor.start();
        Producer producer1 = new Producer(ringBuffer);
        Producer producer2 = new Producer(ringBuffer);
        Producer producer3 = new Producer(ringBuffer);
        while (true){
            producer1.onData("goods"+UUID.randomUUID());
            producer2.onData("goods"+UUID.randomUUID());
            producer3.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

除了上述多播模式中多个消费者各自处理事件(一个event事件会同时被多个消费者处理),其实还有Disruptor另一种模式:多个消费者合作处理一批事件(一个event事件会被其中一个消费者处理),由Disruptor 的 WorkPool 支持,不过在4.0中已经被去除了

img_8.png
看了github的issue,作者大概意思说难以维护,并且在LMAX公司也不会用到WorkPool,所以就去除了。

img_9.png

img_10.png

4、RingBuffer原理

Disruptor内部由环形数组Ring Buffer(数组必须为2的n次方)。

image.png
1、Ring Buffer使用环形数组,有效避免线性数组index越界问题,而且数组内元素的内存地址是连续的,对CPU缓存友好,在硬件级别,数组中的元素是会被预加载的,所以RingBuffer中,CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动,可以实现数据在数组中的环形存取。
2、在多生产者场景下,多个生产者会进行竞争,防止读到还未写的元素。引入了一个与Ring Buffer大小相同的buffer:available Buffer,用来判断Ring Buffer某个元素是否已经就绪。
3、为什么available Buffer也做成圈呢?这样做是防止把上一轮的数据当成这一轮的数据,错误判断Ring Buffer元素可用。
4、为什么Ring Buffer要2的n次方,因为会涉及到二进制&运算,来算出元素位置,在源码中可以找到。

img_11.png
5、具体RingBuffer写数据和读数据流程,可以参考美团技术博客:https://tech.meituan.com/2016/11/18/disruptor.html

5、等待策略

生产者和消费者都可能出现速度过快的情况,比如队列满了,生产者需要等待消费者消费后才能生产,或者消费者消费过快导致队列为空,进而需要等待生产者生产。
Disruptor目前一共内置了8种等待策略。

img_7.png

  1. BlockingWaitStrategy:用了ReentrantLock的等待唤醒机制实现等待逻辑,是默认策略,对CPU的消耗最小
  2. BusySpinWaitStrategy: 持续自旋,会消耗大量CPU资源
  3. LiteBlockingWaitStrategy: 基于BlockingWaitStrategy,非重入锁的阻塞等待策略,在没有锁竞争的时候会省去唤醒操作
  4. TimeoutBlockingWaitStrategy: 超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出
  5. LiteTimeoutBlockingWaitStrategy: 基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
  6. SleepingWaitStrategy: 三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的睡眠
  7. YieldingWaitStrategy: 二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  8. PhasedBackoffWaitStrategy: 四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

6、结束

Disruptor简单的介绍已经结束了,点个赞再走啦!~


http://www.kler.cn/a/159008.html

相关文章:

  • Vue通过file控件上传文件到Node服务器
  • Win10/11 安装使用 Neo4j Community Edition
  • 问:说说SpringDAO及ORM的用法?
  • 录的视频怎么消除杂音?从录制到后期的杂音消除攻略
  • vue3点击按钮el-dialog对话框不显示问题
  • 【Electron】Electron Forge如何支持Element plus?
  • Django回顾6
  • Perl | Multi-line Strings | Here Document
  • 十种接口安全方案!!!
  • 解密IIS服务器API跨域问题的终极解决方案
  • CENTOS 7 添加黑名单禁止IP访问服务器
  • 云计算与低代码:加速应用开发与创新的双核引擎
  • CAD画图-模型和布局区别,视图命令MV使用(用于局部放大显示)
  • 【ArcGIS Pro】探索性插值无法覆盖所需shp范围
  • python基于轻量级卷积神经网络模型ShuffleNetv2开发构建辣椒病虫害图像识别系统
  • Landsat 5 C02数据集2007-2011年
  • 通俗讲解分布式锁:场景和使用方法
  • Python---魔术方法
  • 在AWS EC2中部署和使用Apache Superset的方案
  • 【开源】基于JAVA的校园疫情防控管理系统
  • JeecgBoot 框架升级 Spring Boot 3.1.5
  • ifstream读取txt中的中文数据转成QString出现乱码
  • ArcGIS点集之间两两连线
  • CMake中的include函数
  • vue3项目实现文档 JSON 格式和 Excel 表格的在线预览,(智能搜索,未验证)
  • 【前端】html不渲染换行\n\t\r等的问题