当前位置: 首页 > article >正文

C++ - 从零实现Json-Rpc框架-1(JsonCpp Muduo 异步操作)

RPC(Remote Procedure Call,远程过程调用) 允许程序像调用本地方法一样调用远程方法,而不需要关心底层的 网络通信细节。在你的 C++ RPC 框架 中,RPC 的核心目标就是简化远程方法调用,让开发者无需了解底层网络传输

RPC 的核心概念

概念解释
客户端(Client)发送 RPC 请求,调用远程方法
服务器(Server)接收请求并执行对应的远程方法
序列化(Serialization)把方法名 + 参数转换为可传输的数据格式(JSON/Protobuf)
反序列化(Deserialization)把收到的数据转换回原始方法调用
网络传输(Transport)通过 TCP / HTTP / UDP 传输请求和响应
服务注册 & 发现让客户端找到可用的 RPC 服务器(Zookeeper/Redis)

第三⽅库使⽤介绍

JsonCpp库

Json 是⼀种数据交换格式,它采⽤完全独⽴于编程语⾔的⽂本格式来存储和表⽰数据。

如: 我们想表⽰⼀个同学的学⽣信息
• C 代码表⽰
char *name = "xx";
int age = 18;
float score[3] = {88.5, 99, 58};

• Json 表⽰
{
"姓名" : "xx",
"年龄" : 18,
"成绩" : [88.5, 99, 58],
"爱好" :{
"书籍" : "西游记",
"运动" : "打篮球"
}
}

Json 的数据类型包括对象,数组,字符串,数字等。
• 对象:使⽤花括号 {} 括起来的表⽰⼀个对象
• 数组:使⽤中括号 [] 括起来的表⽰⼀个数组
• 字符串:使⽤常规双引号 "" 括起来的表⽰⼀个字符串
• 数字:包括整形和浮点型,直接使⽤

Json 数据对象类的表⽰

class Json::Value{
Value &operator=(const Value &other); //Value重载了[]和=,因此所有的赋值和获取
数据都可以通过
Value& operator[](const std::string& key);//简单的⽅式完成 val["name"] =
"xx";
Value& operator[](const char* key);
Value removeMember(const char* key);//移除元素
const Value& operator[](ArrayIndex index) const; //val["score"][0]
Value& append(const Value& value);//添加数组元素val["score"].append(88);
ArrayIndex size() const;//获取数组元素个数 val["score"].size();
std::string asString() const;//转string string name =
val["name"].asString();
const char* asCString() const;//转char* char *name =
val["name"].asCString();
Int asInt() const;//转int int age = val["age"].asInt();
float asFloat() const;//转float float weight = val["weight"].asFloat();
bool asBool() const;//转 bool bool ok = val["ok"].asBool();
};

序列化

JsonCpp 中,你可以将变量(如 intfloatstringbool、数组等)赋值给 Json::Value 类型的对象。

1. 直接赋值基本数据类型  int float double bool string const char*(自动转换为 std::string)

Json::Value root;
int age = 25;
double height = 175.5;
bool is_student = true;
std::string name = "Alice";

// 直接赋值
root["年龄"] = age;
root["身高"] = height;
root["是否学生"] = is_student;
root["姓名"] = name;

std::cout << root << std::endl;

2.用append()添加数组元素 std::vector<int> C数组

float grades[] = {85.5, 78.0, 92.3};
Json::Value gradesArray;

for (float grade : grades) {
    gradesArray.append(grade);
}

root["分数"] = gradesArray;

3.赋值嵌套 JSON 结构

Json::Value student;
student["姓名"] = "张三";
student["年龄"] = 20;
student["地址"]["省"] = "北京";
student["地址"]["市"] = "海淀区";

root["学生信息"] = student;

//或者这样嵌套也可以
//Json::Value address;
//address["省"]="北京";
//address["市"]="海淀区";
//student["地址"]=address;
{
  "学生信息": {
    "姓名": "张三",
    "年龄": 20,
    "地址": {
      "省": "北京",
      "市": "海淀区"
    }
  }
}

#include<iostream>
#include<string.h>
#include <sstream>
#include <memory>
#include <jsoncpp/json/json.h>

//实现序列化
void serialize()
{
    const char* name="王伟";
    const int age=19;
    const char* sex="男";
    float score[3]={80,75.6,60};

    Json::Value root;
    root["名字"]=name;
    root["性别"]=sex;
    root["年龄"]=age;
    root["成绩"].append(score[0]);
    root["成绩"].append(score[1]);
    root["成绩"].append(score[2]);

    //先实例化一个工厂类
    Json::StreamWriterBuilder wbuilder;
    // 来创建序列化器
    std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter());

    // 将 JSON 数据输出到字符串
    std::stringstream ss;
    writer->write(root,&ss);

    std::cout << "Serialized JSON:" << std::endl;
    std::cout << ss.str() << std::endl;

}

int main()
{
    serialize();
}
步骤代码作用
1️⃣ 创建 JSON 数据Json::Value root;生成 JSON 数据
2️⃣ 创建 StreamWriterBuilder(工厂类)Json::StreamWriterBuilder wbuilder;准备 JSON 序列化
3️⃣ 通过工厂创建 StreamWriter(序列化器)std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter());生成 JSON 序列化对象
4️⃣ 将 JSON 数据写入 std::stringstreamwriter->write(root, &ss);将 JSON 转换为字符串格式

std::stringstream 是 C++ 标准库中的 字符串流(stringstream),它允许像操作 输入/输出流 那样操作 字符串,主要用于 格式化输入输出 和 字符串转换。

在 JSON 处理(如 JsonCpp)中,std::stringstream 常用于 存储 JSON 序列化的字符串,然后再进行输出或进一步处理。

功能代码示例作用
写入字符串流ss << "Hello, " << "World!";把数据存入 stringstream
获取字符串内容std::string result = ss.str();获取 stringstream 的内容
清空 stringstreamss.str(""); ss.clear();清空 stringstream
stringstream 解析数据ss >> root;解析 JSON 数据
转换数据类型ss >> num >> decimal;stringstream 提取数据

反序列化

 Json::CharReaderBuilder

//实现反序列化
bool unserialize(std::string& body, Json::Value&val)
{
    body=R"({"姓名":"王伟","年龄":19,"成绩":[10,99.9,6]})";
    //先实例化一个工厂类
    Json::CharReaderBuilder crb;
    // 来创建序反列化器
    std::unique_ptr<Json::CharReader> cr(crb.newCharReader());

    std::string errs;
    bool ret=cr->parse(body.c_str(),body.c_str()+body.size(),&val,&errs);
    if(ret==false)
    {
        std::cout<<"json unserizlize faile:"<<errs<<std::endl;
        return false;
    }
    std::cout<<"姓名"<<val["姓名"].asString()<<std::endl;
    std::cout<<"年龄"<<val["年龄"].asInt()<<std::endl;
    int sz=val["成绩"].size();
    for(int i=0;i<sz;i++)
        std::cout<<"成绩:"<<val["成绩"][i].asFloat()<<std::endl;
    return true;
}

CharReader类中parse()

​
bool parse(
    char const* beginDoc,  // JSON 字符串的起始地址
    char const* endDoc,    // JSON 字符串的结束地址
    Json::Value* root,     // 解析后的 JSON 结构存放的地方
    std::string* errs      // 错误信息
) 

输出信息:

姓名王伟
年龄19
成绩:10
成绩:99.9
成绩:6

步骤代码作用
创建解析器工厂Json::CharReaderBuilder crb;生成 JSON 解析器
创建 JSON 解析器std::unique_ptr<Json::CharReader> rc(crb.newCharReader());解析 JSON 字符串
解析 JSONrc->parse(body.c_str(), body.c_str() + body.size(), &val, &errs);解析 JSON 并存入 val

Muduo库

Muduo 是一个 高性能的 C++ 网络库,基于 Reactor 模式多线程编程,适用于高并发服务器开发。它是 基于 epoll非阻塞 IO 设计的,适合构建 高效的 TCP 服务器

Muduo 的核心组件 分为 4 大类

  1. 事件循环 (EventLoop)
  2. TCP 连接管理 (TcpServerTcpClient)
  3. 网络数据处理 (Buffer)
  4. 多线程支持 (ThreadPool)

Muduo核心类

1. EventLoop(事件循环)

作用

  • 负责 epoll 事件处理,管理 Channel(文件描述符)。
  • 负责定时器、异步任务调度。
  • 每个 EventLoop 运行在 单独的线程,通常一个线程只有一个 EventLoop
EventLoop
 ├── Poller(封装 epoll)
 ├── TimerQueue(管理定时器)
 ├── Channel(封装文件描述符)

2. TcpServer(TCP 服务器)

作用

  • 监听 TCP 端口,管理多个客户端连接。
  • 通过 Acceptor 处理新连接,通过 TcpConnection 处理数据收发。
TcpServer
 ├── EventLoop(事件循环)
 ├── Acceptor(管理新连接)
 ├── TcpConnection(管理每个客户端)
 ├── Buffer(处理数据收发)

3. TcpClient(TCP 客户端)

作用

  • 连接服务器,发送/接收数据。
  • 通过 TcpConnection 进行读写数据。
TcpClient
 ├── EventLoop(事件循环)
 ├── TcpConnection(管理连接)
 ├── Buffer(处理数据收发)

4. Buffer(数据缓冲区)

作用

  • 处理 TCP 数据收发,支持零拷贝优化。
ThreadPool
 ├── Thread(线程管理)
 ├── TaskQueue(任务队列)

5.ThreadPool(线程池)

作用

  • 用于多线程任务调度,避免线程创建销毁的开销。
ThreadPool
 ├── Thread(线程管理)
 ├── TaskQueue(任务队列)
Muduo
 ├── muduo::base  (基础工具库)
 │   ├── ThreadPool      (线程池)
 │   ├── Logging         (日志系统)
 │   ├── Timestamp       (时间处理)
 │
 ├── muduo::net   (网络库)
 │   ├── EventLoop       (事件循环,基于 epoll)
 │   ├── TcpServer       (TCP 服务器)
 │   ├── TcpClient       (TCP 客户端)
 │   ├── TcpConnection   (管理每个 TCP 连接)
 │   ├── Buffer          (数据缓冲区)
 │   ├── Acceptor        (监听新连接)
 │   ├── TimerQueue      (定时任务)
 │
 ├── muduo::http  (HTTP 服务器)
     ├── HttpServer
     ├── HttpRequest
     ├── HttpResponse

1.Muduo实现字典服务端

server.cpp

#include<muduo/net/TcpServer.h>
#include<muduo/net/EventLoop.h>
#include<muduo/net/TcpConnection.h>
#include<muduo/net/Buffer.h>
#include<iostream>
#include<string>
#include<unordered_map>

class DictServer
{
public:
    DictServer(int port):_server(&_baseloop, muduo::net::InetAddress("0.0.0.0",port),
    "DictServer",muduo::net::TcpServer::kNoReusePort)//kNoReusePort是否启动地址重用?
    {
        //设置回调函数 
        //需要的参数类型 void setConnectionCallback(const ConnectionCallback& cb)
        //typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
        //TcpConnectionPtr&就是onConnection函数的参数,但onConnection是类成员函数带有this指针
        //用bind先绑定this指针(最先bind),其它参数按顺序传
        _server.setConnectionCallback(std::bind(&DictServer::onConnection,this,std::placeholders::_1));
        _server.setMessageCallback(std::bind(&DictServer::onMessage,this,
            std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
    }
    void start()
    {
        _server.start();//开始监听
        _baseloop.loop();//开始循环事件监控
    }
private:
    void onConnection(const muduo::net::TcpConnectionPtr&conn) //连接建立/断开的回调函数
    {
        if(conn->connected())//判断连接是否存在
            std::cout<<"连接建立"<<std::endl;
        else 
            std::cout<<"连接断开"<<std::endl;
    }
    //接收到消息的回调函数
    void onMessage(const muduo::net::TcpConnectionPtr&conn,
        muduo::net::Buffer *buf,muduo::Timestamp)
    {
        static std::unordered_map<std::string,std::string> dict_map={
            {"hello","你好"},
            {"coke","小猫"},
        };
        std::string msg=buf->retrieveAllAsString();//获取缓冲区的全部数据
        std::string res;
        if(dict_map.find(msg)!=dict_map.end())
            res=dict_map[msg];
        else 
            res="找不到";
        conn->send(res);
    }
private:
    muduo::net::EventLoop _baseloop;
    muduo::net::TcpServer _server;
};

int main()
{
    DictServer server(9090);
    server.start();
    return 0;
}

1.设置回调函数时,为什么bind要传this?

_server.setConnectionCallback(std::bind(&DictServer::onConnection, this, std::placeholders::_1));

&DictServer::onConnection 只是成员函数的地址,它并不属于任何对象。

this用于绑定 onConnection 到当前 DictServer 实例,告诉 std::bind 这个函数属于哪个对象.

(静态成员函数 不依赖 this,所以可以直接使用,而 非静态成员函数 需要 this 指定对象。)

2.回调函数的参数只需要一个,算上this参数不就是两个了?

这是因为 std::bind 生成了一个新的函数对象,这个函数对象的参数列表与 setConnectionCallback() 需要的回调完全匹配,Muduo 可以正确调用它。

所以最终 this 只是 std::bind 内部存储的对象指针,不会影响 setConnectionCallback() 只接受一个参数!

3.std::bind 绑定成员函数时,this 和参数的顺序

std::bind 绑定类成员函数时:

  • this 必须是第一个参数(因为成员函数需要对象实例)。
  • 之后的参数按照成员函数的参数顺序传递。std::placeholders::_1代表是未提供默认值的 第一个参数的位置,而不是成员函数的第一个参数

makefile:

CXXFLAGS= -std=c++11 -I../../build/release-install-cpp11/include/
CXXFLAGS=-L../../build/release-install-cpp11/lib -lmuduo_net -lmuduo_base -pthread
server: server.cpp
	g++ $(CXXFLAGS) $^ -o $@ $(CXXFLAGS)
clean:
	rm -f server

CXXFLAGS C++ 编译选项 

-I../../build/release-install-cpp11/include/ 指定额外的头文件搜索路径,确保能找到#include<muduo/net/Buffer.h>等相关头文件。

LDFLAGS 链接选项

-L../../build/release-install-cpp11/lib 

额外的库文件搜索路径,如果 server.cpp 依赖 muduo 库(如 libmuduo_net.so),编译器会在 ../../build/release-install-cpp11/lib/ 目录查找这些库。

  • -lmuduo_net:链接 libmuduo_net.alibmuduo_net.so
  • -lmuduo_base:链接 libmuduo_base.alibmuduo_base.so
  • -lxxx 表示链接 libxxx.solibxxx.a,前缀 lib 可以省略。

#include" " #include<>查找方式

方式作用
#include "file.h"先查找当前目录,再查找 -I 目录,最后查找系统路径
#include <file.h>只查找 -I 目录和系统路径,不查找当前目录
-I/path/to/include添加额外头文件目录

netstat -anput | grep 9090

查看当前系统上所有和 9090 端口相关的网络连接

选项含义
netstat显示网络连接、路由表、接口统计等信息
-a显示所有套接字(包括监听和已建立的连接)
-n数字格式显示地址(不解析 DNS)
-t仅显示 TCP 连接
-p显示关联的进程(需要 root 权限)
-u显示 UDP 连接
grep 9090过滤出包含 9090 端口的结果

2.Muduo实现字典客户端

client.cpp

#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/Buffer.h>
#include <muduo/base/CountDownLatch.h>
#include <iostream>
#include <string>

class DictClient
{
public:
    // TcpClient(EventLoop* loop,
    //     const InetAddress& serverAddr,
    //     const string& nameArg);
    // InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
    DictClient(const std::string &sip, int sport) : 
    _downlactch(1), // 初始化计数器=1
    _baseloop(_loopthread.startLoop()),
    _client(_baseloop, muduo::net::InetAddress(sip, sport), "DictClient")             
    {
        // 连接事件回调
        _client.setConnectionCallback(std::bind(&DictClient::onConnection, this, std::placeholders::_1));
        // 连接消息回调
        _client.setMessageCallback(std::bind(&DictClient::onMessage, this,
                                             std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

        // 连接服务器 client的connect接口是一个非阻塞的操作
        // 所有可能会出现connect还没建立连接,conntion接口就获取了连接,send()发送数据
        _client.connect();  // 此时计数>0
        _downlactch.wait(); // 等连接建立完成countDown()后计数-- =0继续

        // //在连接建立完成后,可以直接loop吗?
        // //开始事件循环监控,内部是死循环,对于客户端来说不能直接使用,因为一旦循环,就走不下去了
        // //一般再用一个线程单独loop
        // _baseloop.loop();
    }
    bool send(const std::string &msg)
    {
        if (_conn->connected() == false)
        {
            std::cout << "连接断开" << std::endl;
            return false;
        }
        _conn->send(msg);
        return true;
    }

private:
    void onConnection(const muduo::net::TcpConnectionPtr &conn) // 连接建立/断开的回调函数
    {
        if (conn->connected()) // 判断连接是否存在
        {
            std::cout << "连接建立" << std::endl;
            _conn = conn;
            _downlactch.countDown(); // 计数-- =0 唤醒wait
        }
        else
            std::cout << "连接断开" << std::endl;
    }
    // 接收到消息的回调函数
    void onMessage(const muduo::net::TcpConnectionPtr &conn,
                   muduo::net::Buffer *buf, muduo::Timestamp)
    {
        std::string res=buf->retrieveAllAsString();
        std::cout<<res<<std::endl;
    }

private:
    muduo::net::TcpConnectionPtr _conn;
    muduo::CountDownLatch _downlactch; // 做计数同步的类 void wait()计数>0阻塞 countDown()-- 计数=0唤醒wait
    muduo::net::EventLoopThread _loopthread; //实例化后自动创建一个线程执行loop
    muduo::net::EventLoop* _baseloop;
    muduo::net::TcpClient _client;
};

int main()
{
    DictClient client("127.0.0.1",9090);
    while(1)
    {
        std::string msg;
        std::cin>>msg;
        client.send(msg);
    }
    return 0;
}

1.使用 CountDownLatch类 确保连接建立后再发送数据,防止 send() 失败。

2.使用 EventLoopThreadEventLoop 运行在独立线程,避免 main() 被阻塞,确保可以继续处理用户输入和其他操作。如果 loop() 运行在 main() 线程,那么 main() 线程就会被阻塞,无法执行后续代码,比如 std::cin 输入等。

makefile

CXX = g++
CXXFLAGS = -std=c++11 -I/root/Json-Rpc/build/release-install-cpp11/include/ -Wall -O2
LDFLAGS = -L/root/Json-Rpc/build/release-install-cpp11/lib -lmuduo_net -lmuduo_base -pthread
all: server client
server: server.cpp
	$(CXX) $(CXXFLAGS) $^ -o $@ $(LDFLAGS)

client: client.cpp
	$(CXX) $(CXXFLAGS) $^ -o $@ $(LDFLAGS)
clean:
	rm -f server client

C++11 异步操作

std::future 获取异步结果

std::future是C++11标准库中的⼀个模板类,用于获取异步任务的结果,而不是异步任务本身当我们在多线程编程中使⽤异步任务时,std::future可以帮助我们在需要的时候获取任务的执⾏结果。std::future的⼀个重要特性是能够阻塞当前线程,直到异步操作完成,从⽽确保我们在获取结果时不会遇到未完成的操作。

std::future 需要搭配其他工具使用

C++11 提供了一些能够执行异步任务的工具来与 std::future 配合使用:

  • std::async: 用于异步执行一个函数,并返回 future 对象,以便获取该函数的执行结果。
  • std::packaged_task: 将一个可调用对象(如函数)封装为异步任务,使其可以在其他线程中执行,并返回 future 获取结果。
  • std::promise: 用于在线程之间传递数据,一个线程可以设置 promise 的值,另一个线程可以通过 future 关联 promise 来获取该值。

1.std::async关联异步任务

std::async 在 C++11 里是一个很方便的方式来启动异步任务,它的 std::launch 选项提供了灵活性,使得任务可以在不同的执行策略下运行:

template <typename Function, typename... Args>
std::future<std::invoke_result_t<std::decay_t<Function>, std::decay_t<Args>...>>
std::async(std::launch policy, Function&& func, Args&&... args);

参数解析

参数类型作用
policystd::launch指定执行策略,可选 std::launch::asyncstd::launch::deferred
funcFunction&&传入的函数,可以是普通函数、lambda、函数对象等
args...Args&&...传递给 func 的参数,支持多个参数

func传入成员函数时,需要传入this指针

class Test {
public:
    int add(int x, int y) { return x + y; }
};
Test t;
std::async(std::launch::async, &Test::add, &t, 2, 3);  // 传递对象指针

返回值

std::future<T>,其中 T 取决于 func(args...) 的调用函数的返回类型。

std::future<int> result = std::async(std::launch::async, myFunction, arg1, arg2);
int value = result.get(); // 阻塞等待异步任务完成,获取结果

policy选项

  1. std::launch::async

    • 任务会在新创建的线程中立即执行,真正的异步执行方式。
    • 适用于计算密集型任务或者需要并发执行的任务。
  2. std::launch::deferred

    • 任务不会立即执行,而是延迟到 future.get()future.wait() 被调用时才会执行。
    • 适用于一些非计算密集型的任务,避免创建不必要的线程。
  3. std::launch::async | std::launch::deferred

    • 由系统决定是创建一个新线程还是延迟执行。
    • 如果线程资源紧张,可能会采用 deferred 方式,避免额外的线程创建开销。

 std::launch::async 新创建的线程中立即执行,真正的异步执行方式

#include<iostream>
#include<future>
#include <thread>


int Add(int x,int y)
{
    std::cout<<"into add"<<std::endl;
    return x+y;
}

int main()
{
    std::future<int> res=std::async(std::launch::async,Add,11,22);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout<<"------------------\n";
    //std::future<int>::get(); 用于获取异步任务的结果,如果还没有就会阻塞
    std::cout<<res.get()<<std::endl;
    return 0;
}

立即进入Add函数中cout<<into add

 td::launch::deferred延迟到 future.get()future.wait() 被调用时才会执行。

int main()
{
    std::future<int> res=std::async(std::launch::deferred,Add,11,22);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout<<"------------------\n";
    //std::future<int>::get(); 用于获取异步任务的结果,如果还没有就会阻塞
    std::cout<<res.get()<<std::endl;
    return 0;
}

等get()获取时,进入Add函数中执行

2.std::packaged_task

std::packaged_task一个对任务的封装,它将一个 可调用对象(函数、lambda、函数对象等)绑定到 std::future,从而 可以异步获取任务结果

方式执行控制线程管理适用场景
std::async系统自动管理线程,不需要手动调用std::async 处理,不适合线程池适合简单异步调用
std::packaged_task需要手动调用 operator() 启动任务开发者可以自己管理线程池适合任务队列、线程池
template <typename R, typename... Args>
class std::packaged_task<R(Args...)>;

参数

  • R:被封装的可调用对象的返回类型。
  • Args...:被封装的可调用对象的参数类型。

使用move()移动task

#include <iostream>
#include <future>
#include <thread>
#include <chrono>

// 定义一个简单的计算函数
int compute(int x, int y) {
    std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟计算
    return x + y;
}

int main() {
    // 1. 创建 std::packaged_task,参数为函数签名
    std::packaged_task<int(int, int)> task(compute);

    // 2. 获取 future 以便稍后获取结果
    std::future<int> result = task.get_future();

    // 3. 启动任务(这里手动调用 operator())
    std::thread t(std::move(task), 10, 20); // 传递参数并在新线程执行

    // 4. 主线程可以做别的事情
    std::cout << "等待计算结果...\n";

    // 5. 获取结果(阻塞直到任务完成)
    std::cout << "计算结果: " << result.get() << std::endl;

    // 6. 线程回收
    t.join();
    return 0;
}

使用shared_ptr<>共享task

#include<iostream>
#include<future>
#include <thread>
#include <memory>


int Add(int x,int y)
{
    std::cout<<"into add"<<std::endl;
    return x+y;
}
int main()
{
    //1.封装任务
    auto task=std::make_shared<std::packaged_task<int(int,int)>>(Add);
    //2.获取任务包关联的future对象
    std::future<int> res=task->get_future();
    //3.创建线程执行
    std::thread thr([task](){
        (*task)(1,3);
    });
    //4.获取结果
    std::cout<<res.get()<<std::endl;
    thr.join();
    return 0;
}

1.创建std::packaged_task。task 是 compute 的封装,但它不会自动执行,需要手动调用。

2.task.get_future()获取std::future。task.get_future() 绑定 了 std::packaged_task,它用于获取任务的 返回值。

3.创建线程 std::thread手动启动任务

4.result.get() 获取异步结果

5.join()阻塞main()直到线程结束。线程还没结束,main()就退出 程序会崩溃。

move()和shared_ptr<>操作task的不同

std::packaged_task 不能直接拷贝,原因在于它的 拷贝构造函数已被删除(std::future 只能与一个 packaged_task 关联)

所以,必须使用指针(如 std::shared_ptr)或者 std::move 进行转移

1.使用move()移动task,会让原本的task变空,不能再进行调用task(1,2);

task 只能移动一次,下次 RPC 任务就必须创建新的 packaged_task。

每次都要创建 std::thread,然后 join() 或 detach(),导致线程频繁创建和销毁.

2.如果你 需要 packaged_task 在多个地方调用,可以使用 std::shared_ptr 共享 packaged_task 。允许多个线程共享 task,而不会因 std::move 让 task 失效。task 只会在 shared_ptr 计数归零时销毁,确保对象存活。

方案适用场景优势缺点
std::move(packaged_task)单次任务代码简单每次都创建线程,影响性能
std::shared_ptr<packaged_task>线程池 / 多任务共享允许多个线程共享任务额外的引用计数开销

3.std::promise

std::promisestd::future 结合使用时,可以 手动设置值,让 future某个时刻变为就绪状态,进而被 get() 读取。相比于 std::packaged_taskstd::promise 更灵活,适用于跨线程数据通信

std::promisestd::future 的关系

  • std::future<T> 只能读取数据,但 不能主动设置数据,它依赖于某种方式让数据变为就绪(如 std::asyncstd::packaged_task)。
  • std::promise<T> 可以手动设置值,并让 std::future<T> 变为就绪状态。
  • 通过 std::promise::get_future() 获取一个 std::future,然后在另一个线程promise.set_value() 手动设置值,让 future.get() 能够获取到结果。
#include <iostream>
#include <thread>
#include <future>
#include <chrono>

void compute(std::promise<int> resultPromise, int a, int b) {
    std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟计算
    int result = a + b;
    resultPromise.set_value(result); // 计算完毕,手动设置值
}

int main() {
    // 1. 创建 promise
    std::promise<int> resultPromise;

    // 2. 获取 future
    std::future<int> resultFuture = resultPromise.get_future();

    // 3. 启动线程,异步计算
    std::thread t(compute, std::move(resultPromise), 10, 20);

    std::cout << "等待计算结果...\n";
    
    // 4. 获取结果(会阻塞直到 set_value 被调用)
    int result = resultFuture.get();
    
    std::cout << "计算结果: " << result << std::endl;

    t.join();
    return 0;
}

(1) std::promise<int> resultPromise;

  • std::promise<int> 用来存储异步计算的结果
  • promise.set_value(value); 手动设置值,从而让 std::future<int> 变为就绪。

(2) std::future<int> resultFuture = resultPromise.get_future();

  • get_future() 关联 std::future<int>std::promise<int>
  • resultFuture.get(); 阻塞等待值就绪,一旦 promise.set_value(value) 被调用,就能获取值。

(3) 线程 compute() 计算完后 set_value(result);

  • set_value(result); 手动让 future 就绪,使得 resultFuture.get(); 返回 result
  • 如果 set_value() 没有被调用,future.get() 就会一直阻塞,导致死锁

(4) 线程同步

  • t.join(); 确保主线程不会在子线程完成之前结束

packaged_task和promise对比:

1.promise是对结果的封装。

主要作用是手动设置 std::future 的值,支持跨线程通信。

不执行任务,只是用来存储任务的结果并让 future.get() 在某个时刻可用。

2.packaged_task是对任务(函数)的封装。

用来封装可调用对象(函数、Lambda),并让 std::future 自动变为就绪

packaged_task 负责任务的执行,而不是手动设置值。

对比项std::promise(封装结果)std::packaged_task(封装任务)
作用手动设置 future 结果future 结果 自动可用
如何让 future 获取值?promise.set_value(value); 手动赋值task(args...) 执行任务,自动赋值
适用于跨线程数据传递,RPC 服务器响应线程池、任务队列,自动执行任务
是否执行任务?❌ 只存数据,不执行任务✅ 绑定任务,并在执行时设置 future
能否重复使用?可以,可以多次赋值不可以packaged_task 只能 move() 一次

async直接创建线程完成对函数的调用(launch::async立刻进入执行,launch::deferred等到get/wait时再执行),而packaged_task把函数进行打包封装,可以在任意地方进行调用,可以创建线程执行,甚至直接在main函数中执行。promise只对结果封装,不进行函数执行,需要手动设置值,之后才能get()获取结果。


http://www.kler.cn/a/594774.html

相关文章:

  • 四、小白学JAVA-石头剪刀布游戏
  • YZi Labs 谈对 Plume 的投资:利用区块链创造现实价值的典范项目
  • 【Linux】Makefile秘籍
  • 前端技巧:精准判断登录设备是移动端还是 PC 端
  • 数据可视化(matplotlib)-------辅助图标的设置
  • 一键融合,尽享全能生活:三网融合系统在酒店弱电方案中的应用探索
  • 【嵌入式】复刻SQFMI开源的Watchy墨水屏电子表——(2)软件部分
  • NineData云原生智能数据管理平台新功能发布|2025年2月版
  • ​《引力透镜:Relax Max用哈勃光学系统重构排泄物天体力学》​
  • MapStruct 使用教程
  • 技术分享 | MySQL内存使用率高问题排查
  • 如何用C++封装纯C写的函数库,如何处理C函数调用返回错误
  • OpenNJet:下一代云原生应用引擎,支持动态配置与高效管理,简化运维任务,提升应用灵活性与安全性。
  • 【Docker入门】用Docker启动项目
  • Leetcode 378. 有序矩阵中第 K 小的元素 二分查找
  • 【uni-app】集成SQLite,无服务数据库
  • 上海蒂正科技有限公司:技术驱动数字化,打造高端企业门户新标杆
  • Web-Machine-N7靶机攻略
  • C# 派生 详解
  • ONE Deep模型:LG AI Research的开源突破