当前位置: 首页 > article >正文

Rust异步编程tokio--问题记录

最近在玩某个Rust开发的项目时,需要异步编程。对Rust很陌生,过程中遇到了很多问题。在此记录一下。


tokio::spawn 和 tokio 运行时

在 Rust 的异步编程中,tokio::spawntokio 运行时 (Runtime) 关系密切,但它们的作用有所不同:

1. tokio::spawn

tokio::spawn 是一个用于在 tokio 运行时中并发执行异步任务的函数。它的作用类似于标准库中的 std::thread::spawn,但它是基于 tokio 运行时的,并且用于异步任务。

  • tokio::spawn 只能在 tokio 运行时的上下文中调用,否则会 panic。

  • 这个函数会将传入的异步任务提交到 tokio 运行时进行调度。

  • 它返回一个 JoinHandle<T>,用于获取异步任务的返回值。

示例:

use tokio::task;

#[tokio::main]
async fn main() {
    let handle = task::spawn(async {
        println!("Hello from a spawned task!");
        42
    });

    let result = handle.await.unwrap();
    println!("Task returned: {}", result);
}

2. tokio 运行时 (Runtime)

tokio 运行时(Tokio Runtime)是 Tokio 提供的异步执行环境,所有的 Tokio 任务(如 tokio::spawn)都必须运行在 Tokio 运行时内

  • tokio 运行时提供 任务调度器,用于管理多个异步任务的执行。

  • 运行时可以是多线程(multi-thread,默认)或单线程(current_thread)。

  • 运行时的创建方式:

    1. 使用 #[tokio::main] 宏自动创建运行时。

    2. 使用 tokio::runtime::Runtime::new() 手动创建运行时。

示例(手动创建运行时):

use tokio::runtime::Runtime;

fn main() {
    let rt = Runtime::new().unwrap();
    
    rt.block_on(async {
        tokio::spawn(async {
            println!("Task running inside manually created runtime");
        }).await.unwrap();
    });
}

自动创建运行时的例子见上文中的tokio::spawn的示例


3. 关系

  • tokio::spawn 依赖 tokio 运行时,它不能独立运行,必须在 Tokio 运行时环境中执行。

  • tokio::maintokio::runtime::Runtime::new() 负责 创建运行时,确保 tokio::spawn 有运行的上下文。

  • 运行时可以同时运行多个 tokio::spawn 任务,并在后台管理它们的执行。


4. 关键点总结

Featuretokio::spawntokio::runtime::Runtime
作用启动异步任务提供任务执行环境
依赖必须在 Tokio 运行时内需要手动或自动创建
运行方式在现有运行时中运行需要 #[tokio::main]Runtime::new()
是否创建运行时

如果你要运行 tokio::spawn,确保 已经启动了 Tokio 运行时,否则程序会 panic。例如,下面的代码会 panic:

use tokio::task;

fn main() {
    let handle = task::spawn(async {
        println!("This will panic!");
    });

    // 这里没有 tokio 运行时,任务无法执行
}

要修复它,需要在 Tokio 运行时内运行:

use tokio::task;
use tokio::runtime::Runtime;

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        task::spawn(async {
            println!("This works!");
        }).await.unwrap();
    });
}

5. 何时使用 tokio::spawnRuntime

  • 使用 tokio::spawn

    • 你已经在 Tokio 运行时中,并希望并发执行任务。

    • 你不关心任务的返回值,或者愿意使用 JoinHandle<T> 获取结果。

  • 使用 tokio::runtime::Runtime

    • 你在一个非异步的 main() 函数或其他同步环境中,需要手动创建异步运行时。

    • 你希望在同步代码中运行 async 代码。

  • 不能同时使用 #[tokio::main] 和 Runtime::new(),违反了Tokio的“单线程不能嵌套运行时”规则

    可能出现的错误:
Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.


如何解决数据竞争?

tokio::spawn 运行的异步任务中,如果多个任务需要访问和修改同一份数据,可能会出现 数据竞争(data race) 问题。由于 Rust 具有严格的所有权和借用检查机制,它会在编译时阻止数据竞争,但可能仍然需要使用 并发安全的机制 来正确地共享数据。

1. 使用 Arc<Mutex<T>>Arc<RwLock<T>> 进行数据共享

当多个任务需要 可变访问 共享数据时,使用 std::sync::Arctokio::sync::Mutex(或 tokio::sync::RwLock)是常见的解决方案:

  • Arc<T>(原子引用计数):允许多个任务共享所有权。

  • tokio::sync::Mutex<T>:提供异步互斥锁,防止多个任务同时修改数据。

  • tokio::sync::RwLock<T>:提供读写锁,支持多个读者或一个写者。

示例:使用 Arc<Mutex<T>>

use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let counter = Arc::new(Mutex::new(0)); // 共享计数器

    let mut handles = vec![];
    for _ in 0..10 {
        let counter_clone = Arc::clone(&counter);
        let handle = tokio::spawn(async move {
            let mut num = counter_clone.lock().await;
            *num += 1; // 线程安全的修改
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }

    println!("Final count: {}", *counter.lock().await);
}

注意

  • 这里 Arc 允许多个任务共享 Mutex<T>

  • lock().await 是异步的,不会阻塞整个线程。


2. 使用 tokio::sync::RwLock<T> 允许并发读取

如果数据需要 多任务并发读取,但偶尔修改,可以使用 tokio::sync::RwLock<T>,它提供:

  • 多个任务可以同时读取

  • 写入时互斥

示例:

use std::sync::Arc;
use tokio::sync::RwLock;

#[tokio::main]
async fn main() {
    let data = Arc::new(RwLock::new(5));

    let read_data = Arc::clone(&data);
    let read_task = tokio::spawn(async move {
        let num = read_data.read().await;
        println!("Read value: {}", *num);
    });

    let write_data = Arc::clone(&data);
    let write_task = tokio::spawn(async move {
        let mut num = write_data.write().await;
        *num += 10;
        println!("Updated value: {}", *num);
    });

    read_task.await.unwrap();
    write_task.await.unwrap();
}

适用于 多读少写 的场景,如缓存或配置共享。


3. 让数据的所有权在线程内传递(避免共享)

如果设计成 每个任务都拥有数据的独立副本,可以降低数据竞争的概率。例如:

  • 使用 tokio::sync::mpsc(多生产者单消费者) 让任务之间通过 消息传递 共享数据,而不是直接修改共享数据。

示例:使用 tokio::sync::mpsc 进行任务间通信

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(10);

    // 生产者任务
    tokio::spawn(async move {
        for i in 0..5 {
            if tx.send(i).await.is_err() {
                println!("Receiver dropped, stopping");
                return;
            }
        }
    });

    // 消费者任务
    while let Some(val) = rx.recv().await {
        println!("Received: {}", val);
    }
}

适用于 任务间数据传递,而不是直接共享变量。


根据应用场景选择合适的锁解决数据竞争问题

方案适用场景额外开销适用于 Tokio 任务
Arc<Mutex<T>>多个任务读写同一数据高(锁的等待)✅ 适用于 Tokio(异步锁)
Arc<RwLock<T>>多任务并发读,少量写中等(写时锁定)✅ 适用于 Tokio(异步锁)
tokio::sync::mpsc任务间数据传递✅ 适用于 Tokio(异步通道)
线程内数据独占避免共享,所有权传递✅ 适用于 async move

如果在 tokio::spawn 任务之间遇到 数据竞争,建议:

  1. 如果任务需要并发写入数据,使用 Arc<Mutex<T>>

  2. 如果任务主要是读取数据,偶尔修改 ,使用 Arc<RwLock<T>>

  3. 如果数据只需要在任务间传递,不共享修改,使用 tokio::sync::mpsc

  4. 如果可以避免共享数据,让数据所有权随任务 async move 传递


互斥锁

锁类型

当程序中使用到了互斥锁,在编译程序时可能会遇到如下问题:

| |__________________^ future created by async block is not Send

| = help: within {async block@src/main.rs:126:30: 126:40}, the trait std::marker::Send is not implemented for std::sync::MutexGuard<'_, HashMap<pubkey::Pubkey, TokenTrackingInfo>> note: future is not Send as this value is used across an await

解决方案:使用 tokio::sync::Mutex 代替 std::sync::Mutex

  • Tokio 提供了 异步互斥锁 tokio::sync::Mutex,它适用于异步任务
  • std::sync::Mutex 只适用于同步代码

锁获取

  • std::sync::Mutex 使用 .lock().unwrap()
  • tokio::sync::Mutex 需要 .lock().await

std::sync::Mutex<T>tokio::sync::Mutex<T>
锁的方式同步异步
lock() 返回值MutexGuard<T>(立即返回)Future<MutexGuard<T>>(需要 .await
解锁方式离开作用域自动解锁lock().await 结束后自动解锁
适用场景同步代码,阻塞线程异步代码,不会阻塞线程

变量如何在函数之间传递?

答:使用Arc::clone() 处理变量。如:

let tokens = Arc::new(Mutex::new(HashMap::<Pubkey, TokenTrackingInfo>::new()));
let tokens_clone = Arc::clone(&tokens);  // ✅ 共享同一个 HashMap
  • Arc::clone(&tokens) 不会复制 HashMap,只是增加 Arc引用计数

  • tokens_clone tokens 共享相同的 Mutex<HashMap>,修改 tokens_clone.lock().await 也会影响 tokens

🚨 但如果你用 std::clone() 直接克隆 HashMap(错误做法):

let tokens = Mutex::new(HashMap::<Pubkey, TokenTrackingInfo>::new());
let tokens_clone = tokens.clone();  // ❌ 这里会复制 HashMap
  • tokens.clone()直接复制 HashMap,变成一个新的 HashMap 实例,而不是共享数据。

  • 修改 tokens_clone.lock().await 不会影响 tokens.lock().await,因为它们是两个不同的 HashMap

  • 这样你的多个任务就会各自修改自己的副本,导致数据不一致。

总结:

方法是否共享 HashMap影响原数据吗?
Arc::clone(&tokens)✅ 是✅ 影响原数据
tokens.clone()(如果 tokensArc<T>✅ 是✅ 影响原数据
tokens.clone()(如果 tokensMutex<HashMap>❌ 否❌ 不影响原数据(复制了 HashMap)

写在最后

本文介绍了rust异步编程时需要注意的关键事项,包括tokio运行时、tokio异步任务以及锁机制,以及异步编程过程中遇到的问题和解决方案。


http://www.kler.cn/a/600985.html

相关文章:

  • 数智读书笔记系列024《主数据驱动的数据治理 —— 原理、技术与实践》
  • 基于SpringBoot的电影售票系统
  • 【网络】HTTP 和 HTTPS
  • [网鼎杯 2020 白虎组]PicDown1 [反弹shell] [敏感文件路径] [文件描述符]
  • 2、二分和贪心
  • S32K3 RAM ECC 的问题
  • 《似锦》:曹兴昱—残暴和孝顺并不冲突家庭成长环境分析以命抵命逻辑悖论
  • 代码随想录Day23
  • Scrapy——Redis空闲超时关闭扩展
  • Spring 源码硬核解析系列专题(三十二):Spring Cloud LoadBalancer 的负载均衡源码解析
  • 数据库的操作,以及sql之DML
  • Linux输入系统应用编程
  • 字符串常量,数组和指针的不同形式
  • uv:Rust 驱动的 Python 包管理新时代
  • 飞书只有阅读权限的文档下载,飞书文档下载没有权限的文件
  • Qt 线程类
  • 详解c++20的协程,自定义可等待对象,生成器详解
  • <tauri><rust><GUI>基于rust和tauri,实现多窗口与窗口间通信
  • ISIS-2 邻居建立关系
  • Python 编程中函数嵌套的相关解析