当前位置: 首页 > article >正文

02_线程通信与线程池

线程通信

什么是线程通信

当多个线程共同操作共享的资源时,线程间通过某种方式互相告知自己的状态,以相互协调,并避免无效的资源争夺

线程通信的常见模型(生产者与消费者模型)
  • 生产者线程,负责生产数据
  • 消费者线程,负责消费生产者所生产的数据
  • 生产者生产完数据,应该等待自己,通知消费者消费;
  • 消费者消费完数据,也应该等待自己,通知生产者生产!
模型的实现
import java.util.ArrayList;
import java.util.List;

// 生产者消费者模型
// 需求:
// 3个生产者线程,负责生产包子,每个线程每次只能生产1个包子放在桌子上
// 2个消费者线程,负责吃掉包子,每个线程每次只能从桌子上拿1个包子吃
public class Test {
    public static void main(String[] args) {
        // 1. 创建一个桌子(注意,是只有一个!)
        Desk desk = new Desk();

        // 2. 创建3个生产者线程(3个厨师)
        new Thread(() -> {
            while (true) {
                desk.put();     // 这是厨师1抢到桌子后,需要做的事情
            }
        }, "厨师1").start();

        new Thread(() -> {
            while (true) {
                desk.put();     // 这是厨师2抢到桌子后,需要做的事情
            }
        }, "厨师2").start();

        new Thread(() -> {
            while (true) {
                desk.put();     // 这是厨师2抢到桌子后,需要做的事情
            }
        }, "厨师3").start();

        // 3. 创建2个消费者线程(2个客人)
        new Thread(() -> {
            while (true) {
                desk.get();     // 这是客人1抢到桌子后,需要做的事情
            }
        }, "客人1").start();

        new Thread(() -> {
            while (true) {
                desk.get();     // 这是客人2抢到桌子后,需要做的事情
            }
        }, "客人2").start();
    }
}

class Desk {
    private List<String> list = new ArrayList<>();
    //(本程序中的厨师1,厨师2,厨师3,客人1,客人2,这五个线程相互抢锁与陷入等待
    // 原因很简单,因为 this 就是它们的锁对象,而这个锁对象,正好是我们创建的一个桌子对象(我们就创建了一个桌子而已)
    // 所以不论哪个线程调用了桌子对象的什么方法,总之,先碰到桌子的人,就能先干自己的活

    // 放1个包子的方法
    public synchronized void put() {
        // 判断是否有包子
        try {
            String name = Thread.currentThread().getName();
            if (list.size() == 0) {
                Thread.sleep(2000);  // 模拟做包子花了2秒
                list.add(name + "做的那个包子");
                System.out.println(name + "做了一个包子");

                // 做完包子后,唤醒别人,自己沉睡
                this.notifyAll();
                this.wait();
            } else {
                // 检测到有包子,就不用做了
                // 直接唤醒别人,自己沉睡
                this.notifyAll();
                this.wait();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    // 拿1个包子的方法
    public synchronized void get() {
        try {
            String name = Thread.currentThread().getName();
            if (list.size() == 1) {
                // 有包子,直接开吃!
                Thread.sleep(1000);  // 模拟吃包子花了1秒
                System.out.println(name + "吃了" + list.get(0));
                list.clear();
                this.notifyAll();
                this.wait();
            } else {
                // 检测到没有包子,直接唤醒别人,自己沉睡
                this.notifyAll();
                this.wait();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

线程池

认识线程池
  • 线程池就是一个可以复用线程的技术

  • 不使用线程池的问题——用户每发起一个请求,后台就需要创建一个新线程来处理,下次新任务来了肯定又要创建新线程处理的,而创建新线程的开销是很大的,并且请求过多肯定会产生大量的线程出来,这样会严重影响系统的性能。

如何创建线程池

临时线程什么时候创建

  • 新任务提交时发现核心线程都在忙,任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程

什么时候会开始拒绝新任务

  • 核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会开始拒绝任务
import java.util.concurrent.*;

public class Test {
    public static void main(String[] args) {
        // 1. 通过 ThreadPoolExecutor 创建一个线程池对象
        /* public ThreadPoolExecutor(   int corePoolSize,  // 核心线程数量(核心员工)
                                        int maximumPoolSize,  // 最大线程数量(核心员工+临时员工)
                                        long keepAliveTime,  // 临时线程存活时间(临时员工辞退时间)
                                        TimeUnit unit,  // 设置存活时间的单位(配合上面一行的功能使用)
                                        BlockingQueue<Runnable> workQueue,  // 任务队列(客人座位的数量)
                                        ThreadFactory threadFactory,  // 线程工厂(招聘员工的hr)
                                        RejectedExecutionHandler handler)  // 任务拒绝策略(大家都在忙的时候的处理方法)
         */
        ExecutorService pool = new ThreadPoolExecutor(3, 5, 8, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    }
}
线程池处理 Runnable 任务
import java.util.concurrent.*;

public class Test {
    public static void main(String[] args) {
        // 1. 通过 ThreadPoolExecutor 创建一个线程池对象
        ExecutorService pool = new ThreadPoolExecutor(3, 5, 8, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());  // new ThreadPoolExecutor.AbortPolicy() 拒绝策略之一: 抛异常

        // 2. 创建任务对象
        Runnable target = new MyRunnable();

        // 3. 将任务提交到线程池中处理
        pool.execute(target);  // 线程池会自动创建一个新线程,自动处理这个任务,自动执行!
        pool.execute(target);
        pool.execute(target);  // 此行及其以上的两行,都是立刻被3个核心员工服务

        pool.execute(target);  // 剩下的4个任务,因为刚好设有4个座位,所以不会招募临时员工
        pool.execute(target);  // 因此,这4个任务只能默默地等待那3个正在忙碌的核心员工服务它们
        pool.execute(target);
        pool.execute(target);


//        pool.shutdown();  // 等着线程池的任务全部执行完毕后,再关闭线程池
//        pool.shutdownNow();  // 不管任务是否执行完毕,立即关闭线程池!
        // 补充:正常开发中,创建线程池的目的就是为了不断的服务别人
        // 所以我们不会停掉它的,所以上面两个命令用得比较少

    }
}

class MyRunnable implements Runnable {
    @Override
    public void run() {
        // 线程要执行的任务内容
        try {
            System.out.println(Thread.currentThread().getName() + " ===> 输出完毕!");
            Thread.sleep(999000);  // 线程暂停1000毫秒
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
线程池处理 Callable 任务
import java.util.concurrent.*;

public class Test {
    public static void main(String[] args) throws Exception {
        // 1. 通过 ThreadPoolExecutor 创建一个线程池对象
        ExecutorService pool = new ThreadPoolExecutor(3, 5, 8, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());  // new ThreadPoolExecutor.AbortPolicy() 拒绝策略之一: 抛异常

        // 2. 使用线程处理 Callable 任务
        Future<String> f1 = pool.submit(new MyCallable(100));
        Future<String> f2 = pool.submit(new MyCallable(200));
        Future<String> f3 = pool.submit(new MyCallable(300));
        Future<String> f4 = pool.submit(new MyCallable(400));

        // 3. 获取线程处理后的任务返回结果
        System.out.println(f1.get());
        System.out.println(f2.get());
        System.out.println(f3.get());
        System.out.println(f4.get());

    }
}

class MyCallable implements Callable<String> {
    private int n;

    public MyCallable(int n) {
        this.n = n;
    }

    @Override
    public String call() {
        // 线程要执行的任务内容
        int sum = 0;
        for (int i = 0; i < n; i++) {
            sum += i;
        }
        return Thread.currentThread().getName() + "计算出了1-" + n + "的和是 " + sum;

    }
}
Executors 工具类实现线程池

Executors 是一个线程池的工具类,提供了很多静态方法用于返回不同特点的线程池对象

import java.util.concurrent.*;

public class Test {
    public static void main(String[] args) throws Exception {
        // 1. 通过 Executors 创建一个线程对象
        ExecutorService pool = Executors.newFixedThreadPool(3);  // 3 : 三个核心线程可复用
        // 核心线程数量到达配置多少合适呢?
        // 计算密集型的任务:核心线程数量 = CPU的核数 + 1
        // IO密集型的任务:核心线程数量 = CPU的核数 * 2

        // 2. 使用线程处理 Callable 任务
        Future<String> f1 = pool.submit(new MyCallable(100));
        Future<String> f2 = pool.submit(new MyCallable(200));
        Future<String> f3 = pool.submit(new MyCallable(300));
        Future<String> f4 = pool.submit(new MyCallable(400));

        // 3. 获取线程处理后的任务返回结果
        System.out.println(f1.get());
        System.out.println(f2.get());
        System.out.println(f3.get());
        System.out.println(f4.get());

    }
}

class MyCallable implements Callable<String> {
    private int n;

    public MyCallable(int n) {
        this.n = n;
    }

    @Override
    public String call() {
        // 线程要执行的任务内容
        int sum = 0;
        for (int i = 0; i < n; i++) {
            sum += i;
        }
        return Thread.currentThread().getName() + "计算出了1-" + n + "的和是 " + sum;

    }
}

http://www.kler.cn/a/154937.html

相关文章:

  • Qt主线程把数据发给子线程,主线程会阻塞吗
  • 跟着尚硅谷学vue2—基础篇4.0
  • 从swagger直接转 vue的api
  • 【PowerHarmony】电鸿蒙学习记录-编写helloworld!
  • 【QT常用技术讲解】优化网络链接不上导致qt、qml界面卡顿的问题
  • 基于微信小程序的乡村研学游平台设计与实现,LW+源码+讲解
  • H265、VP9、AV1视频编码器性能对比
  • 22.Oracle中的临时表空间
  • 短线买入卖出有哪些交易技巧?
  • Sailfish OS 移动操作系统
  • C/C++内存管理(含C++中new和delete的使用)
  • 优化机器学习:解析数据归一化的重要性与应用
  • Git 合并冲突解决步骤
  • Simple_SSTI_1-WEB-bugku-解题步骤
  • 实时流式计算 kafkaStream
  • 第8关:定义一个名为PROC_AVGWEIGHT的有参数存储过程
  • ubuntu22.04离线手动安装openstack yoga和ceph quincy
  • 西南科技大学C++程序设计实验三(类与对象二)
  • hbase Master is initializing
  • 分布式事务有哪些解决方案?
  • 力扣labuladong一刷day24天
  • MySQL 教程 1.5
  • 同旺科技 USB TO SPI / I2C --- 调试W5500_Ping测试
  • 工业机器视觉megauging(向光有光)使用说明书(一,轻量级的visionpro)
  • java中@Async注解在CompletableFuture.runAsync里面使用没有生效的原因?
  • Java项目调用C/C++ SDK的方案汇总