当前位置: 首页 > article >正文

Rust : FnOnce、线程池与多策略执行

一、问题:mpsc如何发送各类不同的函数?

3个关键词:闭包、Box与FnOnce;请细品。

use std::sync::{mpsc,Arc,Mutex};
use std::thread;
fn process<F>(old:f32,name:String,f:F) where F: FnOnce(f32,String) {
    f(old,name);
}
fn add_f32(a:f32,b:f32) ->f32{
    a+b
}
fn doit(amount: f32,code:String){
    println!("amount:{:?} code:{:?}",amount,code);
}
fn workit(amount:f32){
    println!("amount:{:?}",amount);
}
fn test(){
    process(2.0, "b".into(), doit);
    let fn1: Box<dyn FnOnce()> = Box::new(move || doit(2.0,"hello cat!".into()));
    fn1();
    let fn2: Box<dyn FnOnce(f32,String)>  = Box::new(move |amount:f32,code:String| doit(amount,code));
    fn2(2.0,"hello john!".into());
    let fn3:Box<dyn FnOnce(f32,String)> = Box::new(move |amount:f32,code:String| process(amount,code,doit));
    fn3(2.0,"hello rose!".into());
    let fn4: Box<dyn FnOnce()>  = Box::new(move ||workit(3.0));
    fn4();
    let fn5: Box<dyn FnOnce()>  = Box::new(move ||println!("hello world!"));
    fn5();
    // 带类型返还的结构
    let fn6:Box<dyn FnOnce()->f32> = Box::new(move ||add_f32(1.0,2.0));
    fn6();
}
// 定义闭包中没有参数输入的函数类型,做为发送对象
type box_fn = Box<dyn FnOnce() + Send>;
fn main(){
    //test();
    let (sender, receiver) = mpsc::channel::<box_fn>();
    let receiver = Arc::new(Mutex::new(receiver));
    // 注意:||中均没有参数,故是FnOnce(); 具体参考test()
    let vec_fn:Vec<Box<dyn FnOnce()+Send>> = vec![Box::new(move ||println!("hello world!")),
                                                Box::new(move ||workit(3.0)),
                                                Box::new(move || doit(2.0,"hello cat!".into()))];
    for f in vec_fn {
        sender.send(f).unwrap();
    }
    let rx = receiver.clone();
    let handle = thread::spawn(move ||{
       loop{
            let box_fn  = rx.lock().unwrap().recv().unwrap() ;
            println!("from son thread!");
            box_fn();
       }
    });
    println!("main thread sended all box_fn!");
    handle.join().unwrap();
   
}

二、线程池的应用:发送函数有什么用处?

如果需要让每一个函数都分配一个线程来执行这些函数(任务),或者用一个线程池来执行函数,这个时侯就可以用上场了。

在线程池中,FnOnce是一个其中的灵魂。他可以把所有的函数进行抽象统一,便一管理和执行。

说明:如果你想写一个更专业的,可以参考 github rust-threadpool库:

https://github.com/rust-threadpool/rust-threadpool/blob/master/src/lib.rs
// 这个线程池是一个比较粗的框架;可以执行不同策略类型:多周期、趋势、套利、市场中性等各类策略组的运行。
use std::sync::atomic::AtomicU32;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
// 存在不均衡的情况 => steal work todo!
// 简易的threadpool,没有用condvar.
// send_num可以不要,这里只是记录一下发送的情况
pub struct ThreadPool {
    threads_num: usize,
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
    send_num: Arc<Mutex<usize>>,
}
// 定义执行各类函数的参数结构类型,带不带参数,带参数的类型,多少
// 其中参数只是举例。比如某类策略只需要输入vec<bar>或vec<tick>; 某类策略需要输入hashmap<string,vec<bar>>;
// 或者套利策略中:两组或多组pair: (vec<bar>,vec<bar>)......
// 一组1min vec<bar>,3min vec<bar>,5min vec<bar>......
// 不同的策略类型,都可以在这里进行抽象;
// 最后线程池可以执行各类不同的策略;
enum StrategyFn
{
    T(Box<dyn FnOnce() + Send +'static>),
    U(Box<dyn FnOnce(f32, f32) + Send +'static>,Arc<f32>,Arc<f32>),
    W(Box<dyn FnOnce(Vec<f32>, f32) + Send +'static>,Arc<Vec<f32>>,Arc<f32>),
}

// 定义发送内容
enum Message{
    Task(Task),
    Shutdown,
}
struct Task{
    job :Job,
    task_id :usize,
}

type Job = Box<dyn FnOnce() + Send +'static>;

impl ThreadPool {
    pub fn new(threads_num: usize) -> ThreadPool {
        assert!(threads_num > 0);
        let (sender, receiver) = mpsc::channel::<Message>();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(threads_num);
        let send_num = Arc::new(Mutex::new(0_usize));
        let execute_num = Arc::new(Mutex::new(0_usize));
        for id in 0..threads_num {
            let mut worker = Worker::new(id);
            worker.run(&receiver, &execute_num);
            workers.push(worker);
        }
        ThreadPool { threads_num,workers, sender,send_num}
    }

    pub fn execute(&mut self, task_id:usize,strategy_fn: StrategyFn)
    {
        let mut job:Box<dyn FnOnce()+Send +'static> = Box::new(move||{});
        match strategy_fn{
            StrategyFn::U(f, input1, input2) => {
                job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));
            },
            StrategyFn::T(f) =>{
                job = Box::new(move || f());
            }
            StrategyFn::W(f,input1,input2)=> {
                job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));
            },

        }
        let task = Task{job:job,task_id:task_id};
        let mut send_num = self.send_num.lock().unwrap();
        *send_num = *send_num +1;
        self.sender.send(Message::Task(task)).unwrap();

    }

}

pub struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,

}

impl Worker {
    pub fn new(id: usize)-> Self{
        Worker {
            id:id,
            thread: None,
        }
    }
    pub fn run(&mut self, receiver: &Arc<Mutex<mpsc::Receiver<Message>>>,execute_num: &Arc<Mutex<usize>>){
        let id = self.id;
        let receiver = receiver.clone();
        let execute_num = execute_num.clone();
        let thread = thread::spawn(move || loop {
            let msg = receiver.lock().unwrap().recv().unwrap();
            match msg{
                Message::Task(task) => {
                    println!("Worker {} got a task; executing task {}.", id,task.task_id);
                    let mut execute_num = execute_num.lock().unwrap();
                    *execute_num = *execute_num + 1;
                    println!("work {} = >execute_num: {}", id,*execute_num);
                    (task.job)();
                },
                Message::Shutdown => {
                    println!("Worker {} received shutdown message.", id);
                    break;//很关键
                }
             }
        });
        self.thread = Some(thread);

    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for _ in 0..self.threads_num{
            self.sender.send(Message::Shutdown).unwrap();
        }
        println!("drop worker :{:?}",self.workers.len());
        for (i,worker) in (&mut self.workers).into_iter().enumerate() {
            println!("------Shutting down worker {}  i:{} -----------", worker.id,i);
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}
// 这里只是通过宏生成一类:U(Box<dyn FnOnce(f32, f32) + Send +'static>,Arc<f32>,Arc<f32>)策略数据类型;
// 
macro_rules! create_strategy_U{
    ($($s:ident),*) => (
        $(
            pub fn $s(_a:f32,_b:f32){
                println!("run 【U】 strategy {:?}",stringify!($s));
                //thread::sleep(Duration::from_millis(1));
            }
        )*      
    );
}
//这里只是通过宏生成一类:T(Box<dyn FnOnce() + Send +'static>)策略数据类型;
macro_rules! create_strategy_T{
    ($($s:ident),*) => (
        $(
            pub fn $s(){
                println!("run 【T】 strategy {:?}",stringify!($s));
                //thread::sleep(Duration::from_millis(1));
            }
        )*      
    );
}
// 生成U策略组若干
create_strategy_U!(A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q);
// 生成T策略组若干
create_strategy_T!(AA,BB,CC,DD,EE,FF,GG);
pub fn main() {
    let mut pool = ThreadPool::new(2);
    let input1 = Arc::new(1.0);
    let input2 = Arc::new(2.0);
    let u_strategies = vec![A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q];
    let t_strategies = vec![AA,BB,CC,DD,EE,FF,GG];
    println!("u_strategies num : {:?}",u_strategies.len());
    for (task_id,u_strategy) in u_strategies.into_iter().enumerate(){
        let strategy_fn = StrategyFn::U(Box::new(u_strategy),input1.clone(), input2.clone());
        pool.execute(task_id as usize,strategy_fn);
    }
    println!("t_strategies num : {:?}",t_strategies.len());
    for (task_id,t_strategy) in t_strategies.into_iter().enumerate(){
        let strategy_fn = StrategyFn::T(Box::new(t_strategy));
        pool.execute(task_id as usize,strategy_fn);
    }
    println!("toal send_num :  {:?}",*pool.send_num.lock().unwrap());
}

输出:
能过U和T类型策略运行实例,说明只要事先进行枚举定义,可以看出,这个线程池可以执行不同类型的策略。
这里有一个问题:这些线程池中线程中如何抢到这个任务,并完整执行完这些任务?这里依靠的是Arc<Mutex<mpsc::Receiver>>结构来保证。
具体如下:

u_strategies num : 17
t_strategies num : 7
toal send_num :  24
drop worker :4
------Shutting down worker 0  i:0 -----------
Worker 2 got a task; executing task 3.
work 2 = >execute_num: 1
run 【U】 strategy "D"
Worker 2 got a task; executing task 4.
work 2 = >execute_num: 2
run 【U】 strategy "E"
Worker 2 got a task; executing task 5.
work 2 = >execute_num: 3
Worker 1 got a task; executing task 1.
Worker 3 got a task; executing task 2.
Worker 0 got a task; executing task 0.
run 【U】 strategy "F"
Worker 2 got a task; executing task 6.
work 1 = >execute_num: 4
run 【U】 strategy "B"
Worker 1 got a task; executing task 7.
work 3 = >execute_num: 5
run 【U】 strategy "C"
work 0 = >execute_num: 6
run 【U】 strategy "A"
Worker 0 got a task; executing task 9.
Worker 3 got a task; executing task 8.
work 2 = >execute_num: 7
run 【U】 strategy "G"
Worker 2 got a task; executing task 10.
work 1 = >execute_num: 8
run 【U】 strategy "H"
Worker 1 got a task; executing task 11.
work 0 = >execute_num: 9
run 【U】 strategy "J"
Worker 0 got a task; executing task 12.
work 3 = >execute_num: 10
run 【U】 strategy "I"
Worker 3 got a task; executing task 13.
work 2 = >execute_num: 11
run 【U】 strategy "K"
Worker 2 got a task; executing task 14.
work 1 = >execute_num: 12
run 【U】 strategy "L"
Worker 1 got a task; executing task 15.
work 0 = >execute_num: 13
run 【U】 strategy "M"
Worker 0 got a task; executing task 16.
work 3 = >execute_num: 14
run 【U】 strategy "N"
Worker 3 got a task; executing task 0.
work 2 = >execute_num: 15
run 【U】 strategy "O"
Worker 2 got a task; executing task 1.
work 1 = >execute_num: 16
run 【U】 strategy "P"
Worker 1 got a task; executing task 2.
work 0 = >execute_num: 17
run 【U】 strategy "Q"
Worker 0 got a task; executing task 3.
work 3 = >execute_num: 18
run 【T】 strategy "AA"
Worker 3 got a task; executing task 4.
work 2 = >execute_num: 19
run 【T】 strategy "BB"
Worker 2 got a task; executing task 5.
work 1 = >execute_num: 20
run 【T】 strategy "CC"
Worker 1 got a task; executing task 6.
work 0 = >execute_num: 21
run 【T】 strategy "DD"
Worker 0 received shutdown message.
work 3 = >execute_num: 22
run 【T】 strategy "EE"
Worker 3 received shutdown message.
work 2 = >execute_num: 23
run 【T】 strategy "FF"
------Shutting down worker 1  i:1 -----------
Worker 2 received shutdown message.
work 1 = >execute_num: 24
run 【T】 strategy "GG"
Worker 1 received shutdown message.
------Shutting down worker 2  i:2 -----------
------Shutting down worker 3  i:3 -----------

http://www.kler.cn/news/362303.html

相关文章:

  • 科研进展 | RSE:全波形高光谱激光雷达数据Rclonte系列处理算法一
  • 数据结构——顺序表的基本操作
  • Redis JSON介绍和命令大全
  • 如何彻底销毁硬盘数据
  • 基于深度学习的声纹识别
  • 浪潮云启操作系统(InLinux)bcache缓存实践:理解OpenStack环境下虚拟机卷、Ceph OSD、bcache设备之间的映射关系
  • 11.useComponentDidMount
  • neo4j 中日期时间 时间戳处理
  • Android状态栏/通知栏图标白底问题
  • 归并排序 - 非递归实现
  • 代码随想录刷题Day8
  • 基于SSM汽车零部件加工系统的设计
  • bindService 流程学习总结
  • PTA L1系列题解(C语言)(L1_089 -- L1_096)
  • JZ2440开发板——MMU与Cache
  • 如何使用Git推送本地搭建的仓库以及远程克隆的仓库
  • golang中的上下文
  • 滚雪球学Redis[7.4讲]:Redis在分布式系统中的应用:微服务与跨数据中心策略
  • 016_基于python+django网络爬虫及数据分析可视化系统2024_kyz52ks2
  • Python 应用可观测重磅上线:解决 LLM 应用落地的“最后一公里”问题
  • python如何基于numpy pandas完成复杂的数据分析操作?
  • 华企盾对当前网络安全挑战与应对策略探讨
  • LeetCode102. 二叉树的层序遍历(2024秋季每日一题 43)
  • 毕业设计项目系统:基于Springboot框架的心理咨询评估管理系统,完整源代码+数据库+毕设文档+部署说明
  • python将1格式化为01
  • 思科网络设备命令