04、JUC并发编程之:简单概述(四)
JUC并发编程之:简单概述(四)
##本章内容:
无锁并发--乐观锁(非阻塞)
·CAS与volatile
·原子整数
·原子引用
·原子数组
·字段更新器
·原子累加器
·Unsafe
一、CAS与volatile
1、保护共享资源
·有一个账户,有1万元,现在有1000个线程,每个线程每次取10元,最后会剩下0元
##错误实现 与 加锁实现
@Slf4j
public class CasAndVolatile01 {
public static void main(String[] args) {
//unsafe实现
Account account1 = new AccountUnsafe(10000);
Account.demo(account1);
//加锁实现
Account account2 = new AccountLock(10000);
Account.demo(account2);
}
}
//加锁实现
class AccountLock implements Account{
Integer balance;
public AccountLock(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
synchronized (this){
return this.balance;
}
}
@Override
public void withDraw(Integer amount) {
synchronized (this){
this.balance -= amount;
}
}
}
//错误实现
class AccountUnsafe implements Account{
Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
return this.balance;
}
@Override
public void withDraw(Integer amount) {
this.balance -= amount;
}
}
interface Account{
//获取余额
Integer getBalance();
//取款
void withDraw(Integer amount);
//方法内启动1000个线程,每个线程-10元
//如果总额为10000,那么余额将会为0
static void demo(Account account){
List<Thread> ts = new ArrayList<>();
for(int i=0;i<1000;i++){
ts.add(new Thread(()->{
account.withDraw(10);
}));
}
//计算起始时间
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance()
+" cost : "+(end-start)/1000_000+"ms");
}
}
##结果:
310 cost : 80ms ##错误实现
0 cost : 69ms ##加锁实现
##无锁实现
//无锁实现
class AccountCas implements Account{
AtomicInteger balance;
public AccountCas(Integer balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get(); //底层是volatile
}
@Override
public void withDraw(Integer amount) {
while(true){
//余额最新值
Integer prev = balance.get();
//修改后的余额
Integer next = prev - amount;
//同步到主存--比较并设置
if(balance.compareAndSet(prev,next)){
break;
}
}
}
}
##结果:
160 cost : 85ms ##错误实现
0 cost : 73ms ##加锁实现
0 cost : 65ms ##无锁实现
2、CAS与volatile
CAS:
·上面AtomicInteger的解决方法,内部并没有使用锁来保护共享变量的线程安全,它的实现原理:
@Override
public void withDraw(Integer amount) {
while(true){
//余额最新值
Integer prev = balance.get();
//修改后的余额
Integer next = prev - amount;
//同步到主存---比较并设置
if(balance.compareAndSet(prev,next)){
break;
}
}
}
其中compareAndSet,它的简称就是CAS(也称 compare and swap),它必须是原子操作
·CAS底层是lock cmpxchg指令(X86架构),在单个CPU和多核CPU下都能够保证【比较-交换】的原子
性
·在多核状态下,某个执行到带lock的指令时,CPU会让总线锁住,当这个核把此指令执行完毕,再开启
总线,这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
【CAS会在修改前判断 自己共享变量修改之前读到的数据和当前的数据是否一致,如果不一致则返回
·false,重新修改,如果一致则返回true修改成功】
volatile:
·获取共享变量时,为了保证该变量的可见性,需要使用volatile修饰。
·它可以用来修饰成员变量和静态成员变量,它可以避免线程从自己的工作缓存中查找变量的值,必须到
主存中获取他的值,线程操作volatile变量都是直接操作主存。即一个线程对volatile变量的修改,
对另一个线程可见。
##注意:
volatile仅保证了共享变量的可见性,让其他线程能够看到最新值,但不能解决指令交错问题
(不能保证原子性)
·【CAS必须借助volatile才能读取到共享变量的最新值来实现【比较并交换】的效果】
##AtomicInteger部分源码:
public class AtomicInteger{
private volatile int value;
}
3、CAS效率分析
##为什么无锁效率高?
·无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而synchronized会让线程在没有获得
锁的时候,发生上下文切换,进入阻塞
(打个比方:线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车减
速,熄火,等被唤醒又得重新打火、启动、加速恢复到高速运行,代价比较大)
·但无锁情况下因为线程要保持运行,需要额外CPU的支持,CPU在这里就好比高速跑道,没有额外的跑
道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,
还是会导致上下文切换。
4、CAS特点
·CAS结合volatile可以实现无锁并发,适用于线程数少,多核CPU的场景下:
>CAS是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃点亏
再重试呗
>synchronized是基于悲观锁的思想:最悲观的估计,得防着其他线程来修改共享变量,我上了锁你们
都别想改,我改完了解开锁,你们才有机会
>CAS体现的是无锁并发,无阻塞并发:
>>因为没有使用synchronized,所以县城不会陷入阻塞,这是效率提升的因素之一
>>但如果竞争激烈,可以想到重试必然发生,反而会影响效率
二、原子整数
##JUC并发包提供了一些供CAS实现工具类:
·AtomicBoolean
·AtomicInteger
·AtomicLong
##ActomicInteger常用方法:
@Slf4j
public class ActomicIntegerTest {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
System.out.println(i.incrementAndGet());//++i
System.out.println(i.getAndIncrement());//i++
System.out.println(i.addAndGet(5));//先+5然后get
System.out.println(i.getAndAdd(5));//先get然后+5
System.out.println(i.updateAndGet(x->x*5));//先乘以5后get
System.out.println(i.getAndUpdate(x->x*5));//先get然后乘以5
System.out.println(i.get());
}
}
updateAndGet( )底层源码:
public final int updateAndGet(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return next;
}
三、原子引用
##由于我们的共享变量并不一定都是基本数据类型,比如说BigDecimal,我们就可以使用原子引用
保证该共享变量的线程安全
·AtomicReference
·AtomicMarkableReference
·AtomicStampedReference 有版本号的
3.1、AtomicReference
public class CasAndVolatile02 {
public static void main(String[] args) {
DecimalAccount account = new BigDecimalAccountCas(new BigDecimal("10000"));
DecimalAccount.demo(account);
}
}
class BigDecimalAccountCas implements DecimalAccount{
private AtomicReference<BigDecimal> balance;
public BigDecimalAccountCas(BigDecimal balance) {
this.balance = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return balance.get();
}
@Override
public void withDraw(BigDecimal amount) {
while (true){
BigDecimal prev = balance.get();
BigDecimal next = prev.subtract(amount);
if(balance.compareAndSet(prev,next)){
break;
}
}
}
}
interface DecimalAccount{
//获取余额
BigDecimal getBalance();
//取款
void withDraw(BigDecimal amount);
//方法内启动1000个线程,每个线程-10元
//如果总额为10000,那么余额将会为0
static void demo(DecimalAccount account2){
List<Thread> ts = new ArrayList<>();
for(int i=0;i<1000;i++){
ts.add(new Thread(()->{
account2.withDraw(BigDecimal.TEN);
}));
}
//计算起始时间
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account2.getBalance()
+" cost : "+(end-start)/1000_000+"ms");
}
}
3.2、ABA问题:
##主线程将共享变量从A---修改成---->C
##但在主线程修改过程中t线程将共享变量从A---修改成---->B---又修改成--->A
##主线程还是会修改成功,但主线程是无法感知共享变量的变化的
##代码如下:
@Slf4j
public class AtomicReferenceABA01 {
static AtomicReference<String> at = new AtomicReference("A");
public static void main(String[] args) {
log.debug("main start...");
String prev = at.get();
other();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("main change A-->C : {}",at.compareAndSet(prev,"C"));
}
private static void other(){
new Thread(()->{
log.debug("t1 change A-->B : {}",at.compareAndSet("A","B"));
},"t1").start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
log.debug("t1 change B-->A : {}",at.compareAndSet("B","A"));
},"t2").start();
}
}
##结果:
10:06:54.024 [main] DEBUG xxx - main start...
10:06:54.119 [t1] DEBUG xxx - t1 change A-->B : true
10:06:54.620 [t2] DEBUG xxx - t1 change B-->A : true
10:06:55.634 [main] DEBUG xxx - main change A-->C : true
##如何才能让main线程感知到 共享变量被修改呢?
##只要有其他线程动过了共享变量,那么自己的CAS就算失败,这时仅比较值是不够的,
需要再增加一个版本号
3.3、AtomicStampedReference
@Slf4j
public class AtomicStampedReferenceABA02 {
static AtomicStampedReference<String> asr =new AtomicStampedReference<>("A",0);
public static void main(String[] args) {
log.debug("main start....");
String prev = asr.getReference();
int stamp = asr.getStamp();
log.debug("main prev:{} , stamp:{}",prev,stamp);
other();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("main change A-->C : {}",asr.compareAndSet(prev,"C",stamp,stamp+1));
}
private static void other(){
new Thread(()->{
String prev = asr.getReference();
int stamp = asr.getStamp();
log.debug("t1 prev:{} , stamp:{}",prev,stamp);
log.debug("t1 change A-->B : {}",asr.compareAndSet(prev,"B",stamp,stamp+1));
},"t1").start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
String prev = asr.getReference();
int stamp = asr.getStamp();
log.debug("t2 prev:{} , stamp:{}",prev,stamp);
log.debug("t2 change B-->A : {}",asr.compareAndSet(prev,"A",stamp,stamp+1));
},"t2").start();
}
}
##结果:
10:25:43.158 [main] DEBUG xxx - main start....
10:25:43.172 [main] DEBUG xxx - main prev:A , stamp:0
10:25:43.217 [t1] DEBUG xxx - t1 prev:A , stamp:0
10:25:43.217 [t1] DEBUG xxx - t1 change A-->B : true
10:25:44.227 [t2] DEBUG xxx - t2 prev:B , stamp:1
10:25:44.227 [t2] DEBUG xxx - t2 change B-->A : true
10:25:45.242 [main] DEBUG xxx - main change A-->C : false
3.4、AtomicMarkableReference
##AtomicStampedReference 通过版本号 可以记录 是否被修改了,和修改了几次
##AtomicMarkableReference 通过boolean标记 是否被修改
@Slf4j
public class AtomicMarkableReferenceTest {
public static void main(String[] args) {
GarbageBag bag = new GarbageBag("一个满了的垃圾袋");
AtomicMarkableReference<GarbageBag> amr = new AtomicMarkableReference<>(bag,true);
log.debug("main start...");
GarbageBag prev = amr.getReference();
log.debug("main prev:{}",prev);
new Thread(()->{
log.debug("清洁阿姨 更换垃圾袋: {}",amr.compareAndSet(prev,new GarbageBag("清洁阿姨放了一个新的垃圾袋"),true,false));
},"清洁阿姨").start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("main 更换垃圾袋: {}",amr.compareAndSet(prev,new GarbageBag("main放了一个新的垃圾袋"),true,false));
}
}
@Data
@ToString
@AllArgsConstructor
class GarbageBag{
private String desc;
}
##结果:
10:41:53.070 [main] DEBUG xxx - main start...
10:41:53.086 [main] DEBUG xxx - main prev:GarbageBag(desc=一个满了的垃圾袋)
10:41:53.150 [清洁阿姨] DEBUG xxx - 清洁阿姨 更换垃圾袋: true
10:41:54.149 [main] DEBUG xxx - main 更换垃圾袋: false
四、原子数组
·AtomicIntegerArray
·AtomicLongArray
·AtomicReferenceArray
##原子数组可以在多线程中保护数组里的元素
@Slf4j
public class AtomicIntegerArrayTest {
public static void main(String[] args) {
//创建了容量为10个int的AtomicIntegerArray
AtomicIntegerArray array = new AtomicIntegerArray(10);
log.debug("第2个值:{}",array.get(1));
int[] array2 = new int[]{1,2,3,4,5,6,7,8,9,10};
AtomicIntegerArray array3 = new AtomicIntegerArray(array2);
log.debug("第2个值:{}",array3.get(1));
log.debug("第2个值+1:{}",array3.getAndIncrement(1));
log.debug("第2个值:{}",array3.get(1));
log.debug("第3个值+5:{}",array3.getAndAdd(2,5));
log.debug("第3个值:{}",array3.get(2));
log.debug("第4个值*7:{}",array3.getAndUpdate(3,t->t*7));
log.debug("第3个值:{}",array3.get(3));
log.debug("修改第5个值为999");
array3.set(4,999);
log.debug("第5个值:{}",array3.get(4));
}
}
##结果:
11:45:37.301 [main] DEBUG xxx - 第2个值:0
11:45:37.316 [main] DEBUG xxx - 第2个值:2
11:45:37.317 [main] DEBUG xxx - 第2个值+1:2
11:45:37.317 [main] DEBUG xxx - 第2个值:3
11:45:37.317 [main] DEBUG xxx - 第3个值+5:3
11:45:37.317 [main] DEBUG xxx - 第3个值:8
11:45:37.361 [main] DEBUG xxx - 第4个值*7:4
11:45:37.361 [main] DEBUG xxx - 第3个值:28
11:47:45.114 [main] DEBUG xxx - 修改第5个值为999
11:47:45.114 [main] DEBUG xxx - 第5个值:999
五、字段更新器
·AtomicReferenceFieldUpdater
·AtomicIntegerFieldUpdater
·AtomicLongFieldUpdater
##字段更新器可以在多线程中保护对象的某个属性\成员变量
@Slf4j
public class AtomicReferenceFieldUpdaterTest {
public static void main(String[] args) {
Student st = new Student("李四");
//参数:类,要更新字段的类型,要更新字段的名称
AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(Student.class,String.class,"name");
log.debug("更新名称为张三:{}",updater.compareAndSet(st,"李四","张三"));
log.debug("st : {}",st);
}
}
@Data
@ToString
@AllArgsConstructor
class Student{
volatile String name;
}
六、原子累加器
##ActomicLong和LongAdder累加的性能比较
@Slf4j
public class LongAdderTest {
public static void main(String[] args) {
for (int i = 0; i <5 ; i++) {
demo(
()->new AtomicLong(0),
(adder)->adder.getAndIncrement()
);
}
log.debug("-------------");
for (int i = 0; i <5 ; i++) {
demo(
()->new LongAdder(),
(adder)->adder.increment()
);
}
}
private static <T> void demo(Supplier<T> supplier, Consumer<T> consumer){
T adder = supplier.get();
List<Thread> ts = new ArrayList<>();
//4个线程,每人累加50万
for (int i=0;i<4;i++){
ts.add( new Thread(()->{
for(int j=0;j<500000;j++){
consumer.accept(adder);
}
}));
}
long start = System.nanoTime();
ts.forEach(t->t.start());
ts.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
log.debug("adder {}, cost : {} ms",adder,(end-start)/1000_000);
}
}
##结果:
14:57:20.170 [main] DEBUG xxx - adder 2000000, cost : 49 ms
14:57:20.228 [main] DEBUG xxx - adder 2000000, cost : 43 ms
14:57:20.265 [main] DEBUG xxx - adder 2000000, cost : 36 ms
14:57:20.303 [main] DEBUG xxx - adder 2000000, cost : 37 ms
14:57:20.344 [main] DEBUG xxx - adder 2000000, cost : 41 ms
14:57:20.344 [main] DEBUG xxx - -------------
14:57:20.359 [main] DEBUG xxx - adder 2000000, cost : 13 ms
14:57:20.364 [main] DEBUG xxx - adder 2000000, cost : 5 ms
14:57:20.370 [main] DEBUG xxx - adder 2000000, cost : 5 ms
14:57:20.377 [main] DEBUG xxx - adder 2000000, cost : 6 ms
14:57:20.382 [main] DEBUG xxx - adder 2000000, cost : 4 ms
##LongAdder性能提升的原因:
LongAdder性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Thread-0累加Cell[0]
而Thread-1累加Cell[1]...最后将结果汇总,这样他们在累加时操作的不同的Cell变量,因此减
少了CAS重试失败,从而提高性能
七、Unsafe
·Unsafe对象提供了非常底层的,操作内存、线程的方法
·Unsafe对象不能直接调用,只能通过反射获得
获取Unsafe:
@Slf4j
public class UnsafeAccessor {
public static void main(String[] args) {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
log.debug("unsafe:{}",unsafe);
}
}
Unsafe CAS操作:
/**
* 使用Unsafe线程安全的操作Teacher对象的成员变量
* (之前使用AtomicReferenceFieldUpdater)
*/
@Slf4j
public class UnsafeAccessor {
public static void main(String[] args) {
try {
//获取Unsafe
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
log.debug("unsafe:{}",unsafe);
Teacher tc = new Teacher(1,"张三");
//获取域的偏移地址
long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));
//执行CAS操作
unsafe.compareAndSwapInt(tc,idOffset,1,2);
unsafe.compareAndSwapObject(tc,nameOffset,"张三","李四");
//检验
log.debug("tc : {}",tc);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
@Data
@ToString
@AllArgsConstructor
class Teacher{
volatile int id;
volatile String name;
}
Unsafe模拟原子整数:
@Slf4j
public class UnsafeForAtomicInteger {
public static void main(String[] args) {
MyAtomicInteger mat = new MyAtomicInteger(10000);
List<Thread> ts = new ArrayList<>();
//1000个线程,每个线程减去10
for(int i=0;i<1000;i++){
ts.add(new Thread(()->{
mat.decrement(10);
}));
}
ts.forEach(t->t.start());
ts.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
log.debug("mat : {}",mat.get());
}
}
class MyAtomicInteger{
private volatile Integer value;
private static final long valueOffset;
private static final Unsafe UNSAFE;
static{
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
UNSAFE = (Unsafe) theUnsafe.get(null);
valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
} catch (NoSuchFieldException e) {
e.printStackTrace();
throw new RuntimeException();
} catch (IllegalAccessException e) {
e.printStackTrace();
throw new RuntimeException();
}
}
public MyAtomicInteger(Integer value) {
this.value = value;
}
public Integer get(){
return value;
}
public void decrement(Integer num){
while (true){
Integer prev = this.value;
Integer next = prev - num;
if(UNSAFE.compareAndSwapObject(this,valueOffset,prev,next)){
break;
}
}
}
}