C++ - 从零实现Json-Rpc框架-1(JsonCpp Muduo 异步操作)
RPC(Remote Procedure Call,远程过程调用) 允许程序像调用本地方法一样调用远程方法,而不需要关心底层的 网络通信细节。在你的 C++ RPC 框架 中,RPC 的核心目标就是简化远程方法调用,让开发者无需了解底层网络传输。
RPC 的核心概念
概念 解释 客户端(Client) 发送 RPC 请求,调用远程方法 服务器(Server) 接收请求并执行对应的远程方法 序列化(Serialization) 把方法名 + 参数转换为可传输的数据格式(JSON/Protobuf) 反序列化(Deserialization) 把收到的数据转换回原始方法调用 网络传输(Transport) 通过 TCP / HTTP / UDP 传输请求和响应 服务注册 & 发现 让客户端找到可用的 RPC 服务器(Zookeeper/Redis)
第三⽅库使⽤介绍
JsonCpp库
Json 是⼀种数据交换格式,它采⽤完全独⽴于编程语⾔的⽂本格式来存储和表⽰数据。
如: 我们想表⽰⼀个同学的学⽣信息
• C 代码表⽰
char *name = "xx";
int age = 18;
float score[3] = {88.5, 99, 58};
• Json 表⽰
{
"姓名" : "xx",
"年龄" : 18,
"成绩" : [88.5, 99, 58],
"爱好" :{
"书籍" : "西游记",
"运动" : "打篮球"
}
}
Json 的数据类型包括对象,数组,字符串,数字等。
• 对象:使⽤花括号 {} 括起来的表⽰⼀个对象
• 数组:使⽤中括号 [] 括起来的表⽰⼀个数组
• 字符串:使⽤常规双引号 "" 括起来的表⽰⼀个字符串
• 数字:包括整形和浮点型,直接使⽤
Json 数据对象类的表⽰
class Json::Value{ Value &operator=(const Value &other); //Value重载了[]和=,因此所有的赋值和获取 数据都可以通过 Value& operator[](const std::string& key);//简单的⽅式完成 val["name"] = "xx"; Value& operator[](const char* key); Value removeMember(const char* key);//移除元素 const Value& operator[](ArrayIndex index) const; //val["score"][0] Value& append(const Value& value);//添加数组元素val["score"].append(88); ArrayIndex size() const;//获取数组元素个数 val["score"].size(); std::string asString() const;//转string string name = val["name"].asString(); const char* asCString() const;//转char* char *name = val["name"].asCString(); Int asInt() const;//转int int age = val["age"].asInt(); float asFloat() const;//转float float weight = val["weight"].asFloat(); bool asBool() const;//转 bool bool ok = val["ok"].asBool(); };
序列化
在
JsonCpp
中,你可以将变量(如int
、float
、string
、bool
、数组等)赋值给Json::Value
类型的对象。1. 直接赋值基本数据类型 int float double bool string const char*(自动转换为 std::string)
Json::Value root; int age = 25; double height = 175.5; bool is_student = true; std::string name = "Alice"; // 直接赋值 root["年龄"] = age; root["身高"] = height; root["是否学生"] = is_student; root["姓名"] = name; std::cout << root << std::endl;
2.用append()添加数组元素 std::vector<int> C数组
float grades[] = {85.5, 78.0, 92.3}; Json::Value gradesArray; for (float grade : grades) { gradesArray.append(grade); } root["分数"] = gradesArray;
3.赋值嵌套 JSON 结构
Json::Value student; student["姓名"] = "张三"; student["年龄"] = 20; student["地址"]["省"] = "北京"; student["地址"]["市"] = "海淀区"; root["学生信息"] = student; //或者这样嵌套也可以 //Json::Value address; //address["省"]="北京"; //address["市"]="海淀区"; //student["地址"]=address;
{ "学生信息": { "姓名": "张三", "年龄": 20, "地址": { "省": "北京", "市": "海淀区" } } }
#include<iostream>
#include<string.h>
#include <sstream>
#include <memory>
#include <jsoncpp/json/json.h>
//实现序列化
void serialize()
{
const char* name="王伟";
const int age=19;
const char* sex="男";
float score[3]={80,75.6,60};
Json::Value root;
root["名字"]=name;
root["性别"]=sex;
root["年龄"]=age;
root["成绩"].append(score[0]);
root["成绩"].append(score[1]);
root["成绩"].append(score[2]);
//先实例化一个工厂类
Json::StreamWriterBuilder wbuilder;
// 来创建序列化器
std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter());
// 将 JSON 数据输出到字符串
std::stringstream ss;
writer->write(root,&ss);
std::cout << "Serialized JSON:" << std::endl;
std::cout << ss.str() << std::endl;
}
int main()
{
serialize();
}
步骤 | 代码 | 作用 |
---|---|---|
1️⃣ 创建 JSON 数据 | Json::Value root; | 生成 JSON 数据 |
2️⃣ 创建 StreamWriterBuilder (工厂类) | Json::StreamWriterBuilder wbuilder; | 准备 JSON 序列化 |
3️⃣ 通过工厂创建 StreamWriter (序列化器) | std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter()); | 生成 JSON 序列化对象 |
4️⃣ 将 JSON 数据写入 std::stringstream | writer->write(root, &ss); | 将 JSON 转换为字符串格式 |
std::stringstream 是 C++ 标准库中的 字符串流(stringstream),它允许像操作 输入/输出流 那样操作 字符串,主要用于 格式化输入输出 和 字符串转换。
在 JSON 处理(如 JsonCpp)中,std::stringstream 常用于 存储 JSON 序列化的字符串,然后再进行输出或进一步处理。
功能 代码示例 作用 写入字符串流 ss << "Hello, " << "World!";
把数据存入 stringstream
获取字符串内容 std::string result = ss.str();
获取 stringstream
的内容清空 stringstream
ss.str(""); ss.clear();
清空 stringstream
从 stringstream
解析数据ss >> root;
解析 JSON 数据 转换数据类型 ss >> num >> decimal;
从 stringstream
提取数据
反序列化
Json::CharReaderBuilder
//实现反序列化 bool unserialize(std::string& body, Json::Value&val) { body=R"({"姓名":"王伟","年龄":19,"成绩":[10,99.9,6]})"; //先实例化一个工厂类 Json::CharReaderBuilder crb; // 来创建序反列化器 std::unique_ptr<Json::CharReader> cr(crb.newCharReader()); std::string errs; bool ret=cr->parse(body.c_str(),body.c_str()+body.size(),&val,&errs); if(ret==false) { std::cout<<"json unserizlize faile:"<<errs<<std::endl; return false; } std::cout<<"姓名"<<val["姓名"].asString()<<std::endl; std::cout<<"年龄"<<val["年龄"].asInt()<<std::endl; int sz=val["成绩"].size(); for(int i=0;i<sz;i++) std::cout<<"成绩:"<<val["成绩"][i].asFloat()<<std::endl; return true; }
CharReader类中parse()
bool parse( char const* beginDoc, // JSON 字符串的起始地址 char const* endDoc, // JSON 字符串的结束地址 Json::Value* root, // 解析后的 JSON 结构存放的地方 std::string* errs // 错误信息 )
输出信息:
姓名王伟 年龄19 成绩:10 成绩:99.9 成绩:6
步骤 | 代码 | 作用 |
---|---|---|
创建解析器工厂 | Json::CharReaderBuilder crb; | 生成 JSON 解析器 |
创建 JSON 解析器 | std::unique_ptr<Json::CharReader> rc(crb.newCharReader()); | 解析 JSON 字符串 |
解析 JSON | rc->parse(body.c_str(), body.c_str() + body.size(), &val, &errs); | 解析 JSON 并存入 val |
Muduo库
Muduo 是一个 高性能的 C++ 网络库,基于 Reactor 模式 和 多线程编程,适用于高并发服务器开发。它是 基于
epoll
和非阻塞 IO
设计的,适合构建 高效的 TCP 服务器。Muduo 的核心组件 分为 4 大类:
- 事件循环 (
EventLoop
)- TCP 连接管理 (
TcpServer
、TcpClient
)- 网络数据处理 (
Buffer
)- 多线程支持 (
ThreadPool
)
Muduo核心类
1.
EventLoop
(事件循环)作用:
- 负责
epoll
事件处理,管理Channel
(文件描述符)。- 负责定时器、异步任务调度。
- 每个
EventLoop
运行在 单独的线程,通常一个线程只有一个EventLoop
。EventLoop ├── Poller(封装 epoll) ├── TimerQueue(管理定时器) ├── Channel(封装文件描述符)
2.
TcpServer
(TCP 服务器)作用:
- 监听
TCP
端口,管理多个客户端连接。- 通过
Acceptor
处理新连接,通过TcpConnection
处理数据收发。TcpServer ├── EventLoop(事件循环) ├── Acceptor(管理新连接) ├── TcpConnection(管理每个客户端) ├── Buffer(处理数据收发)
3.
TcpClient
(TCP 客户端)作用:
- 连接服务器,发送/接收数据。
- 通过
TcpConnection
进行读写数据。TcpClient ├── EventLoop(事件循环) ├── TcpConnection(管理连接) ├── Buffer(处理数据收发)
4. Buffer
(数据缓冲区)作用:
- 处理 TCP 数据收发,支持零拷贝优化。
ThreadPool ├── Thread(线程管理) ├── TaskQueue(任务队列)
5.ThreadPool
(线程池)作用:
- 用于多线程任务调度,避免线程创建销毁的开销。
ThreadPool ├── Thread(线程管理) ├── TaskQueue(任务队列)
Muduo
├── muduo::base (基础工具库)
│ ├── ThreadPool (线程池)
│ ├── Logging (日志系统)
│ ├── Timestamp (时间处理)
│
├── muduo::net (网络库)
│ ├── EventLoop (事件循环,基于 epoll)
│ ├── TcpServer (TCP 服务器)
│ ├── TcpClient (TCP 客户端)
│ ├── TcpConnection (管理每个 TCP 连接)
│ ├── Buffer (数据缓冲区)
│ ├── Acceptor (监听新连接)
│ ├── TimerQueue (定时任务)
│
├── muduo::http (HTTP 服务器)
├── HttpServer
├── HttpRequest
├── HttpResponse
1.Muduo实现字典服务端
server.cpp
#include<muduo/net/TcpServer.h> #include<muduo/net/EventLoop.h> #include<muduo/net/TcpConnection.h> #include<muduo/net/Buffer.h> #include<iostream> #include<string> #include<unordered_map> class DictServer { public: DictServer(int port):_server(&_baseloop, muduo::net::InetAddress("0.0.0.0",port), "DictServer",muduo::net::TcpServer::kNoReusePort)//kNoReusePort是否启动地址重用? { //设置回调函数 //需要的参数类型 void setConnectionCallback(const ConnectionCallback& cb) //typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback; //TcpConnectionPtr&就是onConnection函数的参数,但onConnection是类成员函数带有this指针 //用bind先绑定this指针(最先bind),其它参数按顺序传 _server.setConnectionCallback(std::bind(&DictServer::onConnection,this,std::placeholders::_1)); _server.setMessageCallback(std::bind(&DictServer::onMessage,this, std::placeholders::_1,std::placeholders::_2,std::placeholders::_3)); } void start() { _server.start();//开始监听 _baseloop.loop();//开始循环事件监控 } private: void onConnection(const muduo::net::TcpConnectionPtr&conn) //连接建立/断开的回调函数 { if(conn->connected())//判断连接是否存在 std::cout<<"连接建立"<<std::endl; else std::cout<<"连接断开"<<std::endl; } //接收到消息的回调函数 void onMessage(const muduo::net::TcpConnectionPtr&conn, muduo::net::Buffer *buf,muduo::Timestamp) { static std::unordered_map<std::string,std::string> dict_map={ {"hello","你好"}, {"coke","小猫"}, }; std::string msg=buf->retrieveAllAsString();//获取缓冲区的全部数据 std::string res; if(dict_map.find(msg)!=dict_map.end()) res=dict_map[msg]; else res="找不到"; conn->send(res); } private: muduo::net::EventLoop _baseloop; muduo::net::TcpServer _server; }; int main() { DictServer server(9090); server.start(); return 0; }
1.设置回调函数时,为什么bind要传this?
_server.setConnectionCallback(std::bind(&DictServer::onConnection, this, std::placeholders::_1));
&DictServer::onConnection 只是成员函数的地址,它并不属于任何对象。
this用于绑定 onConnection 到当前 DictServer 实例,告诉
std::bind
这个函数属于哪个对象.(静态成员函数 不依赖
this
,所以可以直接使用,而 非静态成员函数 需要this
指定对象。)2.回调函数的参数只需要一个,算上this参数不就是两个了?
这是因为 std::bind 生成了一个新的函数对象,这个函数对象的参数列表与 setConnectionCallback() 需要的回调完全匹配,Muduo 可以正确调用它。
所以最终 this 只是 std::bind 内部存储的对象指针,不会影响 setConnectionCallback() 只接受一个参数!
3.std::bind 绑定成员函数时,this 和参数的顺序
在
std::bind
绑定类成员函数时:
this
必须是第一个参数(因为成员函数需要对象实例)。- 之后的参数按照成员函数的参数顺序传递。std::placeholders::_1代表是未提供默认值的 第一个参数的位置,而不是成员函数的第一个参数。
makefile:
CXXFLAGS= -std=c++11 -I../../build/release-install-cpp11/include/ CXXFLAGS=-L../../build/release-install-cpp11/lib -lmuduo_net -lmuduo_base -pthread server: server.cpp g++ $(CXXFLAGS) $^ -o $@ $(CXXFLAGS) clean: rm -f server
CXXFLAGS
C++ 编译选项-I../../build/release-install-cpp11/include/ 指定额外的头文件搜索路径,确保能找到#include<muduo/net/Buffer.h>等相关头文件。
LDFLAGS
链接选项-L../../build/release-install-cpp11/lib
额外的库文件搜索路径,如果
server.cpp
依赖muduo
库(如libmuduo_net.so
),编译器会在../../build/release-install-cpp11/lib/
目录查找这些库。
-lmuduo_net
:链接libmuduo_net.a
或libmuduo_net.so
-lmuduo_base
:链接libmuduo_base.a
或libmuduo_base.so
-lxxx
表示链接libxxx.so
或libxxx.a
,前缀lib
可以省略。#include" " #include<>查找方式
方式 作用 #include "file.h"
先查找当前目录,再查找 -I
目录,最后查找系统路径#include <file.h>
只查找 -I
目录和系统路径,不查找当前目录-I/path/to/include
添加额外头文件目录
netstat -anput | grep 9090
查看当前系统上所有和
9090
端口相关的网络连接。
选项 含义 netstat
显示网络连接、路由表、接口统计等信息 -a
显示所有套接字(包括监听和已建立的连接) -n
以数字格式显示地址(不解析 DNS) -t
仅显示 TCP 连接 -p
显示关联的进程(需要 root
权限)-u
显示 UDP 连接 grep 9090
过滤出包含 9090
端口的结果
2.Muduo实现字典客户端
client.cpp
#include <muduo/net/TcpClient.h> #include <muduo/net/EventLoop.h> #include <muduo/net/TcpConnection.h> #include <muduo/net/EventLoopThread.h> #include <muduo/net/Buffer.h> #include <muduo/base/CountDownLatch.h> #include <iostream> #include <string> class DictClient { public: // TcpClient(EventLoop* loop, // const InetAddress& serverAddr, // const string& nameArg); // InetAddress(StringArg ip, uint16_t port, bool ipv6 = false); DictClient(const std::string &sip, int sport) : _downlactch(1), // 初始化计数器=1 _baseloop(_loopthread.startLoop()), _client(_baseloop, muduo::net::InetAddress(sip, sport), "DictClient") { // 连接事件回调 _client.setConnectionCallback(std::bind(&DictClient::onConnection, this, std::placeholders::_1)); // 连接消息回调 _client.setMessageCallback(std::bind(&DictClient::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); // 连接服务器 client的connect接口是一个非阻塞的操作 // 所有可能会出现connect还没建立连接,conntion接口就获取了连接,send()发送数据 _client.connect(); // 此时计数>0 _downlactch.wait(); // 等连接建立完成countDown()后计数-- =0继续 // //在连接建立完成后,可以直接loop吗? // //开始事件循环监控,内部是死循环,对于客户端来说不能直接使用,因为一旦循环,就走不下去了 // //一般再用一个线程单独loop // _baseloop.loop(); } bool send(const std::string &msg) { if (_conn->connected() == false) { std::cout << "连接断开" << std::endl; return false; } _conn->send(msg); return true; } private: void onConnection(const muduo::net::TcpConnectionPtr &conn) // 连接建立/断开的回调函数 { if (conn->connected()) // 判断连接是否存在 { std::cout << "连接建立" << std::endl; _conn = conn; _downlactch.countDown(); // 计数-- =0 唤醒wait } else std::cout << "连接断开" << std::endl; } // 接收到消息的回调函数 void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp) { std::string res=buf->retrieveAllAsString(); std::cout<<res<<std::endl; } private: muduo::net::TcpConnectionPtr _conn; muduo::CountDownLatch _downlactch; // 做计数同步的类 void wait()计数>0阻塞 countDown()-- 计数=0唤醒wait muduo::net::EventLoopThread _loopthread; //实例化后自动创建一个线程执行loop muduo::net::EventLoop* _baseloop; muduo::net::TcpClient _client; }; int main() { DictClient client("127.0.0.1",9090); while(1) { std::string msg; std::cin>>msg; client.send(msg); } return 0; }
1.使用
CountDownLatch类
确保连接建立后再发送数据,防止send()
失败。2.使用
EventLoopThread
让EventLoop
运行在独立线程,避免main()
被阻塞,确保可以继续处理用户输入和其他操作。如果loop()
运行在main()
线程,那么main()
线程就会被阻塞,无法执行后续代码,比如std::cin
输入等。
makefile
CXX = g++ CXXFLAGS = -std=c++11 -I/root/Json-Rpc/build/release-install-cpp11/include/ -Wall -O2 LDFLAGS = -L/root/Json-Rpc/build/release-install-cpp11/lib -lmuduo_net -lmuduo_base -pthread all: server client server: server.cpp $(CXX) $(CXXFLAGS) $^ -o $@ $(LDFLAGS) client: client.cpp $(CXX) $(CXXFLAGS) $^ -o $@ $(LDFLAGS) clean: rm -f server client
C++11 异步操作
std::future 获取异步结果
std::future是C++11标准库中的⼀个模板类,用于获取异步任务的结果,而不是异步任务本身。当我们在多线程编程中使⽤异步任务时,std::future可以帮助我们在需要的时候获取任务的执⾏结果。std::future的⼀个重要特性是能够阻塞当前线程,直到异步操作完成,从⽽确保我们在获取结果时不会遇到未完成的操作。
std::future
需要搭配其他工具使用C++11 提供了一些能够执行异步任务的工具来与
std::future
配合使用:
std::async
: 用于异步执行一个函数,并返回future
对象,以便获取该函数的执行结果。std::packaged_task
: 将一个可调用对象(如函数)封装为异步任务,使其可以在其他线程中执行,并返回future
获取结果。std::promise
: 用于在线程之间传递数据,一个线程可以设置promise
的值,另一个线程可以通过future
关联promise
来获取该值。
1.std::async关联异步任务
std::async
在 C++11 里是一个很方便的方式来启动异步任务,它的std::launch
选项提供了灵活性,使得任务可以在不同的执行策略下运行:template <typename Function, typename... Args> std::future<std::invoke_result_t<std::decay_t<Function>, std::decay_t<Args>...>> std::async(std::launch policy, Function&& func, Args&&... args);
参数解析
参数 类型 作用 policy
std::launch
指定执行策略,可选 std::launch::async
或std::launch::deferred
func
Function&&
传入的函数,可以是普通函数、lambda、函数对象等 args...
Args&&...
传递给 func
的参数,支持多个参数func传入成员函数时,需要传入this指针
class Test { public: int add(int x, int y) { return x + y; } }; Test t; std::async(std::launch::async, &Test::add, &t, 2, 3); // 传递对象指针
返回值
std::future<T>
,其中T
取决于func(args...)
的调用函数的返回类型。std::future<int> result = std::async(std::launch::async, myFunction, arg1, arg2); int value = result.get(); // 阻塞等待异步任务完成,获取结果
policy选项
std::launch::async
- 任务会在新创建的线程中立即执行,真正的异步执行方式。
- 适用于计算密集型任务或者需要并发执行的任务。
std::launch::deferred
- 任务不会立即执行,而是延迟到
future.get()
或future.wait()
被调用时才会执行。- 适用于一些非计算密集型的任务,避免创建不必要的线程。
std::launch::async | std::launch::deferred
- 由系统决定是创建一个新线程还是延迟执行。
- 如果线程资源紧张,可能会采用
deferred
方式,避免额外的线程创建开销。
std::launch::async
新创建的线程中立即执行,真正的异步执行方式#include<iostream> #include<future> #include <thread> int Add(int x,int y) { std::cout<<"into add"<<std::endl; return x+y; } int main() { std::future<int> res=std::async(std::launch::async,Add,11,22); std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout<<"------------------\n"; //std::future<int>::get(); 用于获取异步任务的结果,如果还没有就会阻塞 std::cout<<res.get()<<std::endl; return 0; }
立即进入Add函数中cout<<into add
td::launch::deferred
延迟到future.get()
或future.wait()
被调用时才会执行。int main() { std::future<int> res=std::async(std::launch::deferred,Add,11,22); std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout<<"------------------\n"; //std::future<int>::get(); 用于获取异步任务的结果,如果还没有就会阻塞 std::cout<<res.get()<<std::endl; return 0; }
等get()获取时,进入Add函数中执行
2.std::packaged_task
std::packaged_task
是 一个对任务的封装,它将一个 可调用对象(函数、lambda、函数对象等)绑定到std::future
,从而 可以异步获取任务结果。
方式 执行控制 线程管理 适用场景 std::async
系统自动管理线程,不需要手动调用 由 std::async
处理,不适合线程池适合简单异步调用 std::packaged_task
需要手动调用 operator()
启动任务开发者可以自己管理线程池 适合任务队列、线程池 template <typename R, typename... Args> class std::packaged_task<R(Args...)>;
参数
R
:被封装的可调用对象的返回类型。Args...
:被封装的可调用对象的参数类型。
使用move()移动task
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
// 定义一个简单的计算函数
int compute(int x, int y) {
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟计算
return x + y;
}
int main() {
// 1. 创建 std::packaged_task,参数为函数签名
std::packaged_task<int(int, int)> task(compute);
// 2. 获取 future 以便稍后获取结果
std::future<int> result = task.get_future();
// 3. 启动任务(这里手动调用 operator())
std::thread t(std::move(task), 10, 20); // 传递参数并在新线程执行
// 4. 主线程可以做别的事情
std::cout << "等待计算结果...\n";
// 5. 获取结果(阻塞直到任务完成)
std::cout << "计算结果: " << result.get() << std::endl;
// 6. 线程回收
t.join();
return 0;
}
使用shared_ptr<>共享task
#include<iostream>
#include<future>
#include <thread>
#include <memory>
int Add(int x,int y)
{
std::cout<<"into add"<<std::endl;
return x+y;
}
int main()
{
//1.封装任务
auto task=std::make_shared<std::packaged_task<int(int,int)>>(Add);
//2.获取任务包关联的future对象
std::future<int> res=task->get_future();
//3.创建线程执行
std::thread thr([task](){
(*task)(1,3);
});
//4.获取结果
std::cout<<res.get()<<std::endl;
thr.join();
return 0;
}
1.创建std::packaged_task。task 是 compute 的封装,但它不会自动执行,需要手动调用。
2.task.get_future()获取std::future。task.get_future() 绑定 了 std::packaged_task,它用于获取任务的 返回值。
3.创建线程 std::thread手动启动任务
4.result.get() 获取异步结果
5.join()阻塞main()直到线程结束。线程还没结束,main()就退出 程序会崩溃。
move()和shared_ptr<>操作task的不同
std::packaged_task
不能直接拷贝,原因在于它的 拷贝构造函数已被删除(std::future 只能与一个 packaged_task 关联)。所以,必须使用指针(如
std::shared_ptr
)或者std::move
进行转移。1.使用move()移动task,会让原本的task变空,不能再进行调用task(1,2);
task 只能移动一次,下次 RPC 任务就必须创建新的 packaged_task。
每次都要创建 std::thread,然后 join() 或 detach(),导致线程频繁创建和销毁.
2.如果你 需要
packaged_task
在多个地方调用,可以使用std::shared_ptr
共享packaged_task
。允许多个线程共享 task,而不会因 std::move 让 task 失效。task 只会在 shared_ptr 计数归零时销毁,确保对象存活。
方案 适用场景 优势 缺点 std::move(packaged_task)
单次任务 代码简单 每次都创建线程,影响性能 std::shared_ptr<packaged_task>
线程池 / 多任务共享 允许多个线程共享任务 额外的引用计数开销
3.std::promise
std::promise
和std::future
结合使用时,可以 手动设置值,让future
在 某个时刻变为就绪状态,进而被get()
读取。相比于std::packaged_task
,std::promise
更灵活,适用于跨线程数据通信。
std::promise
和std::future
的关系
std::future<T>
只能读取数据,但 不能主动设置数据,它依赖于某种方式让数据变为就绪(如std::async
或std::packaged_task
)。std::promise<T>
可以手动设置值,并让std::future<T>
变为就绪状态。- 通过
std::promise::get_future()
获取一个std::future
,然后在另一个线程里promise.set_value()
手动设置值,让future.get()
能够获取到结果。
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
void compute(std::promise<int> resultPromise, int a, int b) {
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟计算
int result = a + b;
resultPromise.set_value(result); // 计算完毕,手动设置值
}
int main() {
// 1. 创建 promise
std::promise<int> resultPromise;
// 2. 获取 future
std::future<int> resultFuture = resultPromise.get_future();
// 3. 启动线程,异步计算
std::thread t(compute, std::move(resultPromise), 10, 20);
std::cout << "等待计算结果...\n";
// 4. 获取结果(会阻塞直到 set_value 被调用)
int result = resultFuture.get();
std::cout << "计算结果: " << result << std::endl;
t.join();
return 0;
}
(1)
std::promise<int> resultPromise;
std::promise<int>
用来存储异步计算的结果。promise.set_value(value);
手动设置值,从而让std::future<int>
变为就绪。(2)
std::future<int> resultFuture = resultPromise.get_future();
get_future()
关联std::future<int>
和std::promise<int>
。resultFuture.get();
阻塞等待值就绪,一旦promise.set_value(value)
被调用,就能获取值。(3) 线程
compute()
计算完后set_value(result);
set_value(result);
手动让future
就绪,使得resultFuture.get();
返回result
。- 如果
set_value()
没有被调用,future.get()
就会一直阻塞,导致死锁。(4) 线程同步
t.join();
确保主线程不会在子线程完成之前结束。
packaged_task和promise对比:
1.promise是对结果的封装。
主要作用是手动设置 std::future 的值,支持跨线程通信。
不执行任务,只是用来存储任务的结果,并让 future.get() 在某个时刻可用。
2.packaged_task是对任务(函数)的封装。
用来封装可调用对象(函数、Lambda),并让 std::future 自动变为就绪。
packaged_task 负责任务的执行,而不是手动设置值。
对比项 std::promise
(封装结果)std::packaged_task
(封装任务)作用 手动设置 future
结果让 future
结果 自动可用如何让 future
获取值?promise.set_value(value);
手动赋值task(args...)
执行任务,自动赋值适用于 跨线程数据传递,RPC 服务器响应 线程池、任务队列,自动执行任务 是否执行任务? ❌ 只存数据,不执行任务 ✅ 绑定任务,并在执行时设置 future
能否重复使用? ✅ 可以,可以多次赋值 ❌ 不可以, packaged_task
只能move()
一次
async直接创建线程完成对函数的调用(
launch::async立刻进入执行,launch::deferred等到get/wait时再执行
),而packaged_task把函数进行打包封装,可以在任意地方进行调用,可以创建线程执行,甚至直接在main函数中执行。promise只对结果封装,不进行函数执行,需要手动设置值,之后才能get()获取结果。