当前位置: 首页 > article >正文

【第十一课】Rust并发编程(二)

目录

前言

Channel

多生产者


前言

在上一节中,我们介绍了Rust中并发编程的方式之一:Fork和Join,通过新建线程提升代码的效率,这节课我们介绍并发编程的第二种方式:通道。Channel就类似于水管,通过Channel可以连接多个线程,达到多个线程之间协调作业。

Channel

我们以一个简单的需求为例来解释下Channel的使用方法。完成WordCount,使用2个线程,线程1从文件中读取数据,将数据通过Channel发送给线程2,线程2负责计算wordcount。

注意:

(1)下面代码中文件路径使用的是绝对路径

(2)使用mpsc::channel()可以创建一个通道,下面代码还指定了泛型,即在下面的代码中通道中只可以发送String类型的数据

(3)通道的返回值是一个发送者,一个消费者,发送者给线程1中的闭包,因为线程1负责读数据发送到通道中,消费者给线程2中的闭包,线程2负责读取数据处理wordcount。

(4)mpsc:指的是多生产者,单消费者的意思,即multi-producer,single-consumer。

use std::collections::HashMap;
use std::sync::mpsc;
use std::thread::JoinHandle;
use std::{fs, io, thread};

fn main() {

    let paths = vec!["/Users/xxxxx/rustproject/lesson11/src/test.txt".to_string()];

    let (receiver, handler1) = start_file_reader_thread(paths);

    let handler2 = start_file_word_count_thread(receiver);

    let _ = handler1.join().unwrap();
    let _ = handler2.join().unwrap();
}

// 读取文件内容
fn start_file_reader_thread(
    documents: Vec<String>,
) -> (mpsc::Receiver<String>, JoinHandle<Result<(), io::Error>>) {
    let (sender, receiver) = mpsc::channel::<String>();

    let handle = thread::spawn(move || -> Result<(), io::Error> {
        for filename in documents {
            let text = fs::read_to_string(filename)?;
            if sender.send(text).is_err() {
                break;
            }
        }
        Ok(())
    });

    (receiver, handle)
}

// word count
fn start_file_word_count_thread(texts: mpsc::Receiver<String>) -> JoinHandle<()> {
    let handle = thread::spawn(move || {
        // 处理
        let mut wc: HashMap<String, u32> = HashMap::new();
        for line in texts {
            let words: Vec<String> = line.split(" ").map(|x| x.to_string()).collect();
            for word in words {
                match wc.get(&word) {
                    None => {
                        wc.insert(word, 1);
                    }
                    Some(old) => {
                        wc.insert(word, old + 1);
                    }
                }
            }
        }
        // 打印
        for (word, cnt) in wc {
            println!("key = {}, count = {}", word, cnt);
        }
    });

    handle
}

多生产者

上面是单生产者的例子,我们扩展一下,使用多生产者。

注意:
(1)使用生产者的clone方法,扩展出多个生产者传递给不同的线程发送消息。

use std::collections::HashMap;
use std::sync::mpsc;
use std::{fs, io, thread};

fn main() {
    let path1 = vec!["/Users/xxx/rustproject/lesson11/src/test1.txt".to_string()];
    let path2 = vec!["/Users/xxx/rustproject/lesson11/src/test2.txt".to_string()];

    let (producer, consumer) = mpsc::channel::<String>();

    let producer1 = producer.clone();

    // 生产者
    let handler1 = thread::spawn(move || -> Result<(), io::Error> {
        for filename in path1 {
            let text = fs::read_to_string(filename)?;
            if producer.send(text).is_err() {
                break;
            }
        }
        Ok(())
    });

    let handler2 = thread::spawn(move || -> Result<(), io::Error>{
        for filename in path2 {
            let text = fs::read_to_string(filename)?;
            if producer1.send(text).is_err() {
                break;
            }
        }
        Ok(())
    });

    // 消费者
    let handler3 = thread::spawn(move || {
        // 处理
        let mut wc: HashMap<String, u32> = HashMap::new();
        for line in consumer {
            let words: Vec<String> = line.split(" ").map(|x| x.to_string()).collect();
            for word in words {
                match wc.get(&word) {
                    None => {
                        wc.insert(word, 1);
                    }
                    Some(old) => {
                        wc.insert(word, old + 1);
                    }
                }
            }
        }
        // 打印
        for (word, cnt) in wc {
            println!("key = {}, count = {}", word, cnt);
        }
    });


    //
    handler1.join();
    handler2.join();
    handler3.join();
}


http://www.kler.cn/a/414940.html

相关文章:

  • Mac 系统上控制台常用性能查看命令
  • MTK 展锐 高通 sensorhub架构
  • 利用zabbix自定义脚本监控MySQL基础状态
  • Linux三剑客-awk
  • 【Qt】QDateTimeEdit控件实现清空(不保留默认时间/最小时间)
  • 【AI绘画】Midjourney进阶:色调详解(下)
  • Linux(ubuntu)系统的一些基本操作和命令(持续更新)
  • 平安科技大数据面试题及参考答案
  • React前端面试题详解(一)
  • 泷羽sec---shell作业
  • JVM系列之OOM实战
  • 【论文阅读】Federated learning backdoor attack detection with persistence diagram
  • idea新建springboot web项目
  • YOLOv8-ultralytics-8.2.103部分代码阅读笔记-autobatch.py
  • 【UE5 C++课程系列笔记】05——组件和碰撞
  • Ubuntu nvidia-cuda-toolkit 升级
  • Chrome://常用的内部页面地址
  • java回文数
  • MySQL 启动失败问题分析与解决方案:`mysqld.service failed to run ‘start-pre‘ task`
  • 在 Ubuntu 18.04 上安装 MySQL 5.7和MySQL 8
  • 【网络安全 | 漏洞挖掘】绕过SAML认证获得管理员面板访问权限
  • Python知识分享第九天补充
  • rocylinux9.4安装prometheus监控
  • js:循环、数组
  • 网络技术-VRRP(虚拟路由冗余协议)部署介绍
  • Element UI Collapse 折叠面板和表格结合高度闪动问题