当前位置: 首页 > article >正文

java 并发编程之线程池与自定义线程池介绍

Java并发编程实战:深入线程协作与线程池应用

引言:并发编程的双刃剑

在多核处理器普及的今天,Java并发编程已成为提升系统性能的关键技术。然而,线程协作与资源管理如同双刃剑:使用得当可显著提升吞吐量,处理不当则会导致死锁、资源耗尽等严重问题。本文将深入解析线程协作的核心机制,并通过完整案例演示如何构建高可靠的线程池系统。


第一部分:线程协作的艺术

1.1 经典协作机制的三要素

条件满足
条件不满足
线程获取对象锁
条件检查
执行业务逻辑
调用wait释放锁
进入等待队列
其他线程notify/notifyAll
唤醒等待线程
重新竞争锁

关键要点:

  1. 所有协作操作必须在同步代码块中执行
  2. while循环检查条件避免虚假唤醒
  3. notifyAll()优先于notify()使用

1.2 增强版生产者-消费者模型

class AdvancedBuffer {
    private final LinkedList<Integer> items = new LinkedList<>();
    private final int capacity;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public AdvancedBuffer(int capacity) {
        this.capacity = capacity;
    }

    public void produce(int value) throws InterruptedException {
        lock.lock();
        try {
            while (items.size() == capacity) {
                notFull.await();
            }
            items.addLast(value);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (items.isEmpty()) {
                notEmpty.await();
            }
            int value = items.removeFirst();
            notFull.signal();
            return value;
        } finally {
            lock.unlock();
        }
    }
}

改进点分析:

  • 使用显式锁(ReentrantLock)提升灵活性
  • 分离生产/消费等待条件(notFull/notEmpty)
  • 精准唤醒对应类型线程

第二部分:线程池深度解析

2.1 线程池工作原理全景图

任务处理流程
未满
已满
未满
已满
未满
已满
核心线程是否满?
提交任务
创建新线程执行
队列是否已满?
任务入队列
最大线程是否满?
创建临时线程
执行拒绝策略

2.2 线程池类型对比实践

在 Java 中,Executors 类提供了四种创建线程池的工厂方法,分别是 newCachedThreadPoolnewFixedThreadPoolnewScheduledThreadPoolnewSingleThreadExecutor。下面我们将对文档中的线程池类型对比实践部分进行完善,详细介绍这四种线程池的特点、使用示例以及潜在问题。

2.2 线程池类型对比实践

2.2.1 CachedThreadPool 隐患演示
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService dangerousPool = Executors.newCachedThreadPool();
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            dangerousPool.submit(() -> {
                try {
                    Thread.sleep(1000000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        // 将快速耗尽系统资源,最终导致 OOM
    }
}

特点

  • 线程数量不固定,可根据需要创建新线程。
  • 空闲线程会被保留 60 秒。
  • 适合执行大量短期异步任务。

潜在问题
如果任务数量过多且执行时间较长,会不断创建新线程,导致系统资源耗尽,最终引发 OutOfMemoryError

2.2.2 FixedThreadPool 实践
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        int threadCount = 5;
        ExecutorService fixedPool = Executors.newFixedThreadPool(threadCount);
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            fixedPool.submit(() -> {
                System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        fixedPool.shutdown();
    }
}

特点

  • 线程数量固定,由构造函数指定。
  • 当有新任务提交时,如果线程池中有空闲线程,则立即执行;否则,任务会被放入队列等待。
  • 适合执行长期稳定的任务。

潜在问题
如果任务执行时间过长,而线程数量固定,可能会导致队列积压,影响系统性能。

2.2.3 ScheduledThreadPool 最佳实践
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

        Runnable task = () -> System.out.println("Task is running at " + System.currentTimeMillis());

        // 固定频率执行(可能产生任务堆积)
        scheduler.scheduleAtFixedRate(task, 1, 5, TimeUnit.SECONDS);

        // 固定延迟执行(保证执行间隔)
        scheduler.scheduleWithFixedDelay(task, 1, 5, TimeUnit.SECONDS);
    }
}

特点

  • 支持定时和周期性任务执行。
  • 线程数量可根据需要指定。

潜在问题

  • scheduleAtFixedRate 方法可能会导致任务堆积,如果任务执行时间超过了指定的周期。
  • scheduleWithFixedDelay 方法保证任务执行间隔,但可能会导致任务执行时间不稳定。
2.2.4 SingleThreadExecutor 实践
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        ExecutorService singlePool = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            singlePool.submit(() -> {
                System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        singlePool.shutdown();
    }
}

特点

  • 只有一个线程执行任务。
  • 任务会按照提交顺序依次执行。
  • 适合需要顺序执行任务的场景。

潜在问题
如果任务执行时间过长,会导致后续任务等待时间过长,影响系统响应速度。


第三部分:自定义线程池实战

3.1 线程池参数黄金公式

N t h r e a d s = N c p u ∗ U c p u ∗ ( 1 + W / C ) N_{threads} = N_{cpu} * U_{cpu} * (1 + W/C) Nthreads=NcpuUcpu(1+W/C)

  • N c p u N_{cpu} Ncpu: CPU核心数(Runtime.getRuntime().availableProcessors())
  • U c p u U_{cpu} Ucpu: 目标CPU利用率(0 < U ≤ 1)
  • W/C: 等待时间与计算时间的比率
    在Java中,ThreadPoolExecutor 是用于创建自定义线程池的核心类。下面详细解释自定义线程池时所涉及的各个参数的含义。

1. ThreadPoolExecutor 构造函数参数

ThreadPoolExecutor 有多个构造函数重载,最常用的构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
1.1 corePoolSize
  • 含义:核心线程数,即线程池长期保持运行的线程数量。当提交的任务数量小于 corePoolSize 时,线程池会创建新的线程来执行这些任务,即使有空闲的线程。
  • 示例:如果 corePoolSize 设置为 5,那么当有 3 个任务提交时,线程池会创建 3 个线程来执行这些任务。
1.2 maximumPoolSize
  • 含义:线程池允许的最大线程数量。当提交的任务数量超过 corePoolSize 且任务队列已满时,线程池会创建新的线程,直到线程数量达到 maximumPoolSize
  • 示例:如果 corePoolSize 为 5,maximumPoolSize 为 10,任务队列已满且又有新任务提交时,线程池会继续创建线程,直到线程数量达到 10。
1.3 keepAliveTime
  • 含义:当线程池中的线程数量超过 corePoolSize 时,多余的空闲线程在被终止前等待新任务的最长时间。
  • 示例:如果 keepAliveTime 设置为 60,unit 设置为 TimeUnit.SECONDS,那么当线程池中的线程数量超过 corePoolSize 且某个线程空闲超过 60 秒时,该线程会被终止。
1.4 unit
  • 含义keepAliveTime 参数的时间单位,它是 java.util.concurrent.TimeUnit 枚举类型的实例,常见的时间单位有 SECONDS(秒)、MILLISECONDS(毫秒)等。
  • 示例
TimeUnit.SECONDS; // 表示秒
TimeUnit.MILLISECONDS; // 表示毫秒
1.5 workQueue
  • 含义:用于存储等待执行的任务的阻塞队列。当提交的任务数量超过 corePoolSize 时,新的任务会被放入该队列中等待执行。
  • 常见类型
    • ArrayBlockingQueue:有界队列,需要指定队列的容量。
    • LinkedBlockingQueue:无界队列(如果不指定容量)或有界队列(指定容量)。
    • SynchronousQueue:不存储元素的队列,每个插入操作必须等待另一个线程的移除操作,反之亦然。
    • PriorityBlockingQueue:具有优先级的无界队列。
  • 示例
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 创建一个容量为 100 的有界队列
1.6 threadFactory
  • 含义:用于创建新线程的线程工厂。通过自定义线程工厂,可以为线程设置名称、优先级等属性。
  • 示例
ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 使用默认的线程工厂
1.7 handler
  • 含义:当线程池中的线程数量达到 maximumPoolSize 且任务队列已满时,用于处理新提交任务的拒绝策略。
  • 常见类型
    • AbortPolicy:直接抛出 RejectedExecutionException 异常,默认策略。
    • CallerRunsPolicy:由提交任务的线程来执行该任务。
    • DiscardPolicy:直接丢弃该任务,不做任何处理。
    • DiscardOldestPolicy:丢弃队列中最旧的任务,然后尝试重新提交新任务。
  • 示例
RejectedExecutionHandler handler = new AbortPolicy(); // 使用 AbortPolicy 拒绝策略

2. 完善自定义线程池示例代码

import java.util.concurrent.*;

class MonitorThreadPool extends ThreadPoolExecutor {
    private long startTime;

    public MonitorThreadPool(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory,
                             RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTime = System.currentTimeMillis();
        System.out.printf("Task %s start at %s%n", r.hashCode(), java.time.LocalDateTime.now());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.printf("Task %s completed in %s ms%n", r.hashCode(), System.currentTimeMillis() - startTime);
    }

    public void showStats() {
        System.out.println("Active threads: " + getActiveCount());
        System.out.println("Completed tasks: " + getCompletedTaskCount());
    }
}

3. 使用示例

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        int corePoolSize = 5;
        int maximumPoolSize = 10;
        long keepAliveTime = 60;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        RejectedExecutionHandler handler = new AbortPolicy();

        MonitorThreadPool pool = new MonitorThreadPool(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                threadFactory,
                handler
        );

        // 提交任务
        for (int i = 0; i < 20; i++) {
            final int taskId = i;
            pool.submit(() -> {
                try {
                    System.out.println("Task " + taskId + " is running.");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // 显示线程池状态
        pool.showStats();

        // 关闭线程池
        pool.shutdown();
    }
}

第四部分:工业级最佳实践

4.1 线程池配置检查清单

  1. 核心线程数 = 平均负载 × 安全系数(1.2~1.5)
  2. 最大线程数 = 核心线程数 × 突发系数(2~3)
  3. 队列容量 = 核心线程数 × 处理时间 × 峰值TPS
  4. 拒绝策略选择:
    • AbortPolicy:关键业务系统
    • CallerRunsPolicy:非关键批处理
    • 自定义策略:记录日志/降级处理

4.2 优雅关闭模式

public class GracefulShutdownExample {
    public static void main(String[] args) {
        ExecutorService pool = new MonitorThreadPool(...);
        
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            pool.shutdown();
            try {
                if (!pool.awaitTermination(60, SECONDS)) {
                    pool.shutdownNow();
                }
            } catch (InterruptedException e) {
                pool.shutdownNow();
            }
        }));
    }
}

第五部分:常见陷阱与解决方案

5.1 死锁检测流程图

发现线程阻塞
获取线程堆栈
分析锁持有情况
绘制资源等待图
存在循环等待?
确认死锁
排查其他原因

5.2 线程池使用七大禁忌

  1. 使用无界队列导致内存泄漏
  2. 忽略任务执行异常
  3. 混淆submit()与execute()
  4. 错误处理Future结果
  5. 不合理设置线程存活时间
  6. 未监控线程池运行状态
  7. 盲目复用全局线程池

结语:构建稳健的并发系统

掌握线程协作机制如同获得手术刀,而合理使用线程池则是搭建系统骨架。建议开发者在实际项目中:

  1. 使用ThreadPoolExecutor而非Executors工厂方法
  2. 为不同业务类型创建独立线程池
  3. 实施全面的监控预警机制
  4. 定期进行并发压力测试
  5. 建立线程使用规范文档

提示:使用jstack、VisualVM等工具分析线程状态,结合Arthas进行运行时诊断可大幅提升排障效率


http://www.kler.cn/a/595493.html

相关文章:

  • 如何 使用 Docker 部署 ollama 和 MaxKB
  • 如何在 Linux 系统中部署 FTP 服务器:从基础配置到安全优化
  • Linux系统-TCPDump流量
  • C++Primer学习(13.1 拷贝、赋值与销毁)
  • 通过Knife4j在gateway中对swagger文档进行聚合
  • 流程优化的可配置文档结构化系统设计
  • 计算机网络精讲day1——计算机网络的性能指标(上)
  • 【人工智能】如何理解transformer中的token?
  • 【AI知识】pytorch手写Attention之Self-Attention,Multi-Head-Attention
  • vue3源码分析 -- computed
  • 深度解析学术论文成果评估(Artifact Evaluation):从历史到现状
  • 【问题解决】Postman 测试报错 406
  • 深入理解Java虚拟机(学习笔记)
  • java基础--序列化与反序列化的概念是什么?
  • 关于FastAPI框架的面试题及答案解析
  • 查看visual studio的MSVC版本的方法
  • 23 种设计模式中的访问者模式
  • 零基础上手Python数据分析 (7):Python 面向对象编程初步
  • 蓝桥杯 之 暴力回溯
  • 3.16[A]FPGA