当前位置: 首页 > article >正文

Java并发学习总结:原子操作类

本文是学习尚硅谷周阳老师《JUC并发编程》的总结(文末有链接)。

基本类型原子类

  • AtomicInteger
  • AtomicLong
  • AtomicBoolean

AtomicInteger 的方法 getAndIncrement 和 incrementAndGet 的区别:

两个方法都能实现对当前值加 1 , 但返回值不同,getAndIncrement 方法返回加 1 前的旧值,incrementAndGet 方法返回加 1 后的新值。

package juc.cas;

import java.util.concurrent.atomic.AtomicInteger;

public class ShareNumCas {
    private AtomicInteger num = new AtomicInteger(0);

    public int add() {
        return num.getAndIncrement();
    }

    public int add2() {
        return num.incrementAndGet();
    }

    public int getNum() {
        return num.get();
    }
}

public class NumPlusDemo {
    public static void main(String[] args) {
        ShareNumCas share = new ShareNumCas();

        System.out.println("before add num is " + share.getNum());
        System.out.println("add return " + share.add());
        System.out.println("after add num is " + share.getNum());
    }
}

public class NumPlusDemo2 {
    public static void main(String[] args) {
        ShareNumCas share = new ShareNumCas();

        System.out.println("before add num is " + share.getNum());
        System.out.println("add2 return " + share.add2());
        System.out.println("after add num is " + share.getNum());
    }
}

输出:

before add num is 0
add return 0
after add num is 1

before add num is 0
add2 return 1
after add num is 1

数组类型原子类

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

示例代码:

package juc.atomic;

import java.util.concurrent.atomic.AtomicIntegerArray;

public class AtomicIntegerArrayDemo {
    public static void main(String[] args) {
        AtomicIntegerArray array = new AtomicIntegerArray(5);
        // AtomicIntegerArray array = new AtomicIntegerArray(new int[]{1, 2, 3, 4, 5});
        for (int i = 0; i < array.length(); i++) {
            System.out.println(array.get(i));
        }

        System.out.println();

        int tmp = array.getAndSet(0, 100);
        System.out.println(tmp + " " + array.get(0));

        tmp = array.getAndIncrement(0);
        System.out.println(tmp + " " + array.get(0));

    }
}

输出:

0
0
0
0
0

0 100
100 101

引用类型原子类

  • AtomicReference
  • AtomicStampedReference
  • AtomicMarkableReference

AtomicReference 实现自旋锁

package juc.cas;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class SpinLockDemo {
    AtomicReference<Thread> atomicReference = new AtomicReference<>();

    public void lock() {
        Thread thread = Thread.currentThread();
        System.out.println(thread.getName() + " come in");
        while (!atomicReference.compareAndSet(null, thread)) {

        }
    }

    public void unlock() {
        Thread thread = Thread.currentThread();
        atomicReference.compareAndSet(thread, null);
        System.out.println(thread.getName() + " task over, unlock");
    }

    public static void main(String[] args) {
        SpinLockDemo spinLockDemo = new SpinLockDemo();
        new Thread(() -> {
            spinLockDemo.lock();

            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            spinLockDemo.unlock();
        }, "A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        new Thread(() -> {
            spinLockDemo.lock();
            spinLockDemo.unlock();
        }, "B").start();
    }
}

输出:

A come in
B come in
A task over, unlock
B task over, unlock

AtomicStampedReference VS AtomicMarkableReference

AtomicStampedReference 可以记录对象被修改过几次

AtomicMarkableReference 只能记录对象是否被修改过

AtomicMarkableReference 示例代码:

package juc.atomic;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicMarkableReference;

public class AtomicMarkableReferenceDemo {
    static AtomicMarkableReference<Integer> atomicMarkableReference = new AtomicMarkableReference<>(100, false);

    public static void main(String[] args) {
        new Thread(() -> {
            boolean marked = atomicMarkableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + " init marked = " + marked);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            boolean result = atomicMarkableReference.compareAndSet(100, 101, marked, !marked);
            System.out.println(Thread.currentThread().getName() + " result = " + result +
                    ", marked = " + atomicMarkableReference.isMarked() + ", value = " + atomicMarkableReference.getReference());
        }, "t1").start();

        new Thread(() -> {
            boolean marked = atomicMarkableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + " init marked = " + marked);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            boolean result = atomicMarkableReference.compareAndSet(100, 102, marked, !marked);
            System.out.println(Thread.currentThread().getName() + " result = " + result
                    + ", marked = " + atomicMarkableReference.isMarked() + ", value = " + atomicMarkableReference.getReference());
        }, "t2").start();
    }
}

输出:

t2 init marked = false
t1 init marked = false
t1 result = true, marked = true, value = 101
t2 result = false, marked = true, value = 101

对象的属性修改原子类

  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater
  • AtomicReferenceFieldUpdater<T,V>

下例中可以将 money 定义为 AtomicInteger,已经有了 AtomicInteger ,AtomicIntegerFieldUpdater 有什么使用场景?

AtomicIntegerFieldUpdater 示例代码:

package juc.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

class BankAccount {
    String bankName = "CCB";

    public volatile int money = 0;

    AtomicIntegerFieldUpdater<BankAccount> updater =
            AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");

    public void add(BankAccount bankAccount) {
        updater.getAndIncrement(bankAccount);
    }
}
public class AtomicIntegerFieldUpdaterDemo {
    public static void main(String[] args) throws InterruptedException {
        BankAccount bankAccount = new BankAccount();
        CountDownLatch countDownLatch = new CountDownLatch(10);

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    bankAccount.add(bankAccount);
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        System.out.println("after add money is " + bankAccount.money);
    }
}

输出:

after add money is 10000

AtomicReferenceFieldUpdater<T,V> 示例代码:

package juc.atomic;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

class MyVar {
    public volatile Boolean isInit = Boolean.FALSE;

    AtomicReferenceFieldUpdater<MyVar, Boolean> updater =
            AtomicReferenceFieldUpdater.newUpdater(MyVar.class, Boolean.class, "isInit");

    public void init(MyVar myVar) {
        if(updater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE)) {
            System.out.println(Thread.currentThread().getName() + " start init");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + " init success");
        } else {
            System.out.println(Thread.currentThread().getName() + " other thread is initing...");
        }
    }
}

public class AtomicReferenceFieldUpdaterDemo {
    public static void main(String[] args) {
        MyVar myVar = new MyVar();
        for (int i = 0; i < 5; i++) {
            new Thread(() -> myVar.init(myVar), "t" + i).start();
        }
    }
}

输出:

t3 other thread is initing...
t1 other thread is initing...
t4 other thread is initing...
t2 other thread is initing...
t0 start init
t0 init success

原子操作增强类

  • LongAdder
  • LongAccumulator
  • DoubleAdder
  • DoubleAccumulator

LongAdder VS LongAccumulator

  • LongAdder 的初始值只能为 0
  • LongAdder 只能进行加减操作
  • LongAccumulator 没有上面两个限制,更灵活

LongAdder#sum() 不会加锁,不会影响对数的操作。

LongAdder 简单示例:

package juc.atomic;

import java.util.concurrent.atomic.LongAdder;

public class LongAdderDemo {
    public static void main(String[] args) {
        LongAdder longAdder = new LongAdder();
        longAdder.increment();
        longAdder.increment();
        longAdder.increment();
        longAdder.increment();
        longAdder.increment();
        System.out.println(longAdder.sum());
    }
}

输出:

5

LongAccumulator 简单示例:

package juc.atomic;

import java.util.concurrent.atomic.LongAccumulator;

public class LongAccumulatorDemo {
    public static void main(String[] args) {
        LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);
        // 可以使用方法引用简写
        // LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0);
        longAccumulator.accumulate(1);
        System.out.println(longAccumulator.get());
        longAccumulator.accumulate(2);
        System.out.println(longAccumulator.get());
    }
}

输出:

1
3

各种计数性能比较

package juc.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;

class ClickNum {
    int number = 0;

    public synchronized void add() {
        number++;
    }

    AtomicLong atomicLong = new AtomicLong(0);

    public void addAtomic() {
        atomicLong.getAndIncrement();
    }

    LongAdder longAdder = new LongAdder();

    public void addLongAdder() {
        longAdder.increment();
    }

    LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);

    public void addLongAccumulator() {
        longAccumulator.accumulate(1);
    }
}

public class CountPerfTester {
    private static final int THREADS_COUNT = 50;

    private static final int TEN_THOUSANDS = 10000;


    public static void main(String[] args) throws InterruptedException {
        ClickNum clickNum = new ClickNum();

        long start = 0L;
        long end = 0L;

        CountDownLatch countDownLatch1 = new CountDownLatch(THREADS_COUNT);
        CountDownLatch countDownLatch2 = new CountDownLatch(THREADS_COUNT);
        CountDownLatch countDownLatch3 = new CountDownLatch(THREADS_COUNT);
        CountDownLatch countDownLatch4 = new CountDownLatch(THREADS_COUNT);

        start = System.currentTimeMillis();
        for (int i = 0; i < THREADS_COUNT; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * TEN_THOUSANDS; j++) {
                        clickNum.add();
                    }
                } finally {
                    countDownLatch1.countDown();
                }
            }).start();
        }
        countDownLatch1.await();
        end = System.currentTimeMillis();
        System.out.println("synchronized add cost time : " + (end - start) + " ms, " + "result is " + clickNum.number);

        start = System.currentTimeMillis();
        for (int i = 0; i < THREADS_COUNT; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * TEN_THOUSANDS; j++) {
                        clickNum.addAtomic();
                    }
                } finally {
                    countDownLatch2.countDown();
                }
            }).start();
        }
        countDownLatch2.await();
        end = System.currentTimeMillis();
        System.out.println("addAtomic cost time : " + (end - start) + " ms, " + "result is " + clickNum.atomicLong.get());

        start = System.currentTimeMillis();
        for (int i = 0; i < THREADS_COUNT; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * TEN_THOUSANDS; j++) {
                        clickNum.addLongAdder();
                    }
                } finally {
                    countDownLatch3.countDown();
                }
            }).start();
        }
        countDownLatch3.await();
        end = System.currentTimeMillis();
        System.out.println("addLongAdder cost time : " + (end - start) + " ms, " + "result is " + clickNum.longAdder.sum());

        start = System.currentTimeMillis();
        for (int i = 0; i < THREADS_COUNT; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * TEN_THOUSANDS; j++) {
                        clickNum.addLongAccumulator();
                    }
                } finally {
                    countDownLatch4.countDown();
                }
            }).start();
        }
        countDownLatch4.await();
        end = System.currentTimeMillis();
        System.out.println("addLongAccumulator cost time : " + (end - start) + " ms, " + "result is " + clickNum.longAccumulator.get());

    }
}

输出:

synchronized add cost time : 1352 ms, result is 50000000
addAtomic cost time : 844 ms, result is 50000000
addLongAdder cost time : 97 ms, result is 50000000
addLongAccumulator cost time : 74 ms, result is 50000000

可以看出:

LongAdder 和 LongAccumulator 的时间是 synchronized 和 AtomicLong 的十分之一左右。

LongAdder 高性能原理说明

LongAdder 的基本思路是分散热点,将 value 值分散到一个 Cell 数组中,不同的线程会命中到不同的数组元素中,各个线程只对自己数组元素的 value 进行 CAS 操作,冲突概率会小很多,sum() 方法会对 base 和所有 Cell 数组的值累加作为返回值。

某个线程分配到哪个 Cell 的方法是:对线程 id 进行 hash 求值,再根据 hash 值映射到 Cell 数组的某个下标。

Cell 数组元素是 2 的 n 次方,最大是 CPU 的核数。

LongAdder 源码分析

add(1L)

  • 最初无竞争时只更新 base
  • 如果更新 base 失败时,首次新增一个 Cell 数组
  • 当多个线程对同一个 Cell 竞争激烈时,可能就会对 Cell 数组扩容
    public void add(long x) {
        Cell[] cs; long b, v; int m; Cell c;
        if ((cs = cells) != null || !casBase(b = base, b + x)) {
            int index = getProbe();
            boolean uncontended = true;
            if (cs == null || (m = cs.length - 1) < 0 ||
                (c = cs[index & m]) == null ||
                !(uncontended = c.cas(v = c.value, v + x)))
                longAccumulate(x, null, uncontended, index);
        }
    }

判断代码中有赋值,也有方法调用,简化了代码。

longAccumulator

分为 3 种情况:

  • Cell 数组已经初始化

     这种情况下主要进行对某个 Cell 元素的初始化赋值,在某个 Cell 元素上重试 CAS,对 Cell 数组扩容
    
  • Cell 数组未初始化

     这种情况下首次创建 Cell 数组,包含 2 个元素
    
  • Cell 数组正在初始化,则尝试在 base 上重试 CAS 操作

sum

    /**
     * Returns the current sum.  The returned value is <em>NOT</em> an
     * atomic snapshot; invocation in the absence of concurrent
     * updates returns an accurate result, but concurrent updates that
     * occur while the sum is being calculated might not be
     * incorporated.
     *
     * @return the sum
     */
    public long sum() {
        Cell[] cs = cells;
        long sum = base;
        if (cs != null) {
            for (Cell c : cs)
                if (c != null)
                    sum += c.value;
        }
        return sum;
    }

sum 会将所有 Cell 数组中的 value 和 base 累加作为返回值,sum 执行时并没有限制对 base 和 Cells 的值更新,所以返回值可能会损失一些精度。LongAdder 不是强一致性的,是最终一致性的。

参考

  • 尚硅谷JUC并发编程 (https://www.bilibili.com/video/BV1ar4y1x727?spm_id_from=333.788.videopod.episodes&vd_source=9266b9af652d5902d068c94b9d60116f&p=80)

http://www.kler.cn/a/369086.html

相关文章:

  • python:如何判断一个数是否为素数
  • Go语言初识
  • 基于Python和OpenCV的疲劳检测系统设计与实现
  • 解决vue使用pdfdist-mergeofd插件时报错polyfills
  • VMware各版本下载的镜像站(含windows和linux)
  • ptp4l协议_配置文件
  • 【JIT/极态云】技术文档--函数设计
  • java :String 类
  • ReactOS系统中平衡二叉树,在一个空间中寻找与给定地址范围重合或部分重合的(已分配)区间
  • Python 实现日期计算与日历格式化输出(万年历)
  • Qt 窗口可见性 之 close函数和hide函数
  • [Go实战]:HTTP请求转发
  • 电商平台店铺运营:巧用 API 接口的策略之道
  • jemalloc替换标准库 malloc等函数的三种方式
  • 宿舍管理新篇章:基于Spring Boot的系统开发
  • 验证俩套加密算法是否互通
  • [思考记录]做事别忘最初目的
  • 安全见闻(7)-上(网络安全热门证书介绍及备考指南)
  • 理解OAuth2与用户账户与授权UAA的关系
  • thinkadmin,点击列表导出excel