单机最快的队列Disruptor解析和使用

前言

介绍高性能队列Disruptor原理以及使用例子。

Disruptor是什么?

Disruptor是外汇和加密货币交易所运营商 LMAX group 建立高性能的金融交易所的结果。用于解决生产者、消费者及其数据存储的设计问题的高性能队列实现。可以对标JDK中的ArrayBlockingQueue。是目前单机且基于内存存储的最高性能的队列实现。见 与ArrayBlockingQueue性能对比。

Disruptor高性能秘诀

使用CAS代替锁

锁非常昂贵,因为它们在竞争时需要仲裁。这种仲裁是通过到操作系统内核的上下文切换来实现的,该内核将挂起等待锁的线程,直到它被释放。系统提供的原子操作CAS(Compare And Swap/Set)是很好的锁替代方案,Disruptor中同步就是使用的这种。

比如多生产者模式中com.lmax.disruptor.MultiProducerSequencer就是用了Java里sun.misc.Unsafe类基于CAS实现的API。

等待策略com.lmax.disruptor.BlockingWaitStrategy使用了基于CAS实现的ReentrantLock。

独占缓存行

为了提高效率CPU硬件不会以字节或字为单位移动内存,而是以缓存行,通常大小为 32-256 字节的缓存行,最常见的缓存行是 64 字节。这意味着,如果两个变量在同一个缓存行中,并且由不同的线程写入,那么它们会出现与单个变量相同的写入争用问题。为了获得高性能,如果要最小化争用,那么确保独立但同时写入的变量不共享相同的缓存行是很重要的。

比如com.lmax.disruptor.RingBuffer中属性前后都用未赋值的long来独占。com.lmax.disruptor.SingleProducerSequencerPad也有相同处理方式。

环形队列

  • 使用有界队列,减少线程争用

队列相比链表在访问速度上占据优势,而有界队列相比可动态扩容的无界队列则避免扩容产生的同步问题效率更高。Disruptor和JDK中的ArrayBlockingQueue一样使用有界队列。队列长度要设为2的n次幂,有利于二进制计算。

  • 使用环形数组,避免生产和消费速度差异导致队列头和尾争用

Disruptor在逻辑上将数组的的头尾看成是相连的,即一个环形数组(RingBuffer)。

  • Sequence

生产和消费都需要维护自增序列值(Sequence),从0开始。

生产方只维护一个代表生产的最后一个元素的序号。代表生产的最后一个元素的序号。每次向Disruptor发布一个元素都调用Sequenced.next()来获取下个位置的写入权。

在单生产者模式(SINGLE)由于不存在并发写入,则不需要解决同步问题。在多生产者模式(MULTI)就需要借助JDK中基于CAS(Compare And Swap/Set)实现的API来保证线程安全。

多个消费者各自维护自己的消费序列值(Sequence)保存数组中。

而环形通过与运算(sequence & indexMask)实现的,indexMask就是环形队列的长度-1。以环形队列长度8为例,第9个元素Sequence为8,8 & 7 = 0,刚好又回到了数组第1个位置。

见com.lmax.disruptor.RingBuffer.elementAt(long sequence)

预分配内存

环形队列存放的是Event对象,而且是在Disruptor创建的时候调用EventFactory创建并一次将队列填满。Event保存生产者生产的数据,消费也是通过Event获取,后续生产则只需要替换掉Event中的属性值。这种方式避免了重复创建对象,降低JVM的GC产频率。

见com.lmax.disruptor.RingBuffer.fill(EventFactory eventFactory)

消费者8种等待策略

当消费速度大于生产速度情况下,消费者执行的等待策略。

消费者序列

所有消费者的消费序列(Sequence)都放在一个数组中,见com.lmax.disruptor.AbstractSequencer,通过SEQUENCE_UPDATER来更新对应的序列值。

调用更新的地方在com.lmax.disruptor.RingBuffer.addGatingSequences(Sequence... gatingSequences)。

消费太慢队列满了怎么办?

生产者线程被阻塞。生产者调用Sequenced.next()争夺写入权的时候需要判断最小的消费序列值进行比较。如果写入的位置还未消费则会进入循环不断获取最小消费序列值进行比较。

见包com.lmax.disruptor下SingleProducerSequencer或MultiProducerSequencer中next(int n)方法。

Disruptor开发步骤

  • 创建Event、EventFactory、EventHandler和ExceptionHandler类

Event是环形队列(RingBuffer)中的元素,是生产者数据的载体;EventFactory是定义Event创建方式的工厂类;EventHandler则是Event的处理器,定义如何消费Event中的数据。

另外有必要定义一个消费异常处理器ExceptionHandler,它是和EventHandler绑定的。当EventHandler.onEvent()执行抛出异常时会执行对应的异常回调方法。

  • 实例化Disruptor

创建Disruptor需要指定5个参数eventFactory、ringBufferSize、threadFactory、producerType、waitStrategy。

EventFactory是上面定义的Event工厂类;

ringBufferSize是环形队列的长度,这个值要是2的N次方;

threadFactory是定义消费者线程创建方式的工厂类;

producerType是指明生产者是一个(SINGLE)还是多个(MULTI)。默认是MULTI,会使用CAS(Compare And Swap/Set)保证线程安全。如果指定为SINGLE,则不使用没必要的CAS,使单线程处理更高效。

waitStrategy指明消费者等待生产时的策略。

  • 设置消费者

指明EventHandler并绑定ExceptionHandler。指定多个EventHandler时,会为每个EventHandler分配一个线程,一个Event会被多个并行EventHandler处理。

也可以指明多个WorkHandler,每个WorkHandler分配一个线程并行消费队列中的Event,一个Event只会被一个WorkHandler处理。

  • 创建/实例化EventTranslator

EventTranslator定义生产者数据转换为Event的方式,不同数量参数有不同的接口用来实现。

  • 最后用Disruptor.publishEvent() 来发布元素指明EventTranslator和参数

例子程序

  • 先引入Maven依赖
<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.4</version>
</dependency>
  • Event
/**
 * 事件
 *
 * @param <T>发布的数据类型
 */
public class MyEvent<T> {

    private T data;

    public T getData() {
        return data;
    }

    public MyEvent<T> setData(T data) {
        this.data = data;
        return this;
    }
}
  • EventFactory
import com.lmax.disruptor.EventFactory;

/**
 * 创建事件的工厂
 *
 * @param <T>发布的数据类型
 */
public class MyEventFactory<T> implements EventFactory<MyEvent<T>> {

    @Override
    public MyEvent<T> newInstance() {
        return new MyEvent<>();
    }
}
  • EventHandler
import com.lmax.disruptor.EventHandler;

/**
 * 事件消费方法
 *
 * @param <T>发布的数据类型
 */
public class MyEventHandler<T> implements EventHandler<MyEvent<T>> {

    @Override
    public void onEvent(MyEvent<T> tMyEvent, long l, boolean b) throws Exception {
        System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + tMyEvent.getData());
    }
}
  • ExceptionHandler
import com.lmax.disruptor.ExceptionHandler;

/**
 * 消费者异常处理器
 *
 * @param <T>发布的数据类型
 */
public class MyExceptionHandler<T> implements ExceptionHandler<MyEvent<T>> {

    @Override
    public void handleEventException(Throwable ex, long sequence, MyEvent<T> event) {
        System.out.println("handleEventException");
    }

    @Override
    public void handleOnStartException(Throwable ex) {
        System.out.println("handleOnStartException");
    }

    @Override
    public void handleOnShutdownException(Throwable ex) {
        System.out.println("handleOnShutdownException");
    }
}

单消费者

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

/**
 * 单消费者
 */
public class SingleConsumerSample {

    public static void main(String[] args) {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 指定一个处理器
        MyEventHandler<String> eventHandler = new MyEventHandler<>();
        disruptor.handleEventsWith(eventHandler);
        // 处理器异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        disruptor.start();

        // 通过事件转换器(EventTranslator)来指明如何将发布的数据转换到事件对象(Event)中
        // 这里是一个参数的转换器,另外还有两个(EventTranslatorTwoArg)、三个(EventTranslatorThreeArg)
        // 和多个(EventTranslatorVararg)参数的转换器可以使用,参数类型可以不一样
        EventTranslatorOneArg<MyEvent<String>, String> eventTranslatorOneArg =
                new EventTranslatorOneArg<MyEvent<String>, String>() {
                    @Override
                    public void translateTo(MyEvent<String> event, long sequence, String arg0) {
                        event.setData(arg0);
                    }
                };

        // 发布
        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent(eventTranslatorOneArg, "One arg " + i);
        }

        disruptor.shutdown();
    }
}

单消费者Lambda写法

这种只是迎合Java8 Lambda语法特性,代码更简洁。

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

public class LambdaSample {

    public static void main(String[] args) {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(MyEvent::new, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 指定一个处理器
        EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + event.getData());
        disruptor.handleEventsWith(eventHandler);
        // 处理器异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        disruptor.start();

        // 通过事件转换器(EventTranslator)来指明如何将发布的数据转换到事件对象(Event)中
        // 一个参数的转换器
        disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg ");
        // 两个参数的转换器
        disruptor.publishEvent((event, sequence, pA, pB) -> event.setData(pA + pB), "Two arg ", 1);
        // 三个参数的转换器
        disruptor.publishEvent((event, sequence, pA, pB, pC) -> event.setData(pA + pB + pC)
                , "Three arg ", 1, false);
        // 多个参数的转换器
        disruptor.getRingBuffer().publishEvent((event, sequence, params) -> {
            List<String> paramList = Arrays.stream(params).map(Object::toString).collect(Collectors.toList());
            event.setData("Var arg " + String.join(",", paramList));
        }, "param1", "param2", "param3");

        disruptor.shutdown();
    }
}

多消费者重复消费元素

关键只在于指定多个EventHandler,并且EventHandler还可以分别绑定不同的ExceptionHandler。

每个EventHandler分配一个线程,一个Event会被每个EventHandler处理,适合两个不同的业务都需要处理同一个元素的情况,类似广播模式。

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

/**
 * 一个元素多个消费者重复消费
 */
public class RepetitionConsumerSample {

    public static void main(String[] args) {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 这里指定了2个消费者,那就会产生2个消费线程,一个事件会被消费2次
        EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + event.getData());
        EventHandler<MyEvent<String>> eventHandler2 = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler——2消费:" + event.getData());
        disruptor.handleEventsWith(eventHandler, eventHandler2);
        // 分别指定异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.handleExceptionsFor(eventHandler).with(exceptionHandler);
        disruptor.handleExceptionsFor(eventHandler2).with(exceptionHandler);

        disruptor.start();

        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
        }

        disruptor.shutdown();
    }
}

多消费者

关键只在于定义WorkHandler,然后实例化多个来消费。

每个WorkHandler分配一个线程,一个元素只会被一个WorkHandler处理。

import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

public class MultiConsumerSample {

    public static void main(String[] args) {
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 1024;
        // 创建事件(Event)对象的工厂
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 处理器异常处理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        // 设置2个消费者,2个线程,一个Event只被一个消费者消费
        WorkHandler<MyEvent<String>> workHandler = tMyEvent ->
                System.out.println(Thread.currentThread().getName() + "WorkHandler消费:" + tMyEvent.getData());
        disruptor.handleEventsWithWorkerPool(workHandler, workHandler2);

        disruptor.start();

        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
        }

        disruptor.shutdown();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/9315.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【JavaWeb】1—JavaWeb概述

⭐⭐⭐⭐⭐⭐ Github主页&#x1f449;https://github.com/A-BigTree 笔记链接&#x1f449;https://github.com/A-BigTree/Code_Learning ⭐⭐⭐⭐⭐⭐ 如果可以&#xff0c;麻烦各位看官顺手点个star~&#x1f60a; 如果文章对你有所帮助&#xff0c;可以点赞&#x1f44d;…

人工智能中的移动端编程

移动端编程是现在新兴的主要编程领域之一&#xff0c;该领域聚集了非常多的开发人员。这主要得益于手机和平板电脑的快速普及&#xff0c;人们以前需要在台式机上完成的事情&#xff0c;现在都可以非常方便地在手机或平板电脑上完成。由于手机和平板电脑携带更加方便&#xff0…

阿里云版GPT官宣,我们问了它10个问题

4月7日&#xff0c;阿里云宣布自研大模型“通义千问”&#xff0c;目前已开始邀请用户测试体验。 阿里达摩院在NLP自然语言处理等前沿科研领域早已布局多年&#xff0c;并于2019年启动大模型研发&#xff0c;通义千问便是其最新成果&#xff0c;相当于阿里云版的“ChatGPT”。 …

网络编程之输入ip地址解析不出来域名

网络编程之输入ip地址解析不出来域名 1.解决方案 设置本机的域名解析服务器 1. 查看域名的ip ping 域名 找到如下图路径下的hosts文件 赋予权限 添加域名和ip地址的对应关系。域名和ip之间采用空格隔开。 代码测试 代码详见&#xff1a;网络编程---实验2 查找Internet地址和用…

腾讯云轻量应用服务器16核32G28M处理器带宽流量性能测评

腾讯云轻量应用服务器16核32G28M带宽&#xff0c;28M带宽下载速度峰值可达3584KB/s&#xff0c;折合3.5M/秒&#xff0c;16核32G28M带宽3468元15个月&#xff0c;折合每月231元&#xff0c;系统盘为380GB SSD盘&#xff0c;免费6000GB月流量&#xff0c;折合每天200GB流量&…

系统集成项目管理工程师案例分析考点汇总(成本、质量、人力)

项目成本管理常见考点1. 成本估算、成本预算的步骤2. 成本估算、成本预算的区别与联系3. 成本估算困难或不准的原因4. 成本失控的原因5. 成本超支、进度落后采取的措施6. 成本超支、进度超前采取的措施项目质量管理常见考点1. 质量管理计划的内容2. 质量保证与质量控制的联系3.…

「解析」Matplotlib 绘制折线图

相比于【优雅】matplotlib 常见图、【优雅】matplotlib 3D图 而言&#xff0c;折线图使用的频率会更高一些&#xff0c;在此整理下最近使用 Matplotlib 绘制折线图常用的一些配置&#xff0c;小伙伴们只需要修改对应的 aug_list、list 即可直接使用 # !/usr/bin/env python …

在线Plist文件格式转Json文件格式

Plist文件是一种用于存储应用程序配置信息的文件格式&#xff0c;其中包含应用程序的各种设置和数据。在过去&#xff0c;Plist文件通常是以 .plist 格式存储的。然而&#xff0c;随着时间的推移&#xff0c;人们开始使用 JSON 格式来存储更复杂的数据结构和数据。如果您需要将…

77-Linux_网络编程

网络编程一.主机字节序列和网络字节序列二.套接字地址结构1.通用socket地址结构2.专用的socket地址结构3.IP地址转换函数一.主机字节序列和网络字节序列 主机字节序列分为大端字节序和小端字节序&#xff0c;不同的主机采用的字节序列可能不同。 大端字节序是指一个整数的高位…

二 、Locust自定义用户(场景)

二 、自定义用户&#xff08;场景&#xff09; 一个用户类代表了你系统中的一种用户/场景。当你做一个测试运行时&#xff0c;你指定你想模拟的并发用户的数量&#xff0c;Locust将为每个用户创建一个实例。你可以给这些类/实例添加任何你喜欢的属性&#xff0c;但有一些属性对…

shell 脚本编写

文章目录练习题目&#xff1a;1.编写函数&#xff0c;实现打印绿色0K和红色FAILED,判断是否有参数&#xff0c;存在为0k&#xff0c;不存在为FAILED2.编写函数&#xff0c;实现判断是否无位置参数&#xff0c;如无参数&#xff0c;提示错误3.编写函数实现两个数字做为参数&…

uniapp - 实现车牌号键盘与格子间隔显示组件,汽车牌照录入支持自定义样式、新能源等(附带组件完整源码,开箱即用,稍微改改就能用)

效果图 uniapp 全平台兼容,车牌号键盘输入、分格显示功能示例源码,注释很多! 可以直接复制一下,然后自己改改样式或功能就能使了。 示例源码 复制,运行。 &

ReRes 谷歌浏览器插件使用

安装&#xff1a; 本插件是chrome插件&#xff0c;有条件者可以直接在chrome商店下载安装&#xff1b; 条件有限者&#xff1a; CSDN 下载 ReRes找到插件的github路径&#xff0c;本文插件点击 ReRes下载到本地并解压在chrome浏览器地址栏输入chrome://extensions/进入扩展页…

MySQL存储引擎

存储引擎MySQL体系结构存储引擎简介存储引擎特点存储引擎选择总结MySQL体系结构 连接层 最上层是一些客户端和链接服务,主要完成一些类似于连接处理、授权认证、及相关的安全方案。服务器也会为安全接入的每个客户端验证它所具有的操作权限。服务层 第二层架构主要完成大多数的…

集成时间序列模型提高预测精度

使用Catboost从RNN、ARIMA和Prophet模型中提取信号进行预测 集成各种弱学习器可以提高预测精度&#xff0c;但是如果我们的模型已经很强大了&#xff0c;集成学习往往也能够起到锦上添花的作用。流行的机器学习库scikit-learn提供了一个StackingRegressor&#xff0c;可以用于…

【Linux】进程控制(2)(阻塞vs非阻塞 进程程序替换 替换函数 单进程: 别的方式 进程程序替换原理 通过myexec执行自己写的程序)

文章目录进程等待阻塞vs非阻塞进程程序替换替换函数单进程&#xff1a;别的方式进程程序替换原理通过myexec执行自己写的程序进程等待 阻塞vs非阻塞 waitpid调用成功 && 子进程没退出子进程没有退出&#xff0c;我的waitpid没有等待失败&#xff0c;仅仅是没有检测到…

卫龙携手鸿翼打造研发知识管理平台,“辣条一哥”再次为食品安全和健康发力

合作简介 ​▼ ​卫龙已经成为中国最大的辣味休闲食品企业&#xff0c;年营收超48亿元&#xff0c;总的市场份额达5.7%&#xff0c;按零售额计是排在第二名企业的3.8倍。另外&#xff0c;其调味面制品和辣味休闲蔬菜制品的市场份额均排名第一。面对流量成本上升和竞争红海&am…

jmeter跨平台分布式部署

目录 1、jmeter的master节点设置在windows上的环境配置&#xff0c;及启动&#xff1a; 2、slave执行机在linux环境下的配置&#xff0c;及启动&#xff1a; 3、跨平台csv文件处理&#xff1a; 1、jmeter的master节点设置在windows上的环境配置&#xff0c;及启动&#xff1a; …

vue悬浮导航实现内容滚动时,导航跟随选中,点击导航滚动到相应位置,

<template><div><div class"goods_navBg" ref"navHeader">模拟悬浮导航</div><div class"goods_box">box</div><!-- 吸顶导航 start --><GoodsInfoNavTab class"goods_sticky" :navHead…
最新文章