tokio tcp通信
引入crate
tokio = { version = "1.35.1", features = ["full"] }
服务端
use std::time::Duration;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener, TcpStream,
},
sync::mpsc,
};
#[tokio::main]
async fn main() {
println!("Begin Start Server...");
let server = TcpListener::bind("127.0.0.1:10888").await.unwrap();
while let Ok((client_stream, client_addr)) = server.accept().await{
println!("accept client: {}", client_addr);
tokio::spawn(async move{
process_client(client_stream).await;
});
}
}
async fn process_client(client_stream: TcpStream){
let (client_reader, client_writer) = client_stream.into_split();
let (msg_tx, msg_rx) = mpsc::channel::<String>(100);
let mut read_task = tokio::spawn(async move {
read_from_client(client_reader, msg_tx).await;
});
let mut write_task = tokio::spawn(async move{
write_to_client(client_writer, msg_rx).await;
});
if tokio::try_join!(&mut read_task, &mut write_task).is_err() {
read_task.abort();
write_task.abort();
};
}
async fn read_from_client(reader: OwnedReadHalf, mst_tx: mpsc::Sender<String>){
let mut buf_reader = tokio::io::BufReader::new(reader);
let mut buf = String::new();
loop{
match buf_reader.read_line(&mut buf).await{
Err(_e) =>{
eprintln!("read from client error");
break;
}
Ok(0) =>{
println!("client closed");
break;
}
Ok(n) => {
buf.pop(); //去除末尾的回车符
let mut content = buf.drain(..).as_str().to_string();
println!("read {} bytes from client. content: {}", n, content);
tokio::time::sleep(Duration::from_secs(1)).await;
content.push('\n');
if mst_tx.send(content).await.is_err(){
eprintln!("receiver closed");
break;
}
}
}
}
}
async fn write_to_client(writer: OwnedWriteHalf, mut msg_rx: mpsc::Receiver<String>){
let mut buf_writer = tokio::io::BufWriter::new(writer);
while let Some(mut str) = msg_rx.recv().await{
//str.push('\n');
if let Err(e) = buf_writer.write_all(str.as_bytes()).await {
eprintln!("write to client failed: {}", e);
break;
}
buf_writer.flush().await;
print!("write to client: {}", str);
}
}
客户端
use std::sync;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{Local, NaiveDateTime};
use tokio::{
io::{Interest, AsyncBufReadExt, AsyncWriteExt},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
},
sync::mpsc,
};
#[tokio::main]
async fn main() {
let stream = TcpStream::connect("127.0.0.1:10888").await.unwrap();
let (reader, writer) = stream.into_split();
let mut buf = String::new();//[0u8, 12];
let mut buf_reader = tokio::io::BufReader::new(reader);
let mut buf_writer = tokio::io::BufWriter::new(writer);
loop{
let now=Local::now();
let formatted=now.format("%Y-%m-%d %H:%M:%S");
let content = format!("hello world {}\n", formatted);
//buf_writer.write_all(b"hello world\n").await;
buf_writer.write_all(content.as_bytes()).await;
buf_writer.flush().await;
println!("send:{}", content);
match buf_reader.read_line(&mut buf).await {
Err(_e) =>{
eprintln!("read from server error");
break;
}
Ok(n) =>{
buf.pop();
let content = buf.as_str().to_string();
println!("received: {} {}", n, content);
}
};
}
}
客户端、服务端都使用TcpStream的into_split方法获取网络通信读和写实例,进而获取buffer读写对象,通过channel实现线程执行同步。子线程使用tokio::spawn函数启动。