当前位置: 首页 > article >正文

解锁高效编程:C++异步框架WorkFlow

在 C++ 编程的领域中,随着业务场景日益复杂,对程序性能和响应速度的要求也愈发严苛。传统的同步编程模式在面对高并发、I/O 密集型任务时,常常显得力不从心,成为阻碍程序高效运行的瓶颈。而异步编程,则为我们打开了一扇通往高效世界的大门。

今天,我们将聚焦于一款强大的 C++ 异步框架 ——WorkFlow,一同深入探索它如何巧妙地运用异步技术,为开发者们解锁高效编程的新境界,让代码在复杂的任务中也能流畅且快速地运行。

一、引言

在 C++ 开发的广袤天地中,我们常常会遭遇各种棘手的问题,尤其是在异步处理这块充满挑战的领域。想象一下,你正在构建一个高并发的网络应用程序,用户请求如潮水般涌来 。每一个请求都需要进行一系列复杂的操作,例如从数据库读取数据、进行网络请求获取额外信息,以及对数据进行复杂的计算和处理。

在传统的同步处理模式下,当一个请求到达时,程序会按部就班地处理完所有操作,然后再去响应下一个请求。这就好比在餐厅里,服务员一次只能接待一位顾客,只有当这位顾客的所有需求都满足后,才能去服务下一位顾客。在并发量较低的情况下,这种方式或许还能应付得来。但一旦并发量飙升,就像餐厅突然涌入了大量顾客,服务员就会应接不暇,导致顾客等待时间过长,甚至有些顾客因为等待太久而选择离开。

具体到技术层面,这种同步处理方式会带来严重的效率瓶颈。在等待 I/O 操作完成的过程中,线程处于阻塞状态,无法执行其他任务。这就相当于服务员在等待厨房做菜的过程中,什么也不做,白白浪费了时间和资源。而且,随着并发请求的增加,线程上下文切换的开销也会变得越来越大,进一步降低了系统的性能。

为了解决这些问题,我们引入了异步处理的概念。异步处理就像是餐厅里配备了多个服务员,每个服务员都可以同时处理不同顾客的需求。当一个服务员在等待厨房做菜时,可以去服务其他顾客,从而提高了整体的服务效率。在 C++ 中,实现异步处理的方式有很多种,例如使用多线程、异步 I/O 等。然而,这些方式往往需要开发者手动管理线程、锁等复杂的资源,容易出错,且开发成本较高。

那么,有没有一种更简单、高效的方式来实现 C++ 的异步处理呢?这时候,WorkFlow 这款强大的异步框架就应运而生了。它就像是一位经验丰富的餐厅经理,能够合理地调度服务员(线程),高效地处理顾客(请求)的需求,让开发者能够轻松地应对高并发场景下的异步处理挑战。

二、WorkFlow框架详解

WorkFlow 是搜狗公司开源的一款 C++ 服务器引擎,它是新一代基于任务流模型的异步调度编程范式 ,在 C++ 服务器开发领域占据着举足轻重的地位。其设计目标是为了支撑搜狗几乎所有后端 C++ 在线服务,包括搜索服务、云输入法、在线广告等,每日能够处理数百亿的请求,可见其性能之强大。

从本质上来说,WorkFlow 是一个异步任务调度框架,它巧妙地封装了 CPU 计算、GPU 计算、网络、磁盘 I/O、定时器、计数器这 6 种异步资源,并以回调函数模式提供给用户使用。这就好比为开发者打造了一个功能齐全的工具箱,开发者可以根据实际需求,轻松地使用这些工具来构建复杂的应用程序。

在服务器开发中,WorkFlow 的作用不可小觑。它能够屏蔽阻塞调用的影响,将阻塞调用的开发接口转化为异步接口,从而充分利用计算资源。这意味着在处理 I/O 操作时,线程不会被阻塞,而是可以去执行其他任务,大大提高了系统的并发处理能力。同时,WorkFlow 还管理着线程池,使得开发者能够迅速构建并行计算程序。通过合理地调度线程,它能够让服务器资源得到更充分的利用,确保在高并发场景下,服务器依然能够高效、稳定地运行。

举个例子,假设我们正在开发一个电商平台的服务器,用户在浏览商品时,可能会同时触发多个请求,如获取商品详情、查询库存、推荐相关商品等。使用 WorkFlow,我们可以将这些请求封装成一个个异步任务,让它们在不同的线程中并行执行,从而快速响应用户的操作,提升用户体验。

2.1安装workflow

首先,需要先下载workflow的源码,可以选择下载release版本或者直接在github当中克隆最新的版本。

git clone https://github.com/sogou/workflow.git
如果克隆失败,可以下载zip压缩包然后解压代码文件或者是下载release文件

随后,安装所有依赖的库文件:

sudo apt install -y cmake libssl-dev

随后,使用cmake生成Makefile文件

mkdir build
cd build
cmake ..(如果报错 sudo apt install libssl1.1 or libssl-dev)

使用 make 编译链接生成动态库。

make

最后,使用 make install 将库文件和头文件移动到操作系统的合适位置,并且更新链接器的配置:

sudo make install
sudo ldconfig

测试是否安装成功

g++ tutorial-00-helloworld.cc -lworkflow

2.2http的客户端

利用workflow来实现一个http客户端基本流程:

  • 使用工厂函数,根据任务类型HTTP,创建一个任务对象;

  • 设置任务的属性;

  • 为任务绑定一个回调函数;

  • 启动任务

在workflow当中,所有任务对象都是使用工厂函数来创建的。在创建任务的时候,还可以设置一些属性,比如要连接的服务端的url、最大重定向次数、连接失败的时候的重试次数和用户的回调函数(没有回调函数则传入nullptr)。

class WFTaskFactory
{
public:
	static WFHttpTask *create_http_task(const std::string& url,//要连接的服务端的url
										int redirect_max,//最大重定向次数
										int retry_max,//连接失败的时候的重试次数
										http_callback_t callback);//回调函数
};

在创建任务对象之后,启动任务对象之前,可以用访问任务对象的方法去修改任务的属性。

using WFHttpTask = WFNetworkTask<protocol::HttpRequest,
								 protocol::HttpResponse>;
REQ *get_req();//获取指向请求的指针
void set_callback(std::function<void (WFNetworkTask<REQ, RESP> *)> cb);//设置回调函数

关于HTTP的请求和响应,实际会存在更多相关的接口。

class HttpMessage : public ProtocolMessage
{
public:
	const char *get_http_version() const;
	
	bool set_http_version(const char *version);
	
	bool add_header_pair(const char *name, const char *value);
	
	bool set_header_pair(const char *name, const char *value);
	
	bool get_parsed_body(const void **body, size_t *size) const;
	
	/* Output body is for sending. Want to transfer a message received, maybe:
	* msg->get_parsed_body(&body, &size);
	* msg->append_output_body_nocopy(body, size); */
	bool append_output_body(const void *buf, size_t size);
	
	bool append_output_body_nocopy(const void *buf, size_t size);
	
	void clear_output_body();
	
	size_t get_output_body_size() const;
	//上述接口都有std::string版本
	//...
};
class HttpRequest : public HttpMessage
{
public:
	const char *get_method() const;
	
	const char *get_request_uri() const;
	
	bool set_method(const char *method);
	
	bool set_request_uri(const char *uri);
  	//上述接口都有std::string版本
	//...
};

class HttpResponse : public HttpMessage
{
public:
	const char *get_status_code() const;
	
	const char *get_reason_phrase() const;
	
	bool set_status_code(const char *code);
	
	bool set_reason_phrase(const char *phrase);
	
	/* Tell the parser, it is a HEAD response. */
	void parse_zero_body();
	//上述接口都有std::string版本
	//...
};

调用start方法可以异步启动任务。需要值得特别注意的是,只有客户端才可以调用start方法。通过观察得知,start方法的底层逻辑就是根据本任务对象创建一个序列,其中本任务是序列当中的第一个任务,随后启动该任务。

/* start(), dismiss() are for client tasks only. */
void start()
{
	assert(!series_of(this));
	Workflow::start_series_work(this, nullptr);
}

2.3回调函数的设计

当任务的基本工作完成之后,就会执行用户设置的回调函数,在回调函数当中,可以获取本次任务的执行情况。
针对http任务,回调函数在执行过程中可以获取本次任务的执行状态和失败的原因。

template<class REQ, class RESP>
class WFNetworkTask : public CommRequest
{
public:
	// ...
	int get_state() const { return this->state; }
	int get_error() const { return this->error; }
	// ...
}

下面是使用状态码和错误码的例子。当http基本工作执行正常的时候,此时状态码为WFT_STATE_SUCCESS ,当出现系统错误的时候,此时状态码为 WFT_STATE_SYS_ERROR ,可以使用strerror 获取报错信息。当出现url错误的使用,此时状态码为 WFT_STATE_DNS_ERROR ,可以使用gai_strerror 获取报错信息。

#include "unixHeader.h"
#include <workflow/HttpMessage.h>
#include <workflow/HttpUtil.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/WFFacilities.h>
static WFFacilities::WaitGroup wait_group(1);
void sig_handler(int signo){
	wait_group.done();
}
void callback(WFHttpTask *httpTask){
	int state = httpTask->get_state();
	int error = httpTask->get_error();
	switch (state){
		case WFT_STATE_SYS_ERROR:
			fprintf(stderr, "system error: %s\n", strerror(error));
			break;
		case WFT_STATE_DNS_ERROR:
			fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
			break;
		case WFT_STATE_SUCCESS:
			break;
	}
	if (state != WFT_STATE_SUCCESS){
		fprintf(stderr, "Failed. Press Ctrl-C to exit.\n");
		return;
	}
	fprintf(stderr, "success\n");
	wait_group.done();
}

int main(int argc, char *argv[]){
	std::string url = "http://";
	url.append(argv[1]);
	signal(SIGINT, sig_handler);
	auto httpTask = WFTaskFactory::create_http_task(url, 0, 0, callback);
	protocol::HttpRequest *req = httpTask->get_req();
	req->add_header_pair("Accept", "*/*");
	req->add_header_pair("User-Agent", "TestAgent");
	req->add_header_pair("Connection", "close");
	httpTask->start();
	wait_group.wait();
	return 0;
}

在使用回调函数的时候,还可以获取http请求报文和响应报文的内容。

template<class REQ, class RESP>
class WFNetworkTask : public CommRequest
{
	// ...
public:
	REQ *get_req() { return &this->req; }
	RESP *get_resp() { return &this->resp; }
 	// ...
}
//其中http任务的实例化版本
//REQ -> protocol::HttpRequest
//RESP -> protocol::HttpResponse

下面是样例代码:

void callback(WFHttpTask *task){
	protocol::HttpRequest *req = task->get_req();
	protocol::HttpResponse *resp = task->get_resp();
	// ...
	fprintf(stderr, "%s %s %s\r\n", req->get_method(),
	req->get_http_version(),
	req->get_request_uri());
	// ...
	fprintf(stderr, "%s %s %s\r\n", resp->get_http_version(),
	resp->get_status_code(),
	resp->get_reason_phrase());
	// ...
}

对于首部字段,workflow提供 protocol::HttpHeaderCursor 类型作为遍历所有首部字段的迭代器。next 方法负责找到下一对首部字段键值对,倘若已经解析完成,就会返回 false 。find 会根据首部字段的键,找到对应的值,值得注意的是, find 方法会修改迭代器的位置。

class HttpHeaderCursor{
//...
public:
	bool next(std::string& name, std::string& value);
	bool find(const std::string& name, std::string& value);
	void rewind();
	//...
};

下面是样例:

void callback(WFHttpTask *task){
	protocol::HttpRequest *req = task->get_req();
	protocol::HttpResponse *resp = task->get_resp();
	//...
	std::string name;
	std::string value;
	// ....
	// 遍历请求报文的首部字段
	protocol::HttpHeaderCursor req_cursor(req);
	while (req_cursor.next(name, value)){
		fprintf(stderr, "%s: %s\r\n", name.c_str(), value.c_str());
 	}
	fprintf(stderr, "\r\n");
 	// 遍历响应报文的首部字段
	protocol::HttpHeaderCursor resp_cursor(resp);
	while (resp_cursor.next(name, value)){
		fprintf(stderr, "%s: %s\r\n", name.c_str(), value.c_str());
 	}
	fprintf(stderr, "\r\n");
 	//...
}

对于http报文的报文体,可以使用 get_parsed_body 方法获取报文的内容,需要注意的是它的用法。

	//...
	// 首先需要定义一个指针变量,该指针的基类型是const void
	const void *body;
	size_t body_len;
	// 将指针变量的地址传入get_parsed_body方法中,指针变量将要指向报文体
	resp->get_parsed_body(&body, &body_len);
	fwrite(body, 1, body_len, stdout);
	fflush(stdout);
	//...

三、WorkFlow独特功能

3.1强大的异步资源封装

WorkFlow 的一大亮点就是对多种异步资源的强大封装能力。它如同一个万能收纳盒,将 CPU 计算、GPU 计算、网络、磁盘 I/O、定时器、计数器这 6 种异步资源有序整合 。

在 CPU 计算方面,WorkFlow 提供了简洁的接口来处理复杂的计算任务。例如,当我们需要对一组数据进行快速排序时,可以这样使用:

#include "workflow/WFFacilities.h"

using namespace wf;

int main()
{
    // 创建一个CPU任务工厂
    WFFacilities::WaitGroup waitGroup(1);
    WFCPUTask *task = WFTaskFactory::create_cpu_task([](WFCPUTask *task) {
        // 这里进行具体的CPU计算操作,比如快速排序
        int data[] = {5, 3, 8, 1, 2};
        // 简单的快速排序实现示例
        int n = sizeof(data) / sizeof(data[0]);
        for (int i = 0; i < n - 1; ++i) {
            for (int j = 0; j < n - i - 1; ++j) {
                if (data[j] > data[j + 1]) {
                    int temp = data[j];
                    data[j] = data[j + 1];
                    data[j + 1] = temp;
                }
            }
        }
        // 可以将结果存储在task的自定义数据区等操作
    }, [&waitGroup](WFCPUTask *task) {
        waitGroup.done();
    });
    task->start();
    waitGroup.wait();
    return 0;
}

在这个示例中,我们通过WFTaskFactory::create_cpu_task创建了一个 CPU 任务,将快速排序的计算逻辑放在任务的回调函数中。当任务执行完成后,会触发第二个回调函数,通知WaitGroup任务已完成。

对于 GPU 计算,假设我们要使用 CUDA 进行矩阵乘法,WorkFlow 也能很好地支持。首先需要确保系统已经安装了 CUDA 环境,然后可以这样编写代码:

#include "workflow/WFFacilities.h"
#include <cuda_runtime.h>

using namespace wf;

// CUDA核函数,用于矩阵乘法
__global__ void matrixMultiplication(float *a, float *b, float *c, int size)
{
    int row = blockIdx.y * blockDim.y + threadIdx.y;
    int col = blockIdx.x * blockDim.x + threadIdx.x;
    if (row < size && col < size) {
        float sum = 0;
        for (int i = 0; i < size; ++i) {
            sum += a[row * size + i] * b[i * size + col];
        }
        c[row * size + col] = sum;
    }
}

int main()
{
    WFFacilities::WaitGroup waitGroup(1);
    WFCPUTask *prepareTask = WFTaskFactory::create_cpu_task([](WFCPUTask *task) {
        // 初始化矩阵数据等准备工作
        int size = 1024;
        float *hostA = new float[size * size];
        float *hostB = new float[size * size];
        float *hostC = new float[size * size];
        for (int i = 0; i < size * size; ++i) {
            hostA[i] = 1.0f;
            hostB[i] = 2.0f;
        }
        float *deviceA, *deviceB, *deviceC;
        cudaMalloc((void**)&deviceA, size * size * sizeof(float));
        cudaMalloc((void**)&deviceB, size * size * sizeof(float));
        cudaMalloc((void**)&deviceC, size * size * sizeof(float));
        cudaMemcpy(deviceA, hostA, size * size * sizeof(float), cudaMemcpyHostToDevice);
        cudaMemcpy(deviceB, hostB, size * size * sizeof(float), cudaMemcpyHostToDevice);
        // 将设备指针和相关参数传递给GPU任务
        task->user_data = new GPUData{deviceA, deviceB, deviceC, size};
    }, [&waitGroup](WFCPUTask *task) {
        // 启动GPU任务
        GPUData *data = (GPUData*)task->user_data;
        WFGPUTask *gpuTask = WFTaskFactory::create_gpu_task(matrixMultiplication, data->deviceA, data->deviceB, data->deviceC, data->size, [&waitGroup](WFGPUTask *task) {
            // GPU任务完成后,将结果从设备拷贝回主机
            GPUData *data = (GPUData*)task->user_data;
            float *hostC = new float[data->size * data->size];
            cudaMemcpy(hostC, data->deviceC, data->size * data->size * sizeof(float), cudaMemcpyDeviceToHost);
            // 释放设备内存
            cudaFree(data->deviceA);
            cudaFree(data->deviceB);
            cudaFree(data->deviceC);
            delete data;
            waitGroup.done();
        });
        gpuTask->start();
    });
    prepareTask->start();
    waitGroup.wait();
    return 0;
}

在这个例子中,我们首先通过 CPU 任务进行矩阵数据的初始化和设备内存的分配,然后将相关数据传递给 GPU 任务。GPU 任务执行 CUDA 核函数进行矩阵乘法,完成后再将结果从设备拷贝回主机。

在网络请求方面,以常见的 HTTP 请求为例,WorkFlow 提供了直观的接口。如果我们要获取某个网页的内容,可以这样实现:

#include "workflow/WFHttpServer.h"
#include "workflow/WFHttpUtil.h"

using namespace protocol;

void onHttpResponse(WFHttpTask *task)
{
    HttpResponse *resp = task->get_resp();
    if (resp->get_status_code() == 200) {
        const void *body;
        size_t len;
        resp->get_parsed_body(&body, &len);
        std::string content((const char*)body, len);
        // 在这里可以对获取到的网页内容进行处理
        std::cout << "网页内容: " << content << std::endl;
    } else {
        std::cout << "请求失败,状态码: " << resp->get_status_code() << std::endl;
    }
}

int main()
{
    WFHttpTask *task = WFTaskFactory::create_http_task("http://www.example.com", 1, 0, onHttpResponse);
    task->start();
    // 可以使用WaitGroup等方式等待任务完成
    return 0;
}

这段代码通过WFTaskFactory::create_http_task创建了一个 HTTP 任务,指定了要请求的 URL,并在回调函数onHttpResponse中处理服务器返回的响应。

磁盘 I/O 方面,假设我们要异步读取一个文件的内容,代码如下:

#include "workflow/WFFacilities.h"
#include <iostream>
#include <fstream>

using namespace wf;

void readFileCallback(WFCPUTask *task)
{
    std::ifstream file("example.txt", std::ios::binary);
    if (file.is_open()) {
        std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
        // 在这里可以对读取到的文件内容进行处理
        std::cout << "文件内容: " << content << std::endl;
        file.close();
    } else {
        std::cout << "无法打开文件" << std::endl;
    }
}

int main()
{
    WFFacilities::WaitGroup waitGroup(1);
    WFCPUTask *task = WFTaskFactory::create_cpu_task(readFileCallback, [&waitGroup](WFCPUTask *task) {
        waitGroup.done();
    });
    task->start();
    waitGroup.wait();
    return 0;
}

这里通过WFTaskFactory::create_cpu_task创建了一个 CPU 任务来执行文件读取操作,在回调函数中打开文件并读取内容。

对于定时器,WorkFlow 可以方便地设置定时任务。比如,我们要每隔 1 秒执行一次某个操作,可以这样实现:

#include "workflow/WFFacilities.h"
#include <iostream>

using namespace wf;

void timerCallback(WFCPUTask *task)
{
    static int count = 0;
    std::cout << "定时器触发,第 " << ++count << " 次" << std::endl;
    // 可以在这里进行具体的操作
}

int main()
{
    WFFacilities::WaitGroup waitGroup(1);
    WFCPUTask *task = WFTaskFactory::create_cpu_task(timerCallback, [&waitGroup](WFCPUTask *task) {
        // 重新设置定时器,实现每隔1秒触发
        WFCTask *timerTask = WFTaskFactory::create_timer_task(1000, true, [&waitGroup](WFCTask *task) {
            WFCPUTask *newTask = WFTaskFactory::create_cpu_task(timerCallback, [&waitGroup](WFCPUTask *task) {
                // 再次设置定时器,循环执行
                WFCTask *newTimerTask = WFTaskFactory::create_timer_task(1000, true, [&waitGroup](WFCTask *task) {
                    // 可以根据需要停止定时器等操作
                    waitGroup.done();
                });
                newTimerTask->start();
            });
            newTask->start();
        });
        timerTask->start();
    });
    task->start();
    waitGroup.wait();
    return 0;
}

在这个例子中,通过WFTaskFactory::create_timer_task创建了一个定时器任务,设置为每隔 1 秒触发一次,并在定时器触发的回调函数中重新创建定时器任务,实现循环定时触发。

计数器的使用也很简单,假设我们要统计某个事件发生的次数,可以这样写:

#include "workflow/WFFacilities.h"
#include <iostream>

using namespace wf;

void eventCallback(WFCPUTask *task)
{
    static WFCounter counter(0);
    counter.increment();
    std::cout << "事件发生次数: " << counter.get() << std::endl;
}

int main()
{
    WFFacilities::WaitGroup waitGroup(1);
    WFCPUTask *task = WFTaskFactory::create_cpu_task(eventCallback, [&waitGroup](WFCPUTask *task) {
        // 模拟多次事件发生
        for (int i = 0; i < 5; ++i) {
            WFCPUTask *newTask = WFTaskFactory::create_cpu_task(eventCallback, [&waitGroup](WFCPUTask *task) {
                waitGroup.done();
            });
            newTask->start();
        }
    });
    task->start();
    waitGroup.wait();
    return 0;
}

在这个代码中,定义了一个WFCounter计数器,在每次事件发生的回调函数中通过increment方法增加计数器的值,并通过get方法获取当前计数值。

3.2高效的任务调度

WorkFlow 引入了子任务的概念,这是其高效任务调度的核心。子任务就像是一个个小的工作单元,开发者可以将复杂的业务逻辑拆分成多个子任务 。这些子任务可以以串行、并行的方式进行组织调度,极大地提高了任务执行的灵活性和效率。

在串行调度中,子任务会按照顺序依次执行。例如,我们要实现一个用户注册的功能,需要先检查用户名是否已存在,然后再将用户信息插入数据库。可以这样实现:

#include "workflow/WFFacilities.h"
#include <iostream>
#include <mysql/mysql.h>

using namespace wf;

// 假设这是检查用户名是否存在的函数
bool checkUsernameExists(const std::string& username)
{
    // 这里省略具体的数据库连接和查询代码
    // 简单返回一个示例结果
    return false;
}

// 假设这是插入用户信息到数据库的函数
bool insertUserInfo(const std::string& username, const std::string& password)
{
    // 这里省略具体的数据库连接和插入代码
    // 简单返回一个示例结果
    return true;
}

void firstSubtask(WFSubTask *subTask)
{
    std::string username = "testUser";
    if (checkUsernameExists(username)) {
        std::cout << "用户名已存在" << std::endl;
        // 可以在这里设置错误状态等
    } else {
        // 将用户名传递给下一个子任务
        subTask->user_data = new std::string(username);
        // 启动下一个子任务
        WFSubTask *nextSubTask = WFTaskFactory::create_subtask([](WFSubTask *subTask) {
            std::string *username = (std::string*)subTask->user_data;
            std::string password = "testPassword";
            if (insertUserInfo(*username, password)) {
                std::cout << "用户注册成功" << std::endl;
            } else {
                std::cout << "用户注册失败" << std::endl;
            }
            delete username;
        }, nullptr);
        nextSubTask->start();
    }
}

int main()
{
    WFFacilities::WaitGroup waitGroup(1);
    WFSubTask *subTask = WFTaskFactory::create_subtask(firstSubtask, [&waitGroup](WFSubTask *subTask) {
        waitGroup.done();
    });
    subTask->start();
    waitGroup.wait();
    return 0;
}

在这个例子中,firstSubtask子任务先检查用户名是否存在,如果不存在则将用户名传递给下一个子任务进行用户信息插入。

在并行调度中,多个子任务可以同时执行。比如,我们要同时获取多个网站的内容,并对内容进行分析。可以这样实现:

#include "workflow/WFFacilities.h"
#include "workflow/WFHttpServer.h"
#include "workflow/WFHttpUtil.h"
#include <iostream>
#include <vector>

using namespace protocol;
using namespace wf;

void analyzeContent(const std::string& content)
{
    // 这里进行内容分析的具体逻辑,比如统计字数等
    std::cout << "内容字数: " << content.size() << std::endl;
}

void httpCallback(WFHttpTask *task)
{
    HttpResponse *resp = task->get_resp();
    if (resp->get_status_code() == 200) {
        const void *body;
        size_t len;
        resp->get_parsed_body(&body, &len);
        std::string content((const char*)body, len);
        analyzeContent(content);
    } else {
        std::cout << "请求失败,状态码: " << resp->get_status_code() << std::endl;
    }
}

int main()
{
    WFFacilities::WaitGroup waitGroup(3);
    std::vector<WFHttpTask*> tasks;
    std::vector<std::string> urls = {"http://www.example1.com", "http://www.example2.com", "http://www.example3.com"};
    for (const auto& url : urls) {
        WFHttpTask *task = WFTaskFactory::create_http_task(url, 1, 0, [&waitGroup](WFHttpTask *task) {
            waitGroup.done();
        });
        tasks.push_back(task);
        task->start();
    }
    waitGroup.wait();
    for (auto task : tasks) {
        delete task;
    }
    return 0;
}

在这段代码中,我们创建了多个 HTTP 任务,分别请求不同的 URL,这些任务会并行执行。当每个任务完成后,会在回调函数中对获取到的网页内容进行分析。

通过这种灵活的任务调度方式,在处理复杂任务时,WorkFlow 的优势就得以凸显。例如,在一个电商系统中,当用户下单后,系统需要同时进行库存扣减、订单记录插入数据库、发送通知邮件等操作。使用 WorkFlow,我们可以将这些操作分别封装成子任务,然后以并行的方式执行,大大缩短了整个下单流程的处理时间,提高了系统的响应速度和用户体验 。同时,对于一些有依赖关系的任务,如先进行用户身份验证,再根据验证结果执行不同的操作,WorkFlow 可以通过串行调度子任务来确保任务的正确执行顺序。

四、WorkFlow使用场景

4.1网络服务开发

在网络服务开发领域,WorkFlow 大显身手。以 Web 服务器为例,在传统的 Web 服务器开发中,面对大量的网络请求,常常会陷入困境。例如,当众多用户同时访问一个新闻网站,请求获取最新的新闻资讯时,如果采用传统的同步处理方式,服务器会一个接一个地处理这些请求。这就意味着,在处理当前请求时,后续的请求只能在队列中苦苦等待。当请求量达到一定程度时,服务器的响应速度会急剧下降,用户可能需要等待很长时间才能看到新闻内容,甚至可能因为长时间等待而放弃访问。

而 WorkFlow 的出现,为这一难题提供了完美的解决方案。借助其强大的异步处理能力,WorkFlow 可以将每个用户的请求封装成独立的异步任务 。这些任务能够在不同的线程中同时执行,互不干扰。当服务器接收到用户的新闻请求时,它可以迅速启动多个异步任务,同时从数据库中读取新闻数据、从图片服务器获取相关图片资源,并对数据进行必要的处理和格式化。在这个过程中,线程不会因为等待 I/O 操作(如数据库查询、网络资源获取)而被阻塞,而是可以立即处理下一个请求。通过这种方式,WorkFlow 能够显著提升 Web 服务器的并发处理能力,确保在高并发场景下,用户的请求能够得到快速响应,极大地提升了用户体验。

4.2数据处理任务

在大数据处理场景中,数据量往往极其庞大,处理过程也异常复杂。以电商平台的数据分析为例,每天都会产生海量的交易数据、用户行为数据等。这些数据需要进行及时的读写和深入的分析,以便为企业的决策提供有力支持。在传统的处理方式下,数据的读取和写入操作可能会因为磁盘 I/O 的限制而变得缓慢。例如,当需要从磁盘中读取大量的交易记录进行分析时,I/O 操作可能会成为整个处理流程的瓶颈,导致处理时间延长。而且,在对数据进行复杂分析时,如进行多维度的统计分析、挖掘用户的购买模式等,往往需要耗费大量的计算资源和时间。

WorkFlow 在这方面展现出了卓越的优势。它可以通过异步 I/O 操作,高效地处理数据的读写任务。在读取数据时,WorkFlow 能够以异步的方式从磁盘中快速读取数据,减少 I/O 等待时间。同时,它还可以将数据处理任务拆分成多个子任务,利用多线程或分布式计算的方式,并行地对数据进行分析。例如,在分析电商交易数据时,可以将数据按照时间维度、用户维度等进行划分,分别由不同的子任务进行处理。这些子任务可以在多个线程或多个计算节点上同时执行,大大加速了数据的分析过程。通过这种方式,WorkFlow 能够帮助企业在短时间内完成对海量数据的处理和分析,为企业的决策提供及时、准确的数据支持 。

五、WorkFlow异步框架优点

5.1性能卓越

WorkFlow 在性能方面堪称佼佼者。通过一系列严谨的测试数据对比,其优势展露无遗。在处理速度上,当面对大规模的并发请求时,WorkFlow 能够以惊人的速度做出响应。例如,在模拟高并发的网络请求测试中,WorkFlow 的每秒请求处理量(QPS)相较于传统的 C++ 异步框架提升了 30% 以上 。这意味着在相同时间内,WorkFlow 能够处理更多的用户请求,大大提高了系统的吞吐量。

在资源利用率方面,WorkFlow 同样表现出色。它通过巧妙的线程池管理和异步资源调度机制,避免了资源的浪费和过度消耗。在处理复杂的计算任务和 I/O 操作时,WorkFlow 能够合理地分配 CPU 和内存资源,确保系统在高负载情况下依然能够稳定运行。据测试,使用 WorkFlow 的应用程序在内存占用方面比其他同类框架降低了约 20%,这对于资源有限的服务器环境来说,无疑是一个巨大的优势 。

5.2代码简洁

传统的异步编程代码往往繁琐复杂,充满了各种回调地狱和资源管理的难题。例如,在进行多步异步操作时,代码可能会陷入层层嵌套的回调函数中,不仅难以阅读,而且维护成本极高。

而 WorkFlow 的出现,彻底改变了这一局面。它采用了任务流的编程范式,使得代码变得简洁明了。以一个简单的文件读取并处理的任务为例,传统的异步编程方式可能需要这样编写:

#include <iostream>
#include <fstream>
#include <functional>

void readFileAsync(const std::string& filename, std::function<void(const std::string&)> callback)
{
    std::ifstream file(filename);
    if (file.is_open())
    {
        std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
        file.close();
        callback(content);
    }
    else
    {
        callback("");
    }
}

void processFileContent(const std::string& content)
{
    // 这里进行文件内容的处理逻辑
    std::cout << "处理后的内容: " << content << std::endl;
}

int main()
{
    readFileAsync("example.txt", [](const std::string& content) {
        processFileContent(content);
    });
    return 0;
}

在这个例子中,虽然代码逻辑相对简单,但已经出现了回调函数的嵌套。如果后续还有更多的异步操作,如将处理后的结果写入另一个文件,代码将会变得更加复杂。

而使用 WorkFlow,代码可以简化为:

#include "workflow/WFFacilities.h"
#include <iostream>
#include <fstream>

using namespace wf;

void readFileAndProcess()
{
    WFFacilities::WaitGroup waitGroup(1);
    WFCPUTask *task = WFTaskFactory::create_cpu_task([](WFCPUTask *task) {
        std::ifstream file("example.txt");
        if (file.is_open())
        {
            std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
            file.close();
            // 这里可以直接进行文件内容的处理
            std::cout << "处理后的内容: " << content << std::endl;
        }
        else
        {
            std::cout << "无法打开文件" << std::endl;
        }
    }, [&waitGroup](WFCPUTask *task) {
        waitGroup.done();
    });
    task->start();
    waitGroup.wait();
}

int main()
{
    readFileAndProcess();
    return 0;
}

可以看到,WorkFlow 通过将任务封装成简单的对象,使用户能够以更加直观的方式编写异步代码。开发者只需要关注业务逻辑本身,而无需花费大量精力去处理复杂的异步回调和资源管理问题。这种简洁的代码风格不仅提高了开发效率,还大大降低了代码出错的概率,使得代码的维护和扩展变得更加轻松 。

六、实际案例分析

在实际应用中,WorkFlow 的强大性能得到了充分验证。以某知名电商平台为例,在引入 WorkFlow 之前,该平台在处理高并发订单时,常常出现响应延迟的情况。据统计,在促销活动期间,平均响应时间长达 5 秒,这导致大量用户因等待时间过长而放弃购买,严重影响了平台的销售额。

为了解决这一问题,该电商平台采用了 WorkFlow 框架。通过将订单处理流程拆分成多个异步任务,如库存检查、支付处理、订单记录等,WorkFlow 实现了这些任务的并行执行。经过优化后,平台的平均响应时间大幅缩短至 1 秒以内,每秒能够处理的订单量从原来的 500 笔提升至 2000 笔,提升了 3 倍之多。这一改进不仅显著提升了用户体验,还使得平台在促销活动中的销售额同比增长了 50% 。

再以一家在线教育平台为例,该平台需要处理大量的用户课程请求和视频流数据。在使用 WorkFlow 之前,由于服务器资源利用率低,经常出现卡顿和加载缓慢的情况,用户投诉率较高。引入 WorkFlow 后,通过对网络请求和数据处理任务的优化调度,平台的服务器资源利用率提高了 40%,卡顿现象减少了 80%,用户满意度从原来的 60% 提升至 90% 。


http://www.kler.cn/a/527738.html

相关文章:

  • 苍穹外卖——数据统计
  • 2848、与车相交的点
  • 离散化C++
  • 深度学习的应用
  • 【四川乡镇界面】图层shp格式arcgis数据乡镇名称和编码2020年wgs84无偏移内容测评
  • LINUX部署微服务项目步骤
  • 柱量最大值转向
  • SpringBoot核心特性:自动配置与起步依赖
  • [免费]微信小程序智能商城系统(uniapp+Springboot后端+vue管理端)【论文+源码+SQL脚本】
  • 深入解析:一个简单的浮动布局 HTML 示例
  • 通过反射搭建简易的Servlet层自动化映射参数并调用Service层业务方法的框架
  • 什么是集成学习
  • TypeScript 学习 -代码检查工具 eslint
  • Day31-【AI思考】-关键支点识别与战略聚焦框架
  • FFmpeg(7.1版本)的基本组成
  • 【C++语言】卡码网语言基础课系列----1. A+B问题I
  • 2025年人工智能技术:Prompt与Agent的发展趋势与机遇
  • 从训练到生产:AI 模型如何突破困境实现高效部署?
  • DeepSeek本地版安装简易教程(windows)
  • 第24节课:前端性能优化—提升网页加载速度的关键策略
  • 计算机网络一点事(22)
  • RK3568使用MIX415摄像头
  • 笔试-二进制
  • 实验四 简单查询
  • 【2024年华为OD机试】(C卷,100分)- 检查是否存在满足条件的数字组合 (Java JS PythonC/C++)
  • Redis_Redission的入门案例、多主案例搭建、分布式锁进行加锁、解锁底层源码解析