Rust异步编程tokio--问题记录
最近在玩某个Rust开发的项目时,需要异步编程。对Rust很陌生,过程中遇到了很多问题。在此记录一下。
tokio::spawn 和 tokio 运行时
在 Rust 的异步编程中,tokio::spawn
和 tokio
运行时 (Runtime
) 关系密切,但它们的作用有所不同:
1. tokio::spawn
tokio::spawn
是一个用于在 tokio
运行时中并发执行异步任务的函数。它的作用类似于标准库中的 std::thread::spawn
,但它是基于 tokio
运行时的,并且用于异步任务。
-
tokio::spawn
只能在tokio
运行时的上下文中调用,否则会 panic。 -
这个函数会将传入的异步任务提交到
tokio
运行时进行调度。 -
它返回一个
JoinHandle<T>
,用于获取异步任务的返回值。
示例:
use tokio::task;
#[tokio::main]
async fn main() {
let handle = task::spawn(async {
println!("Hello from a spawned task!");
42
});
let result = handle.await.unwrap();
println!("Task returned: {}", result);
}
2. tokio
运行时 (Runtime
)
tokio
运行时(Tokio Runtime)是 Tokio 提供的异步执行环境,所有的 Tokio 任务(如 tokio::spawn
)都必须运行在 Tokio 运行时内。
-
tokio
运行时提供 任务调度器,用于管理多个异步任务的执行。 -
运行时可以是多线程(
multi-thread
,默认)或单线程(current_thread
)。 -
运行时的创建方式:
-
使用
#[tokio::main]
宏自动创建运行时。 -
使用
tokio::runtime::Runtime::new()
手动创建运行时。
-
示例(手动创建运行时):
use tokio::runtime::Runtime;
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
tokio::spawn(async {
println!("Task running inside manually created runtime");
}).await.unwrap();
});
}
自动创建运行时的例子见上文中的
tokio::spawn的示例
。
3. 关系
-
tokio::spawn
依赖tokio
运行时,它不能独立运行,必须在 Tokio 运行时环境中执行。 -
tokio::main
和tokio::runtime::Runtime::new()
负责 创建运行时,确保tokio::spawn
有运行的上下文。 -
运行时可以同时运行多个
tokio::spawn
任务,并在后台管理它们的执行。
4. 关键点总结
Feature | tokio::spawn | tokio::runtime::Runtime |
---|---|---|
作用 | 启动异步任务 | 提供任务执行环境 |
依赖 | 必须在 Tokio 运行时内 | 需要手动或自动创建 |
运行方式 | 在现有运行时中运行 | 需要 #[tokio::main] 或 Runtime::new() |
是否创建运行时 | 否 | 是 |
如果你要运行 tokio::spawn
,确保 已经启动了 Tokio 运行时,否则程序会 panic。例如,下面的代码会 panic:
use tokio::task;
fn main() {
let handle = task::spawn(async {
println!("This will panic!");
});
// 这里没有 tokio 运行时,任务无法执行
}
要修复它,需要在 Tokio 运行时内运行:
use tokio::task;
use tokio::runtime::Runtime;
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
task::spawn(async {
println!("This works!");
}).await.unwrap();
});
}
5. 何时使用 tokio::spawn
与 Runtime
-
使用
tokio::spawn
:-
你已经在 Tokio 运行时中,并希望并发执行任务。
-
你不关心任务的返回值,或者愿意使用
JoinHandle<T>
获取结果。
-
-
使用
tokio::runtime::Runtime
:-
你在一个非异步的
main()
函数或其他同步环境中,需要手动创建异步运行时。 -
你希望在同步代码中运行
async
代码。
-
-
不能同时使用
可能出现的错误:#[tokio::main]
和Runtime::new(),违反了Tokio的“单线程不能嵌套运行时”规则:
Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
如何解决数据竞争?
在
tokio::spawn
运行的异步任务中,如果多个任务需要访问和修改同一份数据,可能会出现 数据竞争(data race) 问题。由于 Rust 具有严格的所有权和借用检查机制,它会在编译时阻止数据竞争,但可能仍然需要使用 并发安全的机制 来正确地共享数据。
1. 使用 Arc<Mutex<T>>
或 Arc<RwLock<T>>
进行数据共享
当多个任务需要 可变访问 共享数据时,使用 std::sync::Arc
和 tokio::sync::Mutex
(或 tokio::sync::RwLock
)是常见的解决方案:
-
Arc<T>
(原子引用计数):允许多个任务共享所有权。 -
tokio::sync::Mutex<T>
:提供异步互斥锁,防止多个任务同时修改数据。 -
tokio::sync::RwLock<T>
:提供读写锁,支持多个读者或一个写者。
示例:使用 Arc<Mutex<T>>
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0)); // 共享计数器
let mut handles = vec![];
for _ in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = tokio::spawn(async move {
let mut num = counter_clone.lock().await;
*num += 1; // 线程安全的修改
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
println!("Final count: {}", *counter.lock().await);
}
注意:
这里
Arc
允许多个任务共享Mutex<T>
。
lock().await
是异步的,不会阻塞整个线程。
2. 使用 tokio::sync::RwLock<T>
允许并发读取
如果数据需要 多任务并发读取,但偶尔修改,可以使用 tokio::sync::RwLock<T>
,它提供:
-
多个任务可以同时读取
-
写入时互斥
示例:
use std::sync::Arc;
use tokio::sync::RwLock;
#[tokio::main]
async fn main() {
let data = Arc::new(RwLock::new(5));
let read_data = Arc::clone(&data);
let read_task = tokio::spawn(async move {
let num = read_data.read().await;
println!("Read value: {}", *num);
});
let write_data = Arc::clone(&data);
let write_task = tokio::spawn(async move {
let mut num = write_data.write().await;
*num += 10;
println!("Updated value: {}", *num);
});
read_task.await.unwrap();
write_task.await.unwrap();
}
适用于 多读少写 的场景,如缓存或配置共享。
3. 让数据的所有权在线程内传递(避免共享)
如果设计成 每个任务都拥有数据的独立副本,可以降低数据竞争的概率。例如:
-
使用
tokio::sync::mpsc
(多生产者单消费者) 让任务之间通过 消息传递 共享数据,而不是直接修改共享数据。
示例:使用 tokio::sync::mpsc
进行任务间通信
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(10);
// 生产者任务
tokio::spawn(async move {
for i in 0..5 {
if tx.send(i).await.is_err() {
println!("Receiver dropped, stopping");
return;
}
}
});
// 消费者任务
while let Some(val) = rx.recv().await {
println!("Received: {}", val);
}
}
适用于 任务间数据传递,而不是直接共享变量。
根据应用场景选择合适的锁解决数据竞争问题
方案 | 适用场景 | 额外开销 | 适用于 Tokio 任务 |
---|---|---|---|
Arc<Mutex<T>> | 多个任务读写同一数据 | 高(锁的等待) | ✅ 适用于 Tokio(异步锁) |
Arc<RwLock<T>> | 多任务并发读,少量写 | 中等(写时锁定) | ✅ 适用于 Tokio(异步锁) |
tokio::sync::mpsc | 任务间数据传递 | 低 | ✅ 适用于 Tokio(异步通道) |
线程内数据独占 | 避免共享,所有权传递 | 低 | ✅ 适用于 async move |
如果在 tokio::spawn
任务之间遇到 数据竞争,建议:
-
如果任务需要并发写入数据,使用
Arc<Mutex<T>>
-
如果任务主要是读取数据,偶尔修改 ,使用
Arc<RwLock<T>>
-
如果数据只需要在任务间传递,不共享修改,使用
tokio::sync::mpsc
-
如果可以避免共享数据,让数据所有权随任务
async move
传递
互斥锁
锁类型
当程序中使用到了互斥锁,在编译程序时可能会遇到如下问题:
| |__________________^ future created by async block is not
Send
| = help: within
{async block@src/main.rs:126:30: 126:40}
, the traitstd::marker::Send
is not implemented forstd::sync::MutexGuard<'_, HashMap<pubkey::Pubkey, TokenTrackingInfo>>
note: future is notSend
as this value is used across an await
解决方案:使用 tokio::sync::Mutex
代替 std::sync::Mutex
- Tokio 提供了 异步互斥锁
tokio::sync::Mutex
,它适用于异步任务 - 而
std::sync::Mutex
只适用于同步代码。
锁获取
std::sync::Mutex
使用.lock().unwrap()
tokio::sync::Mutex
需要.lock().await
std::sync::Mutex<T> | tokio::sync::Mutex<T> | |
---|---|---|
锁的方式 | 同步 | 异步 |
lock() 返回值 | MutexGuard<T> (立即返回) | Future<MutexGuard<T>> (需要 .await ) |
解锁方式 | 离开作用域自动解锁 | lock().await 结束后自动解锁 |
适用场景 | 同步代码,阻塞线程 | 异步代码,不会阻塞线程 |
变量如何在函数之间传递?
答:使用Arc::clone()
处理变量。如:
let tokens = Arc::new(Mutex::new(HashMap::<Pubkey, TokenTrackingInfo>::new()));
let tokens_clone = Arc::clone(&tokens); // ✅ 共享同一个 HashMap
-
Arc::clone(&tokens)
不会复制HashMap
,只是增加Arc
的引用计数。 -
tokens_clone
和tokens
共享相同的Mutex<HashMap>
,修改tokens_clone.lock().await
也会影响tokens
。
🚨 但如果你用 std::clone()
直接克隆 HashMap
(错误做法):
let tokens = Mutex::new(HashMap::<Pubkey, TokenTrackingInfo>::new());
let tokens_clone = tokens.clone(); // ❌ 这里会复制 HashMap
-
tokens.clone()
会 直接复制HashMap
,变成一个新的HashMap
实例,而不是共享数据。 -
修改
tokens_clone.lock().await
不会影响tokens.lock().await
,因为它们是两个不同的HashMap
。 -
这样你的多个任务就会各自修改自己的副本,导致数据不一致。
总结:
方法 | 是否共享 HashMap | 影响原数据吗? |
---|---|---|
Arc::clone(&tokens) | ✅ 是 | ✅ 影响原数据 |
tokens.clone() (如果 tokens 是 Arc<T> ) | ✅ 是 | ✅ 影响原数据 |
tokens.clone() (如果 tokens 是 Mutex<HashMap> ) | ❌ 否 | ❌ 不影响原数据(复制了 HashMap) |
写在最后
本文介绍了rust异步编程时需要注意的关键事项,包括tokio运行时、tokio异步任务以及锁机制,以及异步编程过程中遇到的问题和解决方案。