Java开发经验——并发工具类库线程安全问题
摘要
本文探讨了Java并发工具类库中的线程安全问题,特别是ThreadLocal导致的用户信息错乱异常场景。文章通过一个Spring Boot Web应用程序示例,展示了在Tomcat线程池环境下,ThreadLocal如何因线程重用而导致异常,并讨论了其他并发工具类的线程安全问题,包括ConcurrentHashMap、computeIfAbsent方法、CopyOnWrite性能问题以及List线程安全方案。
1. ThreadLocal导致的异常场景
之前有业务同学和我反馈,在生产上遇到一个诡异的问题,有时获取到的用户信息是别人的。查看代码后,我发现他使用了 ThreadLocal 来缓存获取到的用户信息。
我们知道,ThreadLocal 适用于变量在线程间隔离,而在方法或类间共享的场景。如果用户信息的获取比较昂贵(比如从数据库查询用户信息),那么在 ThreadLocal 中缓存数据是比较合适的做法。但,这么做为什么会出现用户信息错乱的 Bug 呢?
使用 Spring Boot 创建一个 Web 应用程序,使用 ThreadLocal 存放一个 Integer 的值,来暂且代表需要在线程中保存的用户信息,这个值初始是 null。在业务逻辑中,我先从 ThreadLocal 获取一次值,然后把外部传入的参数设置到 ThreadLocal 中,来模拟从当前上下文获取到用户信息的逻辑,随后再获取一次值,最后输出两次获得的值和线程名称。
private static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);
@GetMapping("wrong")
public Map wrong(@RequestParam("userId") Integer userId) {
//设置用户信息之前先查询一次ThreadLocal中的用户信息
String before = Thread.currentThread().getName() + ":" + currentUser.get();
//设置用户信息到ThreadLocal
currentUser.set(userId);
//设置用户信息之后再查询一次ThreadLocal中的用户信息
String after = Thread.currentThread().getName() + ":" + currentUser.get();
//汇总输出两次查询结果
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
}
按理说,在设置用户信息之前第一次获取的值始终应该是 null,但我们要意识到,程序运行在 Tomcat 中,执行程序的线程是 Tomcat 的工作线程,而 Tomcat 的工作线程是基于线程池的。
顾名思义,线程池会重用固定的几个线程,一旦线程重用,那么很可能首次从 ThreadLocal 获取的值是之前其他用户的请求遗留的值。这时,ThreadLocal 中的用户信息就是其他用户的信息。
为了更快地重现这个问题,我在配置文件中设置一下 Tomcat 的参数,把工作线程池最大线程数设置为 1,这样始终是同一个线程在处理请求:
server.tomcat.max-threads=1
运行程序后先让用户 1 来请求接口,可以看到第一和第二次获取到用户 ID 分别是 null 和 1,符合预期:
随后用户 2 来请求接口,这次就出现了 Bug,第一和第二次获取到用户 ID 分别是 1 和 2,显然第一次获取到了用户 1 的信息,原因就是 Tomcat 的线程池重用了线程。从图中可以看到,两次请求的线程都是同一个线程:http-nio-8080-exec-1。
这个例子告诉我们,在写业务代码时,首先要理解代码会跑在什么线程上:
- 我们可能会抱怨学多线程没用,因为代码里没有开启使用多线程。但其实,可能只是我们没有意识到,在 Tomcat 这种 Web 服务器下跑的业务代码,本来就运行在一个多线程环境(否则接口也不可能支持这么高的并发),并不能认为没有显式开启多线程就不会有线程安全问题。
- 因为线程的创建比较昂贵,所以 Web 服务器往往会使用线程池来处理请求,这就意味着线程会被重用。这时,使用类似 ThreadLocal 工具来存放一些数据时,需要特别注意在代码运行完后,显式地去清空设置的数据。如果在代码中使用了自定义的线程池,也同样会遇到这个问题。需要使用线程的remove()方法。
理解了这个知识点后,我们修正这段代码的方案是,在代码的 finally 代码块中,显式清除 ThreadLocal 中的数据。这样一来,新的请求过来即使使用了之前的线程也不会获取到错误的用户信息了。修正后的代码如下:
@GetMapping("right")
public Map right(@RequestParam("userId") Integer userId) {
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
try {
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
} finally {
//在finally代码块中删除ThreadLocal中的数据,确保数据不串
currentUser.remove();
}
}
重新运行程序可以验证,再也不会出现第一次查询用户信息查询到之前用户请求的 Bug:
ThreadLocal 是利用独占资源的方式,来解决线程安全问题,那如果我们确实需要有资源在线程之间共享,应该怎么办呢?这时,我们可能就需要用到线程安全的容器了。
2. ConcurrentHashMap的线程不安全场景
JDK 1.5 后推出的 ConcurrentHashMap,是一个高性能的线程安全的哈希表容器。“线程安全”这四个字特别容易让人误解,因为ConcurrentHashMap 只能保证提供的原子性读写操作是线程安全的。
比如下面这个场景。有一个含 900 个元素的 Map,现在再补充 100 个元素进去,这个补充操作由 10 个线程并发进行。开发人员误以为使用了 ConcurrentHashMap 就不会有线程安全问题,于是不加思索地写出了下面的代码:在每一个线程的代码逻辑中先通过 size 方法拿到当前元素数量,计算 ConcurrentHashMap 目前还需要补充多少元素,并在日志中输出了这个值,然后通过 putAll 方法把缺少的元素添加进去。
/线程个数
private static int THREAD_COUNT = 10;
//总元素数量
private static int ITEM_COUNT = 1000;
//帮助方法,用来获得一个指定元素数量模拟数据的ConcurrentHashMap
private ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
(o1, o2) -> o1, ConcurrentHashMap::new));
}
@GetMapping("wrong")
public String wrong() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
//初始900个元素
log.info("init size:{}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
//使用线程池并发处理逻辑
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//查询还需要补充多少个元素
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size:{}", gap);
//补充元素
concurrentHashMap.putAll(getData(gap));
}));
//等待所有任务完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//最后元素个数会是1000吗?
log.info("finish size:{}", concurrentHashMap.size());
return "OK";
}
访问接口后程序输出的日志内容如下:
从日志中可以看到:
- 初始大小 900 符合预期,还需要填充 100 个元素。
- worker1 线程查询到当前需要填充的元素为 36,竟然还不是 100 的倍数。
- worker13 线程查询到需要填充的元素数是负的,显然已经过度填充了。
- 最后 HashMap 的总项目数是 1536,显然不符合填充满 1000 的预期。
针对这个场景,我们可以举一个形象的例子。ConcurrentHashMap 就像是一个大篮子,现在这个篮子里有 900 个桔子,我们期望把这个篮子装满 1000 个桔子,也就是再装 100 个桔子。有 10 个工人来干这件事儿,大家先后到岗后会计算还需要补多少个桔子进去,最后把桔子装入篮子。
ConcurrentHashMap 这个篮子本身,可以确保多个工人在装东西进去时,不会相互影响干扰,但无法确保工人 A 看到还需要装 100 个桔子但是还未装的时候,工人 B 就看不到篮子中的桔子数量。更值得注意的是,你往这个篮子装 100 个桔子的操作不是原子性的,在别人看来可能会有一个瞬间篮子里有 964 个桔子,还需要补 36 个桔子。
回到 ConcurrentHashMap,我们需要注意 ConcurrentHashMap 对外提供的方法或能力的限制:
- 使用了 ConcurrentHashMap,不代表对它的多个操作之间的状态是一致的,是没有其他线程在操作它的,如果需要确保需要手动加锁。
- 诸如 size、isEmpty 和 containsValue 等聚合方法,在并发情况下可能会反映 ConcurrentHashMap 的中间状态。因此在并发情况下,这些方法的返回值只能用作参考,而不能用于流程控制。显然,利用 size 方法计算差异值,是一个流程控制。
- 诸如 putAll 这样的聚合方法也不能确保原子性,在 putAll 的过程中去获取数据可能会获取到部分数据。
代码的修改方案很简单,整段逻辑加锁即可:
@GetMapping("right")
public String right() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
log.info("init size:{}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//下面的这段复合逻辑需要锁一下这个ConcurrentHashMap
synchronized (concurrentHashMap) {
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size:{}", gap);
concurrentHashMap.putAll(getData(gap));
}
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", concurrentHashMap.size());
return "OK";
}
重新调用接口,程序的日志输出结果符合预期:
可以看到,只有一个线程查询到了需要补 100 个元素,其他 9 个线程查询到不需要补元素,最后 Map 大小为 1000。到了这里,你可能又要问了,使用 ConcurrentHashMap 全程加锁,还不如使用普通的 HashMap 呢。
其实不完全是这样。ConcurrentHashMap 提供了一些原子性的简单复合逻辑方法,用好这些方法就可以发挥其威力。这就引申出代码中常见的另一个问题:在使用一些类库提供的高级工具类时,开发人员可能还是按照旧的方式去使用这些新类,因为没有使用其特性,所以无法发挥其威力。
3. computeIfAbsent方法
我们来看一个使用 Map 来统计 Key 出现次数的场景吧,这个逻辑在业务代码中非常常见。
- 使用 ConcurrentHashMap 来统计,Key 的范围是 10。
- 使用最多 10 个并发,循环操作 1000 万次,每次操作累加随机的 Key。
- 如果 Key 不存在的话,首次设置值为 1。
//循环次数
private static int LOOP_COUNT = 10000000;
//线程数量
private static int THREAD_COUNT = 10;
//元素数量
private static int ITEM_COUNT = 10;
private Map<String, Long> normaluse() throws InterruptedException {
ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
//获得一个随机的Key
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
synchronized (freqs) {
if (freqs.containsKey(key)) {
//Key存在则+1
freqs.put(key, freqs.get(key) + 1);
} else {
//Key不存在则初始化为1
freqs.put(key, 1L);
}
}
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs;
}
我们吸取之前的教训,直接通过锁的方式锁住 Map,然后做判断、读取现在的累计值、加 1、保存累加后值的逻辑。这段代码在功能上没有问题,但无法充分发挥 ConcurrentHashMap 的威力,改进后的代码如下:
private Map<String, Long> gooduse() throws InterruptedException {
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
//利用computeIfAbsent()方法来实例化LongAdder,然后利用LongAdder来进行线程安全计数
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//因为我们的Value是LongAdder而不是Long,所以需要做一次转换才能返回
return freqs.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().longValue())
);
}
在这段改进后的代码中,我们巧妙利用了下面两点:
- 使用 ConcurrentHashMap 的原子性方法 computeIfAbsent 来做复合逻辑操作,判断 Key 是否存在 Value,如果不存在则把 Lambda 表达式运行后的结果放入 Map 作为 Value,也就是新创建一个 LongAdder 对象,最后返回 Value。
- 由于 computeIfAbsent 方法返回的 Value 是 LongAdder,是一个线程安全的累加器,因此可以直接调用其 increment 方法进行累加。
这样在确保线程安全的情况下达到极致性能,把之前 7 行代码替换为了 1 行。
我们通过一个简单的测试比较一下修改前后两段代码的性能:
@GetMapping("good")
public String good() throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start("normaluse");
Map<String, Long> normaluse = normaluse();
stopWatch.stop();
//校验元素数量
Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");
//校验累计总数
Assert.isTrue(normaluse.entrySet().stream()
.mapToLong(item -> item.getValue()).reduce(0, Long::sum) == LOOP_COUNT
, "normaluse count error");
stopWatch.start("gooduse");
Map<String, Long> gooduse = gooduse();
stopWatch.stop();
Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");
Assert.isTrue(gooduse.entrySet().stream()
.mapToLong(item -> item.getValue())
.reduce(0, Long::sum) == LOOP_COUNT
, "gooduse count error");
log.info(stopWatch.prettyPrint());
return "OK";
}
这段测试代码并无特殊之处,使用 StopWatch 来测试两段代码的性能,最后跟了一个断言判断 Map 中元素的个数以及所有 Value 的和,是否符合预期来校验代码的正确性。测试结果如下:
可以看到,优化后的代码,相比使用锁来操作 ConcurrentHashMap 的方式,性能提升了 10 倍。你可能会问,computeIfAbsent 为什么如此高效呢?
答案就在源码最核心的部分,也就是 Java 自带的 Unsafe 实现的 CAS。它在虚拟机层面确保了写入数据的原子性,比加锁的效率高得多:
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
像 ConcurrentHashMap 这样的高级并发工具的确提供了一些高级 API,只有充分了解其特性才能最大化其威力,而不能因为其足够高级、酷炫盲目使用。
4. CopyOnWrite导致的性能问题
除了 ConcurrentHashMap 这样通用的并发工具类之外,我们的工具包中还有些针对特殊场景实现的生面孔。一般来说,针对通用场景的通用解决方案,在所有场景下性能都还可以,属于“万金油”;而针对特殊场景的特殊实现,会有比通用解决方案更高的性能,但一定要在它针对的场景下使用,否则可能会产生性能问题甚至是 Bug。
之前在排查一个生产性能问题时,我们发现一段简单的非数据库操作的业务逻辑,消耗了超出预期的时间,在修改数据时操作本地缓存比回写数据库慢许多。查看代码发现,开发同学使用了 CopyOnWriteArrayList 来缓存大量的数据,而数据变化又比较频繁。CopyOnWrite 是一个时髦的技术,不管是 Linux 还是 Redis 都会用到。在 Java 中,CopyOnWriteArrayList 虽然是一个线程安全的 ArrayList,但因为其实现方式是,每次修改数据时都会复制一份数据出来,所以有明显的适用场景,即读多写少或者说希望无锁读的场景。
如果我们要使用 CopyOnWriteArrayList,那一定是因为场景需要而不是因为足够酷炫。如果读写比例均衡或者有大量写操作的话,使用 CopyOnWriteArrayList 的性能会非常糟糕。
我们写一段测试代码,来比较下使用 CopyOnWriteArrayList 和普通加锁方式 ArrayList 的读写性能吧。在这段代码中我们针对并发读和并发写分别写了一个测试方法,测试两者一定次数的写或读操作的耗时。
//测试并发写的性能
@GetMapping("write")
public Map testWrite() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
StopWatch stopWatch = new StopWatch();
int loopCount = 100000;
stopWatch.start("Write:copyOnWriteArrayList");
//循环100000次并发往CopyOnWriteArrayList写入随机元素
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("Write:synchronizedList");
//循环100000次并发往加锁的ArrayList写入随机元素
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}
//帮助方法用来填充List
private void addAll(List<Integer> list) {
list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList()));
}
//测试并发读的性能
@GetMapping("read")
public Map testRead() {
//创建两个测试对象
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
//填充数据
addAll(copyOnWriteArrayList);
addAll(synchronizedList);
StopWatch stopWatch = new StopWatch();
int loopCount = 1000000;
int count = copyOnWriteArrayList.size();
stopWatch.start("Read:copyOnWriteArrayList");
//循环1000000次并发从CopyOnWriteArrayList随机查询元素
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
stopWatch.start("Read:synchronizedList");
//循环1000000次并发从加锁的ArrayList随机查询元素
IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}
运行程序可以看到,大量写的场景(10 万次 add 操作),CopyOnWriteArray 几乎比同步的 ArrayList 慢一百倍:
而在大量读的场景下(100 万次 get 操作),CopyOnWriteArray 又比同步的 ArrayList 快五倍以上:
你可能会问,为何在大量写的场景下,CopyOnWriteArrayList 会这么慢呢?答案就在源码中。以 add 方法为例,每次 add 时,都会用 Arrays.copyOf 创建一个新数组,频繁 add 时内存的申请释放消耗会很大:
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
synchronized (lock) {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
}
}
5. List线程安全方案
5.1. 使用同步包装类(Collections.synchronizedList
)
List<String> list = Collections.synchronizedList(new ArrayList<>());
// 线程安全的操作列表
list.add("item1");
list.add("item2");
// 遍历时需要手动同步
synchronized (list) {
for (String item : list) {
System.out.println(item);
}
}
虽然 synchronizedList
确保了增删改的线程安全,但在遍历时仍需手动同步。
5.2. 使用并发容器(CopyOnWriteArrayList
)
CopyOnWriteArrayList
是 java.util.concurrent
提供的线程安全 List
实现。
工作原理:
- 在写操作(如
add
、remove
)时,会创建集合的副本进行操作,因此读操作不会被锁定。 - 适合读多写少的场景。
List<String> list = new CopyOnWriteArrayList<>();
// 写操作
list.add("item1");
list.add("item2");
// 读操作,无需同步
for (String item : list) {
System.out.println(item);
}
优点:
- 读操作无需同步,性能优越。
- 写操作创建副本,避免了锁竞争。
缺点:
- 写操作开销较大,适合读多写少的场景。
5.3. 手动同步(synchronized
)
如果对现有的非线程安全 List
进行操作,可以使用 synchronized
块来保证线程安全。
List<String> list = new ArrayList<>();
// 添加元素时同步
synchronized (list) {
list.add("item1");
}
// 遍历时同步
synchronized (list) {
for (String item : list) {
System.out.println(item);
}
}
5.4. 使用 BlockingQueue
替代 List
如果需要在线程之间传递数据,可以考虑使用 BlockingQueue
(如 LinkedBlockingQueue
或 ArrayBlockingQueue
),它本身是线程安全的,且为生产者-消费者模式提供了更好的支持。
BlockingQueue
是线程安全的,因为它在内部通过多种同步机制确保了线程之间的安全操作,包括 锁、条件变量 和 CAS(Compare-And-Swap)操作 等。以下是它实现线程安全的核心原因和机制:
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 生产者线程
new Thread(() -> {
try {
queue.put("item1");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 消费者线程
new Thread(() -> {
try {
String item = queue.take();
System.out.println(item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
优点:
- 支持阻塞操作(
put
和take
)。 - 适合多线程的生产者-消费者模型。
5.5. 使用 ConcurrentLinkedQueue
如果不需要阻塞操作,但需要高效的线程安全队列,可以使用 ConcurrentLinkedQueue
。
Queue<String> queue = new ConcurrentLinkedQueue<>();
queue.add("item1");
queue.add("item2");
String item = queue.poll();
System.out.println(item);
5.6. list现场安全选择方案
- 读多写少:使用
CopyOnWriteArrayList
。 - 高并发读写:使用
ConcurrentLinkedQueue
或BlockingQueue
。 - 简单同步:使用
Collections.synchronizedList
。 - 低性能需求:手动加锁(
synchronized
)。
根据场景选择适合的解决方案即可!
6. 代码加锁导致线程安全问题
在一个类里有两个 int 类型的字段 a 和 b,有一个 add 方法循环 1 万次对 a 和 b 进行 ++ 操作,有另一个 compare 方法,同样循环 1 万次判断 a 是否小于 b,条件成立就打印 a 和 b 的值,并判断 a>b 是否成立。
@Slf4j
public class Interesting {
volatile int a = 1;
volatile int b = 1;
public void add() {
log.info("add start");
for (int i = 0; i < 10000; i++) {
a++;
b++;
}
log.info("add done");
}
public void compare() {
log.info("compare start");
for (int i = 0; i < 10000; i++) {
//a始终等于b吗?
if (a < b) {
log.info("a:{},b:{},{}", a, b, a > b);
//最后的a>b应该始终是false吗?
}
}
log.info("compare done");
}
}
他起了两个线程来分别执行 add 和 compare 方法:
Interesting interesting = new Interesting();
new Thread(() -> interesting.add()).start();
new Thread(() -> interesting.compare()).start();
按道理,a 和 b 同样进行累加操作,应该始终相等,compare 中的第一次判断应该始终不会成立,不会输出任何日志。但,执行代码后发现不但输出了日志,而且更诡异的是,compare 方法在判断 ab 也成立:
群里一位同学看到这个问题笑了,说:“这哪是 JVM 的 Bug,分明是线程安全问题嘛。很明显,你这是在操作两个字段 a 和 b,有线程安全问题,应该为 add 方法加上锁,确保 a 和 b 的 ++ 是原子性的,就不会错乱了。”随后,他为 add 方法加上了锁:
public synchronized void add()
但,加锁后问题并没有解决。我们来仔细想一下,为什么锁可以解决线程安全问题呢。因为只有一个线程可以拿到锁,所以加锁后的代码中的资源操作是线程安全的。但是,这个案例中的 add 方法始终只有一个线程在操作,显然只为 add 方法加锁是没用的。
之所以出现这种错乱,是因为两个线程是交错执行 add 和 compare 方法中的业务逻辑,而且这些业务逻辑不是原子性的:a++ 和 b++ 操作中可以穿插在 compare 方法的比较代码中;更需要注意的是,a 这种比较操作在字节码层面是加载 a、加载 b 和比较三步,代码虽然是一行但也不是原子性的。
所以,正确的做法应该是,为 add 和 compare 都加上方法锁,确保 add 方法执行时,compare 无法读取 a 和 b:
public synchronized void add()
public synchronized void compare()
所以,使用锁解决问题之前一定要理清楚,我们要保护的是什么逻辑,多线程执行的情况又是怎样的。
6.1. 加锁前要清楚锁和被保护的对象是不是一个层面的
除了没有分析清线程、业务逻辑和锁三者之间的关系随意添加无效的方法锁外,还有一种比较常见的错误是,没有理清楚锁和要保护的对象是否是一个层面的。
我们知道静态字段属于类,类级别的锁才能保护;而非静态字段属于类实例,实例级别的锁就可以保护。
先看看这段代码有什么问题:在类 Data 中定义了一个静态的 int 字段 counter 和一个非静态的 wrong 方法,实现 counter 字段的累加操作。
class Data {
@Getter
private static int counter = 0;
public static int reset() {
counter = 0;
return counter;
}
public synchronized void wrong() {
counter++;
}
}
写一段代码测试下:
@GetMapping("wrong")
public int wrong(@RequestParam(value = "count", defaultValue = "1000000") int count) {
Data.reset();
//多线程循环一定次数调用Data类不同实例的wrong方法
IntStream.rangeClosed(1, count).parallel().forEach(i -> new Data().wrong());
return Data.getCounter();
}
因为默认运行 100 万次,所以执行后应该输出 100 万,但页面输出的是 639242:
在非静态的 wrong 方法上加锁,只能确保多个线程无法执行同一个实例的 wrong 方法,却不能保证不会执行不同实例的 wrong 方法。而静态的 counter 在多个实例中共享,所以必然会出现线程安全问题。
理清思路后,修正方法就很清晰了:同样在类中定义一个 Object 类型的静态字段,在操作 counter 之前对这个字段加锁。
class Data {
@Getter
private static int counter = 0;
private static Object locker = new Object();
public void right() {
synchronized (locker) {
counter++;
}
}
}
你可能要问了,把 wrong 方法定义为静态不就可以了,这个时候锁是类级别的。可以是可以,但我们不可能为了解决线程安全问题改变代码结构,把实例方法改为静态方法。
6.2. 加锁要考虑锁的粒度和场景问题
在方法上加 synchronized 关键字实现加锁确实简单,也因此我曾看到一些业务代码中几乎所有方法都加了 synchronized,但这种滥用 synchronized 的做法:
- 一是,没必要。通常情况下 60% 的业务代码是三层架构,数据经过无状态的 Controller、Service、Repository 流转到数据库,没必要使用 synchronized 来保护什么数据。
- 二是,可能会极大地降低性能。使用 Spring 框架时,默认情况下 Controller、Service、Repository 是单例的,加上 synchronized 会导致整个程序几乎就只能支持单线程,造成极大的性能问题。
即使我们确实有一些共享资源需要保护,也要尽可能降低锁的粒度,仅对必要的代码块甚至是需要保护的资源本身加锁。比如,在业务代码中,有一个 ArrayList 因为会被多个线程操作而需要保护,又有一段比较耗时的操作(代码中的 slow 方法)不涉及线程安全问题,应该如何加锁呢?
错误的做法是,给整段业务逻辑加锁,把 slow 方法和操作 ArrayList 的代码同时纳入 synchronized 代码块;更合适的做法是,把加锁的粒度降到最低,只在操作 ArrayList 的时候给这个 ArrayList 加锁。
private List<Integer> data = new ArrayList<>();
//不涉及共享资源的慢方法
private void slow() {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
}
}
//错误的加锁方法
@GetMapping("wrong")
public int wrong() {
long begin = System.currentTimeMillis();
IntStream.rangeClosed(1, 1000).parallel().forEach(i -> {
//加锁粒度太粗了
synchronized (this) {
slow();
data.add(i);
}
});
log.info("took:{}", System.currentTimeMillis() - begin);
return data.size();
}
//正确的加锁方法
@GetMapping("right")
public int right() {
long begin = System.currentTimeMillis();
IntStream.rangeClosed(1, 1000).parallel().forEach(i -> {
slow();
//只对List加锁
synchronized (data) {
data.add(i);
}
});
log.info("took:{}", System.currentTimeMillis() - begin);
return data.size();
}
执行这段代码,同样是 1000 次业务操作,正确加锁的版本耗时 1.4 秒,而对整个业务逻辑加锁的话耗时 11 秒。
如果精细化考虑了锁应用范围后,性能还无法满足需求的话,我们就要考虑另一个维度的粒度问题了,即:区分读写场景以及资源的访问冲突,考虑使用悲观方式的锁还是乐观方式的锁。
一般业务代码中,很少需要进一步考虑这两种更细粒度的锁,所以我只和你分享几个大概的结论,你可以根据自己的需求来考虑是否有必要进一步优化:
- 对于读写比例差异明显的场景,考虑使用 ReentrantReadWriteLock 细化区分读写锁,来提高性能。
- 如果你的 JDK 版本高于 1.8、共享资源的冲突概率也没那么大的话,考虑使用 StampedLock 的乐观读的特性,进一步提高性能。
- JDK 里 ReentrantLock 和 ReentrantReadWriteLock 都提供了公平锁的版本,在没有明确需求的情况下不要轻易开启公平锁特性,在任务很轻的情况下开启公平锁可能会让性能下降上百倍。
6.3. 多把锁要小心死锁问题
下单操作需要锁定订单中多个商品的库存,拿到所有商品的锁之后进行下单扣减库存操作,全部操作完成之后释放所有的锁。代码上线后发现,下单失败概率很高,失败后需要用户重新下单,极大影响了用户体验,还影响到了销量。
经排查发现是死锁引起的问题,背后原因是扣减库存的顺序不同,导致并发的情况下多个线程可能相互持有部分商品的锁,又等待其他线程释放另一部分商品的锁,于是出现了死锁问题。
首先,定义一个商品类型,包含商品名、库存剩余和商品的库存锁三个属性,每一种商品默认库存 1000 个;然后,初始化 10 个这样的商品对象来模拟商品清单:
@Data
@RequiredArgsConstructor
static class Item {
final String name; //商品名
int remaining = 1000; //库存剩余
@ToString.Exclude //ToString不包含这个字段
ReentrantLock lock = new ReentrantLock();
}
随后,写一个方法模拟在购物车进行商品选购,每次从商品清单(items 字段)中随机选购三个商品(为了逻辑简单,我们不考虑每次选购多个同类商品的逻辑,购物车中不体现商品数量):
private List<Item> createCart() {
return IntStream.rangeClosed(1, 3)
.mapToObj(i -> "item" + ThreadLocalRandom.current().nextInt(items.size()))
.map(name -> items.get(name)).collect(Collectors.toList());
}
下单代码如下:先声明一个 List 来保存所有获得的锁,然后遍历购物车中的商品依次尝试获得商品的锁,最长等待 10 秒,获得全部锁之后再扣减库存;如果有无法获得锁的情况则解锁之前获得的所有锁,返回 false 下单失败。
private boolean createOrder(List<Item> order) {
//存放所有获得的锁
List<ReentrantLock> locks = new ArrayList<>();
for (Item item : order) {
try {
//获得锁10秒超时
if (item.lock.tryLock(10, TimeUnit.SECONDS)) {
locks.add(item.lock);
} else {
locks.forEach(ReentrantLock::unlock);
return false;
}
} catch (InterruptedException e) {
}
}
//锁全部拿到之后执行扣减库存业务逻辑
try {
order.forEach(item -> item.remaining--);
} finally {
locks.forEach(ReentrantLock::unlock);
}
return true;
}
我们写一段代码测试这个下单操作。模拟在多线程情况下进行 100 次创建购物车和下单操作,最后通过日志输出成功的下单次数、总剩余的商品个数、100 次下单耗时,以及下单完成后的商品库存明细:
@GetMapping("wrong")
public long wrong() {
long begin = System.currentTimeMillis();
//并发进行100次下单操作,统计成功次数
long success = IntStream.rangeClosed(1, 100).parallel()
.mapToObj(i -> {
List<Item> cart = createCart();
return createOrder(cart);
})
.filter(result -> result)
.count();
log.info("success:{} totalRemaining:{} took:{}ms items:{}",
success,
items.entrySet().stream().map(item -> item.getValue().remaining).reduce(0, Integer::sum),
System.currentTimeMillis() - begin, items);
return success;
}
运行程序,输出如下日志:
可以看到,100 次下单操作成功了 65 次,10 种商品总计 10000 件,库存总计为 9805,消耗了 195 件符合预期(65 次下单成功,每次下单包含三件商品),总耗时 50 秒。
使用 JDK 自带的 VisualVM 工具来跟踪一下,重新执行方法后不久就可以看到,线程 Tab 中提示了死锁问题,根据提示点击右侧线程 Dump 按钮进行线程抓取操作:
查看抓取出的线程栈,在页面中部可以看到如下日志:
显然,是出现了死锁,线程 4 在等待的一个锁被线程 3 持有,线程 3 在等待的另一把锁被线程 4 持有。
那为什么会有死锁问题呢?
我们仔细回忆一下购物车添加商品的逻辑,随机添加了三种商品,假设一个购物车中的商品是 item1 和 item2,另一个购物车中的商品是 item2 和 item1,一个线程先获取到了 item1 的锁,同时另一个线程获取到了 item2 的锁,然后两个线程接下来要分别获取 item2 和 item1 的锁,这个时候锁已经被对方获取了,只能相互等待一直到 10 秒超时。
其实,避免死锁的方案很简单,为购物车中的商品排一下序,让所有的线程一定是先获取 item1 的锁然后获取 item2 的锁,就不会有问题了。所以,我只需要修改一行代码,对 createCart 获得的购物车按照商品名进行排序即可:
@GetMapping("right")
public long right() {
....
long success = IntStream.rangeClosed(1, 100).parallel()
.mapToObj(i -> {
List<Item> cart = createCart().stream()
.sorted(Comparator.comparing(Item::getName))
.collect(Collectors.toList());
return createOrder(cart);
})
.filter(result -> result)
.count();
...
return success;
}
测试一下 right 方法,不管执行多少次都是 100 次成功下单,而且性能相当高,达到了 3000 以上的 TPS:
这个案例中,虽然产生了死锁问题,但因为尝试获取锁的操作并不是无限阻塞的,所以没有造成永久死锁,之后的改进就是避免循环等待,通过对购物车的商品进行排序来实现有顺序的加锁,避免循环等待。
博文参考
《Java 开发坑点解析:从根因分析到最佳实践》