当前位置: 首页 > article >正文

Java并发编程——线程池(基础,使用,拒绝策略,命名,提交方式,状态)

文章目录

    • 线程池🏊
      • 线程池的好处👍
      • 线程池的创建🏗️
      • 线程池(ThreadPoolExecutor)常见参数🔢
      • 处理任务流程🔃
      • 拒绝策略⭐
        • 使用数据库任务表来自定义拒绝策略
      • 线程池中两种提交方式
      • 线程池命名♂️♀️
      • 线程池状态

线程池🏊

线程池是多线程应用中的一种资源管理技术,它旨在减少创建和销毁线程所带来的开销,并且通过复用已存在的线程来执行任务,提高响应速度。线程池提供了一种限制和管理资源(包括执行一个任务的线程)的方法。

线程池的好处👍

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的创建🏗️

  1. 使用ThreadPoolExecutor构造函数来创建自定义线程池(建议这种方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险)
  2. 通过使用 java.util.concurrent.Executors 工厂类创建

常用的线程池:

Executors 返回线程池对象特点
FixedThreadPool创建一个固定大小的线程池。如果所有线程都处于活动状态,新任务将在队列中等待,直到有线程可用。
CachedThreadPool创建一个可根据需要创建新线程的线程池,但在前一次构造的线程可用时将重用它们。适用于执行大量短期异步任务的应用程序。
SingleThreadExecutor创建一个单线程化的 Executor,它会确保所有任务都在同一个线程中按顺序执行。
ScheduledThreadPool创建一个支持定时及周期性的任务执行的线程池,类似于 Timer
WorkStealingPool创建一个具有多个任务队列的工作窃取线程池,适用于处理大量可并行的任务。

线程池对象的构造方法:

// LinkedBlockingQueue有界队列
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

// LinkedBlockingQueue 无界队列
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

// SynchronousQueue同步队列
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

// DelayedWorkQueue(延迟阻塞队列)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    ...
}

线程池(ThreadPoolExecutor)常见参数🔢

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

参数解释

参数解释
corePoolSize线程池的核心线程数量:任务队列未达到队列容量时,最大可以同时运行的线程数量。
maximumPoolSize线程池的最大线程数:任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
keepAliveTime当线程数大于核心线程数时,多余的空闲线程存活的最长时间
unit时间单位,使用java.util.concurrent.TimeUnit枚举值来指定时间单位
workQueue任务队列,用来储存等待执行任务的队列
threadFactory线程工厂,用来创建线程,一般默认即可
handler拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务(常见的拒绝策略包括抛出异常、丢弃任务、执行者自身运行任务等。)

工作队列(workQueue):用于存放待执行的任务的阻塞队列。可以使用BlockingQueue接口的任何实现,常见的有:

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个基于链表结构的有界阻塞队列。如果构造时未指定容量,则默认容量为Integer.MAX_VALUE,即视为无界队列
  • SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等待另一个线程的对应移除操作。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • **LinkedBlockingDeque:**一个由链表结构组成的双端阻塞队列。

img

处理任务流程🔃

ThreadPoolExecutor中最关键的execute源码:

public void execute(Runnable command) {
    // 先检查传入的command是否为空,若空则报错
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get(); // 原子操作,返回线程池状态和线程计数的控制字段值
    // 1. 如果线程数少于corePoolSize,调用addWorker创建核心线程数
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2. 如果线程数大于corePoolSize,将任务添加进workQueue队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 2.1 如果检查isRunning的状态为false,则remove这个任务,然后执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 2.2 线程池处于running状态,但是没有线程,则创建线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.如果放入workQueue失败,则创建非核心线程执行任务,如果创建失败(当前线程总数不小于maximumPoolSize),就会拒绝
    else if (!addWorker(command, false))
        reject(command);
}

img

拒绝策略⭐

在Java的ThreadPoolExecutor中,当线程池无法处理新提交的任务时(例如,线程池已关闭或者队列已满),可以配置不同的拒绝策略来决定如何处理这些任务。

拒绝策略详细
ThreadPoolExecutor.AbortPolicy**默认的拒绝策略。**抛出 RejectedExecutionException来拒绝新任务的处理。
ThreadPoolExecutor.CallerRunsPolicy如果使用这种策略,那么当任务被拒绝时,该任务将在调用者的线程中执行,即直接在调用execute方法的线程中运行这个任务。这可能会降低任务提交线程的速度,从而减缓任务提交的速度,给线程池一些时间来处理队列中的任务。
ThreadPoolExecutor.DiscardPolicy该策略会悄悄地丢弃无法处理的任务,既不会抛出异常也不会通知调用者。因此,使用此策略时需要注意可能丢失任务的情况。
ThreadPoolExecutor.DiscardOldestPolicy此策略会丢弃队列中最旧的一个未处理任务,并尝试重新提交当前任务。这意味着最老的任务将从队列中移除,然后尝试将当前任务添加到队列中进行处理。
自定义拒绝策略通过实现接口可以自定义任务拒绝策略。

在日常开发中,我们不希望任务被丢弃,我们就会使用CallerRunsPolicy拒绝策略。

// 被拒绝任务的处理程序,直接在execute方法的调用线程中运行被拒绝的任务,除非执行器已关闭,在这种情况下,任务将被丢弃。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     *在调用者的线程中执行任务r,除非执行器已关闭,在这种情况下,任务将被丢弃
     * @param r 请求执行的可运行任务
     * @param e 试图执行此任务的执行器
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

又源代码可知:只要当前程序不关闭就会使用执行execute方法的线程执行该任务。

但是会有一定风险

如果走到CallerRunsPolicy的任务是个非常耗时的任务,且处理提交任务的线程是主线程,可能会导致主线程阻塞,影响程序的正常运行。也可能会造成死锁(如果调用者线程正在等待线程池中的某个任务完成,而这个任务又依赖于调用者线程能够继续运行,那么就会形成循环依赖,进而导致死锁。)

解决方案:

  1. 继续采用CallerRunsPolicy拒绝策略

  2. 在内存允许的情况下,我们可以增加阻塞队列BlockingQueue的大小并调整堆内存以容纳更多的任务,确保任务能够被准确执行。

    1. 1为了充分利用 CPU,我们还可以调整线程池的maximumPoolSize (最大线程数)参数,这样可以提高任务处理速度,避免累计在 BlockingQueue的任务过多导致内存用完。
  3. 持久化思路(自定义)

    1. 设计一个数据库表来存储到数据库中

    2. 使用Redis缓存

    3. 提交到消息队列

使用数据库任务表来自定义拒绝策略
  1. 实现RejectedExecutionHandler接口自定义拒绝策略,自定义拒绝策略负责将线程池暂时无法处理(此时阻塞队列已满)的任务入库(保存到 MySQL 中)。注意:线程池暂时无法处理的任务会先被放在阻塞队列中,阻塞队列满了才会触发拒绝策略。
public class DatabaseRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 保存任务到数据库的方法
        saveTaskToDatabase(r);
    }

    private void saveTaskToDatabase(Runnable task) {
        // 实现保存任务到数据库的逻辑
        // 可以使用JDBC或者其他ORM框架如MyBatis、Hibernate等
    }
}
  1. 继承BlockingQueue实现一个混合式阻塞队列,该队列包含 JDK 自带的ArrayBlockingQueue。另外,该混合式阻塞队列需要修改取任务处理的逻辑,也就是重写take()方法,取任务时优先从数据库中读取最早的任务,数据库中无任务时再从 ArrayBlockingQueue中去取任务。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class HybridBlockingQueue<E> implements BlockingQueue<E> {

    private final ArrayBlockingQueue<E> queue;

    public HybridBlockingQueue(int capacity) {
        this.queue = new ArrayBlockingQueue<>(capacity);
    }

    // 其他BlockingQueue接口方法的实现...

    @Override
    public E take() throws InterruptedException {
        // 尝试从数据库中取任务
        E taskFromDb = takeFromDatabase();
        if (taskFromDb != null) {
            return taskFromDb;
        } else {
            // 如果数据库中没有任务,则从queue中取
            return queue.take();
        }
    }

    private E takeFromDatabase() {
        // 实现从数据库中取任务的逻辑
        // 注意这里需要根据你的E类型来确定如何反序列化或转换成任务对象
        return null; // 返回null表示数据库中没有任务
    }

    @Override
    public boolean offer(E e) {
        return queue.offer(e);
    }

    @Override
    public int remainingCapacity() {
        return queue.remainingCapacity();
    }

    // ...其他必须实现的方法,例如put(), poll(), etc.
}

线程池中两种提交方式

  1. execute`方法

void execute(Runnable command): Executor接口中的方

  1. submit方法

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

两个方法区别:

  1. 异常处理

  2. 使用execute()时,未捕获异常导致线程终止,线程池创建新线程替代

  3. 使用submit()时,异常被封装在Future中,线程继续复用。(更加灵活的错误处理机制,允许调用者决定如何处理异常)

  4. 接受参数

  5. 返回值

线程池命名♂️♀️

给线程池命名可以帮助你在调试和监控多线程应用程序时更容易识别不同的线程池。默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

  1. 使用ThreadFactory来创建一个带有自定义名称的线程池:
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(String name) {
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

线程池状态

线程池有5种状态:

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
线程池状态详细介绍
RUNNING表示线程池处于正常运行状态,可以接受新的任务并处理已提交的任务。
SHUTDOWN表示线程池不再接受新任务,但会继续执行已经提交的任务直到所有任务完成。通过调用 shutdown() 方法进入此状态。
STOP表示线程池不再接受新任务,并尝试停止正在执行的所有任务。通过调用 shutdownNow() 方法进入此状态。
TIDYING表示所有任务都已完成,工作线程数量为零,即将进入 TERMINATED 状态。这是一个短暂的过渡状态,在这个状态下,terminated() 钩子方法会被调用。
TERMINATED表示线程池完全终止,所有的任务都已完成并且所有的工作线程都已被销毁。

shutdownNow为STOP,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,但是这种方法的作用有限,如果线程中没有sleep、wait、Condition、定时锁等应用,interrupt()方法是无法中断当前的线程的。所以,shutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。


http://www.kler.cn/a/505759.html

相关文章:

  • 内联变量(inline variables):在多个文件中共享全局常量
  • springCloudGateway+nacos自定义负载均衡-通过IP隔离开发环境
  • Python 实现 NLP 的完整流程
  • Sprint Boot教程之五十八:动态启动/停止 Kafka 监听器
  • 【算法学习笔记】31:试除法分解质因数及求解欧拉函数
  • 实现nacos配置修改后无需重启服务--使用@RefreshScope注解
  • Mybatis-底层是如何解决sql注入增删改查操作--删除操作
  • VUE请求返回二进制文件流下载文件例子
  • doc、pdf转markdown
  • STM32H7通过CUBEMX初始化移植LWIP,DHCP建立RAW TCP服务器,不停发成功
  • Spring MVC复杂数据绑定-绑定集合
  • VUE3 + Ant Design Vue4 开发笔记
  • MySQL表的增删改查(进阶)-下篇
  • 【Qt】QThread总结
  • flutter R库对图片资源进行自动管理
  • c#删除文件和目录到回收站
  • 【Linux系统编程】—— 自动化构建工具Makefile指南
  • rtthread学习笔记系列(3) -- FINSH模块
  • 寄存器 reg
  • 【学习笔记】GitLab 使用技巧和说明和配置和使用方法
  • [操作系统] 深入理解约翰·冯·诺伊曼体系
  • DNS介绍(1):基本概念
  • 如何确保API调用安全
  • Flink (三):核心概念(并行度、算子链、任务槽)
  • 算法面试准备 - 手撕系列第一期 - Softmax
  • WPF-01理解XAML