当前位置: 首页 > article >正文

Rust从入门到精通之精通篇:23.高级并发模式

高级并发模式

在 Rust 精通篇中,我们将深入探索 Rust 的高级并发编程模式。Rust 的所有权系统和类型系统为并发编程提供了强大的安全保障,使我们能够在编译时捕获大多数并发错误。在本章中,我们将超越基本的并发原语,探索更复杂的并发模式和无锁数据结构。

并发模型回顾

在深入高级主题之前,让我们简要回顾 Rust 的并发模型:

use std::thread;
use std::sync::{Arc, Mutex};
use std::time::Duration;

fn main() {
    // 共享状态并发
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
            thread::sleep(Duration::from_millis(10));
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("最终计数: {}", *counter.lock().unwrap());
}

Rust 的并发模型基于以下核心概念:

  1. 线程安全:通过所有权和类型系统在编译时防止数据竞争
  2. 消息传递:使用通道(channels)在线程间传递消息
  3. 共享状态:使用 MutexRwLock 等同步原语安全地共享数据
  4. Send 和 Sync trait:控制类型在线程间的安全传递和共享

高级同步原语

读写锁(RwLock)

RwLock 允许多个读取器或单个写入器同时访问数据,适用于读多写少的场景:

use std::sync::{Arc, RwLock};
use std::thread;

fn main() {
    let data = Arc::new(RwLock::new(vec![1, 2, 3]));
    let mut handles = vec![];
    
    // 创建多个读取线程
    for i in 0..3 {
        let data = Arc::clone(&data);
        handles.push(thread::spawn(move || {
            let data = data.read().unwrap();
            println!("读取线程 {}: {:?}", i, *data);
        }));
    }
    
    // 创建一个写入线程
    let data = Arc::clone(&data);
    handles.push(thread::spawn(move || {
        let mut data = data.write().unwrap();
        data.push(4);
        println!("写入线程: {:?}", *data);
    }));
    
    for handle in handles {
        handle.join().unwrap();
    }
}

条件变量(Condvar)

条件变量允许线程等待特定条件发生:

use std::sync::{Arc, Mutex, Condvar};
use std::thread;

fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair_clone = Arc::clone(&pair);
    
    // 消费者线程
    let consumer = thread::spawn(move || {
        let (lock, cvar) = &*pair_clone;
        let mut started = lock.lock().unwrap();
        
        // 等待直到条件变为 true
        while !*started {
            started = cvar.wait(started).unwrap();
        }
        
        println!("消费者: 条件已满足,继续执行");
    });
    
    // 给生产者一些时间启动
    thread::sleep(std::time::Duration::from_secs(1));
    
    // 生产者线程
    let (lock, cvar) = &*pair;
    let mut started = lock.lock().unwrap();
    *started = true;
    cvar.notify_one();
    println!("生产者: 条件已设置,通知消费者");
    
    consumer.join().unwrap();
}

屏障(Barrier)

屏障用于同步多个线程,确保它们同时到达某个点:

use std::sync::{Arc, Barrier};
use std::thread;

fn main() {
    let barrier = Arc::new(Barrier::new(3));
    let mut handles = vec![];
    
    for i in 0..3 {
        let barrier = Arc::clone(&barrier);
        handles.push(thread::spawn(move || {
            println!("线程 {} 正在准备...", i);
            thread::sleep(std::time::Duration::from_secs(i));
            
            // 等待所有线程到达屏障
            let wait_result = barrier.wait();
            
            // 只有一个线程会收到 BarrierWaitResult::Leader
            if wait_result.is_leader() {
                println!("所有线程已就绪,继续执行!");
            }
            
            println!("线程 {} 继续执行", i);
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

无锁数据结构

无锁(lock-free)数据结构通过原子操作而非互斥锁实现线程安全,通常具有更好的性能和可伸缩性。

原子类型

Rust 标准库提供了多种原子类型,如 AtomicBoolAtomicUsize 等:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::sync::Arc;

fn main() {
    let counter = Arc::new(AtomicUsize::new(0));
    let mut handles = vec![];
    
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        handles.push(thread::spawn(move || {
            for _ in 0..1000 {
                // 无锁递增
                counter.fetch_add(1, Ordering::SeqCst);
            }
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("最终计数: {}", counter.load(Ordering::SeqCst));
}

内存排序(Memory Ordering)

原子操作的内存排序模型控制操作的可见性和顺序:

  • Ordering::Relaxed:最弱的排序,只保证原子性
  • Ordering::Release:写操作使用,确保之前的操作不会被重排到此操作之后
  • Ordering::Acquire:读操作使用,确保之后的操作不会被重排到此操作之前
  • Ordering::AcqRel:结合 Acquire 和 Release 语义
  • Ordering::SeqCst:最强的排序,提供全局一致的顺序
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

fn main() {
    static READY: AtomicBool = AtomicBool::new(false);
    static DATA: AtomicBool = AtomicBool::new(false);
    
    thread::spawn(|| {
        // 准备数据
        DATA.store(true, Ordering::Release);
        // 发出信号表明数据已准备好
        READY.store(true, Ordering::Release);
    });
    
    // 等待数据准备好
    while !READY.load(Ordering::Acquire) {
        thread::yield_now();
    }
    
    // 数据现在可以安全访问
    assert!(DATA.load(Ordering::Acquire));
    println!("数据已成功同步");
}

实现简单的无锁队列

下面是一个基于原子操作的简单无锁队列实现:

use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr::null_mut;

pub struct Node<T> {
    data: T,
    next: AtomicPtr<Node<T>>,
}

pub struct LockFreeQueue<T> {
    head: AtomicPtr<Node<T>>,
    tail: AtomicPtr<Node<T>>,
}

impl<T> LockFreeQueue<T> {
    pub fn new() -> Self {
        let sentinel = Box::into_raw(Box::new(Node {
            data: unsafe { std::mem::zeroed() },
            next: AtomicPtr::new(null_mut()),
        }));
        
        LockFreeQueue {
            head: AtomicPtr::new(sentinel),
            tail: AtomicPtr::new(sentinel),
        }
    }
    
    pub fn enqueue(&self, data: T) {
        let new_node = Box::into_raw(Box::new(Node {
            data,
            next: AtomicPtr::new(null_mut()),
        }));
        
        loop {
            let tail = self.tail.load(Ordering::Acquire);
            let next = unsafe { (*tail).next.load(Ordering::Acquire) };
            
            if tail == self.tail.load(Ordering::Acquire) {
                if next.is_null() {
                    // 尝试将新节点添加到尾部
                    if unsafe { (*tail).next.compare_exchange(
                        null_mut(),
                        new_node,
                        Ordering::Release,
                        Ordering::Relaxed,
                    ) }.is_ok() {
                        // 更新尾指针
                        let _ = self.tail.compare_exchange(
                            tail,
                            new_node,
                            Ordering::Release,
                            Ordering::Relaxed,
                        );
                        return;
                    }
                } else {
                    // 尾指针落后,帮助更新
                    let _ = self.tail.compare_exchange(
                        tail,
                        next,
                        Ordering::Release,
                        Ordering::Relaxed,
                    );
                }
            }
        }
    }
    
    pub fn dequeue(&self) -> Option<T> {
        loop {
            let head = self.head.load(Ordering::Acquire);
            let tail = self.tail.load(Ordering::Acquire);
            let next = unsafe { (*head).next.load(Ordering::Acquire) };
            
            if head == self.head.load(Ordering::Acquire) {
                if head == tail {
                    if next.is_null() {
                        // 队列为空
                        return None;
                    }
                    // 尾指针落后,帮助更新
                    let _ = self.tail.compare_exchange(
                        tail,
                        next,
                        Ordering::Release,
                        Ordering::Relaxed,
                    );
                } else {
                    // 读取数据
                    let data = unsafe { std::ptr::read(&(*next).data) };
                    
                    // 尝试更新头指针
                    if self.head.compare_exchange(
                        head,
                        next,
                        Ordering::Release,
                        Ordering::Relaxed,
                    ).is_ok() {
                        // 释放旧的哨兵节点
                        unsafe { Box::from_raw(head) };
                        return Some(data);
                    }
                }
            }
        }
    }
}

impl<T> Drop for LockFreeQueue<T> {
    fn drop(&mut self) {
        while self.dequeue().is_some() {}
        
        // 释放哨兵节点
        let sentinel = self.head.load(Ordering::Relaxed);
        if !sentinel.is_null() {
            unsafe { Box::from_raw(sentinel) };
        }
    }
}

Actor 模型

Actor 模型是一种并发编程范式,其中每个 actor 是一个独立的计算单元,通过消息传递进行通信。

使用 Actix 实现 Actor 系统

Actix 是 Rust 中流行的 Actor 框架:

use actix::prelude::*;

// 定义消息
#[derive(Message)]
#[rtype(result = "String")]
struct Ping(String);

// 定义 Actor
struct MyActor;

impl Actor for MyActor {
    type Context = Context<Self>;
    
    fn started(&mut self, ctx: &mut Self::Context) {
        println!("Actor 已启动");
    }
}

// 实现消息处理
impl Handler<Ping> for MyActor {
    type Result = String;
    
    fn handle(&mut self, msg: Ping, _ctx: &mut Context<Self>) -> Self::Result {
        println!("收到消息: {}", msg.0);
        format!("Pong: {}", msg.0)
    }
}

#[actix_rt::main]
async fn main() {
    // 创建 actor
    let addr = MyActor.start();
    
    // 发送消息并等待响应
    let res = addr.send(Ping("Hello Actor".to_string())).await;
    
    match res {
        Ok(result) => println!("收到响应: {}", result),
        Err(err) => println!("错误: {}", err),
    }
    
    System::current().stop();
}

工作窃取(Work Stealing)

工作窃取是一种任务调度算法,允许空闲线程从繁忙线程的队列中


http://www.kler.cn/a/612259.html

相关文章:

  • FPGA中串行执行方式之使用时钟分频或延迟的方式
  • 光流 | 基于KLT算法的人脸检测与跟踪原理及公式,算法改进,matlab代码
  • Git入门——常用指令汇总
  • STM32 ADC 温度采集 可穿戴体温测量仪LMT70
  • Qt弹出新窗口并关闭(两个按钮)
  • 资本运营:基于Python实现的资本运作模拟
  • Java中用Stream流取出分组后每组最大值对象的ID
  • AI编辑器-Trae 玩转AI 编程
  • 在rockylinux9.4安装mongodb报错:缺少:libcrypto.so.10文件库
  • 【docker】Dockerfile中ENTRYPOINT和CMD区别理解
  • 如何使用DeepSeek编写测试用例?
  • 2025年前端八股文整理持续更新中(css+js+vue)
  • 23种设计模式-创建型模式-建造者
  • Linux 指令篇:tar 命令详解与实战
  • ADB->查看具体应用包名、安装路径、所有应用包名输出到文件
  • 蓝桥杯--bfs专题第二个题目(leetcode103二叉树)
  • [操作系统] 进程间通信:命名管道原理与操作
  • 使用ProcessBuilder执行FFmpeg命令,进程一直处于阻塞状态,一直没有返回执行结果
  • PHP MySQL 预处理语句
  • 基于yolov11的铁路轨道铁轨缺陷检测系统python源码+pytorch模型+评估指标曲线+精美GUI界面