当前位置: 首页 > article >正文

共享模型之无锁(乐观锁,CAS,原子类,LongAdder)

目录

  • 共享模型之无锁
    • 一:保护共享资源
      • 1:加锁实现
      • 2:不加锁实现
    • 二:cas
      • cas与volatile
      • 为什么无锁的效率高
      • cas的特点
    • 三:原子类
      • 1:原子整数(AtomicInteger)
        • updateAndGet
      • 2:原子引用
        • ABA问题
        • AtomicStampedReference(版本号解决ABA)
        • AtomicMarkableReference
      • 3:原子数组(AtomicXXXArray)
      • 4:原子更新器(AtomicXXXXFieldUpdater)
      • 5:原子累加器
    • 四:LongAdder
      • 1:cas锁
      • 2:伪共享

共享模型之无锁

一:保护共享资源

1:加锁实现

  • 问题:
  • 定义一个接口:
interface Account {

    // 获取余额
    Integer getBalance();

    // 取款
    void withdraw(Integer amount);

    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(Account account) {
        List<Thread> ts = new ArrayList<>();

        long start = System.nanoTime();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        ts.forEach(Thread::start);

        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();

        System.out.println(account.getBalance()
                + " cost: " + (end-start)/1000_000 + " ms");
    }
}

这个接口里有一个静态方法是使用1000个线程,每个线程都对账户的余额减去10元;

我们先来实现这个接口:

class AccountUnsafe implements Account{
    private int balance;

    public AccountUnsafe(int balance) {
        this.balance = balance;
    }

    @Override
    public Integer getBalance() {
        return this.balance;
    }

    @Override
    public void withdraw(Integer amount) {
        balance-=amount;
    }
}

然后进行测试:

public static void main(String[] args) {
    AccountUnsafe accountUnsafe = new AccountUnsafe(10000);
    Account.demo(accountUnsafe);
}

这里初始余额是10000,我们1000个线程执行完静态方法之后应该余额为0;

我们执行:

40 cost: 70 ms

可以看到跟我们预期的不一样,说明出现了线程安全问题;

我们可以通过加锁来保证线程安全,再withdraw方法中,会有多个线程对共享资源进行修改,是临界区,所以要加锁:

修改之后:

@Override
public synchronized Integer getBalance() {
    return this.balance;
}

@Override
public synchronized void withdraw(Integer amount) {
    balance-=amount;
}

这里为什么读取余额的方法也需要加锁呢,因为为了防止读的时候有线程对余额进行修改,所以要加锁;

2:不加锁实现

使用cas:

实现一个safe类:

class AccountSafe implements Account{
    private AtomicInteger balance;

    public AccountSafe(int balance) {
        this.balance = new AtomicInteger(balance);
    }

    @Override
    public Integer getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(Integer amount) {
        while (true){
            int pre=balance.get();
            int next=pre-amount;
            if (balance.compareAndSet(pre,next)) {
                break;
            }

        }
    }
}

二:cas

这里使用的是AtomicInteger,赋值比较的逻辑如下:

 while (true){
            int pre=balance.get();
            int next=pre-amount;
            if (balance.compareAndSet(pre,next)) {
                break;
            }

        }
  • cas就是compareAndSet:比较并设置值。或者是compareAndSwap:比较并交换值;

image.png

其实实现的原理是这样的,使用compareAndSet方法时,会进行原子性操作,比较当前线程获取的最新值和共享变量的最新值是否相等如果相等就进行赋值返回true,如果不相等就返回false;

其实cas底层是执行的lock cmxchg指令(x86架构),在多核和单核的情况下都会保证(比较-交换)的原子性;

在多核情况下,执行到lock指令,cpu会将总线锁住,等待指令执行完再开启总线;

cas与volatile

  • 获取共享变量时,为了保证共享的可见性,必须使用volatile修饰
  • volatile可以用来修饰成员变量和静态成员变量,他可以避免直接从工作缓存中读取数据,而是从主存中读取最新数据,操作volatile修饰的变量也是直接修改主存。即一个线程对共享变量的修改对另一个线程可见;
  • cas必须要配合volatile使用,通过获取最新值来比较和交换;
  • 例如aomicInterger中的value变量就是使用volatile修饰的;

为什么无锁的效率高

  • 无锁的状态,即使重试失败,线程仍在高速运转,没有停歇;但是在加锁之后,如果获取锁失败,就会进行上下文切换,上下文切换是非常耗费性能的;

  • 在无锁的情况下,也会进行上下文切换。如cpu的时间片用完;而且无锁更适合线程数小于cpu核心数的情况;

cas的特点

  • 基于cas和volatile可以实现无锁并发,适用于线程数少,多核cpu的情况下:
  • cas是基于乐观锁的思想:最乐观的估计,共享变量不会被其他线程修改,即使修改了也没关系,再重试;
  • synchronized是基于悲观锁的思想:最悲观的估计,防止其他线程对共享变量进行修改,上上锁,其他线程无法修改,只有锁释放了,其他线程才有机会;
  • cas体现的是无锁并发,无阻塞并发
  • 因为没有synchronized修饰,所以线程不会陷入阻塞这是效率提升的原因之一;
  • 但是如果竞争激烈,重试比较频繁,也会影响效率;

三:原子类

1:原子整数(AtomicInteger)

一些api方法:

public static void main(String[] args) {
    AtomicInteger atomicInteger=new AtomicInteger(0);
    System.out.println(atomicInteger.incrementAndGet());//++i
    System.out.println(atomicInteger.getAndIncrement());//i++
    System.out.println(atomicInteger.addAndGet(5));
    System.out.println(atomicInteger.getAndAdd(5));
}

这里的加法操作都是原子的,内部也是一个循环,一直在尝试获取当前线程的值和共享变量的最新值进行比较,所以我们可以使用AtomicInteger来代替我们之前的compareAndSet方法:

@Override
public void withdraw(Integer amount) {
   /* while (true){
        int pre=balance.get();
        int next=pre-amount;
        if (balance.compareAndSet(pre,next)) {
            break;
        }

    }*/
    balance.addAndGet(-1*amount);
}

这样也能保证是线程安全的;

updateAndGet
public static void main(String[] args) {
    AtomicInteger atomicInteger=new AtomicInteger(5);
    System.out.println(atomicInteger.updateAndGet(x -> x * 5));
    /*System.out.println(atomicInteger.incrementAndGet());//++i
    System.out.println(atomicInteger.getAndIncrement());//i++
    System.out.println(atomicInteger.addAndGet(5));
    System.out.println(atomicInteger.getAndAdd(5));*/
}

在updateAndGet方法中传入的是一个lamda表达式,可以做任意的运算然后有返回值;

  • updateAndSet其实底层也是用了CompareAndSet,只不过和之前有区别的是,获取运算之后的值,这个运算可以通过函数式编程实现。

  • public static void main(String[] args) {
        AtomicInteger atomicInteger=new AtomicInteger(5);
        System.out.println(test1.updateAndGet(atomicInteger,x -> x * 5));
        /*System.out.println(atomicInteger.incrementAndGet());//++i
        System.out.println(atomicInteger.getAndIncrement());//i++
        System.out.println(atomicInteger.addAndGet(5));
        System.out.println(atomicInteger.getAndAdd(5));*/
    }
    public static int updateAndGet(AtomicInteger atomicInteger, IntUnaryOperator operator){
        while (true){
            int prev=atomicInteger.get();
            int next=operator.applyAsInt(atomicInteger.get());
            if (atomicInteger.compareAndSet(prev,next)){
                return next;
            }
        }
    }
    

    看一下updateAndGet:

    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev = get(), next = 0;
        for (boolean haveNext = false;;) {
            if (!haveNext)
                next = updateFunction.applyAsInt(prev);
            if (weakCompareAndSetVolatile(prev, next))
                return next;
            haveNext = (prev == (prev = get()));
        }
    }
    

    其实也是这么实现的;

2:原子引用

  • 实际使用就是将引用类型当成AtomicReference类的泛型,这样就能调用compareAndSet来保证对共享变量的原子性操作;

我们将原先例子中的整数余额替换成BigDecimal,因为BigDecimal是引用类型,所以我们使用原子引用:

接口类:

interface DecimalAccount {
    // 获取余额
    BigDecimal getBalance();

    // 取款
    void withdraw(BigDecimal amount);

    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(DecimalAccount account) {
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(BigDecimal.TEN);
            }));
        }
        ts.forEach(Thread::start);

        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(account.getBalance());
    }

}

原子引用实现类:

class DecimalAccountSafe implements DecimalAccount{
    private AtomicReference<BigDecimal> bigDecimal;

    public DecimalAccountSafe(BigDecimal bigDecimal) {
        this.bigDecimal = new AtomicReference<>(bigDecimal);
    }

    @Override
    public BigDecimal getBalance() {
        return bigDecimal.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while (true){
            BigDecimal perv=bigDecimal.get();
            BigDecimal next=perv.subtract(amount);
            if (bigDecimal.compareAndSet(perv,next)) {
                break;
            }
        }
    }
}
  • 减余额时使用了compareAndSet;

最后测试:

public static void main(String[] args) {
    DecimalAccountSafe decimalAccountSafe = new DecimalAccountSafe(new BigDecimal(10000));
    DecimalAccount.demo(decimalAccountSafe);
}
ABA问题
static AtomicReference<String> ref = new AtomicReference<>("A");

public static void main(String[] args) throws InterruptedException {
    log.debug("main start...");
    // 获取值 A
    // 这个共享变量被它线程修改过?
    String prev = ref.get();

    other();

    Thread.sleep(1000);
    // 尝试改为 C
    log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}

private static void other() throws InterruptedException {

    new Thread(() -> {
        log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
    }, "t1").start();

    Thread.sleep(500);

    new Thread(() -> {
        log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
    }, "t2").start();

}
  • 主线程只能判断当前的值和共享变量现在的值是否相等,而不能感知到共享变量的值从a修改到b再从b到a;如果主线程希望,只要其他线程动用过共享变量,那么cas就算失败;这样的话还需要加上一个版本号;
AtomicStampedReference(版本号解决ABA)
  • 每一次对共享变量进行操作都会变更版本号,这样就能使共享变量变更时主线程有感应,如果自己获取的版本号和当前版本号不一致,cas就会失败;

对之前的代码进行修改:

static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);

public static void main(String[] args) throws InterruptedException {
    log.debug("main start...");
    // 获取值 A
    // 这个共享变量被它线程修改过?
    int stamp = ref.getStamp();
    String prev = ref.getReference();
    log.debug("版本号:{}",stamp);
    other();

    Thread.sleep(1000);
    // 尝试改为 C
    log.debug("change A->C {}", ref.compareAndSet(prev, "C",stamp,stamp+1));
}

private static void other() throws InterruptedException {

    new Thread(() -> {
        int stamp = ref.getStamp();
        log.debug("版本号:{}",stamp);
        log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",stamp,stamp+1));
    }, "t1").start();

    Thread.sleep(500);

    new Thread(() -> {
        int stamp = ref.getStamp();
        log.debug("版本号:{}",stamp);
        log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",stamp,stamp+1));
    }, "t2").start();

}

这里就是在每次更新前获取版本号,然后再compareAndSet的时候会判断当前版本号和线程获取的版本号时候相同,相同就返回true,然后更新版本号;

这样就解决了ABA问题,只要其他线程对共享变量进行了操作那么cas就会失败;

AtomicMarkableReference
  • 如果只是想记录共享变量是否被更改过,而不去担心共享变量变更过几次可以使用AtomicMarkableReference
@Slf4j
public class test4 {
    public static void main(String[] args) throws InterruptedException {
        GarbageBag bag = new GarbageBag("装满了垃圾");
        // 参数2 mark 可以看作一个标记,表示垃圾袋满了
        AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);

        log.debug("主线程 start...");
        GarbageBag prev = ref.getReference();
        log.debug(prev.toString());

        new Thread(() -> {
            log.debug("打扫卫生的线程 start...");
            bag.setDesc("空垃圾袋");
            while (!ref.compareAndSet(bag, bag, true, false)) {}
            log.debug(bag.toString());
        }).start();

        Thread.sleep(1000);
        log.debug("主线程想换一只新垃圾袋?");
        boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
        log.debug("换了么?" + success);

        log.debug(ref.getReference().toString());
    }
}

class GarbageBag {
    String desc;

    public GarbageBag(String desc) {
        this.desc = desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return super.toString() + " " + desc;
    }

}

3:原子数组(AtomicXXXArray)

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray
// supplier 提供者 无中生有 ()->结果
// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
    private static <T> void demo(
            Supplier<T> arraySupplier,
            Function<T, Integer> lengthFun,
            BiConsumer<T, Integer> putConsumer,
            Consumer<T> printConsumer ) {

        List<Thread> ts = new ArrayList<>();
        T array = arraySupplier.get();
        int length = lengthFun.apply(array);
        for (int i = 0; i < length; i++) {
            // 每个线程对数组作 10000 次操作
            ts.add(new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    putConsumer.accept(array, j%length);
                }
            }));
        }
        ts.forEach(t -> t.start()); // 启动所有线程

        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }); // 等所有线程结束
        printConsumer.accept(array);
    }

一个方法用来测试,为了可以既测试普通数组,又测试原子数组,我们使用函数式接口的方式来传参:

  • 参数一:Supplier arraySupplier,提供数组。
  • 参数二: Function<T, Integer> lengthFun,返回数组的长度;
  • 参数三:BiConsumer<T, Integer> putConsumer,对数组的索引位置进行操作(自增)
  • 参数四:Consumer printConsumer,打印数组的元素;

看到在循环中我们每个线程都对数组的索引位置进行自增操作,因为是取模均摊,所以最后应该每个元素都是10000:

public static void main(String[] args) {
    demo(
            ()->new int[10],
            array->array.length,
            (array,index)->array[index]++,
            (array)-> System.out.print(Arrays.toString(array))
    );
}

我们使用普通数组进行测试:

[7127, 6868, 7474, 7599, 7579, 7645, 7678, 7696, 7765, 7760]

结果发现,并没有都是10000,说明发送了线程安全问题,我们换成原子数组,原子数组能保证cas的原子性:

public static void main(String[] args) {
    demo(
            ()->new int[10],
            array->array.length,
            (array,index)->array[index]++,
            (array)-> System.out.println(Arrays.toString(array))
    );
    demo(
            ()->new AtomicIntegerArray(10),
            (array)->array.length(),
            (array,index)->array.incrementAndGet(index),
            (array)-> System.out.print(array)
    );

测试:

[6645, 6679, 6658, 6671, 6683, 6646, 6650, 6669, 6643, 6693]
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

4:原子更新器(AtomicXXXXFieldUpdater)

  • 原子更新器是对类的某一个字段进行原子操作;
  • 进行原子操作的字段必须由volatile修饰,否则会保错;
class Student{
    volatile String name;

   /* public Student(String name) {
        this.name = name;
    }*/

    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                '}';
    }
}
public static void main(String[] args) {
    Student student = new Student();
    AtomicReferenceFieldUpdater updater=
            AtomicReferenceFieldUpdater.newUpdater(Student.class,String.class,"name");
    System.out.println(updater.compareAndSet(student, null, "张三"));
    System.out.println(student);
}

使用AtomicReferenceFieldUpdater,因为要修改的字段的类型是引用类型;

5:原子累加器

测试方法

private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
    T adder = adderSupplier.get();

    long start = System.nanoTime();

    List<Thread> ts = new ArrayList<>();
    // 4 个线程,每人累加 50 万
    for (int i = 0; i < 40; i++) {
        ts.add(new Thread(() -> {
            for (int j = 0; j < 500000; j++) {
                action.accept(adder);
            }
        }));
    }
    ts.forEach(t -> t.start());

    ts.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    long end = System.nanoTime();
    System.out.println(adder + " cost:" + (end - start)/1000_000);
}
  • 参数一:Supplier adderSupplier提供累加器对象
  • 参数二:Consumer action:累加操作

实验:

public static void main(String[] args) {
    demo(
            ()->new AtomicLong(0),
            (addr)->addr.getAndIncrement()
    );
}

目的是测试AtomicLong和原子累加器的性能区别;

20000000 cost:201

原子累加器:

  • 原子累加器是专门用来做累加的
public static void main(String[] args) {
    demo(
            ()->new AtomicLong(0),
            (addr)->addr.getAndIncrement()
    );
    demo(
            ()->new LongAdder(),
            (addr)->addr.increment()
    );

}

测试结果:

20000000 cost:227
20000000 cost:23

发现原子累加器的性能提升很大;

  • 性能提升的原因也很简单就是设置多个累加单元,对多个累加单元进行累加,最后进行汇总,这样线程不是只在一个累加单元进行操作,减少了重试的次数,从而提升了性能;

四:LongAdder

  • 几个关键域:
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;

// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;

// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;

1:cas锁

LongAdder底层锁的实现:

模拟一下:

@Slf4j
class MyLock{
    //0没加锁
    //1加锁
    private AtomicInteger state=new AtomicInteger(0);
    public void lock(){
        while (true){
            if (state.compareAndSet(0,1)){
                break;
            }
        }
    }
    public void unLock(){
        log.debug("释放锁");
        state.set(0);
    }
}
  • 就是使用AtomicInteger来实现锁,0表示不加锁,1表示加锁,然后在lock方法中需要循环将0设为1,因为只有一个线程设值成功,其他线程没有成功只能循环等待,直到第一个线程释放锁;
public static void main(String[] args) {
    MyLock lock = new MyLock();

    new Thread(() -> {
        log.debug("begin...");
        lock.lock();
        try {
            log.debug("lock...");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unLock();
        }
    }).start();

    new Thread(() -> {
        log.debug("begin...");
        lock.lock();
        try {
            log.debug("lock...");
        } finally {
            lock.unLock();
        }
    }).start();
}
18:30:33.270 [Thread-0] DEBUG com.hbu.LongAdder.test1 - begin...
18:30:33.272 [Thread-0] DEBUG com.hbu.LongAdder.test1 - lock...
18:30:33.270 [Thread-1] DEBUG com.hbu.LongAdder.test1 - begin...
18:30:34.281 [Thread-0] DEBUG com.hbu.LongAdder.MyLock - 释放锁
18:30:34.281 [Thread-1] DEBUG com.hbu.LongAdder.test1 - lock...
18:30:34.281 [Thread-1] DEBUG com.hbu.LongAdder.MyLock - 释放锁

2:伪共享

一个缓存行存放多个cell数据就是伪共享:

// 防止缓存行伪共享  
// Contended: v.(尤指在争论中)声称,主张,认为;竞争;争夺 ,contend的过去分词和过去式
@sun.misc.Contended
static final class Cell {

    volatile long value;
    
    Cell(long x) { 
        value = x; 
    }

    // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
    final boolean cas(long prev, long next) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
    }
// 省略不重要代码
}

先从缓存说起,cpu从缓存读取数据比直接从内存中读取数据的速度要更快;

而缓存是以缓存行为单位的,一个缓存行对应一块内存,一般是64byte(8个long)

缓存的加入会产生数据副本,同一个数据要被缓存到多个cpu核心的缓存行中,cpu为了保证一致性,当一个核心的数据修改时,其他核心对应的缓存行都会失效;

image.png

transient volatile Cell[] cells;

因为cell是数组类型,在内存中是连续存在的,而一个cell对象是24个字节(16个对象头,8个value),因此一个缓存行可以存放两个cell对象;

这样的话,上图的核心1和核心2无论谁修改了数据都会导致对方的缓存行失效;

如何解决:

加上@sun.misc.Contended注解:这个注解的作用是:加在字段或者类上,可以使使用注解的对象或字段前后各加128字节的padding,这样就可以使每个cell占用不同的缓存行,不会使缓存失效;


http://www.kler.cn/a/445680.html

相关文章:

  • Go 语言常量
  • 不会心理描写,神态描写怎么办?
  • 启用WSL后,使用ssh通道连接ubuntu
  • 纯前端实现更新检测
  • 有关异步场景的 10 大 Spring Boot 面试问题
  • TypeScript进阶实战:构建可维护的企业级应用
  • postman-9.12.2–安装包及汉化
  • 轨迹优化 | 基于Savitzky-Golay滤波的无约束路径平滑(附ROS C++/Python仿真)
  • OpenGL ES 01 渲染一个四边形
  • [Unity]【图形渲染】【游戏开发】Shader数学基础4-更多矢量运算
  • PC寄存器(Program Counter Register)jvm
  • 2024年云计算的发展趋势如何?
  • 【图像处理lec7】图像恢复、去噪
  • SSM 框架结合 Vue 实现电脑测评系统:助力用户明智选择
  • 在M系列芯片的Mac上使用Uniapp开发的依赖安装指南
  • 裸金属服务器的作用都有哪些?
  • GitHub年度报告发布!Python首次超越JavaScript
  • 高校教师成果管理小程序的设计与实现springboot+论文源码调试讲解
  • 全国青少年信息学奥林匹克竞赛(信奥赛)备考实战之分支结构(多分支结构)
  • 在VBA中结合正则表达式和查找功能给文档添加交叉连接
  • css 动画实现从中间到两边亮度逐渐变暗的流水灯效果
  • DNS 服务器是什么?有什么作用
  • MQTT入门:在Spring Boot中建立连接及测试
  • 面试题整理10----k8s集群架构是什么
  • 数据库管理系统——数据库设计
  • 【Linux】基础IO------理解文件系统(inode)