当前位置: 首页 > article >正文

【Java】并发编程实战:单例模式 + 阻塞队列的终极实现指南

各位看官,大家早安午安晚安呀~~~

如果您觉得这篇文章对您有帮助的话

欢迎您一键三连,小编尽全力做到更好
欢迎您分享给更多人哦

今天我们来学习【Java】并发编程实战:单例模式 + 阻塞队列的终极实现指南

目录

1.单例模式

1.1饿汉模式

1.2.懒汉模式

1.3.饿汉模式和懒汉模式有一个是线程不安全的

1.3.1.那我们把new这个操作变成原子的可以了吗?

1.3.2.所以说这个锁的加法很有说法的!

1.3.3.那我们能不能既可以让线程安全,又不会对效率有太大的影响呢(只加第一次的锁)?

1.3.4.指令重排序问题

2.阻塞队列

2.1解耦合

2.2:削峰填谷

2.3.阻塞队列实现

2.4:模拟实现(用循环数组实现)

2.5:解决问题

2.6.验证我的阻塞队列,实现一个简单的生产者消费者模型

1.单例模式

先回答问题:

提问:

单例模式是什么呢?我们在某一个特定的场景下只能new一个对象。

譬如代码里面用来管理对象就应该是单例的

像:MySQL中的JDBC编程中的DataSource(描述了mysql服务器的位置,这个服务器的描述信息就应该是单例的)

那我就要问了,那我new一次对象不就好了,还整个这个模式干嘛呢?

这个不靠谱呀,一不小心我们new多个对象了不就完蛋了?所以这个时候我们就需要一个模式让编译器帮我们做检查

就像final关键字(我们不能修改这个变量,不然编译器就会报错)同理:interface,@Override,throws也是相同的道理

但是:java在语法层面上没有对单例模式作出支持,所以我们只能通过一些编程技巧来实现类似的效果

1.1饿汉模式

第一种方式(饿汉模式):先类里面创建出有一个实例,然后把构造方法给隐藏起来(你就new不了了,哈哈)

class Singleton {
    private static Singleton instance = new Singleton();
  //静态成员变量,类加载的时候就被创建好了
    public static Singleton getInstance(){  // 一定要是静态方法,不然一开始别人都拿不到这个对象,又没办法调用这个方法,岂不是贻笑大方
        return instance;
    }
    private Singleton(){} // 啥也不用干,也干了呀,把构造方法给藏起来了

}
public class SingletonDemo{
    public static void main(String[] args) {
       // Singleton singleton = new Singleton();
  // 直接报错,编译器帮我们检查
        Singleton singleton1 = Singleton.getInstance();  // 都通过getInstance创建实例
        Singleton singleton2 = Singleton.getInstance();
        System.out.println(singleton1 == singleton2); // true,俩对象引用都一致
    }
}

这种方式也叫做饿汉模式,(这么急切)类一加载对象就被创建好了(创建的时机比较早)

那我们能不能不先创建呢?我想用的时候再去创建,当然可以,这个就叫做懒汉模式

1.2.懒汉模式

class SingletonLazy{
    private static SingletonLazy instance = null; // 先置为null
    public static SingletonLazy getInstance(){
        if(instance == null){  // 如果不为空直接返回instance
            instance = new SingletonLazy(); //  为null 再new
        }
        return instance;
    }
    private SingletonLazy(){} // 私有构造方法
}

1.3.饿汉模式和懒汉模式有一个是线程不安全的

我们第一次说的线程不安全的时候是在count++的问题上面,这个count++这个操作再cpu上面是分三步进行的(两个线程同时修改一个变量就很容易出现线程安全问题)(非原子当时加锁解决的,当然还有其他解决办法)。

解释:

1.饿汉模式:(我们在类加载的时候就把对象创建好了,大家调用getInstance()就只是涉及到读操作)

2.懒汉模式:我们需要的时候才去创建这个对象,这个时候就会涉及到两个线程同时修改一个变量。

并且new对象的操作也不是原子的

1.申请内存空间

2.在内存上面构造对象

3.把内存的地址赋值给instance引用(一共这三步)

1.3.1.那我们把new这个操作变成原子的可以了吗?
class SingletonLazy{
    private static SingletonLazy instance = null; // 先置为null
    public static SingletonLazy getInstance(){
        if(instance == null){  // 如果不为空直接返回instance
            synchronized(SingletonLazy.class){
                instance = new SingletonLazy(); // 把这一步变成原子的
            }
        }
        return instance;
    }
    private SingletonLazy(){} // 私有构造方法
}

现在确实new对象的时候不会被穿插了,但是这个锁的位置不对

我们判断这个引用是否为null的时候也被穿插的了呢。另一个线程这个时候也来判断这个引用是否为null。好了这两个线程判断的都是null,又创建了两个对象(图解如下)

总结:​​​​​​加锁的关键是:

不是锁一个操作在cpu上是多个指令就只把这个操作锁起来,而是要综合代码仔细分析,看看哪些操作是要一起被锁起来的

特别是这种判断语句:
这个地方最容易出问题     (小编也是写博客的时候才意识到)
(好开心哈哈)

1.3.2.所以说这个锁的加法很有说法的!

我们这个时候就可以把这个判断语句一起加上锁了

 public static SingletonLazy getInstance(){
        synchronized (SingletonLazy.class){
            if(instance == null){
                instance = new SingletonLazy();
            }
        }
         return instance;
    }

现在是解决了刚才的问题但是,又引发了新的问题

一旦我们这样写:后续每次调用getInstance()都会进行加锁,但是懒汉模式就只有在最开始的时候(后面都是读的操作了,线程就安全了)

两个很大的问题:

1.我们一直加锁,加锁就会涉及到锁冲突,然后在阻塞等待(鬼知道你下次上cpu是啥时候,时间相对是很大被浪费了)

2.一个代码一旦涉及到加锁(那么这个代码基本就和高性能无缘了,而且我们还是每一次都加锁)

1.3.3.那我们能不能既可以让线程安全,又不会对效率有太大的影响呢(只加第一次的锁)?

当然可以!!!:我们现在无非就是担心后面每次getInstance都要加锁,那我们再加一层if(instance == null )不就行了?后面就不需要进入到加锁的那一步了

 public static SingletonLazy getInstance(){
        if(instance == null){
            synchronized (SingletonLazy.class){
                if(instance == null){
                    instance = new SingletonLazy();
                }
            }
        }
        return instance;
    }

我们的代码看似已经很完美了

但是但是但是(还是有问题)!!!

1.3.4.指令重排序问题

指令重排序也会对上述的代码产生影响

(也是编译器为了执行效率,帮助我们调整代码的执行顺序,调整的前提是保证代码的逻辑不变)

上一次编译器是帮助我们把一些要读内存的操作优化成读寄存器,我们用volatile就能解决这个问题,我们这一次也是用volatile解决这个问题。我给大家分析一下。

针对这个情况:我们给instance加上volatile就可以了(告诉编译器你不用帮我优化(指令重排序)),完结撒花!!!

正确的完整的代码如下

class SingletonLazy{
    private static volatile SingletonLazy instance = null; // 先置为null
    public static SingletonLazy getInstance(){
        if(instance == null){
            synchronized (SingletonLazy.class){
                if(instance == null){
                    instance = new SingletonLazy();
                }
            }
        }
        return instance;
    }
    private SingletonLazy(){} // 私有构造方法
}

三个注意点:

1.加锁的位置

2.第二层if(我们只需要第一次new的时候加锁)

3.指令重新排序问题

但是但是但是但是!!!哈哈哈,还是有一点小问题:

这个单例模式可以被反射打破,或者序列化/反序列化打破(不过这个小编也不太懂,后续小编懂了再讲解吧~)

2.阻塞队列

我们接下来来讲解一下阻塞队列吧~~那什么是阻塞队列呢?

阻塞队列是一种特殊的队列(线程安全的队列)

1.如果队列为,继续出队列,就会发生阻塞,阻塞到其他线程往队列里面添加元素为止

2.如果队列为,继续入队列,就会发生阻塞,阻塞到其他线程从队列里面取出元素为止

这个队列最大的用处:用来实现“生产者消费者模型”(一种常见的多线程代码编写方式

那生产者消费者模型有什么意义呢?

1.解耦合

2.削峰填谷:阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力

在 "秒杀" 场景下,消费者服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求, 服务器可能扛不住,这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求

2.1解耦合

两张图

2.2:削峰填谷

2.3.阻塞队列实现

在java标准库里面已经提供了现成的阻塞队列,我们先学习一下怎么使用

解释:

我们可以看到这个阻塞队列继承了我们的队列接口所以说Queue这里提供的各种方法对于BlockingQueue也是可以使用的,但是我们尽量还是不要去使用Queue里面的方法(因为是线程不安全的)

BlockingQueue的两个主要方法:

1.put方法:阻塞入队列(如果队列满了,只有等其他线程take走了一个元素,才能够放put元素)

2.take方法:阻塞出队列(如果队列为空,只有等其他线程put了一个元素,才能继续take元素)

众所周知:队列有用数组实现的(一般是环形队列),也有用链表实现的

自然我们的阻塞队列也是这个样子。

接下来我们先学习一下怎么使用的吧,然后我们在进行模拟实现

public static void main1(String[] args) throws InterruptedException {
       // BlockingDeque<String> queue1 = new ArrayBlockingQueue<>(); // 数组实现的阻塞队列的
        BlockingQeque<String> queue = new LinkedBlockingQeque<>();
        queue.put("111"); // 要抛出异常,一般带阻塞的都会抛出InterruptException
        queue.put("222");
        queue.put("333");
        queue.put("444");// 放进去四个元素
        
        String s = queue.take();   // 取出元素
        System.out.println(s);
        
        s = queue.take();
        System.out.println(s);
        s = queue.take();
        System.out.println(s);
        s = queue.take();
        System.out.println(s);
        s = queue.take();
        System.out.println(s);//我们发现这个时候已经阻塞了,队列里面没有元素了

    }

2.4:模拟实现(用循环数组实现)

这里还是我们以前的问题,如果head和tail重合了,那么这个队列到底是满了还是空的?

两个解决办法:

1.浪费一个数组格子,(tail +1) % data.length = head(这个时候就是满了)。head = tail这个时候就是空的

2.专门搞一个size(数组有效元素个数),如果size = data.length(这个时候就是满的)。如果 size = 0(这个时候就是空的)

我用第二种方式给大家先写一个正常循环队列

   class MyBlockingQueue{
    String[] data = new String[1000];
    private int head; // 头
    private int tail;  // 尾巴进头出
    private int size;// 有效元素个数

 // 两个主要的方法;
    public void put(String elem){
        if(size == data.length){  // 有效元素个数 = 数组长度
            return;
        }
        data[tail] = elem;  // 别搞elem[tail] = elem ,都混乱了

        tail++;
        // 数组比实际元素个数 -1; 下标为1000时说明刚好转了一圈回来了,同时下标置0
        if(tail == data.length){
            tail = 0;
        }
        size++; // 有效元素++
    }
    public String take(){
        if(size == 0){
            return null;
        }
        // 队列不为空,就把对首元素删除,先保存一下
        String ret = data[head];
        head++;
        if(head == data.length){
            head = 0;
        }
        size--;
        return ret;
    }

}

这个时候我们在这个循环队列的上进行线程安全的改进:

第一个改进:

我们可以看到,我们这里的take和put操作几乎每一步都涉及到变量的修改或者条件判断(上一篇博客我们刚讲过条件判断这里最容易出现内存可见性问题(从读内存变成读寄存器),我们索性直接给两个方法都给加上锁。(并且给head,tail,size都加上volatile(因为他们每一个都涉及到了条件判断))

第二个改进:

put方法:阻塞入队列(如果队列满了,只有等其他线程take走了一个元素才能够放put元素)(take走元素的时候我们就可以通知另一个线程可以put元素了)

take方法:阻塞出队列(如果队列为空,只有等其他线程put了一个元素才能继续take元素)(put进去元素的时候我们就可以通知另一个线程可以take元素了)

代码如下:(小编接下来有两个问题,这个代码还是不很完善,需要把问题解决)

class MyBlockingQueue{
    String[] data = new String[1000];
    volatile private int head; // 头
    volatile private int tail;  // 尾巴进头出
    volatile private int size;// 有效元素个数
    //take 和 put方法几乎每一步都涉及到修改值,而且还有判断(这个最容易)被
    synchronized public void put(String elem) throws InterruptedException {  // 扔出异常还是???
        if(size == data.length){  // 有效元素个数 = 数组长度
            this.wait();  // 队列满了,等待别的线程取走元素,然后再put元素
        }
        data[tail] = elem;  // 别搞elem[tail] = elem ,都混乱了

        tail++;
        // 数组比实际元素个数 -1; 下标为1000时说明刚好转了一圈回来了,同时下标置0
        if(tail == data.length){
            tail = 0;
        }
        this.notify(); // 提醒另一个线程我们已经放进去元素了

        size++; // 有效元素++
    }

       synchronized  public String take() throws InterruptedException {
        if(size == 0){
            this.wait();
        }
        // 队列不为空,就把对首元素删除,先保存一下
        String ret = data[head];
        head++;
        if(head == data.length){
            head = 0;
        }
        size--;
        this.notify(); // 提醒另一个线程我们已经拿走一个元素了
        return ret;
    }
   
}

这里小编有两个问题,写这个代码的时候不是很明白?

问题一:为什么put和take要用同一把锁?

问题二:我wait的时候抛出的异常是应该try-catch还是直接抛出去呢?

2.5:解决问题

解决问题一:

我不会

解决问题二:

这个有一个很大问题,我们wait的时候不仅仅可以被notify唤醒还可以被interrupt唤醒

我们队列满的时候还要put时需要wait,interrupt唤醒时会抛出异常(这个时候我们这个线程就以为是notify唤醒的)这个时候就出事了。

如图:

所以我们在被唤醒的时候要多加一步看看是否是因为interrupt导致的唤醒(如果是那就继续wait)。那又被唤醒又要检查,再wait(套娃了哈哈哈哈)

所以我们直接循环一下就好了


        while(size == data.length){  // 被唤醒检查一下还是满的话就不是notify唤醒的
            this.wait();  // 队列满了,等待别的线程取走元素,然后再put元素
        }
        

所以说如果是我们抛出异常的话程序就终止了,其实没问题,但是try-catch我们处理不当的话就容易出问题(保险起见我们还是while检查)(这个操作是java标准文档建议的哈哈哈)

OK!!!最后的终极代码来了!!!

class MyBlockingQueue{
    String[] data = new String[1000];
    volatile private int head; // 头  volatile
    volatile private int tail;  // 尾巴进头出
    volatile private int size;// 有效元素个数
    //take 和 put方法几乎每一步都涉及到修改值,而且还有判断(这个最容易)被
    synchronized public void put(String elem) throws InterruptedException {  // 扔出异常还是???
        while(size == data.length){  // 有效元素个数 = 数组长度
            this.wait();  // 队列满了,等待别的线程取走元素,然后再put元素
        }
        data[tail] = elem;

        tail++;
        // 数组比实际元素个数 -1; 下标为1000时说明刚好转了一圈回来了,同时下标置0
        if(tail == data.length){
            tail = 0;
        }
        this.notify(); //唤醒(小编解释了在上文中)

        size++; // 有效元素++
    }

       synchronized  public String take() throws InterruptedException {
        while (size == 0){
            this.wait();
        }
        // 队列不为空,就把对首元素删除,先保存一下
        String ret = data[head];
         head++;
        if(head == data.length){
            head = 0;
        }
        size--;
        this.notify(); 
        return ret;
    }

}

实话说,问了ai我第一个问题还是没搞懂,唉。有点累,这个问题明明问问其他同学或者老师吧。真的有点累。

没事还有一个使用我这个阻塞队列。

2.6.验证我的阻塞队列,实现一个简单的生产者消费者模型

class MyBlockingQueue{
    String[] data = new String[1000];
    volatile private int head; // 头  volatile
    volatile private int tail;  // 尾巴进头出
    volatile private int size;// 有效元素个数
    //take 和 put方法几乎每一步都涉及到修改值,而且还有判断(这个最容易)被
    synchronized public void put(String elem) throws InterruptedException {  // 扔出异常还是???
        while(size == data.length){  // 有效元素个数 = 数组长度
            this.wait();  // 队列满了,等待别的线程取走元素,然后再put元素
        }
        data[tail] = elem;

        tail++;
        // 数组比实际元素个数 -1; 下标为1000时说明刚好转了一圈回来了,同时下标置0
        if(tail == data.length){
            tail = 0;
        }
        this.notify(); //唤醒(小编解释了在上文中)

        size++; // 有效元素++
    }

       synchronized  public String take() throws InterruptedException {
        while (size == 0){
            this.wait();
        }
        // 队列不为空,就把对首元素删除,先保存一下
        String ret = data[head];
        head++;
        if(head == data.length){
            head = 0;
        }
        size--;
        this.notify();
        return ret;
    }

}

public class Demo1 {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread t1 = new Thread(() -> {
            int count = 0;
            while (true) {
                try {
                    Thread.sleep(500);
                    queue.put(" " + count);  // 慢点生产
                    System.out.println(" put " + count);
                    count++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread t2 = new Thread(() -> {
            while (true) {
                try {
                    String s = queue.take();
                    System.out.println(" take" + s);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        t1.start();
        t2.start();

    }

结果:生产多少消费多少。

如果代码这样改写(生产的快,消费的慢)

    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread t1 = new Thread(() -> {
            int count = 0;
            while (true) {
                try {
                    queue.put(" " + count);
                    System.out.println(" put " + count);
                    count++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread t2 = new Thread(() -> {
            while (true) {
                try {
                    String s = queue.take();
                    System.out.println(" take" + s);
                    Thread.sleep(500);  //  消费慢一点
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        t1.start();
        t2.start();
    }

结果:

小编这里还有一个问题

2.4的问题一:为什么put和take要用同一把锁?

小编一直没懂,大家一起来讨论呀

上述就是【Java】并发编程精要:单例模式 + 阻塞队列的终极实现指南

的全部内容了。单例模式与阻塞队列是构建高并发系统的两大核心设计利器,对于我们解决高并发问题提供了很好的思路~~~预知后事如何,请听下回分解~~~

能看到这里相信您一定对小编的文章有了一定的认可。

有什么问题欢迎各位大佬指出
欢迎各位大佬评论区留言修正~~

您的支持就是我最大的动力​​​!!!

最后记得和小编一起解决问题呀


http://www.kler.cn/a/583157.html

相关文章:

  • 成为git砖家(9): rebase进阶: 拆分commit为多个
  • 29.Harmonyos Next仿uv-ui 组件NumberBox 步进器组件自定义图标
  • 如何绕过 reCAPTCHA V2/V3:Python、Selenium 与其他工具的实战指南
  • 智慧城市时空基础设施建设与应用实践
  • MySQL——基础知识
  • Tensorflow 2.0 GPU的使用与限制使用率及虚拟多GPU
  • 网络爬虫-2:正则化
  • 大语言模型-01-语言模型发展历程-03-预训练语言模型到大语言模型
  • 两会期间的科技强音:DeepSeek技术引领人工智能新篇章
  • Node.js:快速启动你的第一个Web服务器
  • 无人机快速发展,无人机反制如何应对?
  • 第44天:WEB攻防-PHP应用SQL盲注布尔回显延时判断报错处理增删改查方式
  • 【写作模板】JosieBook的写作模板
  • 使用CPR库编写的爬虫程序
  • 大语言模型微调和大语言模型应用区别
  • 机器学习常见激活函数
  • vscode出现:No module named ‘requests‘ 问题的解决方法
  • ubuntu 在VirtualBox 打不开终端
  • Oracle RAC环境下自动清理归档日志实战指南
  • Java静态变量与PHP静态变量的对比