当前位置: 首页 > article >正文

SynchronousQueue 与 LinkedBlockingQueue区别及应用场景

文章目录

  • 前言
  • 认识SynchronousQueue
  • 基本对比及比较
    • 1. **基本特性**
    • 2. **内部实现**
    • 3. **性能特点**
    • 4. **使用场景**
    • 5. **总结对比**
  • SynchronousQueue案例
    • JDK应用案例
    • 案例1:SynchronousQueue的简单用例
    • 案例2:SynchronousQueue公平锁、非公平锁案例
    • 案例3:搭配线程池使用
      • 线程池多线程+SynchronousQueue(公平无效)
      • 线程池多线程+SynchronousQueue(公平有效)
  • 资料获取

稿定智能设计202502032102

前言

博主介绍:✌目前全网粉丝3W+,csdn博客专家、Java领域优质创作者,博客之星、阿里云平台优质作者、专注于Java后端技术领域。

涵盖技术内容:Java后端、大数据、算法、分布式微服务、中间件、前端、运维等。

博主所有博客文件目录索引:博客目录索引(持续更新)

视频平台:b站-Coder长路

本章节配套源码:

  • gitee:https://gitee.com/changluJava/demo-exer/tree/master/JUC/src/main/java/demo11

认识SynchronousQueue

SynchronousQueue 是一个特殊的队列,它的核心特点是 不存储元素,每个插入操作必须等待一个对应的移除操作,反之亦然。这种特性使得它非常适合一些特定的应用场景。

这种机制非常适合处理大量短期异步任务的场景,例如 Web 服务器处理请求、短时计算任务等。

SynchronousQueueLinkedBlockingQueue 是 Java 并发包中两种不同的队列实现,它们的设计目标和使用场景有显著区别。以下是它们的主要区别。

关于SynchronousQueue的公平与非公平体现在多线程场景情况

  • 公平:多个线程进行put阻塞的话,按照谁先进行put会优先去执行任务。(底层是队列)
  • 非公平:多个线程进行put阻塞的话,并非是排队的,而是会以栈的结构先进后出。(底层是栈)

关于公平非公平源码解析:Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析

BlockingQueue接口说明

BlockingQueue设计了很多的放数据和取数据的方法

操作抛出异常返回特定值阻塞阻塞一段时间
放数据addofferputoffer(e, time, unit)
取数据removepolltakepoll(time, unit)
查看数据(不删除)element()peek()不支持不支持

这几组方法的不同之处就是:

  1. 当队列满了,再往队列中放数据,add方法抛异常,offer方法返回false,put方法会一直阻塞(直到有其他线程从队列中取走数据),offer(e, time, unit)方法阻塞指定时间然后返回false。
  2. 当队列是空,再从队列中取数据,remove方法抛异常,poll方法返回null,take方法会一直阻塞(直到有其他线程往队列中放数据),poll(time, unit)方法阻塞指定时间然后返回null。
  3. 当队列是空,再去队列中查看数据(并不删除数据),element方法抛异常,peek方法返回null。

工作中使用最多的就是offer、poll阻塞指定时间的方法。


基本对比及比较

1. 基本特性

特性SynchronousQueueLinkedBlockingQueue
容量容量为 0,不存储元素。容量可配置(默认 Integer.MAX_VALUE),存储元素。
阻塞行为插入操作必须等待对应的移除操作,反之亦然。队列满时插入操作阻塞,队列空时移除操作阻塞。
适用场景直接传递任务,适合线程池任务调度。缓冲任务,适合生产者-消费者模式。

2. 内部实现

实现SynchronousQueueLinkedBlockingQueue
数据结构无存储结构,直接传递元素。基于链表实现,存储元素。
锁机制使用 CAS 或锁实现线程间的直接匹配。使用两把锁(putLocktakeLock)分离插入和移除操作。
公平性支持公平模式(FIFO)和非公平模式(默认)。默认非公平,但可以通过锁实现公平性。

3. 性能特点

性能SynchronousQueueLinkedBlockingQueue
吞吐量高吞吐量,适合直接传递任务的场景。吞吐量较低,因为需要维护队列结构。
延迟低延迟,任务直接传递给消费者。延迟较高,任务需要先入队再出队。
内存占用内存占用低,不存储元素。内存占用高,需要存储队列中的元素。

4. 使用场景

场景SynchronousQueueLinkedBlockingQueue
任务调度适合线程池任务调度(如 Executors.newCachedThreadPool)。适合需要缓冲任务的场景。
生产者-消费者适合生产者直接传递任务给消费者。适合生产者-消费者模式,任务需要缓冲。
流量控制无缓冲能力,严格限制生产者和消费者的同步。提供缓冲能力,适合流量控制。

5. 总结对比

对比项SynchronousQueueLinkedBlockingQueue
容量无容量,直接传递任务。有容量,可缓冲任务。
阻塞行为插入和移除操作必须成对出现。队列满时插入阻塞,队列空时移除阻塞。
性能高吞吐量,低延迟。吞吐量较低,延迟较高。
适用场景直接传递任务,适合线程池任务调度。缓冲任务,适合生产者-消费者模式。

选择使用哪种队列取决于具体的应用场景:

  • 如果需要直接传递任务且不需要缓冲,选择 SynchronousQueue
  • 如果需要缓冲任务并控制流量,选择 LinkedBlockingQueue

SynchronousQueue案例

如果你希望你的任务需要被快速处理,就可以使用这种队列。

JDK应用案例

Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

弊端:可能会出现oom问题。如果你提交了太多的任务,导致创建了大量的线程,这些线程都在竞争CPU时间片,等待CPU调度,处理任务速度也会变慢,所以在使用过程中也要综合考虑。


案例1:SynchronousQueue的简单用例

image-20250202153336290

package demo10.cpu;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

/**
 * @apiNote SynchronousQueue示例
 **/
public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建SynchronousQueue队列
        BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();

        // 2. 启动一个线程,往队列中放3个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 入队列 1");
                synchronousQueue.put(1);
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 入队列 2");
                synchronousQueue.put(2);
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 入队列 3");
                synchronousQueue.put(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 3. 等待1000毫秒
        Thread.sleep(1000L);

        // 4. 再启动一个线程,从队列中取出3个元素
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}

image-20250202141622478

效果如下:第一个线程Thread-0往队列放入一个元素1后,就被阻塞了。直到第二个线程Thread-1从队列中取走元素1后,Thread-0才能继续放入第二个元素2。


案例2:SynchronousQueue公平锁、非公平锁案例

image-20250202153350415

为什么要多线程场景去测试?

  • 单线程你没法直接连续put多个,因为put和take操作是相对的,put了一个之后,只有take了另一个才能再进行put操作。
package demo11;

import java.util.Arrays;
import java.util.concurrent.SynchronousQueue;

/**
 * 公平 & 非公平 案例测试
 * 设置初始化的SynchronousQueue参数即可
 */
public class SynchronousQueueFairAndNotFairDemo {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建SynchronousQueue队列,可设置是否公平
        SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>(false);

        // 放置三个元素 放置过程中各自等待500ms
        for (int i = 0; i < 10; i++) {
            // 2. 启动一个线程,往队列中放1个元素
            int finalI = i;
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "开始入队列" + finalI);
                    synchronousQueue.put(finalI);
//                    System.out.println(Thread.currentThread().getName() + " 入队列" + finalI + "成功");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            Thread.sleep(500);
        }
        // 3. 等待1000毫秒
//        Thread.sleep(1000L);

      	// 取元素的时候各自间隔500ms
        int[] arr = new int[10];
        for (int i = 0; i < 10; i++) {
            // 4. 启动一个线程,往队列中放1个元素
            int finalI = i;
            new Thread(() -> {
                try {
                    arr[finalI] = synchronousQueue.take();
                    System.out.println(Thread.currentThread().getName() + " 出队列 " + arr[finalI]);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();

            Thread.sleep(500);
        }
        System.out.println(Arrays.toString(arr));
        // 7. 等待1000毫秒
        Thread.sleep(1000L);
    }
}

测试1:当设置非公平情况

SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>(false);

image-20250202153656442

测试2:当设置公平情况

SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>(true);

image-20250202153735555


案例3:搭配线程池使用

线程池多线程+SynchronousQueue(公平无效)

image-20250202173128343

配合线程池多线程场景,SynchronousQueue设置为公平无效,测试源码如下:

package demo11;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程池多线程场景(核心数、最大线程数非1场景)
 * 无公平可言原因:线程池底层走的是offer操作,并非是put操作(可见案例2中的demo场景,可实现公平是走的put操作实现)
 * 适合场景:cpu密集型,在多线程场景通过线程池是无法实现按照submit的提交顺序去处理逻辑的(原因如上)。
 */
public class SynchronousQueueFairAndNotFairPoolDemo1 {
    public static void main(String[] args) throws InterruptedException {
        int cpuCores = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(cpuCores + 1, cpuCores + 1,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(true),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        try {
                            // 阻塞等待
                            executor.getQueue().put(r);
                        } catch (InterruptedException var4) {
                            throw new RejectedExecutionException("Unexpected InterruptedException", var4);
                        }
                    }
                });
        // 倒计数
        int jobNum = 100;
        CountDownLatch countDownLatch = new CountDownLatch(jobNum);
        final AtomicInteger count = new AtomicInteger(0);
        // 记录任务开始时间
        long startTime = System.currentTimeMillis();
        //设计提交任务
        new Thread(()->{
            // 可改为queue队列
            int i = 0;
            while (i < jobNum) {
                int finalI = i;
                // submitjob的线程(非主线程)会进入到阻塞当中(保证按照顺序来执行)
                // 线程池底层使用的是队列的offer、poll
                executor.submit(()->{
                    System.out.println("CPU执行任务:" + finalI + ", 计数 =>" + count.incrementAndGet());
                    countDownLatch.countDown();
                });

                i++;
            }
        }).start();

        System.out.println("main主线程开始干活");
        countDownLatch.await();
        executor.shutdown();
        System.out.println("任务全部完成");

        // 记录任务结束时间
        long endTime = System.currentTimeMillis();
        // 计算任务执行时间
        long duration = endTime - startTime;
        System.out.println("任务执行总耗时: " + duration + " 毫秒");
    }
}

image-20250202173220376


线程池多线程+SynchronousQueue(公平有效)

image-20250202173234582

package demo11;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程池单线程场景(核心数、最大线程数1场景)
 * 可实现公平效果(无论是否设置公平参数)
 * 适合场景:任务按照submitjob去依次提交任务 & 去除线程池同样也可实现,可见下面处理方式
 */
public class SynchronousQueueFairAndNotFairPoolDemo2 {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(), // 填写false、true在核心、最大线程数为1 1情况下效果一致
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException var4) {
                            throw new RejectedExecutionException("Unexpected InterruptedException", var4);
                        }
                    }
                });

        // 倒计数
        int jobNum = 100;
        CountDownLatch countDownLatch = new CountDownLatch(jobNum);
        final AtomicInteger count = new AtomicInteger(0);
        //设计提交任务
        new Thread(()->{
            // 可改为queue队列
            int i = 0;
            while (i < jobNum) {
                int finalI = i;
                // 处理方式一:submitjob的线程(非主线程)会进入到阻塞当中(保证按照顺序来执行)
                executor.submit(()->{
                    System.out.println("CPU执行任务:" + finalI + ", 计数 =>" + count.incrementAndGet());
                    countDownLatch.countDown();
                });
                // or 处理方式二:思考:是否如果原本就在新建线程中,是否无需使用线程池去submitjob提交?因为原本当前就是阻塞进行的
//                System.out.println("执行任务:" + finalI);

                i++;
            }
        }).start();

        // 7. 等待1000毫秒
        System.out.println("main主线程开始干活");
        countDownLatch.await();
        executor.shutdown();
        System.out.println("任务全部完成");
    }
}

效果:

image-20250202174350317

资料获取

大家点赞、收藏、关注、评论啦~

精彩专栏推荐订阅:在下方专栏👇🏻

  • 长路-文章目录汇总(算法、后端Java、前端、运维技术导航):博主所有博客导航索引汇总
  • 开源项目Studio-Vue—校园工作室管理系统(含前后台,SpringBoot+Vue):博主个人独立项目,包含详细部署上线视频,已开源
  • 学习与生活-专栏:可以了解博主的学习历程
  • 算法专栏:算法收录

更多博客与资料可查看👇🏻获取联系方式👇🏻,🍅文末获取开发资源及更多资源博客获取🍅



http://www.kler.cn/a/531127.html

相关文章:

  • Windows编译FreeRDP步骤
  • Python 网络爬虫实战:从基础到高级爬取技术
  • MP4基础
  • 消息队列应用示例MessageQueues-STM32CubeMX-FreeRTOS《嵌入式系统设计》P343-P347
  • 想品客老师的第天:类
  • 深入理解Java引用传递
  • DeepSeek-R1 低成本训练的根本原因是?
  • CTF-web: php-session临时文件特性
  • Spring MVC学习——发送请求(@RequestMapping注解及请求参数绑定)
  • Android学习19 -- 手搓App
  • The Simulation技术浅析(三):数值方法
  • Hive修复分区
  • 亚博microros小车-原生ubuntu支持系列:20 ROS Robot APP建图
  • 【IocDI】_存储Bean的五大类注解及getBean的使用
  • 独立开发者的技术栈
  • 使用Pygame制作“走迷宫”游戏
  • 54【ip+端口+根目录通信】
  • 【数据分析】案例04:豆瓣电影Top250的数据分析与Web网页可视化(numpy+pandas+matplotlib+flask)
  • 六百六十六,盐豆不带盐了
  • 解决SetWindowCompositionAttribute使控件文本透明的问题
  • git中文件的状态状态切换
  • 全栈开发:使用.NET Core WebAPI构建前后端分离的核心技巧(一)
  • 代码随想录算法训练营Day35
  • Docker 部署教程jenkins
  • 1-刷力扣问题记录
  • 学习并熟练使用MyBatis