当前位置: 首页 > article >正文

并发编程---多线程不安全示例以及解决,多线程创建方式

文章目录

  • 并发
  • 并行
  • 多线程
  • 为什么需要多线程
  • 线程不安全示例
  • 并发出现问题的根源: 并发三要素
    • 可见性: CPU 缓存引起
    • 原子性:分时复用引起
    • 有序性: 重排序引起
  • 线程不安全示例的解决方法
    • 使用AtomicLong类
    • 使用synchronized 关键字
  • 改进代码避免不必要的延迟
    • join()方法
      • 为什么使用 `join()`?
    • CountDownLatch类
      • 基本原理:
      • 常用方法:
  • 通过实现Runnable接口完成线程
  • 通过继承Thread类完成线程
  • 通过实现Callable接口完成接口
    • 手动创建线程,**使用 `FutureTask` 来处理 `Callable` 任务**
    • 使用线程池


并发

并发指的是系统在同一时间段内处理多个任务的能力,但并不要求这些任务同时执行。它侧重于任务的 逻辑上的同时进行,即在某个时间段内,系统切换多个任务的执行,这样看起来就像是多个任务“同时”运行,但实际上是在有限的资源上交替执行。

并发的特点

  • 逻辑上的同时:多个任务在时间上交替进行,给人一种“同时”执行的错觉,但它们通常是在一个处理器上通过快速切换执行的。
  • 时间分片:系统通过操作系统的调度器(例如时间片轮转)让任务看起来是并行的。
  • 可能不是物理同时执行:在单核 CPU 上,多个任务并发执行,但它们并不会真正同时执行。它们通过时间片的轮流执行来实现“并发”。

并发的例子

  • 单核 CPU 上运行多个任务。任务 A 和任务 B 不会同时执行,但它们会交替执行,任务 A 一会儿执行,接着任务 B 执行。
  • 例如,一个网站需要处理多个用户的请求,尽管每次只有一个请求在处理,但多个请求是并发的,因为它们会在时间上交替被处理。

并行

并行是指多个任务 同时 在多个处理器(或多个核心)上执行。它是并发的一种特殊情况,但必须具备 多核处理器多处理器系统 的支持,允许任务真正地同时执行。

并行的特点

  • 物理上的同时执行:多个任务在多个处理器或处理器核心上同时执行,任务间不需要交替执行。
  • 需要多核或多 CPU:并行计算通常依赖于硬件支持,例如多核 CPU 或分布式计算系统。
  • 速度更快:由于任务真正同时执行,通常比并发要高效,尤其是对于计算密集型任务。

并行的例子

  • 多核 CPU 上的并行任务。任务 A 和任务 B 可以在不同的核心上同时执行,不需要交替执行。
  • 例如,图像处理程序可以将图像划分为多个部分,然后在不同的处理器核心上同时处理每个部分,从而大大加快处理速度。

多线程

多线程是并发编程的一种方式,指的是一个程序内部可以有多个线程,这些线程共享程序的资源(如内存、文件描述符等),并且可以并行或并发地执行。每个线程都是操作系统调度的基本单位。

多线程的特点

  • 多线程是实现并发的一种方式,具体指的是在同一个进程中,通过线程来执行多个任务。
  • 在支持多核处理器的机器上,线程可以在多个核心上同时执行,这就实现了真正的并行。
  • 线程之间共享内存,因此可以快速地交换信息,但也会带来线程安全的问题。

为什么需要多线程

众所周知,CPU、内存、I/O 设备的速度是有极大差异的,为了合理利用 CPU 的高性能,平衡这三者的速度差异,计算机体系结构、操作系统、编译程序都做出了贡献,主要体现为:

  • CPU 增加了缓存,以均衡与内存的速度差异;// 导致 可见性 问题
  • 操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 设备的速度差异;// 导致 原子性 问题
  • 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。// 导致 有序性 问题

线程不安全示例

如果多个线程对同一个共享数据进行访问而不采取同步操作的话,那么操作的结果是不一致的。

这段代码演示了 10000 个线程并发执行计算任务,每个线程计算从 0100 的累加和并将结果加到共享变量 ThreadLearn.result 中。操作结束之后它的值有可能小于 50500000。

package thread;

public class ThreadLearn {

    public static long result; // 多个线程共享的,用于存储最终的计算结果
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10000; i++) { // 通过for循环启动10000个线程,每个线程通过MyThread类来执行任务
            MyThread myThread = new MyThread(100); // 每个线程都会计算从0到100的和
            // 启动线程,让线程并发执行
            Thread thread = new Thread(myThread);
            thread.start(); // start() 会异步调用run()方法
        }
        Thread.sleep(10*1000);
        System.out.println(result);
    }
}

class MyThread implements Runnable {
    private int count;

    public MyThread(int count) {
        this.count = count;
    }

    @Override
    public void run() {
        int sum = 0;
        for (int i = 0; i <= count; i++) {
            sum += i;
        }
        ThreadLearn.result += sum; // 将每个线程的sum累加到result变量中。
    }
}
49833400 // 结果总是小于50500000

并发出现问题的根源: 并发三要素

上述代码输出为什么不是 50500000? 并发出现问题的根源是什么?

可见性: CPU 缓存引起

可见性:一个线程对共享变量的修改,另外一个线程能够立刻看到。

举个简单的例子,看下面这段代码:

//线程1执行的代码
int i = 0;
i = 10;
 
//线程2执行的代码
j = i;

假若执行线程 1 的是 CPU1,执行线程 2 的是 CPU2。由上面的分析可知,当线程 1 执行 i = 10 这句时,会先把 i 的初始值加载到 CPU1 的高速缓存中,然后赋值为 10,那么在 CPU1 的高速缓存当中 i 的值变为 10 了,却没有立即写入到主存当中。

此时线程 2 执行 j = i,它会先去主存读取 i 的值并 i 加载到 CPU2 的缓存当中,注意此时内存当中 i 的值还是 0,i 那么就会使得 j 的值为 0,而不是 10.

这就是可见性问题,线程 1 对变量 i 修改了之后,线程 2 没有立即看到线程 1 修改的值。

原子性:分时复用引起

原子性:即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。

举个简单的例子,看下面这段代码:

int i = 1;

// 线程1执行
i += 1;

// 线程2执行
i += 1;

这里需要注意的是:i += 1 需要三条 CPU 指令

  1. 将变量 i 从内存读取到 CPU 寄存器;
  2. 在 CPU 寄存器中执行 i + 1 操作;
  3. 将最后的结果 i 写入内存(缓存机制导致可能写入的是 CPU 缓存而不是内存)。

由于 CPU 分时复用(线程切换)的存在,线程 1 执行了第一条指令后,就切换到线程 2 执行,假如线程 2 执行了这三条指令后,再切换会线程 1 执行后续两条指令,将造成最后写到内存中的 i 值是 2 而不是 3。

有序性: 重排序引起

有序性:即程序执行的顺序按照代码的先后顺序执行。举个简单的例子,看下面这段代码:

int a = 1; // 指令1
int b = 2; // 指令2

编译器可能会将其重排序为:

int b = 2; // 指令2
int a = 1; // 指令1

如果这些变量是共享变量,且多个线程依赖它们的顺序,就会导致问题。

线程不安全示例的解决方法

使用AtomicLong类

package thread;

public class ThreadLearn {

	// AtomicLong 是 Java 提供的一个类,用于处理长整型(long)数据类型的原子操作
	public static AtomicLong result = new AtomicLong(0);
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10000; i++) { // 通过for循环启动10000个线程,每个线程通过MyThread类来执行任务
            MyThread myThread = new MyThread(100); // 每个线程都会计算从0到100的和
            // 启动线程,让线程并发执行
            Thread thread = new Thread(myThread);
            thread.start(); // start() 会异步调用run()方法
        }
        Thread.sleep(10*1000);
        System.out.println(result);
    }
}

class MyThread implements Runnable {
    private int count;

    public MyThread(int count) {
        this.count = count;
    }

    @Override
    public void run() {
        int sum = 0;
        for (int i = 0; i <= count; i++) {
            sum += i;
        }
        ThreadLearn.result.addAndGet(sum);
        // AtomicLong 提供了原子操作 addAndGet(),这个方法将 sum 加到 result 上,并返回更新后的值。
    }
}
50500000

AtomicLong 的作用: 使用 AtomicLongaddAndGet(sum) 方法,能够保证在多线程环境下,对 result 的累加操作是 原子 的,即使多个线程同时执行相同操作,也能保证每次操作都是安全的,不会产生竞争条件。

使用synchronized 关键字

package thread;

public class ThreadLearn {

    public static long result; // 多个线程共享的,用于存储最终的计算结果
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10000; i++) { // 通过for循环启动10000个线程,每个线程通过MyThread类来执行任务
            MyThread myThread = new MyThread(100); // 每个线程都会计算从0到100的和
            // 启动线程,让线程并发执行
            Thread thread = new Thread(myThread);
            thread.start(); // start() 会异步调用run()方法
        }
        Thread.sleep(10*1000);
        System.out.println(result);
    }
}

class MyThread implements Runnable {
    private int count;

    public MyThread(int count) {
        this.count = count;
    }

    @Override
    public void run() {
        int sum = 0;
        for (int i = 0; i <= count; i++) {
            sum += i;
        }
        synchronized (ThreadLearn.class) {
            // synchronized关键字来保证同一时刻只有一个线程可以更新result的值
            // 通过加锁ThreadLearn.class(即类锁),使得对 result 的修改是线程安全的。
            ThreadLearn.result += sum;
        }
    }
}

synchronized 关键字确保同一时刻只有一个线程能进入被 synchronized 修饰的代码块。当一个线程正在执行被加锁的代码时,其他线程会被阻塞,直到该线程执行完毕并释放锁。这就避免了多个线程在同一时刻访问共享资源的情况,确保了对共享资源的修改是有序的,避免了数据竞争。

在这个代码中的使用

synchronized (ThreadLearn.class) {
    ThreadLearn.result += sum;
}

加锁对象synchronized 关键字后面跟的 ThreadLearn.class 表示加锁的是 ThreadLearn 类的对象锁。也就是说,所有线程在执行这个同步代码块时,必须先获取到 ThreadLearn 类的锁,才能执行 result += sum 操作。

保证顺序执行:通过加锁,当多个线程同时执行 run() 方法时,只有一个线程能进入 synchronized 块并修改 result。其他线程必须等待该线程执行完毕并释放锁后,才能继续修改 result。这样就保证了 result 的更新操作不会被多个线程同时执行,避免了并发问题。

改进代码避免不必要的延迟

join()方法

使用 join() 方法来确保所有线程执行完毕后再继续执行主线程,可以避免通过 Thread.sleep() 等待线程执行的潜在问题。以下是改进后的代码,使用了 join() 来确保主线程等待所有子线程执行完毕后再输出最终结果。

package thread;

import java.util.concurrent.atomic.AtomicLong;

public class ThreadLearn {

    // 使用 AtomicLong 提供原子操作
    public static AtomicLong result = new AtomicLong(0);

    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis(); // 计时器
        int threadCount = 10000;
        Thread[] threads = new Thread[threadCount]; // 创建线程数组

        // 创建并启动 10000 个线程
        for (int i = 0; i < threadCount; i++) {
            MyThread myThread = new MyThread(100);
            threads[i] = new Thread(myThread);
            threads[i].start(); // 启动线程
        }

        // 使用 join() 等待所有线程执行完成
        for (Thread thread : threads) {
            thread.join(); // 确保主线程等待所有子线程执行完毕
        }

        // 打印最终结果和执行时间
        System.out.println(result);
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
    }
}

class MyThread implements Runnable {
    private int count;

    public MyThread(int count) {
        this.count = count;
    }

    @Override
    public void run() {
        int sum = 0;
        for (int i = 0; i <= count; i++) {
            sum += i; // 计算 0 到 count 的和
        }
        ThreadLearn.result.addAndGet(sum);
    }
}

thread.join()join() 方法用于让主线程等待每个子线程执行完毕。主线程会依次调用 join(),直到所有的子线程执行完成。这样可以确保 result 输出的值是正确的。

为什么使用 join()

  • 等待所有线程完成join() 会让主线程等待当前线程完成。在这种情况下,主线程会等所有子线程执行完毕,避免提前输出结果。
  • 防止线程还没执行完毕时输出结果:如果没有使用 join(),主线程可能会在子线程执行之前就打印出结果,从而导致 result 的值不准确。

CountDownLatch类

package thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

public class ThreadLearn {

    // 使用 AtomicLong 提供原子操作
    public static AtomicLong result = new AtomicLong(0);
    static int count = 10000;  // 线程数
    public static CountDownLatch countDownLatch = new CountDownLatch(count);  // 初始化计数器

    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis(); // 计时器

        Thread[] threads = new Thread[count]; // 创建线程数组,大小为 count

        // 创建并启动 10000 个线程
        for (int i = 0; i < count; i++) {
            MyThread myThread = new MyThread(100);  // 每个线程都会计算 0 到 100 的和
            threads[i] = new Thread(myThread);
            threads[i].start();  // 启动线程
        }

        // 等待所有线程完成
        countDownLatch.await(); // 主线程会在这里等待,直到计数器为 0

        // 打印最终结果和执行时间
        System.out.println(result);
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
    }
}

class MyThread implements Runnable {
    private int count;

    public MyThread(int count) {
        this.count = count;
    }

    @Override
    public void run() {
        int sum = 0;
        // 计算 0 到 count 的和
        for (int i = 0; i <= count; i++) {
            sum += i;
        }

        // 使用 AtomicLong 提供的原子操作进行更新
        ThreadLearn.result.addAndGet(sum);  // 线程安全的累加

        // 完成任务,计数器减 1
        ThreadLearn.countDownLatch.countDown();
    }
}

基本原理:

CountDownLatch 通过一个计数器来控制线程的等待,计数器的初始值是一个正整数。每当一个线程完成了某项工作,它会调用 countDown() 方法将计数器减 1。当计数器值减到 0 时,所有等待的线程(调用了 await() 方法的线程)都会被唤醒。

常用方法:

  • countDown():将计数器的值减 1。当计数器值为 0 时,所有等待的线程会被唤醒。
  • await():使当前线程等待,直到计数器的值为 0。该方法会阻塞当前线程,直到其他线程调用 countDown() 并使计数器减为 0。

通过实现Runnable接口完成线程

package thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

public class ThreadLearn {

    // 使用 AtomicLong 提供原子操作
    public static AtomicLong result = new AtomicLong(0);
    static int count = 10000;  // 线程数
    public static CountDownLatch countDownLatch = new CountDownLatch(count);  // 初始化计数器

    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis(); // 计时器

        Thread[] threads = new Thread[count]; // 创建线程数组,大小为 count

        // 创建并启动 10000 个线程
        for (int i = 0; i < count; i++) {
            MyThread myThread = new MyThread(100);  // 每个线程都会计算 0 到 100 的和
            threads[i] = new Thread(myThread);
            threads[i].start();  // 启动线程
        }

        // 等待所有线程完成
        countDownLatch.await(); // 主线程会在这里等待,直到计数器为 0

        // 打印最终结果和执行时间
        System.out.println(result);
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
    }
}

class MyThread implements Runnable {
    private int count;

    public MyThread(int count) {
        this.count = count;
    }

    @Override
    public void run() {
        int sum = 0;
        // 计算 0 到 count 的和
        for (int i = 0; i <= count; i++) {
            sum += i;
        }

        // 使用 AtomicLong 提供的原子操作进行更新
        ThreadLearn.result.addAndGet(sum);  // 线程安全的累加

        // 完成任务,计数器减 1
        ThreadLearn.countDownLatch.countDown();
    }
}

通过继承Thread类完成线程

public class ThreadLearn {

    // 使用 AtomicLong 提供原子操作
    public static AtomicLong result = new AtomicLong(0);
    static int count = 10000;  // 线程数
    public static CountDownLatch countDownLatch = new CountDownLatch(count);  // 初始化计数器

    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis(); // 计时器

        Thread[] threads = new Thread[count]; // 创建线程数组,大小为 count

        // 创建并启动 10000 个线程
        for (int i = 0; i < count; i++) {
            MyThread1 myThread = new MyThread1(100);  // 每个线程都会计算 0 到 100 的和
            threads[i] = new Thread(myThread);
            threads[i].start();  // 启动线程
        }

        // 等待所有线程完成
        countDownLatch.await(); // 主线程会在这里等待,直到计数器为 0

        // 打印最终结果和执行时间
        System.out.println(result);
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
    }
}

class MyThread1 extends Thread {
    private long count;

    public MyThread1(int count) {
        this.count = count;
    }
    @Override
    public void run() {
        super.run();
        int sum = 0;
        // 计算 0 到 count 的和
        for (int i = 0; i <= count; i++) {
            sum += i;
        }

        // 使用 AtomicLong 提供的原子操作进行更新
        ThreadLearn.result.addAndGet(sum);  // 线程安全的累加

        // 完成任务,计数器减 1
        ThreadLearn.countDownLatch.countDown();
    }
}

通过实现Callable接口完成接口

手动创建线程,使用 FutureTask 来处理 Callable 任务

public class ThreadLearn {

    // 使用 AtomicLong 提供原子操作
    public static AtomicLong result = new AtomicLong(0);
    static int count = 10000;  // 线程数
    public static CountDownLatch countDownLatch = new CountDownLatch(count);  // 初始化计数器

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        FutureTask<Integer> future = new FutureTask<Integer>(new MyThread2(100));
        Thread thread = new Thread(future);
        thread.start();
        Integer o = future.get();
        System.out.println(o);
    }
}

class MyThread2 implements Callable<Integer> {
    private int count;

    public MyThread2(int count) {
        this.count = count;
    }
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        // 计算 0 到 count 的和
        for (int i = 0; i <= count; i++) {
            sum += i;
        }

        // 使用 AtomicLong 提供的原子操作进行更新
        ThreadLearn.result.addAndGet(sum);  // 线程安全的累加

        // 完成任务,计数器减 1
        ThreadLearn.countDownLatch.countDown();
        return sum;
    }
}

使用线程池

public class ThreadLearn {

    // 使用 AtomicLong 提供原子操作
    public static AtomicLong result = new AtomicLong(0);
    public static CountDownLatch countDownLatch = new CountDownLatch(count);  // 初始化计数器
    public static ExecutorService executor = Executors.newFixedThreadPool(10);
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Future<Integer> submit = executor.submit(new MyThread2(100));
        Integer i = submit.get();
        System.out.println(i);
    }
}

class MyThread2 implements Callable<Integer> {
    private int count;

    public MyThread2(int count) {
        this.count = count;
    }
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        // 计算 0 到 count 的和
        for (int i = 0; i <= count; i++) {
            sum += i;
        }

        // 使用 AtomicLong 提供的原子操作进行更新
        ThreadLearn.result.addAndGet(sum);  // 线程安全的累加

        // 完成任务,计数器减 1
        ThreadLearn.countDownLatch.countDown();
        return sum;
    }
}

http://www.kler.cn/a/543405.html

相关文章:

  • SpringCloud - Gateway 网关
  • 物联网(IoT)如何与人工智能(AI)的结合
  • 【Vue中BUG解决】npm error path git
  • 解决VsCode的 Vetur 插件has no default export Vetur问题
  • 初识 Express
  • 2025 年 2 月 TIOBE 指数
  • AI大语言模型
  • JAVA面试之容器
  • Linux进程创建与终止
  • Amazon Keyspaces (for Apache Cassandra)
  • 未来趋势系列 篇一(加更四):DeepSeek题材解析和股票梳理
  • CF Round 997 记录 题解 (div. 2 A - E)
  • SpringBoot中的Javaconfig
  • KRR(知识表示与推理,Knowledge Representation and Reasoning)
  • 冒泡排序
  • 多租户数据源隔离
  • kindle.cn 无法接收邮件
  • pnpm的使用
  • 【工业安全】-CVE-2022-35555- Tenda W6路由器 命令注入漏洞
  • Java入门进阶
  • 日语学习-日语知识点小记-构建基础-JLPT-N4&N5阶段(4):~てもいいです & ~てはいきません征求许可
  • 在Mac M1上面安装Miniconda
  • 名词解释:npm,cnpm,yarn,vite,vue,electron
  • PySpark查找Dataframe中的非ASCII字符并导出Excel文件
  • 07贪心 + 动态规划(D1_基础学习)
  • 蓝桥杯试题:归并排序