J.U.C(2)
目录
- JUC
- 一:stamedLock
- 乐观读
- 二:stamphore
- 应用
- 原理
- 三:countDownLatch
- 应用
- 四:CyclicBarrier
JUC
一:stamedLock
- 该类从jdk8加入,目的是进一步提高读的性能,他的特点是在使用读,写锁时都要配合【戳】来进行使用;
加解读锁:
long stamp = lock.readLock();
lock.unlockRead(stamp);
加解写锁:
long stamp = lock.writeLock();
lock.unlockWrite(stamp);
乐观读
- 乐观读,stamedLock支持使用tryOptimisticRead方法进行乐观读,在使用之后还要进行检验,使用validate,如果检验成功,说明期间没有线程进行写操作,数据有效。如果检验失败,则要进行锁升级,重新获取读锁,保证数据安全
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
// 锁升级
}
示例:
class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();
public DataContainerStamped(int data) {
this.data = data;
}
public int read(int readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("optimistic read locking...{}", stamp);
sleep(readTime);
if (lock.validate(stamp)) {
log.debug("read finish...{}, data:{}", stamp, data);
return data;
}
// 锁升级 - 读锁
log.debug("updating to read lock... {}", stamp);
try {
stamp = lock.readLock();
log.debug("read lock {}", stamp);
sleep(readTime);
log.debug("read finish...{}, data:{}", stamp, data);
return data;
} finally {
log.debug("read unlock {}", stamp);
lock.unlockRead(stamp);
}
}
public void write(int newData) {
long stamp = lock.writeLock();
log.debug("write lock {}", stamp);
try {
sleep(2);
this.data = newData;
} finally {
log.debug("write unlock {}", stamp);
lock.unlockWrite(stamp);
}
}
}
读 - 读不影响并发:
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1);
}, "t1").start();
sleep(0.5);
new Thread(() -> {
dataContainer.read(0);
}, "t2").start();
}
15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256
15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256
15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1
15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1
读-写的时候会进行锁升级:
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1);
}, "t1").start();
sleep(0.5);
new Thread(() -> {
dataContainer.write(100);
}, "t2").start();
}
15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256
15:57:00.717 c.DataContainerStamped [t2] - write lock 384
15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256
15:57:02.719 c.DataContainerStamped [t2] - write unlock 384
15:57:02.719 c.DataContainerStamped [t1] - read lock 513
15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000
15:57:03.719 c.DataContainerStamped [t1] - read unlock 513
虽然stamedLock在读的性能上较为优秀,但是还要几个缺点:
1:不可重入;
2:不支持条件变量;
二:stamphore
- 中文是信号量,可以控制访问资源的线程数:
public static void main(String[] args) {
// 1. 创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);
// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 3. 获取许可
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running...");
sleep(1);
log.debug("end...");
} finally {
// 4. 释放许可
semaphore.release();
}
}).start();
}
}
- 这里初始化semaphore的大小是3,所以一次只能有三个线程获得许可,剩余的线程陷入阻塞;
输出:
07:35:15.485 c.TestSemaphore [Thread-2] - running...
07:35:15.485 c.TestSemaphore [Thread-1] - running...
07:35:15.485 c.TestSemaphore [Thread-0] - running...
07:35:16.490 c.TestSemaphore [Thread-2] - end...
07:35:16.490 c.TestSemaphore [Thread-0] - end...
07:35:16.490 c.TestSemaphore [Thread-1] - end...
07:35:16.490 c.TestSemaphore [Thread-3] - running...
07:35:16.490 c.TestSemaphore [Thread-5] - running...
07:35:16.490 c.TestSemaphore [Thread-4] - running...
07:35:17.490 c.TestSemaphore [Thread-5] - end...
07:35:17.490 c.TestSemaphore [Thread-4] - end...
07:35:17.490 c.TestSemaphore [Thread-3] - end...
07:35:17.490 c.TestSemaphore [Thread-6] - running...
07:35:17.490 c.TestSemaphore [Thread-7] - running...
07:35:17.490 c.TestSemaphore [Thread-9] - running...
07:35:18.491 c.TestSemaphore [Thread-6] - end...
07:35:18.491 c.TestSemaphore [Thread-7] - end...
07:35:18.491 c.TestSemaphore [Thread-9] - end...
07:35:18.491 c.TestSemaphore [Thread-8] - running...
07:35:19.492 c.TestSemaphore [Thread-8] - end...
应用
-
semaphore可以用来限流,在访问高峰的时候,将请求阻塞住,等高峰过去之后再将线程释放,但只支持单机,并且这里限制的线程数,而不是资源数;
-
semaphore也可以实现简单的连接池,替代awit,signal,提高了性能和可读性;(线程数和数据库连接数相同)
@Slf4j(topic = "c.Pool")
class Pool {
// 1. 连接池大小
private final int poolSize;
// 2. 连接对象数组
private Connection[] connections;
// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private AtomicIntegerArray states;
private Semaphore semaphore;
// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
// 让许可数与资源数一致
this.semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i+1));
}
}
// 5. 借连接
public Connection borrow() {// t1, t2, t3
// 获取许可
try {
semaphore.acquire(); // 没有许可的线程,在此等待
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 不会执行到这里
return null;
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
log.debug("free {}", conn);
semaphore.release();
break;
}
}
}
}
原理
- aquire
aquire方法->acquireSharedInterruptibly方法->tryAcquireShared方法->nonfairTryAcquireShared方法
我们之前提起过,tryAcquireShared,-1是加锁失败,0成功,正数还剩多少state可以获取
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();//获取state
int remaining = available - acquires;//state-1
if (remaining < 0 ||//如果减了之后等于负数,直接返会负数。如果没有呢,就进行cas赋值
compareAndSetState(available, remaining))
return remaining;
}
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
当tryAcquireShared返回值为负数时进入doAcquireSharedInterruptibly方法;
因为没有位置可以使用了,就像之前的reentrantLock一样进入等待队列:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
- 加入队列后,再尝试获取锁,然后就会将前驱节点的值设为-1之后就park阻塞住;
- 释放锁的原理
release方法->releaseShared方法->tryReleaseShared方法
tryReleaseShared方法中,会让state+1;
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
加一之后进入if块执行doReleaseShared方法:
- 将AQS队列中的头节点的状态设为0,然后唤醒后继节点,然后要判断自己是否是老二节点,如果是就去尝试获取锁,就是让state减一,如果返回结果大于等于0,就将自己的节点作为头节点,然后再判断后面的节点是否是share类型的,如果是的话也会唤醒,尝试获取锁失败还是会陷入阻塞
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
三:countDownLatch
- countDownLatch用于多线程的同步协作,等待全部线程倒计时
- 构造参数是初始化的等待计数值,await方法使线程阻塞等待计数器的值归0,countDown用来将计数值减一;
基本用法示例:
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
new Thread(() -> {
log.debug("start...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("end.....");
latch.countDown();
}, "t1").start();
new Thread(() -> {
log.debug("start...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("end.....");
latch.countDown();
}, "t2").start();
new Thread(() -> {
log.debug("start...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("end.....");
latch.countDown();
}, "t3").start();
log.debug("wait....");
latch.await();
log.debug("ok");
}
结果:
19:42:26.333 [t3] DEBUG com.hbu.aqs.testCount - start...
19:42:26.333 [t1] DEBUG com.hbu.aqs.testCount - start...
19:42:26.333 [t2] DEBUG com.hbu.aqs.testCount - start...
19:42:26.333 [main] DEBUG com.hbu.aqs.testCount - wait....
19:42:26.841 [t2] DEBUG com.hbu.aqs.testCount - end.....
19:42:27.340 [t1] DEBUG com.hbu.aqs.testCount - end.....
19:42:27.845 [t3] DEBUG com.hbu.aqs.testCount - end.....
19:42:27.846 [main] DEBUG com.hbu.aqs.testCount - ok
等待所有线程运行结束,即计数器减为0时,主线程才停止等待继续运行;
应用
等待所有线程运行完毕:
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(10);
CountDownLatch latch=new CountDownLatch(10);
Random random=new Random();
String[]all=new String[10];
for (int j = 0; j < 10; j++) {
int k = j;
pool.submit(()->{
for (int i = 0; i <= 100; i++) {
all[k]=i+"%";
try {
Thread.sleep(random.nextInt(80));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.print("\r"+Arrays.toString(all));
}
latch.countDown();
});
}
pool.shutdown();
latch.await();
System.out.println("\n"+"欢迎来到王者荣耀!");
}
这里面“\r”和随机睡眠时间是进度条显示的关键;
- 应用之同步等待多个远程调用结束
每次调用都开一个线程,每个调用结束都countDown,最后所有的调用结束,主线程结束等待;
@RestController
public class TestCountDownlatchController {
@GetMapping("/order/{id}")
public Map<String, Object> order(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("total", "2300.00");
sleep(2000);
return map;
}
@GetMapping("/product/{id}")
public Map<String, Object> product(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
if (id == 1) {
map.put("name", "小爱音箱");
map.put("price", 300);
} else if (id == 2) {
map.put("name", "小米手机");
map.put("price", 2000);
}
map.put("id", id);
sleep(1000);
return map;
}
@GetMapping("/logistics/{id}")
public Map<String, Object> logistics(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("name", "中通快递");
sleep(2500);
return map;
}
private void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
可以使用future阻塞接收有返回值的submit。
RestTemplate restTemplate = new RestTemplate();
log.debug("begin");
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(4);
Future<Map<String,Object>> f1 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
latch.countDown();
return r;
});
Future<Map<String, Object>> f2 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
latch.countDown();
return r;
});
Future<Map<String, Object>> f3 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
latch.countDown();
return r;
});
Future<Map<String, Object>> f4 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);
latch.countDown();
return r;
});
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
latch.await();
log.debug("执行完毕");
service.shutdown();
19:51:39.711 c.TestCountDownLatch [main] - begin
{total=2300.00, id=1}
{price=300, name=小爱音箱, id=1}
{price=2000, name=小米手机, id=2}
{name=中通快递, id=1}
19:51:42.407 c.TestCountDownLatch [main] - 执行完毕
四:CyclicBarrier
[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行.
调用await方法减一,直到计数器为0,才会调用构造参数中的runnable方法,然后调用完了还会重置计数器的参数;
计数器的初始值要与线程数保持一致;
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
CyclicBarrier cyclicBarrier=new CyclicBarrier(2,()->{
log.debug("task 1.task2 running again...");
});
for (int i = 0; i < 3; i++) {
pool.submit(()->{
log.debug("task1 ..begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
log.debug("task1..end");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
},"t1");
pool.submit(()->{
log.debug("task2 ..begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
log.debug("task2..end");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
},"t2");
}
pool.shutdown();
}