当前位置: 首页 > article >正文

多线程案例---阻塞队列

1. 阻塞队列

阻塞队列是一种特殊的队列,也遵守 " 先进先出 " 的原则。

阻塞队列是一种线程安全的数据结构,并且具有以下特性:

1. 当队列为满时,继续进行入队列操作就会阻塞,直到有其他线程从队列中取走元素

2. 当队列为空时,继续进行出队列操作就会阻塞,直到有其他线程往队列中插入元素

阻塞队列的一个典型应用场景就是”生产者消费者模型“ 。

1.1 生产者消费者模型 

在多线程编程中,生产者消费者模型是一种典型的编码技巧,生产者消费者模型通过阻塞队列来解决生产者和消费者强耦合的问题。

在生产者消费者模型中,生产者不与消费者直接进行通讯,而是通过阻塞队列来间接的进行通讯。当生产者产出一个数据后,它不会等待消费者来处理,而是将产出的数据扔到阻塞队列中,等到消费者去处理数据时,消费者也不会直接跟生产者去要数据,而是从阻塞队列中获取需要处理的数据。

如下图

在生产者消费者模型中引入阻塞队列有什么影响呢?

1 .解耦合

      我们假设有两个服务器,分别为A服务器和B服务器,当A服务器和B服务器直接进行交流时,编写A服务器的代码时,多多少少也会涉及到一点B服务器的逻辑,编写B服务器的代码时多多少少也会涉及到一些A服务器的逻辑,这样就导致了两个服务器的代码就有了强耦合性。而当我们引入阻塞队列之后,就会变成A服务器与阻塞队列进行交涉,B服务器和阻塞队列进行交涉,从而间接完成A服务器和B服务器的间接交涉,这样A服务器和B服务器就不会直接进行交涉了。此时,A服务器中的代码中就看不到B服务器了,B服务器中的代码中就看不到A服务器了。这样就降低了代码大的耦合性。

2.削峰填谷

    服务器接收的数据量可以理解为一个波形图,当A服务器短时间内接收的数据量达到一个峰值时,很容易将A服务器搞挂,此时如果将这些大量的数据直接交给服务器来处理,B服务器也很可能挂掉。但是,如果有了一个阻塞队列,当数据的发送达到一个峰值时,A服务器可以将数据放到阻塞队列中,防止A服务器挂掉。当数据大的发送量达到一个波谷时,B服务器就可以利用数据的传送处于波谷的时间去处理阻塞队列中的数据,从而防止B服务器因为一股脑处理大量数据而挂掉。

1.2 标准库中的阻塞队列 

 在Java标准库中内置了一个阻塞队列,如果我们需要在一些程序中使用阻塞队列,使用标准库提供的即可

1. Java标准库提供的标准库是一个名为BlockingQueue的接口,真正实现的类是LinkedBlockingQueue 

2. put方法用于阻塞式的入队列,take方法用于阻塞式的出队列

3. BlockingQueue也有offer,poll,peek等方法,但是这些方法没有阻塞性

1.3 用标准库的阻塞队列实现一个生产者消费者模型

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Demo26 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> blockingQueue=new LinkedBlockingQueue<>(100);

        Thread producer=new Thread(()->{
            try{
                int id=0;
                for(int i=0;i<100;i++){
                    Thread.sleep(1000);
                    blockingQueue.put(id);
                    System.out.println("producer生产数据:"+id);
                    id++;
                }
            }catch (InterruptedException e){
                e.printStackTrace();
            }

        },"producer");

        Thread consumer=new Thread(()->{
            try{
                for (int i = 0; i < 100; i++) {
                    Thread.sleep(1000);
                    int take=blockingQueue.take();
                    System.out.println("consumer消费数据:"+take);
                }
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        },"consumer");
        producer.start();
        consumer.start();
        producer.join();
        consumer.join();
    }
}

 

2.模拟实现一个阻塞队列

实现思路:

1.由于阻塞队列要不断有数据入队列和出队列,我们呢可以使用循环队列来实现

2.组要模拟实现put方法和take方法

3.在put过程中,发现队列满了或者在take过程中,发现队列为空,我们就要wait

4.注意wait要搭配一个循环来使用,因为wait也有可能被Interrupt这样的方法唤醒

5.使用syncronized进行加锁控制

代码实现:

class MySubBLockingQueue{
    private volatile int[] data=null;
    private volatile int head=0;
    private volatile int tail=0;
    private volatile int size=0;

    public MySubBLockingQueue(int capacity){
        data=new int[capacity];
    }

    public void put(int number) throws InterruptedException {
        synchronized (this){
            //判断队列是否满了
            while (size==data.length){
                this.wait();
            }
            data[tail]=number;
            tail++;
            size++;
            if(tail>=data.length){
                tail=0;
            }
            this.notify();
            //tail=(tail+1)%data.length;
        }
    }

    public int take() throws InterruptedException {
        int back=0;
        synchronized (this){
            //判断队列是否为空
            while (size==0){
                this.wait();
            }
            back=data[head];
            head++;
            size--;
            if(head>= data.length){
                head=0;
            }
            //head=(head+1)%data.length;
            this.notify();
        }
        return back;
    }
}

 

3.用模拟实现的阻塞队列实现生产者消费者模型

class MySubBLockingQueue{
    private volatile int[] data=null;
    private volatile int head=0;
    private volatile int tail=0;
    private volatile int size=0;

    public MySubBLockingQueue(int capacity){
        data=new int[capacity];
    }

    public void put(int number) throws InterruptedException {
        synchronized (this){
            //判断队列是否满了
            while (size==data.length){
                this.wait();
            }
            data[tail]=number;
            tail++;
            size++;
            if(tail>=data.length){
                tail=0;
            }
            this.notify();
            //tail=(tail+1)%data.length;
        }
    }

    public int take() throws InterruptedException {
        int back=0;
        synchronized (this){
            //判断队列是否为空
            while (size==0){
                this.wait();
            }
            back=data[head];
            head++;
            size--;
            if(head>= data.length){
                head=0;
            }
            //head=(head+1)%data.length;
            this.notify();
        }
        return back;
    }
}
public class Demo27 {
    public static void main(String[] args) throws InterruptedException {
        MySubBLockingQueue bLockingQueue=new MySubBLockingQueue(100);
        Thread producer=new Thread(()->{
            try{
                int id=0;
                for (int i = 0; i < 100; i++) {
                    Thread.sleep(1000);
                    bLockingQueue.put(id);
                    System.out.println("producer生产数据:"+id);
                    id++;
                }
            }catch (InterruptedException e){
                e.printStackTrace();
            }

        },"producer");
        Thread consumer=new Thread(()->{
            try{
                for (int i = 0; i < 100; i++) {
                    Thread.sleep(1000);
                    int take=bLockingQueue.take();
                    System.out.println("consumer消费数据:"+take);
                }
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        },"consumer");
        producer.start();
        consumer.start();
        producer.join();
        consumer.join();
    }
}

http://www.kler.cn/a/385257.html

相关文章:

  • 了解 Open RAN 架构中的 DU 和 CU
  • 企业邮箱后缀设置指南,轻松融入公司品牌
  • 解决 “Error: listen EACCES: permission denied 0.0.0.0:80“ 错误
  • Latex中给公式加边框
  • 人保财险(外包)面试分享
  • wireshark工具使用
  • RapidrepairController
  • linux 下 signal() 函数的用法,信号类型在哪里定义的?
  • 【go从零单排】go语言中的指针
  • NVR小程序接入平台/设备EasyNVR多品牌NVR管理工具/设备汇聚公共资源场景方案全析
  • 如何设置 TORCH_CUDA_ARCH_LIST 环境变量以优化 PyTorch 性能
  • AutoOps 使每个 Elasticsearch 部署都更易于管理
  • C#核心(7)索引器
  • 从0开始linux(21)——文件(2)文件重定向
  • Hive 查询各类型专利 Top 10 申请人及对应的专利申请数
  • 记录offcanvas不能显示和关闭的修复方法
  • QT监控文件夹变化(文件增加、删除、改名)
  • B2C分销管理系统(源码+文档+部署+讲解)
  • C++20 STL CookBook 4:使用range在容器中创建view
  • c# 动态lambda实现二级过滤(多种参数类型)
  • 『VUE』21. 组件注册(详细图文注释)
  • Kubernetes时代的APM部署革新:基于Webhook的Agent动态注入
  • docker镜像文件导出导入
  • GPU服务器厂家:AI 赋能科学研究的创新突破
  • 1.每日SQL----2024/11/7
  • 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-30