当前位置: 首页 > article >正文

JavaEE:多线程编程中的同步与并发控制

JavaEE:多线程进阶2

  • 一、Callable 接口
    • 1. 基本定义和接口签名
    • 2. Callable 接口的特点
      • 2.1 返回值
      • 2.2 异常处理
      • 2.3 灵活性
    • 3. Callable 接口的劣势
    • 4. Callable 接口的使用场景
      • 4.1 需要返回结果的任务
      • 4.2 可能抛出异常的任务
      • 4.3 需要组合多个任务的结果
    • 5. 总结
  • 二、ReentrantLock
    • 1. ReentrantLock 的特点
      • 1.1 可重入性
      • 1.2 公平性和非公平性
      • 1.3 条件变量(Condition)
      • 1.4 锁的中断
    • 2. ReentrantLock 的劣势
      • 2.1 需要显式管理
      • 2.2 性能开销
      • 2.3 编程复杂度
    • 3. ReentrantLock 使用场景
      • 3.1 需要精确控制锁的行为
      • 3.2 需要响应中断
      • 3.3 需要多个条件变量
      • 3.4 高并发环境
    • 4. 图文并茂理解 ReentrantLock
    • 5. 总结
  • 三、Semaphore 信号量
    • 1. 协调多个进程之间的资源分配
    • 2. 协调多个线程之间的资源分配
    • 3. Semaphore 与锁的关系
    • 4. 总结
  • 四、CountDownLatch
    • 1. 工作原理
    • 2. 应用场景
    • 3. 示例代码
  • 五、多线程中使用 ArrayList
    • 1. 自行加锁(推荐)
    • 2. 使用 Collections.synchronizedList(new ArrayList())
    • 3. 使用 CopyOnWriteArrayList
    • 4. 总结
  • 六、多线程中使用哈希表
    • 1. HashMap 线程不安全
    • 2. Hashtable 线程安全
    • 3. ConcurrentHashMap 效率更高
      • 3.1 基础概念
      • 3.2 性能测试
      • 3.3 原因分析(全局锁 vs 分段锁/CAS)
      • 3.4 结论
    • 4. 代码案例分析
      • 4.1 类结构
      • 4.2 插入操作 (put 方法)
      • 4.3 查找操作 (get 方法)
      • 4.4 节点类 (Node<K, V>)
      • 4.5 总结
    • 5. ConcurrentHashMap 的一个核心设计理念
      • 5.1 细粒度的锁
      • 5.2 非阻塞算法与 CAS 操作
      • 5.3 细粒度的锁与 CAS 操作的结合
      • 5.4 提高并发性能
      • 5.5 线性探测与冲突处理
      • 5.6 总结

一、Callable 接口

1. 基本定义和接口签名

  • Runnable 接口
public interface Runnable {
    void run();
}
  1. Runnable 接口只有一个无参的,无返回值的 run() 方法。
  2. 适用于不需要返回值的任务场景。
  • Callable 接口
public interface Callable<V> {
    V call() throws Excepption;
}
  1. Callable 接口有一个参数 V ,表示任务的返回类型。
  2. call() 方法可以抛出受检异常(checked Exception),而 Runnable 接口的 run() 方法则不能抛出任何的受检异常。

2. Callable 接口的特点

2.1 返回值

  • Callable :支持返回值。通过 Futrue 对象获取任务执行的结果。
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        return 42; // 返回一个整数值
    }
});
  • Runnable :不支持返回值。如果需要任务的执行结果,通常需要使用共享变量或其他机制。

2.2 异常处理

  • Callablecall() 方法可以抛出受检异常,允许任务在执行过程中处理异常情况。
@Override
public Integer call() throws IOException {
    // 可能抛出 IOException
    throw new IOException("Error occurred");
}
  • Runnablerun() 方法不能抛出受检异常,只能通过捕获并处理异常或并将其封装成运行时异常(unchecked exception)。

2.3 灵活性

提供了更高的灵活性,因为它不仅可以返回结果,还可以处理异常,适用于更加复杂的场景。

3. Callable 接口的劣势

  • 复杂性:相比于 RunnableCallable 接口增加了返回值和处理异常的功能。因此在简单的场景下显得过于复杂。
  • 使用成本:需要配合 FutureFutureTask 来获取任务结果,增加了代码的复杂性和使用成本。

4. Callable 接口的使用场景

4.1 需要返回结果的任务

当任务需要返回任务的结果时,Callable 时正确的选择。例如:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableExample {

    public static void main(String[] args) {
        Callable<Integer> callable = () -> {
            int result = 0;
            for (int i = 0; i < 10; i++) {
                result += i;
            }
            return result;
        };

        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        Thread thread = new Thread(futureTask);
        thread.start();

        try {
            Integer result = futureTask.get(); // 获取任务结果
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,我们创建了 Callable 实例计算从 0 到 9 的累加,并通过 futureTask 来获取任务的执行结果。

4.2 可能抛出异常的任务

当任务抛出受检异常时,Callable 提供了更好的异常处理机制。例如:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableWithExceptionExample {

    public static void main(String[] args) {
        Callable<String> callable = () -> {
            if (Math.random() > 0.5) {
                throw new Exception("Random error");
            }
            return "Success";
        };

        FutureTask<String> futureTask = new FutureTask<>(callable);
        Thread thread = new Thread(futureTask);
        thread.start();

        try {
            String result = futureTask.get(); // 获取任务结果
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Exception caught: " + e.getCause());
        }
    }
}

在这个例子中,我们模拟了一个可能抛出的异常,并展示如何捕获和处理异常。

4.3 需要组合多个任务的结果

当需要组合多个任务的结果时,Callable 提供了方便的支持。例如:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CombineResultsExample {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        List<Callable<Integer>> tasks = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            final int taskId = i;
            tasks.add(() -> {
                int result = taskId * 10;
                System.out.println("Task " + taskId + " completed with result: " + result);
                return result;
            });
        }

        List<Future<Integer>> futures = executor.invokeAll(tasks);

        int total = 0;
        for (Future<Integer> future : futures) {
            total += future.get();
        }

        System.out.println("Total result: " + total);
        executor.shutdown();
    }
}

在这个例子中:

  1. 我们创建了一个固定大小为 3 的线程池。
  2. 创建了三个 Callable 任务,每个任务计算一个简单的整数结果。
  3. 使用 executor.invokeAll(tasks) 提交所有任务,并等待它们全部完成。
  4. 遍历 futures 列表,通过 future.get() 获取每个任务的结果,并计算总和。

5. 总结

  • 特点Callable 接口提供返回值和处理异常的能力。
  • 劣势:相对于 RunnableCallable 接口增加了复杂性和使用成本。
  • 使用场景:需要可能返回结果,可能抛出异常或需要组合多个任务的结果。

二、ReentrantLock

1. ReentrantLock 的特点

1.1 可重入性

ReentrantLock 是一把可重入锁,意味着同一个线程可以多次获取同一把锁而不会导致死锁问题。这和 synchronized 相似。

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void nestedLock() {
        lock.lock();
        try {
            System.out.println("First lock acquired.");
            lock.lock(); // 同一个线程再次获取锁
            try {
                System.out.println("Second lock acquired.");
            } finally {
                lock.unlock(); // 释放第二次获取的锁
            }
        } finally {
            lock.unlock(); // 释放第一次获取的锁
        }
    }

    public static void main(String[] args) {
        ReentrantLockExample example = new ReentrantLockExample();
        example.nestedLock();
    }
}

1.2 公平性和非公平性

ReentrantLock 提供了公平锁和非公平锁的选择。公平锁会按照请求顺序来获取锁,而非公平锁则允许插队(先尝试获取锁,失败则阻塞等待)。

// 创建公平锁
ReentrantLock fairLock = new ReentrantLock(true);

// 创建非公平锁(默认)
ReentrantLock nonFairLock = new ReentrantLock(false);

1.3 条件变量(Condition)

ReentrantLock 支持多个条件变量(Condition),类似于 wait()notify() 方法。每个条件变量都可以独立的等待和唤醒。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private boolean ready = false;

    public void waitForSignal() throws InterruptedException {
        lock.lock();
        try {
            while (!ready) {
                condition.await(); // 等待信号
            }
            System.out.println("Received signal, proceeding...");
        } finally {
            lock.unlock();
        }
    }

    public void signal() {
        lock.lock();
        try {
            ready = true;
            condition.signal(); // 发送信号
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionExample example = new ConditionExample();

        Thread waiter = new Thread(() -> {
            try {
                example.waitForSignal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        waiter.start();
        Thread.sleep(1000); // 模拟延迟
        example.signal();
    }
}

1.4 锁的中断

ReentrantLock 支持响应中断。当一个线程在等待锁时被打断,它可以抛出 InterruptedException 异常,并退出等待状态。而 synchronized 不支持这种机制。

import java.util.concurrent.locks.ReentrantLock;

public class InterruptibleLockExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void tryLockWithInterrupt() throws InterruptedException {
        if (lock.tryLock(5, java.util.concurrent.TimeUnit.SECONDS)) {
            try {
                System.out.println(Thread.currentThread().getName() + " acquired the lock.");
                Thread.sleep(2000); // 模拟长时间持有锁
            } finally {
                lock.unlock();
            }
        } else {
            System.out.println(Thread.currentThread().getName() + " failed to acquire the lock.");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        InterruptibleLockExample example = new InterruptibleLockExample();

        Thread thread1 = new Thread(() -> {
            try {
                example.tryLockWithInterrupt();
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " was interrupted.");
            }
        });

        Thread thread2 = new Thread(() -> {
            try {
                example.tryLockWithInterrupt();
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " was interrupted.");
            }
        });

        thread1.start();
        Thread.sleep(100); // 确保 thread1 先开始
        thread2.start();
        thread2.interrupt(); // 中断 thread2
    }
}

在这个例子中:

  1. 创建了 ReentrantLock 的实例。
  2. 定义了一个 tryLockWithInterrupt() 方法,该方法尝试在 5 秒后获取锁,并在获取锁后模拟长时间持有锁的操作。
  3. main 方法中创建 thread1thread2 两个线程,都尝试获取锁。
  4. thread1 线程先启动,尝试获取锁。
  5. thread2 线程启动后立即被打断,导致它在等待锁的过程中抛出 InterruptedException ,并打断中断信息。

2. ReentrantLock 的劣势

2.1 需要显式管理

相比于 synchronizedReentrantLock 需要显性的加锁和解锁,容易导致编程错误(例如忘记解锁),为了防止这种情况出现,通常使用 try - finally 块来确保锁的释放。

lock.lock();
try {
    // 执行临界区代码
} finally {
    lock.unlock(); // 确保锁被释放
}

2.2 性能开销

在低竞争的情况下,ReentrantLock 的性能可能不如 synchronized 。因为 synchronized 是经过 JVM 高度优化,具有自适应锁升级的策略,而 ReentrantLock 则需要手动管理锁的状态。

2.3 编程复杂度

由于 ReentrantLock 提供了更多的功能和灵活性,编写和维护代码的复杂度也相应增加。开发者需要理解锁的各种行为和潜在问题(如死锁、条件变量的正确使用等)。

3. ReentrantLock 使用场景

3.1 需要精确控制锁的行为

当需要对锁进行精确控制时(例如公平锁、非公平锁、条件变量等),ReentrantLock 是更好的选择,而 synchronized 无法提供这些功能。

3.2 需要响应中断

当线程在等待锁时需要响应中断,ReentrantLock 提供了 tryLock(long timeout, TimeUnit unit) ,允许在指定时间内,尝试获取获取锁,并且可以响应中断。

if (lock.tryLock(5, TimeUnit.SECONDS)) {
    try {
        // 执行临界区代码
    } finally {
        lock.unlock();
    }
} else {
    // 超时或被中断
}

3.3 需要多个条件变量

当需要多个条件变量来控制线程的等待和唤醒,ReentrantLock 提供了 newCondition() 方法,可以创建多个独立的条件变量。

Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();

3.4 高并发环境

在高并发的场景下,ReentrantLock 的性能优势更加明显。特别是在频繁的获取和释放锁的情况下,ReentrantLock 的灵活性和可控性使其更适合这种场景。

4. 图文并茂理解 ReentrantLock

  • ReentrantLock 的工作流程
+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|   ReentrantLock   +-----> |   Lock           +-----> |   Critical Section|
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+
        |                           |                          |
        v                           v                          v
+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|   Try Lock        +------>+   Enter Lock      +------>+   Execute Code    |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+
        |                           |                          |
        v                           v                          v
+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|   Return Success  <-------+   Exit Lock       <-------+   Finish Task     |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+
  • 条件变量的工作流程
+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|   Condition       +-----> |   Await          +-----> |   Signal          |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+
        |                           |                          |
        v                           v                          v
+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|   Wait for Signal +------>+   Check Signal    +------>+   Notify Threads  |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+
        |                           |                          |
        v                           v                          v
+-------------------+       +-------------------+       +-------------------+
|                   |       |                   |       |                   |
|   Return Success  <-------+   Wake Up         <-------+   Finish Task     |
|                   |       |                   |       |                   |
+-------------------+       +-------------------+       +-------------------+

5. 总结

  • ReentrantLock :提供了丰富的功能和灵活性。适用于精确控制锁行为,响应中断和多个条件变量等复杂场景。
  • 劣势:需要显式管理锁,可能导致编程复杂性增加;在低竞争场景下性能可能不如 synchronized
  • 应用场景:高并发环境、需要精确控制锁行为、响应中断、多个条件变量等场景。

三、Semaphore 信号量

Semaphore 是一种计数信号量,用于控制对共享资源的访问。它可以协调多个进程或多个线程间的资源分配,并且在某种情况下可以作为锁的一种推广形式。

1. 协调多个进程之间的资源分配

Semaphore 可以协调多个进程之间的资源分配。通过使用操作系统提供的同步原语(如 POSIX 信号量),Semaphore 可以在不同的进程之间进行同步和资源共享。

示例代码(跨进程)

在 Java 中,由于 JVM 的限制,直接实现跨进程的 Semaphore 比较复杂,通常需要借助于操作系统的 IPC(Inter-Process Communication) 机制。下面是一个使用 POSIX 信号量的 C 语言示例:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <fcntl.h>           /* For O_* constants */
#include <unistd.h>          /* For ftruncate() */

#define SEM_NAME "/my_semaphore"
#define NUM_RESOURCES 2

int main() {
    sem_t *sem = sem_open(SEM_NAME, O_CREAT, 0644, NUM_RESOURCES);
    if (sem == SEM_FAILED) {
        perror("sem_open");
        exit(EXIT_FAILURE);
    }

    // Simulate resource usage by multiple processes
    for (int i = 0; i < 3; i++) {
        pid_t pid = fork();
        if (pid == -1) {
            perror("fork");
            exit(EXIT_FAILURE);
        } else if (pid == 0) {  // Child process
            printf("Child process %d trying to acquire semaphore\n", getpid());
            sem_wait(sem);  // Acquire the semaphore
            printf("Child process %d acquired semaphore\n", getpid());

            // Simulate resource usage
            sleep(2);

            sem_post(sem);  // Release the semaphore
            printf("Child process %d released semaphore\n", getpid());
            exit(EXIT_SUCCESS);
        }
    }

    // Wait for all child processes to finish
    while (wait(NULL) > 0);

    // Clean up
    sem_close(sem);
    sem_unlink(SEM_NAME);

    return 0;
}

在这个例子中,我们创建了一个共享的信号量 sem,并初始化为允许两个并发访问。然后,我们创建了三个子进程,每个子进程尝试获取信号量并模拟资源使用。当一个进程完成资源使用后,它会释放信号量,允许其他进程获取。

2. 协调多个线程之间的资源分配

Semaphore 也可以协调多个线程之间的资源分配。Java 提供了 java.util.concurrent.Semaphore 类来实现这一功能。

示例代码(多线程)

import java.util.concurrent.Semaphore;

public class SemaphoreExample {

    private static final Semaphore semaphore = new Semaphore(2); // 允许最多两个线程同时访问

    public static void main(String[] args) {
        Runnable task = () -> {
            try {
                semaphore.acquire(); // 获取许可
                System.out.println(Thread.currentThread().getName() + " acquired the permit.");
                Thread.sleep(2000); // 模拟长时间持有资源
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release(); // 释放许可
                System.out.println(Thread.currentThread().getName() + " released the permit.");
            }
        };

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(task, "Thread-" + i);
            thread.start();
        }
    }
}

在这个例子中,我们创建了一个 Semaphore 对象,初始化为允许两个并发执行。然后,创建五个线程,都尝试获取信号量并模拟资源使用。当一个线程完成资源使用,就会释放信号,允许其他线程获取。

3. Semaphore 与锁的关系

Semaphore 可以看作是普通锁的一种更广泛的推广形式。具体来说:

  • 普通锁:普通锁(ReentrantLocksynchronized)是一种二元信号量,即只允许一个线程访问共享资源。
  • 信号量:信号量允许指定的线程访问共享资源。因此,信号量可以看成锁的一种推广形式。

信号量作为锁的例子

如果我们把信号量的许可数量设置为 1,那么信号量就等价于一个普通的锁:

import java.util.concurrent.Semaphore;

public class SemaphoreAsLockExample {

    private static final Semaphore lock = new Semaphore(1); // 等价于一个普通的锁

    public static void main(String[] args) {
        Runnable task = () -> {
            try {
                lock.acquire(); // 获取许可
                System.out.println(Thread.currentThread().getName() + " acquired the lock.");
                Thread.sleep(2000); // 模拟长时间持有资源
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.release(); // 释放许可
                System.out.println(Thread.currentThread().getName() + " released the lock.");
            }
        };

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(task, "Thread-" + i);
            thread.start();
        }
    }
}

关键点总结

  1. 信号量作为锁 :通过将信号量的许可数量设置为 1,我们可以使信号量的行为等价于一个普通的锁。这种方式提供了一种灵活的方式来控制对共享资源的访问。
  2. 获取和释放许可acquire()release() 方法分别用于获取和释放信号量的许可。在多线程环境中,这些方法保证了对共享资源的互斥访问。
  3. 异常处理 :在 try-finally 块中调用 release() 方法,确保即使在发生异常的情况下,许可也会被正确释放,避免死锁等问题。
  4. 并发控制 :通过信号量机制,我们可以精确控制同时访问共享资源的线程数量,从而提高程序的并发性和性能。

4. 总结

  • 协调多个进程之间的资源分配Semaphore 可以协调多个进程间的资源分配,通常需要借助于操作系统的 IPC 机制。
  • 协调多个线程之间的资源分配Semaphore 也可以协调多个线程之间的资源分配,Java 提供了 java.util.concurrent.Semaphore 类来实现这一功能。
  • 信号量和锁的关系:信号量可以看成是普通锁的一种更加广泛的推广。如果将信号量的许可数量设为 1 ,那么信号量相当于一个普通的锁。

四、CountDownLatch

CountDownLatch 是 Java 并发包(java.util.concurrent)中的一个同步工具类。用于协调多个线程之间的操作。它允许一个或多个线程等待,直到其他线程完成一组操作。CountDownLatch 通过一个计数器来完成这种协调机制,初始值设为一个计数值,当某个线程完成一个任务时,就会调用 CountDown() 方法将计数器减 1 。其他线程通过调用 await() 方法等待计数器完成置零,从而保证所有子任务都完成。

1. 工作原理

  • 初始化:创建一个 CountDownLatch 对象时,需要指定一个计数器初始值,这个值通常是需要等待的子任务数量。

  • 等待:主线程或其他要等待的线程调用 await() 方法,进入等待状态,直到计数器归零。

  • 计数:每个子任务被完成后调用 countDown() 方法,计数器减一。

  • 继续执行:当计数器归零后,所有在 await() 等待的线程都会被唤醒,继续执行后续操作。

2. 应用场景

  • 并行计算:当将一个大任务拆分成多个小任务,并执行这些小任务。使用 CountDownLatch 等待所有子任务完成。
  • 资源初始化:在应用程序启动前,可能需要初始化多个资源,使用 CountDownLatch 确保所有资源初始化完毕再启动主程序逻辑。

3. 示例代码

以下是一个简单的示例,展示如何使用 CountDownLatch 来协调多个线程:

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {

    public static void main(String[] args) {
        int numberOfTasks = 5;
        CountDownLatch latch = new CountDownLatch(numberOfTasks);

        for (int i = 0; i < numberOfTasks; i++) {
            new Thread(new Worker(latch, i)).start();
        }

        try {
            // 主线程等待所有子任务完成
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("All tasks completed.");
    }
}

class Worker implements Runnable {
    private final CountDownLatch latch;
    private final int taskId;

    public Worker(CountDownLatch latch, int taskId) {
        this.latch = latch;
        this.taskId = taskId;
    }

    @Override
    public void run() {
        try {
            // 模拟任务执行
            System.out.println("Task " + taskId + " is running.");
            Thread.sleep(1000); // 模拟任务耗时
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 任务完成后,减少计数器
            latch.countDown();
            System.out.println("Task " + taskId + " completed.");
        }
    }
}

关键点

  • 初始化CountDownLatch latch = new CountDownLatch(numberOfTasks); 初始化计数器。
  • 等待latch.await(); 主线程等待所有子任务完成。
  • 计数:每个子任务完成后调用 latch.countDown(); 来减少计数器。

五、多线程中使用 ArrayList

在多线程环境下使用 ArrayList ,ArrayList 本身是线程不安全的,我们需要采取一些措施来确保其线程安全。

1. 自行加锁(推荐)

自行加锁是最灵活的一种方式之一,允许开发者自行根据具体需求进行细粒度的控制。通过使用 显式锁(如 ReentrantLock)或同步代码块(synchronized),可以确保多线程对共享资源的访问是线程安全的。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadSafeArrayList<T> {
    private final List<T> list = new ArrayList<>();
    private final Lock lock = new ReentrantLock();

    public void add(T item) {
        lock.lock();
        try {
            list.add(item);
        } finally {
            lock.unlock();
        }
    }

    public T get(int index) {
        lock.lock();
        try {
            return list.get(index);
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        lock.lock();
        try {
            return list.size();
        } finally {
            lock.unlock();
        }
    }

    // 其他方法类似处理...
}

优点

  • 灵活性:可以根据需求选择不同的锁机制和锁定策略。
  • 性能:可以实现细粒度的锁定,减少锁竞争,提高高并发性能。

缺点

  • 复杂性:需要手动管理锁,容易出现死锁、锁遗漏等问题。
  • 维护成本:代码量增加,维护成本增大。

2. 使用 Collections.synchronizedList(new ArrayList())

Collections.synchronizedList 方法返回一个线程安全的 List 对象。其中所有关键方法都用 synchronized 关键字修饰。这种方式类似于 VectorHashtable 的实现方式,但是性能较低,因为每次调用都需要获取全局锁。

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class SynchronizedArrayListExample {
    private static final List<String> list = Collections.synchronizedList(new ArrayList<>());

    public static void main(String[] args) {
        // 添加元素
        for (int i = 0; i < 10; i++) {
            list.add("Item " + i);
        }

        // 遍历元素
        synchronized (list) {
            for (String item : list) {
                System.out.println(item);
            }
        }
    }
}

优点

  • 简单易用:只需调用一次 Collections.synchronizedList() 即可获取到线程安全的 List
  • 内置支持:Java 标准库提供的线程解决方案,无需添加额外的依赖。

缺点

  • 性能较差:每次调用该方法都需要获取全局锁,导致并发性能低下。
  • 潜在问题:遍历时仍需手动添加同步锁,否则可能会引发 ConcurrentModificationException

3. 使用 CopyOnWriteArrayList

CopyOnWriteArrayList 是 Java 并发包中提供的一种线程安全 List 实现,基于写时复制思想来实现。在写操作时,会创建一个新的数组副本,并在新数组中进行修改,修改完成后将引用指向新数组。读操作则直接访问的是当前数组,所以读操作不需要加锁。

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteArrayListExample {
    private static final List<String> list = new CopyOnWriteArrayList<>();

    public static void main(String[] args) {
        // 添加元素
        for (int i = 0; i < 10; i++) {
            list.add("Item " + i);
        }

        // 遍历元素
        for (String item : list) {
            System.out.println(item);
        }
    }
}

优点

  • 线程安全:读写操作都是线程安全的,无需手动同步。
  • 写时复制:写操作不会阻塞读操作,不会出现读到一半就不读的情况。适用于读多写少的场景。

缺点

  • 内存开销大:每次写操作都会创建一个新的副本数组,可能导致较高的内存消耗。
  • 性能问题:频繁的写操作可能会导致性能下降,因为每次写操作都需要复制整个数组。

4. 总结

  • 自行加锁:提供了更大的灵活性和优化空间,但需要开发者具有较高的并发编程能力。
  • Collections.synchronizedList :适用于简单的线程安全需求,尤其是当读写比例较为均衡时。但性能较差且存在潜在问题。
  • CopyOnWriteArrayList :适用于读多写少的场景,如配置加载,缓存更新。在写操作频繁下性能下降。

六、多线程中使用哈希表

1. HashMap 线程不安全

HashMap 是 Java 中最常用的哈希表实现方式之一,但它是非线程安全的。这意味着在多线程中使用 HashMap 可能会导致数据竞争和不一致的状态。

import java.util.HashMap;
import java.util.Map;

public class HashMapExample {
    private static final Map<String, String> map = new HashMap<>();

    public static void main(String[] args) throws InterruptedException {
        Runnable task = () -> {
            for (int i = 0; i < 1000; i++) {
                map.put(Thread.currentThread().getName() + "-" + i, "value" + i);
            }
        };

        Thread t1 = new Thread(task, "Thread-1");
        Thread t2 = new Thread(task, "Thread-2");

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Final size: " + map.size());
    }
}

优点

  • 高性能:没有同步开销,读写操作速度快。
  • 灵活性:提供丰富的 API ,易于使用。

缺点

  • 线程不安全:在多线程环境下直接使用可能会导致数据丢失或异常(ConcurrentModificationException)。

2. Hashtable 线程安全

Hashtable 是 Java 早期提供线程安全的哈希表实现。他通过给所有公共方法都加上 synchronized 关键字来实现线程安全。然而,这种全局锁机制导致并发编程性能低下。

import java.util.Hashtable;
import java.util.Map;

public class HashtableExample {
    private static final Map<String, String> map = new Hashtable<>();

    public static void main(String[] args) throws InterruptedException {
        Runnable task = () -> {
            for (int i = 0; i < 1000; i++) {
                map.put(Thread.currentThread().getName() + "-" + i, "value" + i);
            }
        };

        Thread t1 = new Thread(task, "Thread-1");
        Thread t2 = new Thread(task, "Thread-2");

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Final size: " + map.size());
    }
}

优点

  • 线程安全:所有公共方法都加上了 synchronized ,确保线程安全。
  • 易于使用:无需额外处理同步问题。

缺点

  • 性能较差:每次访问都需获取全局锁,导致并发性能低下。
  • 过时Hashtable 已经被标志过时,建议使用 Collections.synchronizedMapConcurrentHashMap

3. ConcurrentHashMap 效率更高

ConcurrentHashMap 是 Java 并发包中高性能线程安全的哈希表实现方式。它通过分段锁(Segment Locking)或更现代的 CAS 操作(Compare-Anderson-Swap)实现高效的并发性能。与 Hashtable 全局锁不同,ConcurrentHashMap 在多个桶分别加锁,减少锁冲突的概率。

3.1 基础概念

  1. Hashtable
  • 线程安全:所有公共方法都是用 synchronized 关键字来进行同步操作。
  • 全局锁:每次访问都需要获取整个表的锁,导致并发性能低下。
  1. ConcurrentHashMap
  • 分段锁:在 Java 7 及更早版本中,ConcurrentHashMap 使用分段锁(Segment Locking),将哈希表分成多个段(Segment),在每个段独立加锁。
  • CAS 操作:在 Java 8 及更高版本中,ConcurrentHashMap 使用 CAS 操作(Compare-And-Swap)来减少锁的竞争,进一步提高并发性能。

3.2 性能测试

我们将通过一个简单的性能测试来比较 HashtableConcurrentHashMap 的性能。假设我们要在一个多线程环境中对哈希表进行大量插入操作。

import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class HashTableVsConcurrentHashMap {

    public static void main(String[] args) throws InterruptedException {
        int threadCount = 10;
        int operationsPerThread = 100000;

        // Hashtable Test
        testHashTable(threadCount, operationsPerThread);

        // ConcurrentHashMap Test
        testConcurrentHashMap(threadCount, operationsPerThread);
    }

    private static void testHashTable(int threadCount, int operationsPerThread) throws InterruptedException {
        Map<String, String> map = new Hashtable<>();

        Runnable task = () -> {
            for (int i = 0; i < operationsPerThread; i++) {
                map.put(Thread.currentThread().getName() + "-" + i, "value" + i);
            }
        };

        runTest(map, task, threadCount, "Hashtable");
    }

    private static void testConcurrentHashMap(int threadCount, int operationsPerThread) throws InterruptedException {
        Map<String, String> map = new ConcurrentHashMap<>();

        Runnable task = () -> {
            for (int i = 0; i < operationsPerThread; i++) {
                map.put(Thread.currentThread().getName() + "-" + i, "value" + i);
            }
        };

        runTest(map, task, threadCount, "ConcurrentHashMap");
    }

    private static void runTest(Map<String, String> map, Runnable task, int threadCount, String mapName) throws InterruptedException {
        Thread[] threads = new Thread[threadCount];
        long startTime = System.currentTimeMillis();

        for (int i = 0; i < threadCount; i++) {
            threads[i] = new Thread(task, "Thread-" + i);
            threads[i].start();
        }

        for (int i = 0; i < threadCount; i++) {
            threads[i].join();
        }

        long endTime = System.currentTimeMillis();
        System.out.println(mapName + " Time: " + (endTime - startTime) + " ms, Size: " + map.size());
    }
}

运行上述代码,通常会发现 ConcurrentHashMap 的执行时间显著低于 Hashtable。这表明 ConcurrentHashMap 在多线程环境下的性能更好。

Hashtable Time: 455 ms, Size: 1000000
ConcurrentHashMap Time: 268 ms, Size: 1000000

3.3 原因分析(全局锁 vs 分段锁/CAS)

  • Hashtable :每次访问都需要获取整个表的锁,导致其他线程等待当前线程释放锁后才能继续操作,即使这些操作发生在不同的部分。

    public synchronized V put(K key, V value) {
       // Synchronized method
    }
    
  • ConcurrentHashMap

    • 在 Java 7 中,ConcurrentHashMap 使用了 分段锁(Segment Locking),将哈希表分为多个段,每个段独立进行加锁。这些不同段的操作可以并发进行,减少锁冲突的概率。
      final Segment<K,V> segmentFor(int hash) {
          return segments[(hash >>> segmentShift) & segmentMask];
      }
      
    • 在 Java 8 及更高版本中,放弃了分段锁机制。转而使用 CAS 操作和更细粒度的锁(如 锁住单个桶)。这种设计进一步提高并发性能。
      final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                     boolean evict) {
          Node<K,V>[] tab; Node<K,V> p; int n, i;
          if ((tab = table) == null || (n = tab.length) == 0)
              n = (tab = resize()).length;
          if ((p = tab[i = (n - 1) & hash]) == null)
              tab[i] = newNode(hash, key, value, null);
          else {
              Node<K,V> e; K k;
              if (p.hash == hash &&
                  ((k = p.key) == key || (key != null && key.equals(k))))
                  e = p;
              else if (p instanceof TreeNode)
                  e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
              else {
                  for (int binCount = 0; ; ++binCount) {
                      if ((e = p.next) == null) {
                          p.next = newNode(hash, key, value, null);
                          if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                              treeifyBin(tab, hash);
                          break;
                      }
                      if (e.hash == hash &&
                          ((k = e.key) == key || (key != null && key.equals(k))))
                          break;
                      p = e;
                  }
              }
              if (e != null) { // existing mapping for key
                  V oldValue = e.value;
                  if (!onlyIfAbsent || oldValue == null)
                      e.value = value;
                  afterNodeAccess(e);
                  return oldValue;
              }
          }
          ++modCount;
          if (++size > threshold)
              resize();
          afterNodeInsertion(evict);
          return null;
      }
      

3.4 结论

根据以上分析和实验,可以得出以下结论:

  • Hashtable :使用全局锁机制,导致高并发环境性能下降。
  • ConcurrentHashMap :使用分段锁或 CAS 操作,降低锁冲突的概念,提高了并发性能。

具体来讲,ConcurrentHashMap 的实现使其能够在多线程环境中更高效地处理并发读写操作。以下是其主要特性:

  • 分段锁 / CAS 操作:通过减少锁的粒度,提高并发性能。
  • 弱一致性:允许一定的弱一致性,读操作不会阻塞写操作。
  • 丰富的 API :提供诸如 computeIfAbsentmerge 等高级 API ,方便开发。

4. 代码案例分析

import java.util.concurrent.atomic.AtomicReferenceArray;

public class SimpleConcurrentHashMap<K, V> {
    private static final int DEFAULT_CAPACITY = 16;
    private AtomicReferenceArray<Node<K, V>> table;

    public SimpleConcurrentHashMap() {
        table = new AtomicReferenceArray<>(DEFAULT_CAPACITY);
    }

    public void put(K key, V value) {
        int index = Math.abs(key.hashCode()) % table.length();
        Node<K, V> newNode = new Node<>(key, value);
        while (true) {
            Node<K, V> current = table.get(index);
            if (current == null) {
                if (table.compareAndSet(index, null, newNode)) {
                    break;
                }
            } else if (current.getKey().equals(key)) {
                if (table.compareAndSet(index, current, newNode)) {
                    break;
                }
            } else {
                index = (index + 1) % table.length(); // Linear probing
            }
        }
    }

    public V get(K key) {
        int index = Math.abs(key.hashCode()) % table.length();
        Node<K, V> node = table.get(index);
        while (node != null) {
            if (node.getKey().equals(key)) {
                return node.getValue();
            }
            index = (index + 1) % table.length();
            node = table.get(index);
        }
        return null;
    }

    static class Node<K, V> {
        private final K key;
        private volatile V value;

        public Node(K key, V value) {
            this.key = key;
            this.value = value;
        }

        public K getKey() {
            return key;
        }

        public V getValue() {
            return value;
        }

        public void setValue(V value) {
            this.value = value;
        }
    }
}

这段示例代码是一个简化的 ConcurrentHashMap 实现,展示了如何使用 CAS(Compare-And-Swap)操作来实现无锁更新。以下是其主要实现流程和各个部分的功能介绍:

4.1 类结构

SimpleConcurrentHashMap<K, V>
这是一个主类,表示一个简单的线程安全的哈希表。它使用了 AtomicReferenceArray 来存储键值对,并通过 CAS 操作来实现并发控制。

import java.util.concurrent.atomic.AtomicReferenceArray;

public class SimpleConcurrentHashMap<K, V> {
    private static final int DEFAULT_CAPACITY = 16; // 图书馆中有多少书架
    private AtomicReferenceArray<Node<K, V>> table; // 图书馆

    public SimpleConcurrentHashMap() {
        table = new AtomicReferenceArray<>(DEFAULT_CAPACITY);
    }

    // put 和 get 方法的实现
}
  • DEFAULT_CAPACITY :默认容量为 16 。
  • table :使用 AtomicReferenceArray 来存储键值对。

4.2 插入操作 (put 方法)

put(K key, V value)
该方法用于将键值对插入哈希表中,如果键已经存在,则更新其对应的值。

public void put(K key, V value) {
    int index = Math.abs(key.hashCode()) % table.length(); // 哪个书架 的编号
    Node<K, V> newNode = new Node<>(key, value);
    while (true) {
        Node<K, V> current = table.get(index); // 当前书架上的 当前书的编号
        if (current == null) {
            // 当前书架上的没有书,直接插入,使用 CAS 操作
            if (table.compareAndSet(index, null, newNode)) {
                break;
            }
        } else if (current.getKey().equals(key)) {
            // 当前书架上存在相同的书(即编号相同),也可插入(书中的数据)
            if (table.compareAndSet(index, current, newNode)) {
                break;
            }
        } else {
            index = (index + 1) % table.length(); // 找下一个书架
        }
    }
}
  • 计算索引:根据键的哈希码计算出在数组中的索引位置。
  • CAS 操作:尝试将新节点插入指定位置。
    • 如果当前位置为空(null),则直接插入新节点。
    • 如果当前位置已有节点且键相同,则更新节点的值。
    • 如果当前位置已有节点且键不同,则进行线性探测(寻找下一个位置)。

4.3 查找操作 (get 方法)

get(K key)
该方法用于从哈希表中查找给定的键找对应的值。

public V get(K key) {
    int index = Math.abs(key.hashCode()) % table.length(); // 当前书架的编号
    Node<K, V> node = table.get(index); // 书架上的一本书(的编号)
    while (node != null) {
        if (node.getKey().equals(key)) {
            return node.getValue();
        }
        // 找不到,就继续往下一个书架走
        index = (index + 1) % table.length();
        node = table.get(index);
    }
    return null;
}
  • 计算索引:与 put 方法类似,首先根据键的哈希码计算出初始索引位置。
  • 查找节点:遍历链表,查找要匹配的键并返回对应的值。
    • 如果找到匹配的键,则返回对应的值。
    • 如果没找到匹配的键,则继续线性探测,直到找到空位或遍历完整个链表。

4.4 节点类 (Node<K, V>)

Node<K, V>
这是一个内部静态类,表示哈希表中的每个节点。每个节点包含一个键和一个值。

static class Node<K, V> {
    private final K key; // 书的编号
    private volatile V value; // 书的内容

    public Node(K key, V value) {
        this.key = key;
        this.value = value;
    }

    public K getKey() {
        return key;
    }

    public V getValue() {
        return value;
    }

    public void setValue(V value) {
        this.value = value;
    }
}
  • key :节点的键,不可变。
  • value :节点的值,可变(使用 volatile 关键字确保其可见性)。

4.5 总结

主要流程

  • 初始化:创建 SimpleConcurrentHashMap 对象时,初始化一个固定大小的 AtomicReferenceArray 数组。
  • 插入操作
    • 计算键的哈希码并确定其在数组中的位置。
    • 使用 CAS 操作尝试将新节点插入到指定位置。
    • 如果发生冲突(即当前位置已有节点),则进行线性探测,查找下一个可用位置。
  • 查找操作
    • 计算键的哈希码并确定其在数组中的位置。
    • 遍历链表,查找匹配的键并返回其对应的值。
    • 如果未找到匹配的键,则继续线性探测,直到找到空位或遍历完整个链表。

特点

  • CAS 操作:使用 compareAndSet 方法实现无锁更新,避免了传统锁机制带来的性能瓶颈。
  • 线性探测:当发生哈希冲突时,采用线性探测策略寻找下一个可用位置。
  • 简单高效:虽然简化了许多高级特性,但基本实现了线程安全的哈希表功能。

5. ConcurrentHashMap 的一个核心设计理念

为了更好的理解 SimpleConcurrentHashMap 如何通过细粒度的锁和非阻塞算法提高并发性能的设计思想,可以从以下几个方面进行详细介绍:

5.1 细粒度的锁

在传统的线程安全的哈希表实现(Hashtable)中,通常会对整个哈希表加全局锁,这意味着每次进行读写操作都要获取同一把锁,导致其他线程必须等待当前线程释放锁后才能继续操作。这种全局锁机制严重限制了并发性能。

全局锁示例 (Hashtable)

public synchronized V put(K key, V value) {
    // Synchronized method
}

在这个例子中,put() 方法被 synchronized 关键字修饰,表示每次调用该方法都需要获取整个对象的锁。这会导致不同线程操作哈希表的不同部位,它们仍需等待锁的释放。

5.2 非阻塞算法与 CAS 操作

相比之下,SimpleConcurrentHashMap 使用 CAS 操作(Compare-And-Swap)来实现无锁更新。CAS 操作一种原子操作,可以在不使用显式锁的情况下完成对变量的更新。具体来讲,CAS 操作会尝试将某个变量从旧值更新为新值,但只有当变量的当前值为旧值时才能更新成功。如果当前值已经发生变化,则操作失败,并且可以重试。

CAS 操作示例

if (table.compareAndSet(index, null, newNode)) {
    break;
}

在这个例子中,compareAndSet 方法尝试将数组中指定位置的值从 null 更新为 newNode。如果当前位置确实是 null,则更新成功;否则,说明有其他线程已经修改了该位置的值,当前线程需要重新计算索引并重试。

5.3 细粒度的锁与 CAS 操作的结合

SimpleConcurrentHashMap 通过以下方式实现细粒度和非阻塞算法:

  • 每个桶独立加锁:虽然 SimpleConcurrentHashMap 没有显式地划分段或桶,但是使用了 AtomicReferenceArray 来存储节点。并通过 CAS 操作对每个桶进行独立更新。这意味着不同的线程可以操作哈希表中不同的部分,且不会相互影响。
    private AtomicReferenceArray<Node<K, V>> table;
    
  • 无锁操作:通过 CAS 操作,SimpleConcurrentHashMap 实现无锁更新。每个线程在插入或更新节点时,无需等待其他线程释放锁,而是直接尝试更新。如果更新失败(即 CAS 操作返回 false),则线程重新计算索引并重试。
    while (true) {
        Node<K, V> current = table.get(index);
        if (current == null) {
            if (table.compareAndSet(index, null, newNode)) {
                break;
            }
        } else if (current.getKey().equals(key)) {
            if (table.compareAndSet(index, current, newNode)) {
                break;
            }
        } else {
            index = (index + 1) % table.length(); // Linear probing
        }
    }
    

5.4 提高并发性能

通过上述设计,SimpleConcurrentHashMap 在多线程环境下体现出更高的并发性能。主要原因如下:

  • 减少锁竞争:由于每个桶独立加锁(实际上是无锁的 CAS 操作),不同线程可以同时操作哈希表中不同部分,有效减少锁竞争的概率。
    • 例如,假设哈希表有 16 个桶,那么最多可以有 16 个线程同时进行插入或更新操作,而不会相互干扰。
  • 非阻塞算法:CAS 操作是一种非阻塞算法,即使某个线程更新失败,也不会影响其他线程操作哈希表的其他部分。相反,失败的线程会立即重试,直到成功为止。
    • 这种非阻塞特征,使得 SimpleConcurrentHashMap 在高并发环境依然保持得较高的吞吐量。

5.5 线性探测与冲突处理

SimpleConcurrentHashMap 中,发生哈希冲突时(即两个键的哈希码映射到同一个桶),采用了线性探测寻找到下一个可用位置。

else {
    index = (index + 1) % table.length(); // Linear probing
}

这种策略虽然简单,但在高并发环境下可能会导致一定的性能下降。为了进一步的优化,SimpleConcurrentHashMap 在 Java 8 及以上版本,引入了红黑树(TreeMap)代替了链表结构,从而提高了冲突处理的效率。

5.6 总结

  • 细粒度的锁:通过 AtomicReferenceArray 和 CAS 操作,实现了对每个桶的独立更新,避免了全局锁带来的性能瓶颈。
  • 非阻塞算法:CAS 操作是一种非阻塞算法,允许多个线程同时进行读写操作,而不会相互阻塞。
  • 高效冲突处理:虽然采用了简单的线性探测策略,但在高并发环境下仍能有效处理冲突,保证了系统的稳定性。

http://www.kler.cn/a/522873.html

相关文章:

  • 基于dlib/face recognition人脸识别推拉流实现
  • SpringBoot-Vue整合百度地图
  • 2024年除夕
  • vue中的el是指什么
  • Verilog中if语句和case语句综合出的电路区别
  • Kafka 入门与应用实战:吞吐量优化与与 RabbitMQ、RocketMQ 的对比
  • 逻辑学起码常识凸显级数论有重大错误:将两相同级数误为相异级数
  • WGCLOUD运维工具从入门到精通 - 如何设置主题背景
  • Rust语言进阶之迭代器:iter用法实例(九十)
  • 在docker上部署nacos
  • FPGA 23 ,使用 Vivado 实现花式跑马灯( 使用 Vivado 实现花式流水灯,采用模块化编程,从按键消抖到LED控制 )
  • Hive:基本查询语法
  • R语言机器学习算法实战系列(十九)特征选择之Monte Carlo算法(Monte Carlo Feature Selection)
  • 内存泄漏的通用排查方法
  • 《Vision Transformer》论文精读:在大量足够数据量的条件下纯Transformer也能在图像分类任务中比肩SOTA
  • 蓝桥杯例题四
  • 基于微信小程序的社团活动助手php+论文源码调试讲解
  • 电力晶体管(GTR)全控性器件
  • 【Qt】文件操作
  • 打造专业PPT幻灯片布局的实用策略
  • 深入浅出Linux操作系统大数据定制Shell编程(六)
  • 2000-2020年各省第二产业增加值占GDP比重数据
  • 在线免费快速无痕去除照片海报中的文字logo
  • 【面试】【前端】【微信小程序】微信小程序面试内容总结
  • 动态规划DP 数字三角型模型 方格取数(题目详解+C++代码实现)
  • Vue.js Vuex 模块化管理