当前位置: 首页 > article >正文

多线程之间的通讯

Java中线程通信协作的最常见的两种方式:

一.synchronized加锁的线程:通过Object类的wait()/notify()/notifyAll()

二.ReentrantLock类加锁的线程:通过Condition接口的await()/signal()/signalAll()

线程间直接的数据交换:

三.通过管道进行线程间通信:1)字节流;2)字符流 

1、同步:即多个线程通过synchronized关键字来实现线程间的通信

public class MyObject {
    public void synchronized methodA() {
        //do something....
    }
    public void synchronized methodB() {
        //do some other thing
    }
}

public class ThreadA extends Thread {
    private MyObject object;
    //省略构造方法
    @Override
    public void run() {
        super.run();
        object.methodA();
    }
}

public class ThreadB extends Thread {
    private MyObject object;
    //省略构造方法
    @Override
    public void run() {
        super.run();
        object.methodB();
    }
}

public class Run {
    public static void main(String[] args) {
        MyObject object = new MyObject();
        //线程A与线程B 持有的是同一个对象:object
        ThreadA a = new ThreadA(object);
        ThreadB b = new ThreadB(object);
        a.start();
        b.start();
    }
}

由于线程A和线程B持有同一个MyObject类的对象object,尽管这两个线程需要调用不同的方法,但是它们是同步执行的,假如线程A先执行methodA方法,那么线程B需要等待线程A执行完了methodA()方法之后,它才能执行methodB()方法。这样,线程A和线程B就实现了通信。

这种方式,本质上就是“共享内存”式的通信。多个线程需要访问同一个共享变量,谁拿到了锁(获得了访问权限),谁就可以执行。

2、while轮询的方式

import java.util.ArrayList;
import java.util.List;
public class MyList {
    private List<String> list = new ArrayList<String>();
    public void add() {
        list.add("elements");
    }
    public int size() {
        return list.size();
    }
}

import mylist.MyList;
public class ThreadA extends Thread {
    private MyList list;
    public ThreadA(MyList list) {
        super();
        this.list = list;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                list.add();
                System.out.println("添加了" + (i + 1) + "个元素");
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

import mylist.MyList;
public class ThreadB extends Thread {
    private MyList list;
    public ThreadB(MyList list) {
        super();
        this.list = list;
    }
    @Override
    public void run() {
        try {
            while (true) {
                if (list.size() == 5) {
                    System.out.println("==5, 线程b准备退出了");
                    throw new InterruptedException();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

import mylist.MyList;
import extthread.ThreadA;
import extthread.ThreadB;
public class Test {
    public static void main(String[] args) {
        MyList service = new MyList();
        ThreadA a = new ThreadA(service);
        a.setName("A");
        a.start();
        ThreadB b = new ThreadB(service);
        b.setName("B");
        b.start();
    }
}

在这种方式下,线程A不断地改变条件,线程ThreadB不停地通过while语句检测这个条件(list.size()==5)是否成立 ,从而实现了线程间的通信。但是这种方式会浪费CPU资源。之所以说它浪费资源,是因为JVM调度器将CPU交给线程B执行时,它没做啥“有用”的工作,只是在不断地测试 某个条件是否成立。就类似于现实生活中,某个人一直看着手机屏幕是否有电话来了,而不是: 在干别的事情,当有电话来时,响铃通知TA电话来了。

3、wait/notify机制

import java.util.ArrayList;
import java.util.List;
public class MyList {
    private static List<String> list = new ArrayList<String>();
    public static void add() {
        list.add("anyString");
    }
    public static int size() {
        return list.size();
    }
}

public class ThreadA extends Thread {
    private Object lock;
    public ThreadA(Object lock) {
        super();
        this.lock = lock;
    }
    @Override
    public void run() {
        try {
            synchronized (lock) {
                if (MyList.size() != 5) {
                    System.out.println("wait begin " + System.currentTimeMillis());
                    lock.wait();
                    System.out.println("wait end  " + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class ThreadB extends Thread {
    private Object lock;
    public ThreadB(Object lock) {
        super();
        this.lock = lock;
    }
    @Override
    public void run() {
        try {
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                    MyList.add();
                    if (MyList.size() == 5) {
                        lock.notify();
                        System.out.println("已经发出了通知");
                    }
                    System.out.println("添加了" + (i + 1) + "个元素!");
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Run {
    public static void main(String[] args) {
        try {
            Object lock = new Object();
            ThreadA a = new ThreadA(lock);
            a.start();
            Thread.sleep(50);
            ThreadB b = new ThreadB(lock);
            b.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

线程A要等待某个条件满足时(list.size()==5),才执行操作。线程B则向list中添加元素,改变list 的size。A,B之间如何通信的呢?也就是说,线程A如何知道 list.size() 已经为5了呢?

这里用到了Object类的 wait() 和 notify() 方法。当条件未满足时(list.size() !=5),线程A调用wait() 放弃CPU,并进入等待状态。不像while轮询那样占用CPU。

当条件满足时,线程B调用 notify()通知线程A,所谓通知线程A,就是唤醒线程A,让它尝试获取锁,若获取成功,则进入可运行状态,否则进入阻塞状态。由于线程B还未执行完同步代码块,所以线程A会进入阻塞状态,等待线程B释放锁。这种方式的一个好处就是CPU的利用率提高了。

但是也有一些缺点:比如,线程B先执行,一下子添加了5个元素并调用了notify()发送了通知,而此时线程A还未执行;当线程A执行并调用wait()时,那它永远就不可能被唤醒了。因为,线程B已经发了通知了,以后不再发通知了。这说明:通知过早,会打乱程序的执行逻辑。

相比使用Object的wait()/notify(),使用Condition的await()/signal()这种方式能够更加安全和高效地实现线程间协作。Condition是个接口,基本的方法就是await()和signal()方法。Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition() 。必须要注意的是,Condition的await()/signal() 使用都必须在lock保护之内,也就是说,必须在lock.lock()和lock.unlock之间才可以使用。

等待/通知机制 最经典的应用就是 生产者-消费者模型

使用 Condition 方式实现生产者-消费者模型:

package com.tx.study.others.thread;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ProduceConsumeByLock {

    public static void main(String[] args) {
        MyService service = new MyService();
        BootProduceConsume bpc = new BootProduceConsume(service);
        bpc.bootService();
    }

}

class BootProduceConsume{

    private MyService service;

    public BootProduceConsume(MyService service) {
        this.service = service;
    }

    public void bootService(){
        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                while (true){
                    service.produce();
                }
            }
        };
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                while (true) {
                    service.consume();
                }
            }
        };
        Thread t1 = new Thread(r1,"生产者线程");
        Thread t2 = new Thread(r2,"消费者线程");
        t1.start();
        t2.start();
    }

}

class MyService{

    private ReentrantLock lock = new ReentrantLock();

    private Condition produceCdt = lock.newCondition();

    private Condition consumeCdt = lock.newCondition();

    private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

    public void produce(){
        //锁[lock.lock()]必须紧跟try代码块,且unlock()要放到finally第一行。
        lock.lock();
        try {
            String s;
            while (queue.offer(s = this.createRandomString())){
                System.out.println(String.format("[%s]生产数据+++++[%s],队列大小==%d",
                        Thread.currentThread().getName(),s,queue.size()));
                this.waitOneTime();
                consumeCdt.signalAll();
            }
            produceCdt.await();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void consume(){
        lock.lock();
        try {
            String s;
            while (!queue.isEmpty()) {
                s = queue.poll();
                System.out.println(String.format("[%s]消费数据-----[%s],队列大小==%d",
                        Thread.currentThread().getName(),s,queue.size()));
                this.waitOneTime();
                produceCdt.signalAll();
            }
            consumeCdt.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private String createRandomString(){
        int stringLength = 10;
        String str = "abcdefghijklmnopqrstuvwxyz0123456789_";
        int len = str.length();
        StringBuilder builder = new StringBuilder();
        Random random = new Random();
        for (int i=0;i<stringLength;i++){
            int j = random.nextInt(len);
            builder.append(str.charAt(j));
        }
        return builder.toString();
    }

    private void waitOneTime(){
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

打印结果:

[生产者线程]生产数据+++++[jms6jyh_2i],队列大小==1
[生产者线程]生产数据+++++[ng7el5wwj2],队列大小==2
[生产者线程]生产数据+++++[0tdbghhq2r],队列大小==3
[生产者线程]生产数据+++++[4zn1vcda_6],队列大小==4
[生产者线程]生产数据+++++[ei5pteugv7],队列大小==5
[消费者线程]消费数据-----[jms6jyh_2i],队列大小==4
[消费者线程]消费数据-----[ng7el5wwj2],队列大小==3
[消费者线程]消费数据-----[0tdbghhq2r],队列大小==2
[消费者线程]消费数据-----[4zn1vcda_6],队列大小==1
[消费者线程]消费数据-----[ei5pteugv7],队列大小==0
……

4、管道通信:使用java.io.PipedInputStream 和 java.io.PipedOutputStream进行通信 

PipedInputStream类与PipedOutputStream类用于在应用程序中创建管道通信。一个PipedInputStream实例对象必须和一个PipedOutputStream实例对象进行连接而产生一个通信管道。PipedOutputStream可以向管道中写入数据,PipedIntputStream可以读取PipedOutputStream向管道中写入的数据,这两个类主要用来完成线程之间的通信。一个线程的PipedInputStream对象能够从另外一个线程的PipedOutputStream对象中读取数据,如下图所示:

PipedInputStream和PipedOutputStream的实现原理类似于”生产者-消费者”原理,PipedOutputStream是生产者,PipedInputStream是消费者。在PipedInputStream中,有一个buffer字节数组,默认大小为1024,作为缓冲区,存放”生产者”生产出来的东西。此外,还有两个变量in和out —— in用来记录”生产者”生产了多少,out是用来记录”消费者”消费了多少,in为-1表示消费完了,in==out表示生产满了。当消费者没东西可消费的时候,也就是当in为-1的时候,消费者会一直等待,直到有东西可消费。

在 Java 的 JDK 中,提供了四个类用于线程间通信:

字节流:PipedInputStream 和 PipedOutputStream;

字符流:PipedReader 和 PipedWriter;

//读线程
class ThreadRead extends Thread {
    private ReadData read;
    private PipedInputStream input;
    public ThreadRead(ReadData read, PipedInputStream input) {
        super();
        this.read = read;
        this.input = input;
    }
    public void readMethod(PipedInputStream input) {
        try {
            System.out.println("read  :");
            byte[] byteArray = new byte[20];
            int readLength = input.read(byteArray);
            while (readLength != -1) {
                String newData = new String(byteArray, 0, readLength);
                System.out.print(newData);
                readLength = input.read(byteArray);
            }
            System.out.println();
            input.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        this.readMethod(input);
    }
}

//写线程
class ThreadWrite extends Thread {
    private WriteData write;
    private PipedOutputStream out;
    public ThreadWrite(WriteData write, PipedOutputStream out) {
        super();
        this.write = write;
        this.out = out;
    }
    public void writeMethod(PipedOutputStream out) {
        try {
            System.out.println("write :");
            for (int i = 0; i < 30; i++) {
                String outData = "" + (i + 1);
                out.write(outData.getBytes());
                System.out.print(outData);
            }
            System.out.println();
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        this.writeMethod(out);
    }
}

//测试
public class Run {
    public static void main(String[] args) {
        try {
            WriteData writeData = new WriteData();
            ReadData readData = new ReadData();
            PipedInputStream inputStream = new PipedInputStream();
            PipedOutputStream outputStream = new PipedOutputStream();

            // inputStream.connect(outputStream);   // 效果相同
            outputStream.connect(inputStream);

            ThreadRead threadRead = new ThreadRead(readData, inputStream);
            threadRead.start();
            Thread.sleep(2000);
            ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
            threadWrite.start();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


http://www.kler.cn/a/378010.html

相关文章:

  • TecoPytorch
  • SpringCloud Alibaba-05 Seata分布式事务处理
  • Linux:网络协议socket
  • git rebase 使用 - 【nolankywu】
  • 集合进阶------泛型(JAVA笔记第三十期)
  • Python小游戏19——滑雪小游戏
  • 项目复盘:TapTap聚光灯Gamejam
  • 【1】Excel快速入门的核心概念
  • 视频点播系统扩展示例
  • <项目代码>YOLOv8 夜间车辆识别<目标检测>
  • CSPJ2019-1数字游戏 (Number Games)
  • tcp shutdown, fin_wait1, fin_wait2, close_wait, last_ack, 谢特!
  • Spring源码学习(三):finishBeanFactoryInitialization
  • 线程安全的单例模式(Singleton)。
  • 轮廓图【HTML+CSS+JavaScript】
  • Java日志脱敏(二)——fastjson Filter + 注解 + 工具类实现
  • 统信UOS开发环境支持php
  • 使用Vite构建现代化前端应用
  • 使用AIM对SAP PO核心指标的自动化巡检监控
  • SQL,力扣题目1285,找到连续区间的开始和结束数字【窗口函数】
  • VsCode前端常用快捷键
  • 冷钱包与热钱包的差异 | 加密货币存储的安全方案
  • 03.DDD六边形架构
  • apache pdfbox 设置PDF表单域,Java生成PDF模板简单案例。
  • 【ARCGIS实验】地形特征线的提取
  • Spring Boot框架下校园社团信息管理的创新实践