当前位置: 首页 > article >正文

(五)Reactor核心-前置知识4

         本章是Reactor核心-前置知识(第四期),主要讲解线程池。本文章只适合有基础或从业人员进行学习。如果觉得文章有用,就点赞加藏关注支持一下吧。

        filter、map、flatMap等等中间操作:流的每一个元素都完整走完一个流水线,才会得到最终结果。当前中间操作执行完,才会执行下一个中间操作。基于事件机制回调

 复习上一章讲解了StreamAPI:

        流的三大部分:1、创建流        2、0-N个中间操作        3、一个终止操作

        流的特性:流式Lazy的,没用最终操作,中间操作的方法不会执行。

一、线程池定义

        线程池是一种管理线程的机制,它预先创建一定数量的线程,并将这些线程存储在一个 “池” 中。当有任务提交时,线程池会从池中取出一个空闲线程来执行该任务;任务执行完毕后,线程不会被销毁,而是返回到线程池中,等待下一个任务,实现线程的复用。

二、工作原理

  1. 任务提交:当有新的任务到达时,会将任务提交给线程池。

  2. 线程分配:线程池会根据自身的状态和配置来决定如何处理这个任务。

    •  如果线程池中的线程数量小于核心线程数,会创建一个新的线程来执行该任务。

    •  如果线程数量已经达到核心线程数,任务会被放入任务队列中等待执行。

    •  如果任务队列已满,且线程数量小于最大线程数,会创建新的线程来执行任务。

    •  如果线程数量已经达到最大线程数,任务队列也已满,此时会根据线程池的拒绝策略来处理新任务。

  3. 任务执行:线程从任务队列中取出任务并执行。

  4. 线程回收:任务执行完毕后,线程不会立即销毁,而是返回线程池等待下一个任务。

三、线程池的优势

  1. 降低资源消耗:通过复用线程,避免了频繁创建和销毁线程带来的开销,减少了系统资源的消耗。

  2. 提高响应速度:由于线程已经预先创建,当有任务提交时,无需等待线程的创建,能够立即开始执行任务,提高了系统的响应速度。

  3. 便于线程管理:线程池可以对线程进行统一的管理和监控,例如设置线程的最大数量、控制任务队列的大小等,有助于提高系统的稳定性和可维护性。

四、创建方式

        在 Java 中,可以使用 java.util.concurrent 包下的 Executors 类来创建不同类型的线程池,也可以直接使用 ThreadPoolExecutor 类进行自定义创建。

        1.固定大小线程池Executors.newFixedThreadPool(int nThreads) 创建一个固定大小的线程池,线程数量始终保持不变。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个固定大小为 3 的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskId + " is completed.");
            });
        }
        executor.shutdown();
    }
}

        2.单线程线程池Executors.newSingleThreadExecutor() 创建一个只有一个线程的线程池,任务会按照提交的顺序依次执行。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        // 创建一个单线程线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 3; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskId + " is completed.");
            });
        }
        executor.shutdown();
    }
}

        3.缓存线程池Executors.newCachedThreadPool() 创建一个可缓存的线程池,线程数量会根据任务的多少自动调整。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个缓存线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskId + " is completed.");
            });
        }
        executor.shutdown();
    }
}

        4.定时任务线程池Executors.newScheduledThreadPool(int corePoolSize) 创建一个可以执行定时任务和周期性任务的线程池。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个定时任务线程池,核心线程数为 2
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
        // 延迟 2 秒后执行任务
        executor.schedule(() -> {
            System.out.println("Task is running on thread " + Thread.currentThread().getName());
        }, 2, TimeUnit.SECONDS);
        // 延迟 1 秒后开始执行任务,之后每隔 3 秒执行一次
        executor.scheduleAtFixedRate(() -> {
            System.out.println("Periodic task is running on thread " + Thread.currentThread().getName());
        }, 1, 3, TimeUnit.SECONDS);
    }
}

5.使用 ThreadPoolExecutor 自定义创建

import java.util.concurrent.*;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个自定义的线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // 核心线程数
                5, // 最大线程数
                60, // 线程空闲时间
                TimeUnit.SECONDS, // 时间单位
                new LinkedBlockingQueue<>(10) // 任务队列
        );
        for (int i = 0; i < 15; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskId + " is completed.");
            });
        }
        executor.shutdown();
    }
}

注意事项:

  1. 避免使用 Executors 创建线程池Executors 提供的一些创建线程池的方法可能会导致内存溢出等问题,例如 newFixedThreadPool 和 newSingleThreadExecutor 使用的是无界队列,可能会导致任务堆积;newCachedThreadPool 允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,耗尽系统资源。建议使用 ThreadPoolExecutor 进行自定义创建。

  2. 合理配置线程池参数:根据系统的实际情况,合理设置核心线程数、最大线程数、任务队列大小等参数,以充分发挥线程池的性能。

  3. 正确关闭线程池:使用完线程池后,需要调用 shutdown() 或 shutdownNow() 方法来关闭线程池,避免资源泄漏。

四、流与线程

1.流是并发还是不并发?和for循环有什么区别?

        答:流默认不并发,和for循环一样挨个处理数据。也可以并发,使用paraller创建并发流,不能保证执行顺序,但执行结果一致。并发以后自行解决多线程安全问题,加锁等等。有状态数据会产生并发安全问题,千万不要这么写。流的所有操作都是无状态的,数据状态仅在此函数内有效,不溢出至函数外。流的数据受函数外变量影响,则会引起线程安全问题,流的所有操作都应该是独立的。

2.将集合转换为流进行处理,会改变原集合吗?

        答:拿到集合流,其实就是拿到集合深拷贝的值,流的所有操作都是流元素的引用,不影响原有集合的值。

3.线程是越多越好,还是越少越好?

        答:和CPU核心一样多最好,线程多了就会去抢时间片、资源,线程少的话,处理任务可能就不够。100个线程:一个CPU核心排了很多线程,线程就要切换,切换保留线程(浪费内存,浪费时间)。线程越多,线程之间的竞争越激烈。

思路:让少量的线程一直忙,而不是大量的线程一直切换等待。在工作中不会一个员工分配一件事,100件事分配给100个员工,开销是很大的。

什么问题都可以评论区留言,看见都会回复的

如果你觉得本篇文章对你有所帮助的,把“文章有帮助的”打在评论区

多多支持吧!!!

点赞加藏评论,是对小编莫大的肯定。抱拳了!


http://www.kler.cn/a/592150.html

相关文章:

  • (六)Reactive-Stream 响应式流
  • 霍尔传感器与电流互感器的区别
  • 男女搭配(数学思维)
  • 如何实现一个bind函数?
  • electron桌面应用多种快速创建方法
  • PyTorch入门指南:环境配置与张量初探
  • 3.19学习总结 题+java面向对象
  • 程序化广告行业(28/89):基于用户旅程的广告策略解析
  • 第三:go 操作mysql
  • 前端iView面试题及参考答案
  • PMP项目管理—相关方管理篇—补充内容
  • 【系统架构设计师】操作系统 - 特殊操作系统 ③ ( 微内核操作系统 | 单体内核 操作系统 | 内核态 | 用户态 | 单体内核 与 微内核 对比 )
  • k8s学习记录(三):Pod基础-Node选择
  • python系列之元组(Tuple)
  • MySQL配置文件my.cnf详解
  • Java 代码优化技巧:场景与实践
  • 【HarmonyOS Next】鸿蒙中App、HAP、HAR、HSP概念详解
  • 2025年智能系统、自动化与控制国际学术会议(ISAC 2025)
  • 云原生边缘计算:分布式智能的时代黎明
  • 抖音碰一碰发视频系统源码搭建全攻略-碰一碰拓客系统oem搭建