Java并发线程 共享模型之管程 5
1. 生产者消费者
package cn.itcast.testcopy;
import cn.itcast.n2copy.util.Sleeper;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
/**
* ClassName: Test21
* Package: cn.itcast.testcopy
* Description: 生产者消费者
*
* @Author: 1043
* @Create: 2024/9/4 - 11:11
* @Version: v1.0
*/
public class Test21 {
public static void main(String[] args) {
MessgeQueue queue=new MessgeQueue(2);
for (int i = 0; i < 3; i++) {
int id=i;
new Thread(()->{
queue.put(new Message(id,"值"+id));
},"生产者"+i).start();
}
new Thread(()->{
while (true){
Sleeper.sleep(1);
Message take = queue.take();
}
},"消费者").start();
}
}
// 消息队列类,rabbitmq是进程间通信,这个类比较简单是线程间通信
@Slf4j(topic = "c.MessgeQueue")
class MessgeQueue {
// 消息的队列集合
private LinkedList<Message> list = new LinkedList<>();
// 队列容量
private int capcity;
public MessgeQueue(int capcity) {
this.capcity = capcity;
}
public Message take() {
// 检查队列是否为空
synchronized (list) {
while (list.isEmpty()) {
try {
log.debug("队列为空, 消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从队列头部获取消息并返回
Message message = list.removeFirst();
log.debug("已消费消息 {}", message);
list.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (list) {
// 检查对象是否已满
while (list.size() == capcity) {
try {
log.debug("队列已满, 生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 将消息加入队列尾部
list.addLast(message);
log.debug("已生产消息 {}", message);
list.notifyAll();
}
}
}
final class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", message=" + message +
'}';
}
}
2. park和unpark
先看一段代码
@Slf4j(topic = "c.Testpark")
public class Testpark {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(2);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);
}
}
运作结果如图
与 Object 的 wait & notify 相比
wait
、notify
和notifyAll
必须配合Object Monitor
一起使用,而park
、unpark
不必。park
和unpark
是以线程为单位来【阻塞】和【唤醒】线程,而notify
只能随机唤醒一个等待线程,notifyAll
是唤醒所有等待线程,就不那么【精确】。park
和unpark
可以先unpark
,而wait
和notify
不能先notify
。
原理
每个线程都有自己的一个 Parker 对象,由三部分组成 _counter , _cond 和 _mutex 打个比喻线程就像一个旅人,Parker 就像他随身携带的背包,条件变量就好比背包中的帐篷。_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
调用 park 就是要看需不需要停下来歇息
如果备用干粮耗尽,那么钻进帐篷歇息
如果备用干粮充足,那么不需停留,继续前进
调用 unpark,就好比令干粮充足
如果这时线程还在帐篷,就唤醒让他继续前进
如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留,继续前进,因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮
3. 多把锁
一间大屋子有两个功能:睡觉、学习,互不相干。
现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低
解决方法是准备多个房间(多个对象锁)例如
class BigRoom2 {
public void sleep() {
synchronized (this) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}
public void study() {
synchronized (this) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}
BigRoom bigRoom = new BigRoom();
new Thread(() -> {
bigRoom.study();
}, "小南").start();
new Thread(() -> {
bigRoom.sleep();
}, "小女").start();
此时效率很低,可改进降低锁的粒度
public class TestMultiLock {
public static void main(String[] args) {
BigRoom bigRoom = new BigRoom();
new Thread(() -> {
bigRoom.study();
}, "小南").start();
new Thread(() -> {
bigRoom.sleep();
}, "小女").start();
}
}
@Slf4j(topic = "c.BigRoom")
class BigRoom {
private final Object studyRoom = new Object();
private final Object bedRoom = new Object();
public void sleep() {
synchronized (bedRoom) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}
public void study() {
synchronized (studyRoom) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}
此时两个小房间互不影响。不过这种情况容易造成死锁。
死锁
有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁t1 线程 获得 A对象 锁,接下来想获取 B对象的锁 t2 线程 获得 B对象 锁,接下来想获取 A对象的锁 例:
@Slf4j(topic = "c.TestDeadLock")
public class TestDeadLock {
public static void main(String[] args) {
test1();
}
private static void test1() {
Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
synchronized (A) {
log.debug("lock A");
sleep(1);
synchronized (B) {
log.debug("lock B");
log.debug("操作...");
}
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (B) {
log.debug("lock B");
sleep(0.5);
synchronized (A) {
log.debug("lock A");
log.debug("操作...");
}
}
}, "t2");
t1.start();
t2.start();
}
}
检测死锁可以使用 jconsole工具,或者使用 jps 定位进程 id,再用 jstack 定位死锁:
活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如
@Slf4j(topic = "c.TestLiveLock")
public class TestLiveLock {
static volatile int count = 10;
static final Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
sleep(0.2);
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}
一个希望向下减一个希望向上加,互相正直在一个范围内。也是被锁住了。
饥饿
很多教程中把饥饿定义为,一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束,饥饿的情况不易演示,讲读写锁时会涉及饥饿问题。如哲学家就餐问题将最后一个哲学家的左右手顺序颠倒一个就不会死锁了,但会由此引发饥饿问题。
4.ReentrantLock
相对于 synchronized 它具备如下特点
可中断
可以设置超时时间
可以设置为公平锁
支持多个条件变量
与 synchronized 一样,都支持可重入,不过synchronized是在关键字级别保护临界区而ReentrantLock在对象级别保护临界区。
基本语法
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
第一行加锁的语句放try里跟上面效果完全等价。
特性1 可重入
示例
package cn.itcast.testcopy;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
/**
* ClassName: Test22
* Package: cn.itcast.testcopy
* Description:
*
* @Author: 1043
* @Create: 2024/9/4 - 16:09
* @Version: v1.0
*/
@Slf4j(topic = "c.Test22")
public class Test22 {
private static ReentrantLock lock=new ReentrantLock();
public static void main(String[] args) {
lock.lock();
try {
log.debug("enter main");
m1();
}finally {
lock.unlock();
}
}
public static void m1() {
lock.lock();
try {
log.debug("enter m1");
m2();
}finally {
lock.unlock();
}
}
public static void m2() {
lock.lock();
try {
log.debug("enter m2");
}finally {
lock.unlock();
}
}
}
特性2 可打断
package cn.itcast.testcopy;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
/**
* ClassName: Test22kedaduan
* Package: cn.itcast.testcopy
* Description:
*
* @Author: 1043
* @Create: 2024/9/4 - 16:13
* @Version: v1.0
*/
@Slf4j(topic = "c.Test22kedaduan")
public class Test22kedaduan {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
try {
// 如果没有竞争那么此方法就会获取 Lock 对象锁
// 如果有竞争就进入阻塞队列,可以被其它线程用 interrupt 方法打断
log.debug("尝试获取锁");
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("没有获得锁,返回");
return;
}
try {
log.debug("获取到锁");
} finally {
lock.unlock();
}
},"t1");
t1.start();
}
}
lock.lock();//主线程先加锁
t1.start();
Sleeper.sleep(1);
log.debug("打断t1");
t1.interrupt();
t1可以被打断,不会死等下去, 这就为防止死锁发生提供了方式。
特性3 锁超时
@Slf4j(topic = "c.Test22")
public class Test22 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
log.debug("尝试获得锁");
try {
if (!lock.tryLock(2, TimeUnit.SECONDS)) {
log.debug("获取不到锁");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("获取不到锁");
return;
}
try {
log.debug("获得到锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得到锁");
t1.start();
sleep(3);
log.debug("释放了锁");
lock.unlock();
}
}
主线程3s后才释放锁,t1的tryLock只等2s,2s后还获取不到就放弃等待。tryLock若是没参数就只尝试一次获取不到立即放弃。
应用 解决哲学家就餐问题
public class TestDeadLock {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
}
}
@Slf4j(topic = "c.Philosopher")
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
@Override
public void run() {
while (true) {
// 尝试获得左手筷子
if (left.tryLock()) {
try {
// 尝试获得右手筷子
if (right.tryLock()) {
try {
eat();
} finally {
right.unlock();
}
}
} finally {
left.unlock();
}
}
}
}
private void eat() {
log.debug("eating...");
Sleeper.sleep(1);
}
}
class Chopstick extends ReentrantLock {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
核心在于获取右手筷子失败后放开左手的筷子,这样别人就有机会获取筷子,就不会发生死锁与饥饿了。
特性4. 公平锁
ReentrantLock 默认是不公平的,公平锁一般没有必要,会降低并发度。
特性5 条件变量
synchronized 中也有条件变量,就是我们前面分析原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待。
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比 synchronized 是那些不满足条件的线程都在一间休息室等消息。而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒
使用要点:
await 前需要获得锁
await 执行后,会释放锁,进入 conditionObject 等待
await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
竞争 lock 锁成功后,从 await 后继续执行