当前位置: 首页 > article >正文

tokio tcp通信

引入crate

tokio = { version = "1.35.1", features = ["full"] }

服务端

use std::time::Duration;
use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt},
    net::{
        tcp::{OwnedReadHalf, OwnedWriteHalf},
        TcpListener, TcpStream,
    },
    sync::mpsc,
};

#[tokio::main]
async fn main() {
    println!("Begin Start Server...");
    let server = TcpListener::bind("127.0.0.1:10888").await.unwrap();
    while let Ok((client_stream, client_addr)) = server.accept().await{
        println!("accept client: {}", client_addr);
        tokio::spawn(async move{
           process_client(client_stream).await;
        });
    }
}

async fn process_client(client_stream: TcpStream){
    let (client_reader, client_writer) = client_stream.into_split();
    let (msg_tx, msg_rx) = mpsc::channel::<String>(100);

    let mut read_task = tokio::spawn(async move {
        read_from_client(client_reader, msg_tx).await;
    });

    let mut write_task = tokio::spawn(async move{
        write_to_client(client_writer, msg_rx).await;
    });

    if tokio::try_join!(&mut read_task, &mut write_task).is_err() {
        read_task.abort();
        write_task.abort();
    };
}

async fn read_from_client(reader: OwnedReadHalf, mst_tx: mpsc::Sender<String>){
    let mut buf_reader = tokio::io::BufReader::new(reader);
    let mut buf = String::new();
    loop{
        match buf_reader.read_line(&mut buf).await{
            Err(_e) =>{
                eprintln!("read from client error");
                break;
            }
            Ok(0) =>{
                println!("client closed");
                break;
            }
            Ok(n) => {
                buf.pop(); //去除末尾的回车符
                let mut content = buf.drain(..).as_str().to_string();
                println!("read {} bytes from client. content: {}", n, content);
                tokio::time::sleep(Duration::from_secs(1)).await;
                content.push('\n');
                if mst_tx.send(content).await.is_err(){
                    eprintln!("receiver closed");
                    break;
                }
            }
        }
    }
}

async fn write_to_client(writer: OwnedWriteHalf, mut msg_rx: mpsc::Receiver<String>){
    let mut buf_writer = tokio::io::BufWriter::new(writer);
    while let Some(mut str) = msg_rx.recv().await{
        //str.push('\n');
        if let Err(e) = buf_writer.write_all(str.as_bytes()).await {
            eprintln!("write to client failed: {}", e);
            break;
        }
        buf_writer.flush().await;
        print!("write to client: {}", str);
    }
}

客户端

use std::sync;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{Local, NaiveDateTime};

use tokio::{
    io::{Interest, AsyncBufReadExt, AsyncWriteExt},
    net::{
        tcp::{OwnedReadHalf, OwnedWriteHalf},
        TcpStream,
    },
    sync::mpsc,
};


#[tokio::main]
async fn main() {
    let stream = TcpStream::connect("127.0.0.1:10888").await.unwrap();

    let (reader, writer) = stream.into_split();
    let mut buf = String::new();//[0u8, 12];
    let mut buf_reader = tokio::io::BufReader::new(reader);
    let mut buf_writer = tokio::io::BufWriter::new(writer);
     loop{
         let now=Local::now();
         let formatted=now.format("%Y-%m-%d %H:%M:%S");
         let content = format!("hello world {}\n", formatted);
         //buf_writer.write_all(b"hello world\n").await;
         buf_writer.write_all(content.as_bytes()).await;
         buf_writer.flush().await;
         println!("send:{}", content);
         match buf_reader.read_line(&mut buf).await {
             Err(_e) =>{
                 eprintln!("read from server error");
                 break;
             }
             Ok(n) =>{
                 buf.pop();
                 let content = buf.as_str().to_string();
                 println!("received: {}  {}", n, content);
             }
         };
     }

}

客户端、服务端都使用TcpStream的into_split方法获取网络通信读和写实例,进而获取buffer读写对象,通过channel实现线程执行同步。子线程使用tokio::spawn函数启动。


http://www.kler.cn/a/234415.html

相关文章:

  • GitLab创建用户,设置访问SSH Key
  • 递归构建树菜单节点
  • 从零开始:使用VSCode搭建Python数据科学开发环境
  • 『SQLite』解释执行(Explain)
  • 高等数学学习笔记 ☞ 一元函数微分的基础知识
  • 《Spring Framework实战》9:4.1.4.依赖注入
  • Peter算法小课堂—背包问题
  • Mongodb聚合:$planCacheStats
  • 基于Java (spring-boot)的电子商城管理系统
  • Elasticsearch:使用查询规则(query rules)进行搜索
  • 单片机学习笔记---串口通信(1)
  • 测试:JMeter如何获取非json格式的响应参数
  • 《剑指Offer》笔记题解思路技巧优化 Java版本——新版leetcode_Part_1
  • 如何使用 sqlalchemy declarative base 多层次继承
  • springboot/ssm档案管理系统公司设备管理系统Java系统企业配件系统
  • OpenCV基础
  • 每日OJ题_位运算③_力扣面试题 01.01. 判定字符是否唯一
  • java实战:销售订单30分钟未支付自动取消
  • JDK新特性
  • 【MySQL】MySQL表的增删改查(进阶)
  • 计算机网络基本知识(二)
  • Blazor SSR/WASM IDS/OIDC 单点登录授权实例5 - Winform 端授权
  • Cetnos7之修改open files大小引发的故障及处理方式
  • java中使用Lambda表达式实现参数化方法
  • 【操作系统】Ubuntu Swap内存扩容
  • iOS平台如何实现RTSP|RTMP播放端录像?