当前位置: 首页 > article >正文

Rust实现基于Tokio的限制内存占用的channel

Rust实现基于Tokio的限制内存占用的channel

简介

本文介绍如何基于tokio的channel实现一个限制内存占用的channel。

Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。
常用的channel有两种:boundedunbounded,其中ubbounded的channel可以无限的发送数据,而bounded的channel则有限的发送数据。两种channel都没有对自身的内存占用做出限制。

异步网络编程中常用一个channel连接两个task,其中业务task与业务交互:将要发送的数据发送到channel,而网络task与操作系统交互:从channel中接收数据并写入socket。单有时候带宽有限或者对端接收速率过慢时,而网络task从channel中接收的速度小于业务task向channel中发送的速度时,会造成大量的数据阻塞在channel中,如果不对channel的占用内存做限制,则会造成内存占用过多甚至进程被OOM

实现

  1. 获取数据大小

    要想限制channel总的内存占用,必须要直到每个数据的大小。比较常见的作法是所有需要发送到channel的内容都必须实现一个Trait,此Trait中定义了一个get_size方法,用于获取数据的大小。

    pub trait GetSize {
     /// get total size
     fn get_size(&self) -> usize;
    }
    

    要发送的内容必须实现GetSize的Trait,并实现get_size方法。注意:get_size方法获取到的大小需包括栈空间和堆空间,例如:

     struct MyData {
         data: Vec<u8>,
     }
    
     impl GetSize for MyData {
         fn get_size(&self) -> usize {
             return std::mem::size_of::<MyData>() + self.data.len();//stack size + heap size
         }
     }
    
  2. 创建SizedSenderSizedReceiver

    SizedSenderSizedReceiver都可以基于tokio的UnboundedSenderUnboundedReceiver实现。在tokio的基础上,需要共享一个条件变量用于在sender和receiver之间同步当前是否还有可用空间。

       
    pub struct SizedSender<T: GetSize> {
        inner: mpsc::UnboundedSender<T>,
        size_semaphore: Arc<(Semaphore, usize)>,
    }   
    
    
    pub struct SizedReceiver<T: GetSize> {
        inner: mpsc::UnboundedReceiver<T>,
        size_semaphore: Arc<(Semaphore, usize)>,
    }
    
    
    /// Limit space usage but not limit the number of messages, bytes_size must bigger than 0.
    pub fn sized_channel<T: GetSize>(bytes_size: usize) -> (SizedSender<T>, SizedReceiver<T>) {
        let (tx, rx) = mpsc::unbounded_channel::<T>();
        let semaphore = Arc::new((Semaphore::new(bytes_size), bytes_size));
        (
            SizedSender::new(tx, semaphore.clone()),
            SizedReceiver::new(rx, semaphore),
        )
    }          
    
    
  3. SizedSender实现

    发送端发送时需要调用get_size方法获取数据的大小,然后调用Semaphore::available_permits方法获取可用空间,如果可用空间大于数据大小,则发送成功,否则发送失败。

    impl<T: GetSize> SizedSender<T> {
     pub fn new(inner: mpsc::UnboundedSender<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {
         Self {
             inner,
             size_semaphore,
         }
     }
    
     fn do_send(
         &self,
         message: T,
         permits: Option<SemaphorePermit<'_>>,
     ) -> Result<(), SendError<T>> {
         match self.inner.send(message) {
             Ok(r) => {
                 if let Some(permits) = permits {
                     permits.forget();
                 }
    
                 Ok(r)
             }
             Err(e) => {
                 log::debug!("send value error!");
                 Err(e)
             }
         }
     }
     pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
         let message_size = message.get_size();
    
         if message_size > self.size_semaphore.1 {
             return Err(SendError(message));
         }
         let size = match u32::try_from(message_size) {
             Ok(size) => size,
             Err(_) => {
                 return Err(SendError(message));
             }
         };
    
         if self.size_semaphore.0.available_permits() < size as usize {
             // The buffer is about to be depleted, sending may be blocked.
         }
    
         let permits = match self.size_semaphore.0.acquire_many(size).await {
             Ok(perimits) => Some(perimits),
             Err(_) => {
                 return Err(SendError(message));
             }
         };
    
         self.do_send(message, permits)
     }
     }
    
  4. SizedReceiver的实现

    接收端接收时需要调用get_size方法获取数据的大小,然后将相应大小的permits还给信号量即可。

    impl<T: GetSize> SizedReceiver<T> {
    pub fn new(inner: mpsc::UnboundedReceiver<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {
        Self {
            inner,
            size_semaphore,
        }
    }
    
    pub async fn recv(&mut self) -> Option<T> {
        self.inner.recv().await.map(|r| {
            let message_size = r.get_size();
    
            self.size_semaphore.0.add_permits(message_size);
    
            r
        })
    }
    }
    
    
  5. 其他

    在上述实现的基础上,还可以实现更多方法,比如try_sendtry_recv等。


http://www.kler.cn/news/107619.html

相关文章:

  • 【C++】类与对象 第二篇(构造函数,析构函数,拷贝构造,赋值重载)
  • 前端小技巧: 实现 LRU 缓存算法功能
  • Kafka-Java四:Spring配置Kafka消费者提交Offset的策略
  • vue如何使用路由拦截器
  • 数据结构 C语言 2.1 线性表抽象数据类型 2.2 小议顺序表
  • Tp框架如何使用事务和锁,还有查询缓存
  • Linux UWB Stack实现——FiRa会话状态机
  • jmeter疑难杂症
  • 数据库数据恢复—Oracle数据库报错ORA-01110错误的数据恢复案例
  • Hive 常用DML操作
  • 前端移动web高级详细解析二
  • 安装虚拟机(VMware)保姆级教程及配置虚拟网络编辑器和安装WindowsServer以及宿主机访问虚拟机和配置服务器环境
  • 实体店做商城小程序如何
  • 模数转换器-ADC基础
  • 深入探究深度学习、神经网络与卷积神经网络以及它们在多个领域中的应用
  • Android-宝宝相册(第四次作业)
  • 【计算机网络】(谢希仁第八版)第一章课后习题答案
  • 软考 系统架构设计师系列知识点之设计模式(9)
  • ES6之Set集合(通俗易懂,含实践)
  • 外卖霸王餐系统 支持小程序,分站合作
  • 关于pycharm中句号变成点的问题
  • Redis 与 MySQL 一致性 实现方案
  • RSA:基于小加密指数的攻击方式与思维技巧
  • SpringCore完整学习教程5,入门级别
  • 单元测试Testng
  • vue中如何获取当时时间时分秒
  • “第五十三天”
  • 数据库批处理
  • C++学习笔记之四(标准库、标准模板库、vector类)
  • slice()和splice()用法