当前位置: 首页 > article >正文

【Rust自学】20.2. 最后的项目:多线程Web服务器

说句题外话,这篇文章非常要求Rust的各方面知识,最好看一下我的【Rust自学】专栏的所有内容。这篇文章也是整个专栏最长(4762字)的文章,需要多次阅读消化,最好点个收藏,免得刷不到了。
请添加图片描述

喜欢的话别忘了点赞、收藏加关注哦(加关注即可阅读全文),对接下来的教程有兴趣的可以关注专栏。谢谢喵!(=・ω・=)

20.2.1. 回顾

我们在上一篇文章中写了一个简单的本地服务器,但是这个服务器是单线的,也就是说请求一个一个进去之后我们得一个一个地处理,如果某个请求处理得慢,那后面的都得排队等着。这种单线程外部服务器的性能是非常差的。

20.2.2. 慢速请求

我们用代码来模拟慢速请求:

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// ...

fn handle_connection(mut stream: TcpStream) {
    // ...

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // ...
}

省略了一些原代码,但是不影响。我们增加的语句是如果用户访问的是127.0.0.1:7878/sleep时会调用thread::sleep(Duration::from_secs(5));,这句话使代码的执行休眠5秒,也就是模拟的慢速请求。

然后打开两个浏览器窗口:一个用于http://127.0.0.1:7878/另一个为http://127.0.0.1:7878/sleep。如果像以前一样,您会看到它快速响应。但是如果你输入/sleep然后加载 ,你会看到一直等到 sleep在加载前已经休眠了整整5秒。

如何改善这种情况呢?这里我们使用线程池技术,也可以选择其它技术比如fork/join模型单线程异步 I/O 模型多线程异步I/O模型

20.2.3. 使用线程池提高吞吐量

线程池是一组分配出来的线程,它们被用于等待并随时可能的任务。当程序接收到一个新任务时,它会给线程池里边一个线程分配这个任务,其余线程与此同时还可以接收其它任务。当任务执行完后,这个线程就会被重新放回线程池。

线程池通过允许并发处理连接的方式增加了服务器的吞吐量。

如何为每个连接都创建一个线程呢?看代码:

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

迭代器每迭代一次就创建一个新线程来处理。

这样写的缺点在于线程数量没有限制,每一个请求就创建一个新线程。如果黑客使用DoS(Denial of Service,拒绝服务攻击),我们的服务器就会很快瘫掉。

所以在上边代码的基础上我们进行修改,我们使用编译驱动开发编写代码(不是一个标准的开发方法论,是开发者之间的一种戏称,不同于TDD测试驱动开发):把期望调用的函数或是类型写上,再根据编译器的错误一步步修改。

使用编译驱动开发

我们把我们想写的代码直接写上,先不论对错

fn main() {  
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();  
    let pool = ThreadPool::new(4);  
  
    for stream in listener.incoming() {  
        let stream = stream.unwrap();  
  
        pool.execute(|| {  
            handle_connection(stream);  
        })  
    }  
}

虽然说并没有ThreadPool这个类型,但是根据编译驱动开发编写代码的逻辑,我觉得应该这么写就先写上,不管对错。

使用cargo check检查一下:

error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
9  |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

这个错误告诉我们我们需要一个ThreadPool类型或模块,所以我们现在就构建一个。

我们在lib.rs中写ThreadPool的相关代码,一方面保持了main.rs足够简洁,另一方面也使ThreadPool相关代码能更加独立地存在。

打开lib.rs,写下ThreadPool的简单定义:

pub struct ThreadPool;

main.rs里把ThreadPool引入作用域:

use web_server::ThreadPool;

使用cargo check检查一下:

error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:10:28
   |
10 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

这个错误表明接下来我们需要创建一个名为的关联函数 ThreadPoolnew 。我们还知道new需要有一个参数,该参数可以接受4作为参数,并且应该返回一个ThreadPool实例。让我们实现具有这些特征的最简单的new函数:

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

使用cargo check检查一下:

error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
15 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

现在发生错误是因为我们在ThreadPool上没有execute方法。那就补充一个方法:

pub fn execute<F>(&self, f: F)  
where  
    F: FnOnce() + Send + 'static,  
{  
}
  • execute函数的参数除了self的应用还有一个闭包参数,运行请求的线程只会调用闭包一次,所以使用FnOnce()()表示它是返回单位类型()的闭包。同时我们需要Send trait将闭包从一个线程传输到另一个线程,而'static是因为我们不知道线程执行需要多长时间。

  • 也可以这么想:我们使用它替代的是原代码的thread::spawn函数,所以修改时就可以借鉴它的函数签名,它的签名如下。我们主要借鉴的是泛型F和它的约束,所以excute函数的泛型约束就可以按照F来写。

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
	F: FnOnce() -> T,
	F: Send + 'static
	T: Send + 'static

使用cargo check检查没有错误,但是使用cargo run依旧会报错,因为executenew都没有实现实际需要的效果,只是满足了编译器的检查。

你可能听说过关于具有严格编译器的语言(例如 Haskell 和 Rust)的一句话是“if the code compiles, it works.如果代码可以编译,它就可以工作”。但这句话并不普遍正确。我们的项目可以编译,但它什么也没做。如果我们正在构建一个真实的、完整的项目,那么这是开始编写单元测试以检查代码是否编译并具有我们想要的行为的好时机(也就是TDD测试驱动开发)

修改new函数 Pt.1

我们先修改new函数使其具有实际意义:

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // ...
}
  • 我们使用assert!函数来判断new函数的参数要大于0,因为等于0时没有任何意义。
  • 添加了一些文档注释,这样在运行cargo doc --open时就能看到文档解释:
    请添加图片描述

修改ThreadPool类型

new函数的修改遇到瓶颈了:ThreadPool类型都没有具体字段我们实现不了创建具体线程数量的目标。所以接下来我们研究一下如何在ThreadPool里存储线程,代码如下:

use std::thread;  
  
pub struct ThreadPool{  
    threads: Vec<thread::JoinHandle<()>>,  
}

ThreadPool下有threads字段,类型是Vec<thread::JoinHandle<()>>

  • Vec<>是因为我们要存储多个线程,但是具体数量又未知,所以使用Vector
  • 之前我们看过thread::spawn函数的函数签名,其返回值是JoinHandle<T>,依葫芦画瓢,我们就也使用thread::JoinHandle<>来存储线程。
    JoinHandle<T>T是因为thread::spawn的线程有可能会有返回值,不知道具体什么类型,所以用泛型来表示。而我们的代码是确定没有返回值的,所以就写thread::JoinHandle<()>()是单元类型。

修改new函数 Pt.2

修改完ThreadPool的定义之后我们再返回来修改new函数:

pub fn new(size: usize) -> ThreadPool {  
    assert!(size > 0);  
  
    let mut threads = Vec::with_capacity(size);  
  
    for _ in 0..size {  
        // create some threads and store them in the vector  
    }  
  
    ThreadPool { threads }  
}
  • Vec::with_capacity函数传进去size来创建一个预分配好空间的Vector
  • 写了一个从0到size的循环(不包括size),里面的逻辑暂时还没写,总之这个循环是准备用来创建线程并存到Vector里的
  • 最后返回ThreadPool类型即可,threads字段的值就是这个函数中的threads

接下来我们来研究一下thread::spawn函数一遍我们更好写new里的循环。thread::spawn在线程创建后立即获取线程应运行的代码执行。然而,在我们的例子中,我们想要创建线程并让它们等待我们稍后发送的代码。标准库的线程实现不包含任何方法来做到这一点,所以我们必须手动实现它。

使用Worker数据结构

我们使用一种新的数据结构来实现这个效果,叫做Worker ,这是池实现中的常用术语。 Worker拾取需要运行的代码并在Worker的线程中运行代码。想象一下在餐厅厨房工作的人:工人们等待顾客下单,然后负责接受并履行这些订单。我们通过Worker来管理和实现我们所要的行为。

我们来创建Worker这个结构体及必要的方法:

struct Worker {  
    id: usize,  
    thread: thread::JoinHandle<()>,  
}

impl Worker {  
    fn new(id: usize) -> Worker {  
        let thread = thread::spawn(|| {});  
  
        Worker { id, thread }  
    }  
}
  • Worker一共有两个字段,一个是id,类型为usize,表示标识;还有一个thread字段,类型是thread::JoinHandle<()>,存储一个线程
  • new函数创建了Worker实例,id字段的值就是它的参数

PS:外部代码(如main.rs中的服务器)不需要知道有关在ThreadPool中使用Worker结构的实现细节,因此我们将Worker结构及其new函数设为私有。

接下来在ThreadPool里使用Worker

pub struct ThreadPool {  
    workers: Vec<Worker>,  
}

ThreadPool上的new函数和excute函数也需要修改,这里先修改new函数,excute等一下修改:

pub fn new(size: usize) -> ThreadPool {  
    assert!(size > 0);  
  
    let mut workers = Vec::with_capacity(size);  
  
    for id in 0..size {  
        workers.push(Worker::new(id));  
    }  
  
    ThreadPool { workers }  
}
  • threads相关的代码改为Workers即可
  • 由于ThreadPoolWorker字段是被Vector包裹的,所以使用Vectorpush方法即可以往Vector里添加新元素
  • 在循环中使用到了Worker上的new函数,创建了Worker实例,id字段的值就是传进去的参数

PS:如果操作系统由于没有足够的系统资源而无法创建线程, thread::spawn将会出现恐慌。我们在这个例子中不考虑这种情况,但在实际编写时最好考虑到这点,使用std::thread::builder,它会返回Result<JoinHandle<T>>

通过通道向线程发送请求

完成了线程的创建,接下来就要考虑如何接收任务了。这时就需要通道这个技术。重构一下代码:

use std::thread;  
use std::sync::mpsc;  
  
pub struct ThreadPool {  
    workers: Vec<Worker>,  
    sender: mpsc::Sender<Job>,  
}  
  
struct Job;
  • 使用use std::sync::mpsc;mpsc引入作用域以便后文使用
  • ThreadPool新建了一个字段sender,类型是mpsc::Sender<Job>(Job是一个结构体,表示要执行的工作),用于存储发送端

我们在ThreadPoolnew方法上创建通道:

impl ThreadPool {
	// ...
	pub fn new(size: usize) -> ThreadPool {  
	    assert!(size > 0);  
	    let (sender, receiver) = mpsc::channel();  
	    let mut workers = Vec::with_capacity(size);  
  
	    for id in 0..size {  
	        workers.push(Worker::new(id, receiver));  
	    }  
  
	    ThreadPool { workers, sender }  
	}
	// ...
}
// ...
impl Worker {  
    fn new(id: usize, receiver: Receiver<Job>) -> Worker {  
        let thread = thread::spawn(|| {  
            receiver;  
        });  
  
        Worker { id, thread }  
    }  
}
  • 使用mpsc::channel()函数创建通道,发送端和接收端分别命名为senderreceiver
  • sender赋给返回值的sender字段,就相当于线程池持有通道的发送端了
  • 接收者应该是Worker,所以我们把Workernew函数也要相应的更改,增加了receiver这个参数

这时候运行cargo check试试:

error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
22 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:45:33
   |
45 |     fn new(id: usize, receiver: Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

报错是因为该代码尝试将一个receiver传递给多个Worker实例,这是行不通的,因为接收端只能有一个。

我们希望所有的线程都共享一个receiver,从而能在线程间分发任务。此外,从通道队列中取出receiver涉及改变 receiver ,因此线程需要一种安全的方式来共享和修改receiver 。否则,我们可能会遇到竞争条件。

针对多线程多重所有权的要求,可以使用Arc<T>Rc<T>只能用于单线程);针对多线程避免数据竞争的要求,可以使用互斥锁Mutex<T>

这下只需要在原本的receiver上套Arc<T>Mutex<T>就行了:

impl ThreadPool {  
    /// ...  
    pub fn new(size: usize) -> ThreadPool {  
        assert!(size > 0);  
        let (sender, receiver) = mpsc::channel();  
        let mut workers = Vec::with_capacity(size);  
  
        let receiver = Arc::new(Mutex::new(receiver));  
        for id in 0..size {  
            workers.push(Worker::new(id, Arc::clone(&receiver)));  
        }  
  
        ThreadPool { workers, sender }  
    }  
	//...
}  
//...
  
impl Worker {  
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {  
        let thread = thread::spawn(|| {  
            receiver;  
        });  
  
        Worker { id, thread }  
    }  
}
  • 重新声明receiver,把它用Arc<T>Mutex<T>包裹
  • 在循环中使用Arc::clone(&receiver)传给每个Worker
  • Workernew方法的receiver参数的类型需要改为Arc<Mutex<mpsc::Receiver<Job>>>

修改Job

我们的Job暂时还是一个空结构体,没有任何的实际效果,所以我们把它改为类型别名(详见19.5. 高级类型):

type Job = Box<dyn FnOnce() + Send + 'static>;

Job是一个闭包,在一个线程中只被调用一次,没有返回值(或者叫返回值是单元类型()),所以得满足FnOnce();并且这个闭包还要能够在线程间传递,所以得满足Send trait。'static是因为我们不知道线程执行需要多长时间,只好把它声明为静态生命周期。

修改execute函数

接下来我们来修改execute函数:

pub fn execute<F>(&self, f: F)  
where  
    F: FnOnce() + Send + 'static,  
{  
    let job = Box::new(f);  
  
    self.sender.send(job).unwrap();  
}
  • 因为Job的最外层是Box<T>封装,所以想把闭包f发送出去就得先用Box::new函数来封装
  • 使用selfsender字段作为发送端把job发送出去

修改Worker下的new函数

excute方法这么改了,那么作为接收端的Worker下的new函数也得改:

impl Worker {  
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {  
        let thread = thread::spawn(move || loop {  
            let job = receiver.lock().unwrap().recv().unwrap();  
            println!("Worker {} got a job; executing.", id);  
            job();  
        });  
  
        Worker { id, thread }  
    }  
}
  • 使用lock锁定了receiver(receiver被封装在互斥锁Mutex<T>里),获取互斥体,unwrap错误处理
  • 再使用recv方法从通道接收传过来的内容,再使用unwrap错误处理
  • 打印一下是哪个Worker在工作
  • 当调用job();时,编译器会自动将job解引用为其内部的闭包类型,然后调用FnOnce或其他相应的trait实现的call方法。这是因为Box<dyn FnOnce()>实现了FnOnce。也就是说,job();(*job)();的语法糖。

版本差异

我使用的是1.84.0的Rust,在早期(大概是1.0版本附近)时不能直接使用job();,也不能使用(*job)();,因为当时编译器不直接知道动态大小类型所占用的内存大小,所以不能直接解码。在后来的 Rust RFC 127(实现于 Rust 1.20,发布于 2017 年) 之后,Rust 为Box<dyn Trait>等类型添加了直接调用trait方法的能力,这背后利用了自动解引用及调用调度逻辑

总而言之,如果你写成上文代码那样要报错的话要么就升级Rust版本,要么就增加并修改一些代码:

trait FnBox {
	fn call_box(self: Box(self))
}

impl<F: FnOnce()> FnBox for F {
	fn call_box(self: Box<F>) {
		(*self)();
	}
}

type Job = Box<FnBox + Send + 'static>
  • FnBox trait这个方法使得我们可以在类型的Box上调用了
  • FnOnce()写了call_box的具体实现(因为Job实现了FnOnce()),这样就可以获得Box里边东西的所有权,从而调用
  • Job的类型从FnOnce()改成FnBox,这样其它代码就可以不用修改,所有实现了FnBox的类型肯定同时实现了FnBox

20.2.4. 试运行

终于改完了,让我们试运行一下:
请添加图片描述

如果你在浏览器里多刷新几次界面就能看到其它不同id的Worker在工作。

20.2.5. 总结

main.rs:

use std::{  
    io::{prelude::*, BufReader},  
    net::{TcpListener, TcpStream},  
    fs,  
};  
use web_server::ThreadPool;  
  
fn main() {  
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();  
    let pool = ThreadPool::new(4);  
  
    for stream in listener.incoming() {  
        let stream = stream.unwrap();  
  
        pool.execute(|| {  
            handle_connection(stream);  
        })  
    }  
}  
  
fn handle_connection(mut stream: TcpStream) {  
    let buf_reader = BufReader::new(&stream);  
    let request_line = buf_reader.lines().next().unwrap().unwrap();  
  
    let (status_line, filename) = if request_line == "GET / HTTP/1.1" {  
        ("HTTP/1.1 200 OK", "hello.html")  
    } else {  
        ("HTTP/1.1 404 NOT FOUND", "404.html")  
    };  
  
    let contents = fs::read_to_string(filename).unwrap();  
    let length = contents.len();  
  
    let response =  
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");  
  
    stream.write_all(response.as_bytes()).unwrap();  
}

lib.rs:

use std::{  
    sync::{mpsc, Arc, Mutex},  
    thread,  
};  
  
pub struct ThreadPool {  
    workers: Vec<Worker>,  
    sender: mpsc::Sender<Job>,  
}  
  
type Job = Box<dyn FnOnce() + Send + 'static>;  
  
impl ThreadPool {  
    /// Create a new ThreadPool.  
    ///    
    /// The size is the number of threads in the pool.    
    ///    
    /// # Panics  
    ///    
    /// The `new` function will panic if the size is zero.    
    pub fn new(size: usize) -> ThreadPool {  
        assert!(size > 0);  
        let (sender, receiver) = mpsc::channel();  
        let mut workers = Vec::with_capacity(size);  
  
        let receiver = Arc::new(Mutex::new(receiver));  
        for id in 0..size {  
            workers.push(Worker::new(id, Arc::clone(&receiver)));  
        }  
  
        ThreadPool { workers, sender }  
    }  
  
    pub fn execute<F>(&self, f: F)  
    where  
        F: FnOnce() + Send + 'static,  
    {  
        let job = Box::new(f);  
  
        self.sender.send(job).unwrap();  
    }  
}  
  
struct Worker {  
    id: usize,  
    thread: thread::JoinHandle<()>,  
}  
  
impl Worker {  
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {  
        let thread = thread::spawn(move || loop {  
            let job = receiver.lock().unwrap().recv().unwrap();  
            println!("Worker {} got a job; executing.", id);  
            job();  
        });  
  
        Worker { id, thread }  
    }  
}

hello.html:

<!DOCTYPE html>  
<html lang="en">  
<head>  
    <meta charset="utf-8">  
    <title>Hello!</title>  
</head>  
<body>  
<h1>Hello!</h1>  
<p>Hi from Rust</p>  
</body>  
</html>

404.html:

<!DOCTYPE html>  
<html lang="en">  
<head>  
  <meta charset="utf-8">  
  <title>Hello!</title>  
</head>  
<body>  
<h1>Oops!</h1>  
<p>Sorry, I don't know what you're asking for.</p>  
</body>  
</html>

http://www.kler.cn/a/535292.html

相关文章:

  • Win32 学习笔记目录
  • B站自研的第二代视频连麦系统(上)
  • Selenium记录RPA初阶 - 基本输入元件
  • 【3】高并发导出场景下,服务器性能瓶颈优化方案-文件压缩
  • 网络爬虫js逆向之某音乐平台案例
  • 开源AI智能名片2 + 1链动模式S2B2C商城小程序:内容价值创造与传播新引擎
  • 在远程 Linux 服务器上运行 Jupyter Notebook(.ipynb 文件)
  • idea 启动 thingsboard
  • iOS--SDWebImage源码解析
  • 《Node.js Express 框架》
  • Apache Kafka:高吞吐分布式流平台的深度解析
  • 深入解析:Python 爬虫高级技巧与实战应用
  • 前端学习-tab栏切换改造项目(三十一)
  • MATLAB中matches函数用法
  • Mysql表分区后使用主键ID做In查询性能变差分析及解决
  • QT +FFMPEG4.3 拉取 RTMP/http-flv 流播放 AVFrame转Qimage
  • MFC 学习笔记目录
  • 笔记day8
  • 利用HTML和css技术编写学校官网页面
  • LQB(0)-python-基础知识
  • SQL Server2019下载及安装教程
  • python:内置函数与高阶函数
  • qsort函数对二维数组的排序Cmp函数理解
  • 【自学笔记】Python的基础知识点总览-持续更新
  • DeepSeek服务器繁忙问题的原因分析与解决方案
  • 【从0开始】使用Flax NNX API 构建简单神经网络并训练