多线程之间的通讯
Java中线程通信协作的最常见的两种方式:
一.synchronized加锁的线程:通过Object类的wait()/notify()/notifyAll()
二.ReentrantLock类加锁的线程:通过Condition接口的await()/signal()/signalAll()
线程间直接的数据交换:
三.通过管道进行线程间通信:1)字节流;2)字符流
1、同步:即多个线程通过synchronized关键字来实现线程间的通信
public class MyObject {
public void synchronized methodA() {
//do something....
}
public void synchronized methodB() {
//do some other thing
}
}
public class ThreadA extends Thread {
private MyObject object;
//省略构造方法
@Override
public void run() {
super.run();
object.methodA();
}
}
public class ThreadB extends Thread {
private MyObject object;
//省略构造方法
@Override
public void run() {
super.run();
object.methodB();
}
}
public class Run {
public static void main(String[] args) {
MyObject object = new MyObject();
//线程A与线程B 持有的是同一个对象:object
ThreadA a = new ThreadA(object);
ThreadB b = new ThreadB(object);
a.start();
b.start();
}
}
由于线程A和线程B持有同一个MyObject类的对象object,尽管这两个线程需要调用不同的方法,但是它们是同步执行的,假如线程A先执行methodA方法,那么线程B需要等待线程A执行完了methodA()方法之后,它才能执行methodB()方法。这样,线程A和线程B就实现了通信。
这种方式,本质上就是“共享内存”式的通信。多个线程需要访问同一个共享变量,谁拿到了锁(获得了访问权限),谁就可以执行。
2、while轮询的方式
import java.util.ArrayList;
import java.util.List;
public class MyList {
private List<String> list = new ArrayList<String>();
public void add() {
list.add("elements");
}
public int size() {
return list.size();
}
}
import mylist.MyList;
public class ThreadA extends Thread {
private MyList list;
public ThreadA(MyList list) {
super();
this.list = list;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
list.add();
System.out.println("添加了" + (i + 1) + "个元素");
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import mylist.MyList;
public class ThreadB extends Thread {
private MyList list;
public ThreadB(MyList list) {
super();
this.list = list;
}
@Override
public void run() {
try {
while (true) {
if (list.size() == 5) {
System.out.println("==5, 线程b准备退出了");
throw new InterruptedException();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import mylist.MyList;
import extthread.ThreadA;
import extthread.ThreadB;
public class Test {
public static void main(String[] args) {
MyList service = new MyList();
ThreadA a = new ThreadA(service);
a.setName("A");
a.start();
ThreadB b = new ThreadB(service);
b.setName("B");
b.start();
}
}
在这种方式下,线程A不断地改变条件,线程ThreadB不停地通过while语句检测这个条件(list.size()==5)是否成立 ,从而实现了线程间的通信。但是这种方式会浪费CPU资源。之所以说它浪费资源,是因为JVM调度器将CPU交给线程B执行时,它没做啥“有用”的工作,只是在不断地测试 某个条件是否成立。就类似于现实生活中,某个人一直看着手机屏幕是否有电话来了,而不是: 在干别的事情,当有电话来时,响铃通知TA电话来了。
3、wait/notify机制
import java.util.ArrayList;
import java.util.List;
public class MyList {
private static List<String> list = new ArrayList<String>();
public static void add() {
list.add("anyString");
}
public static int size() {
return list.size();
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
if (MyList.size() != 5) {
System.out.println("wait begin " + System.currentTimeMillis());
lock.wait();
System.out.println("wait end " + System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
for (int i = 0; i < 10; i++) {
MyList.add();
if (MyList.size() == 5) {
lock.notify();
System.out.println("已经发出了通知");
}
System.out.println("添加了" + (i + 1) + "个元素!");
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
try {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
Thread.sleep(50);
ThreadB b = new ThreadB(lock);
b.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
线程A要等待某个条件满足时(list.size()==5),才执行操作。线程B则向list中添加元素,改变list 的size。A,B之间如何通信的呢?也就是说,线程A如何知道 list.size() 已经为5了呢?
这里用到了Object类的 wait() 和 notify() 方法。当条件未满足时(list.size() !=5),线程A调用wait() 放弃CPU,并进入等待状态。不像while轮询那样占用CPU。
当条件满足时,线程B调用 notify()通知线程A,所谓通知线程A,就是唤醒线程A,让它尝试获取锁,若获取成功,则进入可运行状态,否则进入阻塞状态。由于线程B还未执行完同步代码块,所以线程A会进入阻塞状态,等待线程B释放锁。这种方式的一个好处就是CPU的利用率提高了。
但是也有一些缺点:比如,线程B先执行,一下子添加了5个元素并调用了notify()发送了通知,而此时线程A还未执行;当线程A执行并调用wait()时,那它永远就不可能被唤醒了。因为,线程B已经发了通知了,以后不再发通知了。这说明:通知过早,会打乱程序的执行逻辑。
相比使用Object的wait()/notify(),使用Condition的await()/signal()这种方式能够更加安全和高效地实现线程间协作。Condition是个接口,基本的方法就是await()和signal()方法。Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition() 。必须要注意的是,Condition的await()/signal() 使用都必须在lock保护之内,也就是说,必须在lock.lock()和lock.unlock之间才可以使用。
等待/通知机制 最经典的应用就是 生产者-消费者模型。
使用 Condition 方式实现生产者-消费者模型:
package com.tx.study.others.thread;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ProduceConsumeByLock {
public static void main(String[] args) {
MyService service = new MyService();
BootProduceConsume bpc = new BootProduceConsume(service);
bpc.bootService();
}
}
class BootProduceConsume{
private MyService service;
public BootProduceConsume(MyService service) {
this.service = service;
}
public void bootService(){
Runnable r1 = new Runnable() {
@Override
public void run() {
while (true){
service.produce();
}
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
while (true) {
service.consume();
}
}
};
Thread t1 = new Thread(r1,"生产者线程");
Thread t2 = new Thread(r2,"消费者线程");
t1.start();
t2.start();
}
}
class MyService{
private ReentrantLock lock = new ReentrantLock();
private Condition produceCdt = lock.newCondition();
private Condition consumeCdt = lock.newCondition();
private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
public void produce(){
//锁[lock.lock()]必须紧跟try代码块,且unlock()要放到finally第一行。
lock.lock();
try {
String s;
while (queue.offer(s = this.createRandomString())){
System.out.println(String.format("[%s]生产数据+++++[%s],队列大小==%d",
Thread.currentThread().getName(),s,queue.size()));
this.waitOneTime();
consumeCdt.signalAll();
}
produceCdt.await();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void consume(){
lock.lock();
try {
String s;
while (!queue.isEmpty()) {
s = queue.poll();
System.out.println(String.format("[%s]消费数据-----[%s],队列大小==%d",
Thread.currentThread().getName(),s,queue.size()));
this.waitOneTime();
produceCdt.signalAll();
}
consumeCdt.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private String createRandomString(){
int stringLength = 10;
String str = "abcdefghijklmnopqrstuvwxyz0123456789_";
int len = str.length();
StringBuilder builder = new StringBuilder();
Random random = new Random();
for (int i=0;i<stringLength;i++){
int j = random.nextInt(len);
builder.append(str.charAt(j));
}
return builder.toString();
}
private void waitOneTime(){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
打印结果:
[生产者线程]生产数据+++++[jms6jyh_2i],队列大小==1
[生产者线程]生产数据+++++[ng7el5wwj2],队列大小==2
[生产者线程]生产数据+++++[0tdbghhq2r],队列大小==3
[生产者线程]生产数据+++++[4zn1vcda_6],队列大小==4
[生产者线程]生产数据+++++[ei5pteugv7],队列大小==5
[消费者线程]消费数据-----[jms6jyh_2i],队列大小==4
[消费者线程]消费数据-----[ng7el5wwj2],队列大小==3
[消费者线程]消费数据-----[0tdbghhq2r],队列大小==2
[消费者线程]消费数据-----[4zn1vcda_6],队列大小==1
[消费者线程]消费数据-----[ei5pteugv7],队列大小==0
……
4、管道通信:使用java.io.PipedInputStream 和 java.io.PipedOutputStream进行通信
PipedInputStream类与PipedOutputStream类用于在应用程序中创建管道通信。一个PipedInputStream实例对象必须和一个PipedOutputStream实例对象进行连接而产生一个通信管道。PipedOutputStream可以向管道中写入数据,PipedIntputStream可以读取PipedOutputStream向管道中写入的数据,这两个类主要用来完成线程之间的通信。一个线程的PipedInputStream对象能够从另外一个线程的PipedOutputStream对象中读取数据,如下图所示:
PipedInputStream和PipedOutputStream的实现原理类似于”生产者-消费者”原理,PipedOutputStream是生产者,PipedInputStream是消费者。在PipedInputStream中,有一个buffer字节数组,默认大小为1024,作为缓冲区,存放”生产者”生产出来的东西。此外,还有两个变量in和out —— in用来记录”生产者”生产了多少,out是用来记录”消费者”消费了多少,in为-1表示消费完了,in==out表示生产满了。当消费者没东西可消费的时候,也就是当in为-1的时候,消费者会一直等待,直到有东西可消费。
在 Java 的 JDK 中,提供了四个类用于线程间通信:
字节流:PipedInputStream 和 PipedOutputStream;
字符流:PipedReader 和 PipedWriter;
//读线程
class ThreadRead extends Thread {
private ReadData read;
private PipedInputStream input;
public ThreadRead(ReadData read, PipedInputStream input) {
super();
this.read = read;
this.input = input;
}
public void readMethod(PipedInputStream input) {
try {
System.out.println("read :");
byte[] byteArray = new byte[20];
int readLength = input.read(byteArray);
while (readLength != -1) {
String newData = new String(byteArray, 0, readLength);
System.out.print(newData);
readLength = input.read(byteArray);
}
System.out.println();
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
this.readMethod(input);
}
}
//写线程
class ThreadWrite extends Thread {
private WriteData write;
private PipedOutputStream out;
public ThreadWrite(WriteData write, PipedOutputStream out) {
super();
this.write = write;
this.out = out;
}
public void writeMethod(PipedOutputStream out) {
try {
System.out.println("write :");
for (int i = 0; i < 30; i++) {
String outData = "" + (i + 1);
out.write(outData.getBytes());
System.out.print(outData);
}
System.out.println();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
this.writeMethod(out);
}
}
//测试
public class Run {
public static void main(String[] args) {
try {
WriteData writeData = new WriteData();
ReadData readData = new ReadData();
PipedInputStream inputStream = new PipedInputStream();
PipedOutputStream outputStream = new PipedOutputStream();
// inputStream.connect(outputStream); // 效果相同
outputStream.connect(inputStream);
ThreadRead threadRead = new ThreadRead(readData, inputStream);
threadRead.start();
Thread.sleep(2000);
ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
threadWrite.start();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}