J.U.C Review - Stream并行计算原理源码分析
文章目录
- Java 8 Stream简介
- Stream单线程串行计算
- Stream多线程并行计算
- 源码分析Stream并行计算原理
- Stream并行计算的性能提升
Java 8 Stream简介
自Java 8推出以来,开发者可以使用Stream
接口和lambda表达式实现流式计算。这种编程风格不仅简化了对集合操作的代码,还提高了代码的可读性和性能。
Stream
接口提供了多种集合操作方法,包括empty
(判空)、filter
(过滤)、max
(求最大值)、findFirst
和findAny
(查找操作)等,使得对集合的操作更加灵活和直观。
Stream单线程串行计算
在默认情况下,Stream
接口是以串行的方式运行的,这意味着所有的操作都在一个线程内执行。我们可以通过以下示例代码展示这一点:
public class StreamDemo {
public static void main(String[] args) {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.reduce((a, b) -> {
System.out.println(String.format("%s: %d + %d = %d",
Thread.currentThread().getName(), a, b, a + b));
return a + b;
})
.ifPresent(System.out::println);
}
}
在这个例子中,我们通过Stream.of()
方法创建了一个包含数字1到9的流。随后,调用reduce
方法对这些数字进行累加操作。reduce
方法的作用是从前两个元素开始,执行指定操作(在此示例中为加法),然后将结果与下一个元素进行相同的操作,直到处理完所有元素。
程序的输出如下:
main: 1 + 2 = 3
main: 3 + 3 = 6
main: 6 + 4 = 10
main: 10 + 5 = 15
main: 15 + 6 = 21
main: 21 + 7 = 28
main: 28 + 8 = 36
main: 36 + 9 = 45
45
从输出可以看出,所有计算均由main
线程执行,并且操作是严格按照元素顺序串行完成的。
Stream多线程并行计算
然而,单线程串行执行并不是唯一的选择。在现代多核处理器的时代,我们可以通过并行计算来更高效地利用计算资源。例如,当计算1+2=3的同时,我们可以在另一个线程中计算3+4=7,最后将这些部分结果进行合并。这种思想与Fork/Join框架的设计理念非常类似。
通过以下代码,我们可以让Stream在多线程中并行执行:
public class StreamParallelDemo {
public static void main(String[] args) {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.parallel()
.reduce((a, b) -> {
System.out.println(String.format("%s: %d + %d = %d",
Thread.currentThread().getName(), a, b, a + b));
return a + b;
})
.ifPresent(System.out::println);
}
}
运行这段代码,输出如下:
ForkJoinPool.commonPool-worker-1: 3 + 4 = 7
ForkJoinPool.commonPool-worker-4: 8 + 9 = 17
ForkJoinPool.commonPool-worker-2: 5 + 6 = 11
ForkJoinPool.commonPool-worker-3: 1 + 2 = 3
ForkJoinPool.commonPool-worker-4: 7 + 17 = 24
ForkJoinPool.commonPool-worker-4: 11 + 24 = 35
ForkJoinPool.commonPool-worker-3: 3 + 7 = 10
ForkJoinPool.commonPool-worker-3: 10 + 35 = 45
45
从输出结果可以看出,这些计算是并行完成的,使用了ForkJoinPool
中的commonPool
线程池。尽管各个部分的计算是并行执行的,最终的结果仍然是正确的,因为Fork/Join框架负责协调这些并行任务。
源码分析Stream并行计算原理
通过以上的实践,我们知道Stream的并行计算底层是基于Fork/Join框架的。但具体是如何实现的?我们可以通过源码分析来探究。
首先,Stream.of()
方法只是生成一个简单的流。接下来,我们查看parallel()
方法的实现。由于这里的数据类型是int
,因此调用的是BaseStream
接口的parallel()
方法。BaseStream
接口的唯一实现类是AbstractPipeline
类。以下是AbstractPipeline
类的parallel()
方法:
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
这个方法的作用非常简单,仅仅是将sourceStage.parallel
标志位设置为true
,表示该流将以并行方式执行。
接下来,查看reduce
方法的实现。Stream.reduce()
方法的具体实现是通过ReferencePipeline
这个抽象类,该类继承了AbstractPipeline
类:
@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
return evaluate(ReduceOps.makeRef(accumulator));
}
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
@Override
public final boolean isParallel() {
return sourceStage.parallel;
}
从源码可以看出,reduce
方法调用了evaluate
方法,而evaluate
方法根据parallel
标志位来决定是并行执行还是串行执行。如果parallel
为true
,则调用evaluateParallel
方法,否则调用evaluateSequential
方法。
我们再来看evaluateParallel
方法在ReduceOps.ReduceOp
类中的具体实现:
@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
evaluateParallel
方法创建了一个ReduceTask
实例,并调用其invoke()
方法来执行计算。ReduceTask
类继承自AbstractTask
,AbstractTask
又继承自CountedCompleter
,最终继承自ForkJoinTask
。这就解释了为什么Stream的并行计算底层使用了Fork/Join框架。
Stream并行计算的性能提升
最后,我们通过一个简单的性能测试来验证Stream并行计算的优势。下面的代码演示了如何计算一千万个随机数的和,并比较串行计算和并行计算的时间开销:
public class StreamParallelDemo {
public static void main(String[] args) {
System.out.println(String.format("本计算机的核数:%d", Runtime.getRuntime().availableProcessors()));
Random random = new Random();
List<Integer> list = new ArrayList<>(1000_0000);
for (int i = 0; i < 1000_0000; i++) {
list.add(random.nextInt(100));
}
long prevTime = getCurrentTime();
list.stream().reduce((a, b) -> a + b).ifPresent(System.out::println);
System.out.println(String.format("单线程计算耗时:%d", getCurrentTime() - prevTime));
prevTime = getCurrentTime();
list.stream().parallel().reduce((a, b) -> a + b).ifPresent(System.out::println);
System.out.println(String.format("多线程计算耗时:%d", getCurrentTime() - prevTime));
}
private static long getCurrentTime() {
return System.currentTimeMillis();
}
}
在一台8核计算机上的输出结果如下:
本计算机的核数:8
495156156
单线程计算耗时:223
495156156
多线程计算耗时:95
结果表明,在多核环境下,Stream的并行计算相比串行计算确实能够显著提升性能。然而,性能提升的幅度并非线性增长,因为线程管理和上下文切换本身也会带来一定的开销。如果在单核环境中,串行计算反而可能会比并行计算更快。
总结而言,Java 8的Stream并行计算通过简化代码的方式,利用了底层的多核资源,大幅提升了复杂集合操作的性能。然而在实际应用中,开发者需要根据具体的硬件环境和任务特性来决定是否使用并行计算。