当前位置: 首页 > article >正文

workflow笔记

workflow 介绍

搜狗公司C++服务器引擎,编程范式。支撑搜狗几乎所有后端C++在线服务,包括所有搜索服务,云输入法,在线广告等,每
日处理数百亿请求。这是一个设计轻盈优雅的企业级程序引擎,可以满足大多数后端与嵌入式开发需求。

特征:
快速搭建 http 服务器。
可异步访问常见第三方服务:http,redis,mysql 和kafka。
构建异步任务流,支持常用的串并联,也支持更加复杂的DAG结构。
作为并行计算工具使用。除了网络任务,我们也包含计算任务的调度。所有类型的任务都可以放入同一个流中。
在Linux系统下作为文件异步IO工具使用,性能超过任何标准调用。磁盘IO也是一种任务。
实现任何计算与通讯关系非常复杂的高性能高并发的后端服务。

workflow 编译安装

git clone https://github.com/sogou/workflow # From
gitee: git clone https://gitee.com/sogou/workflow
cd workflow
make
cd tutorial
make

workflow 编程范式

抽象出来的模型: 程序 = 协议 + 算法 + 任务流

协议

大多数情况下,用户使用的是内置的通用网络协议,例如http,redis或各种rpc。
用户可以方便的自定义网络协议,只需提供序列化和反序列化函数,就可以定义出自己的client/server。

算法

在我们的设计里,算法是与协议对称的概念。如果说协议的调用是rpc,算法的调用就是一次apc(Async Procedure Call)。
我们提供了一些通用算法,例如sort,merge,psort,reduce,可以直接使用。
与自定义协议相比,自定义算法的使用要常见得多。任何一次边界清晰的复杂计算,都应该包装成算法

任务流

在workflow当中所有的任务都是异步处理的
任务隐藏了若干个异步执行流

任务流就是实际的业务逻辑,就是把开发好的协议与算法放在流程图里使用起来。
典型的任务流是一个闭合的串并联图。复杂的业务逻辑,可能是一个非闭合的DAG。
任务流图可以直接构建,也可以根据每一步的结果动态生成。所有任务都是异步执行的。

请求回应模式

workflow是一种请求回应的模式,服务端不会主动给客户端推数据,必须要发起一个请求,而且这个请求是一个一个的发送,如果没有得到回应不要继续发请求。
比如 http

结构化并发与任务隐藏

  • 我们系统中包含五种基础任务:通讯,计算,文件IO,定时器,计数器
  • 一切任务都由任务工厂产生,用户通过调用接口组织并发结构。例如串联并联,DAG等。
  • 大多数情况下,用户通过任务工厂产生的任务,都隐藏了多个异

例如,一次http请求,可能包含许多次异步过程(DNS,重定向),但对用户来讲,就是一次通信任务。
文件排序,看起来就是一个算法,但其实包括复杂的文件IO与CPU计算的交互过程。
如果把业务逻辑想象成用设计好的电子元件搭建电路,那么每个电子元件内部可能又是一个复杂电路。
任务隐藏机制大幅减少了用户需要创建的任务数量和回调深度。

任何任务都运行在某个串行流(series)里,共享series上下文,让异步任务之间数据传递变得简单

workflow的串联执行

使用tutorial里面hello world的例子

#include <stdio.h>
#include "workflow/WFHttpServer.h"
void mul(int a , int b , int& res) {
    res = a * b;
}
int main()
{
    WFHttpServer server([](WFHttpTask *task) {
        task->get_resp()->append_output_body("<html>Hello World!</html>");
        int a = 533;
        int b = 7;
        int res;
        WFGoTask* go = WFTaskFactory::create_go_task("test" , mul , a , b , res);
        go->set_callback([&](WFGoTask* t){
            protocol::HttpResponse* resp = (protocol::HttpResponse*)t->user_data;
            char buf[1024] = {0};
            sprintf(buf , "<br> 7 * 533 = %d<br/>" , res);
            resp->append_output_body(buf);
        });
        go->user_data = task->get_resp();
        series_of(task)->push_back(go); // 串联执行
    });
    if (server.start(8889) == 0) { // start server on port 8888
        getchar(); // press "Enter" to end.
        server.stop();
    }
    return 0;
}

当我们再次请求的时候发现 , 响应中追加了一行乘法结果
在这里插入图片描述

workflow的并行执行

tutorial-06-parallel_wget.cc

workflow的DAG执行

tutorial-11-graph_task.cc
在这里插入图片描述
内部重载了–>

抽象层次

  • 流 (添加任务,添加上下文,设置回调)
  • 任务 WFTaskFactory 中定义的基础任务
  • 任务流 SubTask
class Workflow
{
public:
	static SeriesWork *
	create_series_work(SubTask *first, series_callback_t callback);  // 串行流

	static void
	start_series_work(SubTask *first, series_callback_t callback); // 这个任务流里面所有的任务执行完之后会调用这个回调函数

	static ParallelWork *
	create_parallel_work(parallel_callback_t callback);   // 并行流

	static ParallelWork *
	create_parallel_work(SeriesWork *const all_series[], size_t n,
						 parallel_callback_t callback);

	static void
	start_parallel_work(SeriesWork *const all_series[], size_t n,
						parallel_callback_t callback);

public:
	static SeriesWork *
	create_series_work(SubTask *first, SubTask *last,
					   series_callback_t callback);

	static void
	start_series_work(SubTask *first, SubTask *last,
					  series_callback_t callback);
};

在这里插入图片描述
在这里插入图片描述

回调与内存回收机制

  • 一切调用都是异步执行,几乎不存在占着线程等待的操作。
  • 显式的回调机制。用户清楚自己在写异步程序。
  • 通过一套对象生命周期机制,大幅简化异步程序的内存管理

任何框架创建的任务,生命周期都是从创建到callback函数运行结束为止。没有泄漏风险。
如果创建了任务之后不想运行,则需要通过dismiss()接口删除。
任务中的数据,例如网络请求的resp,也会随着任务被回收。此时用户可通过std::move()把需要的数据移走。
项目中不使用任何智能指针来管理内存。代码观感清新。

  • 尽量避免用户级别派生,以std::function封装用户行为,包括:

任何任务的callback。
任何server的process。符合FaaS(Function as aService)思想。
一个算法的实现,简单来讲也是一个std::function。
如果深入使用,又会发现一切皆可派生。

线程模型

在这里插入图片描述

主线程

4个网络线程

poller 做那么几件事情
1 检测条件是否满足,如果是网络的话,组装完整的数据,然后抛出队列

20个工作线程池

从任务队列中取出任务,执行server注册的回调函数 ,如果这个过程中新生成了net/timer/file io的时间需要去监听注册,则直接调用epoll_ctl 加入到某个poller线程里面去 , 如果有计算密集的任务则将任务加入到go线程的任务队列里面去给go线程处理

8个计算线程池

专门处理cpu类型的任务

应用场景

高扇出场景

搜索服务一般都是高扇出场景;它需要调用许多下游模块多,考验网络通信框架的调度能力。
高扇出解释:一个节点与许多其他节点存在大量连接的情况。
高扇出的痛点:吞吐与长尾;提升吞吐需要提高单个请求的响应速度,所以需要尽量少切换网络收发线程,但是不切换容易导致处理的慢的资源堆积,长尾问题就很明显。

多 client 混用场景

同时访问 redis/mysql/kafka 的数据管理需求。workflow 实现了常见的网络协议,不同协议的任务可以在底层调度层无缝打通。

SRPC

业务层协议 IDL 使用了 protobuf,需要对里边定义的service 支持简单的计算功能,同时支持可以异步执行,如何能够快速搭建这样的服务。基于Workflow做底层调度的生态项目,拥有Workflow的网络性能优势,且自生成service接口,因此可以让 service 接口底层打通 Workflow 底层的异步 server 功能。除此之外使 用案例 2 中的其他协议或者计算任务都很方便。

嵌入式领域

服务治理场景(服务发现)

自定义协议接入

场景:公网接入网需要 http 协议,但是后端服务是自定义私有协议,接入层需要自己开发。
常见的办法是使用 nginx,开发 ngx_module_t,但 nginx的网络有 11 个阶段,内部资源纯自行管理,模块代码与框架代码完全耦合到一起,开发起来非常困难,出了问题也很难排查。
workflow 可以派生基本网络层,实现消息的序列化/反序列化接口,即得到自定义协议的任务。然后启动 http server,创建自定义任务,再利用案例 4 中提到的转发功能,即可得到一个自定义协议接入层,转发性能无损耗。

workflow三板斧

抽象粒度合适的任务

通过任务流组织任务, 串联,并联,DAG

协调任务

counter 递减的“信号量”
conditional 条件变量
resource pool 类似信号量
message queue 类似消息队列
不占用线程的方式,条件触发

网络模块的设计

多线程网络

workflow会创建4个poller线程 , 将listen fd % poller线程数,落入其中的一个poller线程里面
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
accept一个新的连接之后 ,又会将clientfd % poller线程数量,去均衡的加入到一个poller线程当中去处理后续的网络事件

epoll三个系统调用是否线程安全

都是线程安全的

线程状态

1 epoll_wait
2 正在处理事件

工作线程调用epoll_ctl 是否安全

增加 : 一个epoll里面本来没有这个fd , 增加一个是没问题的
修改 :没有问题,epoll本身有这个fd , 把读事件改为写事件如connect ,或者把写事件改为读事件。
删除 : 有问题。 其中一个线程正在处理epoll_wait返回的事件循环,在这个过程中也可能执行删除的操作,另外一个线程同样执行删除的操作,内核就会报错。事件循环的那个线程要下一次epoll_wait的时候才能感知到另外一个线程的删除操作

删除操作通过管道pipe去解决。
在这里插入图片描述
工作线程把删除的操作通过管道传递给epoll_wait线程,让删除的操作串行执行。

队列设计

workflow使用有锁队列

workflow每天要处理成千上万的高并发请求,竞争相当的激烈,且任务的执行时间也比较长。
在这里插入图片描述
如果get队列为空,get队列会和put队里做一个转换,生产者只往put队列添加任务,消费者只从get队列拿任务。

好处:生产者线程和消费者线程碰撞的概率很低,把多对多的问题转换成了多对一的问题。只有当消费者线程发现get队列为空的时候,会锁定put队列,这时候才会发生碰撞,然后把put队列里面的任务拷贝到get队列里面去。
设计的相当优雅,提升并发性

队列元素通用性设计

在这里插入图片描述
1 不限制节点的类型
2 对节点只有一个约束: 节点偏移linkoff 字节之后有一个指针类型用于链接下一个节点
在这里插入图片描述

关于无锁队列

什么时候使用无锁队列:
1 资源竞争不那么激烈的时候
2 任务执行时间比较短

线程池设计

在这里插入图片描述
网络线程当中添加一个线程任务给线程池调度,context是Communicator的this指针
在这里插入图片描述
线程任务中执行一个死循环
在这里插入图片描述
如果在swap的时候发现put队列里面没有任务,就会阻塞线程池里面的线程
在这里插入图片描述


http://www.kler.cn/a/430112.html

相关文章:

  • 解决idea中无法拖动tab标签页的问题
  • 【钉钉在线笔试题】字符串表达式的加减法
  • C++例程:使用I/O模拟IIC接口(6)
  • HarmonyOS中实现TabBar(相当于Android中的TabLayout+ViewPager)
  • 【数据库】四、数据库管理与维护
  • es 3期 第23节-运用Pipeline实现二转聚合统计
  • WebSocket 经验与最佳实践
  • 《跨越平台壁垒:C++ 人工智能模型在移动设备的部署之路》
  • java+ssm+mysql校园物品租赁网
  • C#请求https提示未能为 SSL/TLS 安全通道建立信任关系
  • XREAL在日本AR市场成功的策略分析
  • Mysql | 尚硅谷 | 第02章_MySQL环境搭建
  • linux 安装 vsftpd 服务以及配置全攻略,vsftpd 虚拟多用户多目录配置,为每个用户配置不同的使用权限
  • 【前端】深度解析 JavaScript 中的 new 关键字与构造函数
  • 【ETCD】【源码阅读】configurePeerListeners() 函数解析
  • 数据结构——哈夫曼树
  • 【电控笔记z46】非线性磁链笔记
  • Spring Boot 3.3.4 升级导致 Logback 之前回滚策略配置不兼容问题解决
  • AI智能体Prompt预设词指令大全+GPTs应用使用
  • Vue了解
  • 使用PXE+Kickstart无人值守安装Linux操作系统
  • 正则表达式去除文本中括号()<>[]里的内容
  • BurpSuite-8(FakeIP与爬虫审计)
  • 工业—使用Flink处理Kafka中的数据_EnvironmentData1
  • 音视频入门基础:MPEG2-TS专题(12)—— FFmpeg源码中,使用Section把各个transport packet组合起来的实现
  • Oracle之表空间迁移