当前位置: 首页 > article >正文

parallelStream并行流使用踩坑,集合安全

parallelStream并行流使用踩坑

parallelStream介绍

parallelStream实现的是多线程处理从而实现并行流,相较于stream的单行流处理数据的速度更快,看一下其源码会发现parallelStream是使用线程池ForkJoin来调度的。

而ForkJoinPool的默认线程数是CPU核数 - 1。如果要手动实现其线程数设置,可以构建自己的ForkJoinPool;

CountDownLatch countDownLatch = new CountDownLatch(20);
        int cpu = Runtime.getRuntime().availableProcessors();
        System.out.println(cpu);
        ForkJoinPool pool = new ForkJoinPool(2);
        List<Integer> list = IntStream.range(0, 20).boxed().collect(Collectors.toList());
        pool.submit(() -> {
            list.parallelStream().forEach(s -> {
                // 业务处理
                System.out.println("thread:" + Thread.currentThread().getName() + "value" + s);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();

问题

在开发中遇到了下面这一段代码

List<String> resultList = new ArrayList<>();
List<String> codeList = new ArrayList<>();
//向codeList中添加数据
.....
codeList.parallelStream().forEach(item->{
    //过滤条件后向resultList添加数据
    resultList.add(item);
});

使用并行流去遍历codeList后经过某些过滤再将属性值添加到resultList中。

后续在调试过程中发现,resultList中的数据量会随机少一两个数据,比如codeList中数据为1,2,3,4,5. 经过过滤后本应添加到resultList中的数据为1,2,3,4. 但是发现只加进来了1,2,3或者是1,2,4 会有数据确实的情况,本来以为是过滤条件的问题,排查后发现过滤条件没有问题,开始怀疑是并行流的问题。

简单介绍下arrayList

arrayList其实就是个动态数组类,大小可以动态调整,允许在列表任意位置进行元素的增删改查。同时可自动扩展内部数组的容量,以适应存储需求的增长。

动态扩容

  • 初始容量:arrayList的默认空参构造器时,初始容量是0,使用有参构造器时,初始容量就是传入的参数initialCapacity的值,看下源码:

public ArrayList(int initialCapacity) {
        if (initialCapacity > 0) {
            this.elementData = new Object[initialCapacity];
        } else if (initialCapacity == 0) {
            this.elementData = EMPTY_ELEMENTDATA;
        } else {
            throw new IllegalArgumentException("Illegal Capacity: "+
                                               initialCapacity);
        }
    }
​
    /**
     * Constructs an empty list with an initial capacity of ten.
     */
    public ArrayList() {
        this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
    }
  • 动态扩容:添加第一个元素时,底层会创建一个新的长度为10的数组,当存储满的时候,会扩容1.5倍。这个动作是在集合添加数据的时候进行的判断

    看下源码:

    private void add(E e, Object[] elementData, int s) {
            if (s == elementData.length)
                elementData = grow();
            elementData[s] = e;
            size = s + 1;
        }
    private Object[] grow(int minCapacity) {
            int oldCapacity = elementData.length;
            if (oldCapacity > 0 || elementData != DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
                int newCapacity = ArraysSupport.newLength(oldCapacity,
                        minCapacity - oldCapacity, /* minimum growth */
                        oldCapacity >> 1           /* preferred growth */);
                return elementData = Arrays.copyOf(elementData, newCapacity);
            } else {
                return elementData = new Object[Math.max(DEFAULT_CAPACITY, minCapacity)];
            }
        }

回到原来的问题

最开始的时候,我觉得可能是数组大小不够了,在达到集合容量的前一个时同时有两个线程在往这个集合中添加数据,导致有一个数据没有插入进来,但是考虑了下,觉得如果是这样多少应该有个异常抛出来,但是在运行过程中并无异常。这时突然想到,arryList底层其实还是个数组。其实到这里就已经不言而喻了,多个线程在往list中添加数据时,都已经通过了验证容量的这一步,然后往一个数组的相同位置上放两个元素,最终结果肯定就是后面一个会把前面的一个给覆盖掉。

最终解决

最终的结果不管是对这个集合上锁 还是换成线程安全的list:Vector,Collections.synchronizedList(List<T> list),本质其实都还是类似单线程,同时只有一个线程进行操作。

(如果你只是遇到了我上面说的bug想解决,看到这里就可以了,建议直接换成stream串行。)

但是!

还有第三种线程安全的容器

CopyOnWriteArrayList

这个容器其实就是在写操作的时候复制数组,在使用时,读读操作和读写操作都不互斥。

看下源码:

public boolean add(E e) {
        synchronized (lock) {
            Object[] es = getArray();
            int len = es.length;
            es = Arrays.copyOf(es, len + 1);
            es[len] = e;
            setArray(es);
            return true;
        }
    }

其通过lock来实现线程同步,至于所谓的读写互斥,主要就是这里了

es = Arrays.copyOf(es, len + 1);
es[len] = e;

在添加数据时,他会先复制原来的数组然后在新的数组上面进行添加,最后再将新数组覆盖到旧的上面。如果在操作过程中切换了线程到读,此时的旧数组并未被覆盖,读取到的还是原来的数组。

虽然不会发生安全问题,但是缺陷也同样很明显,因为其每一次操作都会复制一次数组,数据量越大 操作越慢。

但是读取其实还是很快的,如果写少读多可以考虑采用这种容器。


http://www.kler.cn/a/419846.html

相关文章:

  • Linux条件变量线程池详解
  • 用于LiDAR测量的1.58um单芯片MOPA(一)
  • 基础入门-Web应用OSS存储负载均衡CDN加速反向代理WAF防护部署影响
  • 代码随想录-算法训练营day31(贪心算法01:分发饼干,摆动序列,最大子数组和)
  • 做异端中的异端 -- Emacs裸奔之路4: 你不需要IDE
  • avcodec_alloc_context3,avcodec_open2,avcodec_free_context,avcodec_close
  • 4399 Android面试题及参考答案
  • [382]基于springboot的辽B代驾管理系统
  • 论文阅读:Deep divergence-based approach to clustering
  • 【HarmonyOS】自定义相机拍照和录像 (二)之录像
  • iptables 用于设置、维护和检查 IP 数据包的过滤规则。其基本用法是通过命令行界面配置流量的过滤策略,分为以下几类规则链:INPUT(入站流量)、OU
  • WINDOWS 单链表SLIST_ENTRY使用
  • Leecode刷题C语言之N皇后②
  • gitlab自动打包python项目
  • 【vue】响应式(object.defineProperty)、可配置的参数、vue渲染机制
  • 华为HarmonyOS 让应用快速拥有账号能力 - 获取用户手机号
  • yolo11经验教训----之一
  • QT的槽函数的四种写法
  • ME6210:常用在个人通信设备电源里的低静态、低压差线性稳压器
  • @antv/x6 再vue中 ,自定义图形,画流程图、数据建模、er图等图形
  • linux网络抓包工具
  • 网际协议(IP)与其三大配套协议(ARP、ICMP、IGMP)
  • 【在Linux世界中追寻伟大的One Piece】多线程(三)
  • 为什么编程语言会设计不可变的对象?字符串不可变?NSString *s = @“hello“变量s是不可变的吗?Rust内部可变性的意义?
  • 源码分析之Openlayers中的Collection类
  • Web开发基础学习——HTML中\<div>元素的理解