写这篇文章是因为前段时间有个同事用parallelStream处理集合的时候发现数据丢了。因为是线上问题所以和他一起看了一下。

出问题的代码大致如下:

1
2
3
4
5
List<Foobar> result = new ArrayList<>();
fooList.parallelStream().forEach(bar -> {
Foobar foobar = new Foobar(bar);
result.add(foobar);
});

最后results少了数据,得到了一个非预期的结果。

当然能写出这样的代码很不应该,很显然这是一个因为线程安全引发的问题,因为我们知道ArrayList不是一个线程安全的容器,这与parallelStream是不是线程安全无关,它本身就是一个用多线程去帮助我们处理集合的工具,底层依赖Fork/Join框架。

上面的写法就和自己用多个线程往一个ArrayList插数据一样愚蠢,没有本质上的区别。

那么,当我们在使用parallelStream的时候,我们当然可以使用一个线程安全的容器来收集结果,但Java官方文档中更建议我们去使用collect方法,即:

1
List<Foobar> result = fooList.parallelStream().map(bar -> new Foobar(bar)).collect(Collectors.toList());

我们来看collect这个方法,

1
<R, A> R collect(Collector<? super T, A, R> collector);

需要传入Collector接口的实现。Collectors类中有大量的静态方法返回这样的实现类的对象,这里我们看toList方法,

1
2
3
4
5
6
public static <T>
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}

Fork/Join的思想是分治,先拆分任务,再合并结果,每个任务都用单独的线程去处理。所以虽然它同样使用ArrayList,但是我们看到他会为每个线程都创建一个ArrayList对象,最后用addAll方法把它们合并起来,每个线程操作的是自己的集合对象,自然不会有线程安全问题。