workflow笔记
workflow 介绍
搜狗公司C++服务器引擎,编程范式。支撑搜狗几乎所有后端C++在线服务,包括所有搜索服务,云输入法,在线广告等,每
日处理数百亿请求。这是一个设计轻盈优雅的企业级程序引擎,可以满足大多数后端与嵌入式开发需求。
特征:
快速搭建 http 服务器。
可异步访问常见第三方服务:http,redis,mysql 和kafka。
构建异步任务流,支持常用的串并联,也支持更加复杂的DAG结构。
作为并行计算工具使用。除了网络任务,我们也包含计算任务的调度。所有类型的任务都可以放入同一个流中。
在Linux系统下作为文件异步IO工具使用,性能超过任何标准调用。磁盘IO也是一种任务。
实现任何计算与通讯关系非常复杂的高性能高并发的后端服务。
workflow 编译安装
git clone https://github.com/sogou/workflow # From
gitee: git clone https://gitee.com/sogou/workflow
cd workflow
make
cd tutorial
make
workflow 编程范式
抽象出来的模型: 程序 = 协议 + 算法 + 任务流
协议
大多数情况下,用户使用的是内置的通用网络协议,例如http,redis或各种rpc。
用户可以方便的自定义网络协议,只需提供序列化和反序列化函数,就可以定义出自己的client/server。
算法
在我们的设计里,算法是与协议对称的概念。如果说协议的调用是rpc,算法的调用就是一次apc(Async Procedure Call)。
我们提供了一些通用算法,例如sort,merge,psort,reduce,可以直接使用。
与自定义协议相比,自定义算法的使用要常见得多。任何一次边界清晰的复杂计算,都应该包装成算法
任务流
在workflow当中所有的任务都是异步处理的
任务隐藏了若干个异步执行流
任务流就是实际的业务逻辑,就是把开发好的协议与算法放在流程图里使用起来。
典型的任务流是一个闭合的串并联图。复杂的业务逻辑,可能是一个非闭合的DAG。
任务流图可以直接构建,也可以根据每一步的结果动态生成。所有任务都是异步执行的。
请求回应模式
workflow是一种请求回应的模式,服务端不会主动给客户端推数据,必须要发起一个请求,而且这个请求是一个一个的发送,如果没有得到回应不要继续发请求。
比如 http
结构化并发与任务隐藏
- 我们系统中包含五种基础任务:通讯,计算,文件IO,定时器,计数器
- 一切任务都由任务工厂产生,用户通过调用接口组织并发结构。例如串联并联,DAG等。
- 大多数情况下,用户通过任务工厂产生的任务,都隐藏了多个异
例如,一次http请求,可能包含许多次异步过程(DNS,重定向),但对用户来讲,就是一次通信任务。
文件排序,看起来就是一个算法,但其实包括复杂的文件IO与CPU计算的交互过程。
如果把业务逻辑想象成用设计好的电子元件搭建电路,那么每个电子元件内部可能又是一个复杂电路。
任务隐藏机制大幅减少了用户需要创建的任务数量和回调深度。
任何任务都运行在某个串行流(series)里,共享series上下文,让异步任务之间数据传递变得简单
workflow的串联执行
使用tutorial里面hello world的例子
#include <stdio.h>
#include "workflow/WFHttpServer.h"
void mul(int a , int b , int& res) {
res = a * b;
}
int main()
{
WFHttpServer server([](WFHttpTask *task) {
task->get_resp()->append_output_body("<html>Hello World!</html>");
int a = 533;
int b = 7;
int res;
WFGoTask* go = WFTaskFactory::create_go_task("test" , mul , a , b , res);
go->set_callback([&](WFGoTask* t){
protocol::HttpResponse* resp = (protocol::HttpResponse*)t->user_data;
char buf[1024] = {0};
sprintf(buf , "<br> 7 * 533 = %d<br/>" , res);
resp->append_output_body(buf);
});
go->user_data = task->get_resp();
series_of(task)->push_back(go); // 串联执行
});
if (server.start(8889) == 0) { // start server on port 8888
getchar(); // press "Enter" to end.
server.stop();
}
return 0;
}
当我们再次请求的时候发现 , 响应中追加了一行乘法结果
workflow的并行执行
tutorial-06-parallel_wget.cc
workflow的DAG执行
tutorial-11-graph_task.cc
内部重载了–>
抽象层次
- 流 (添加任务,添加上下文,设置回调)
- 任务 WFTaskFactory 中定义的基础任务
- 任务流 SubTask
class Workflow
{
public:
static SeriesWork *
create_series_work(SubTask *first, series_callback_t callback); // 串行流
static void
start_series_work(SubTask *first, series_callback_t callback); // 这个任务流里面所有的任务执行完之后会调用这个回调函数
static ParallelWork *
create_parallel_work(parallel_callback_t callback); // 并行流
static ParallelWork *
create_parallel_work(SeriesWork *const all_series[], size_t n,
parallel_callback_t callback);
static void
start_parallel_work(SeriesWork *const all_series[], size_t n,
parallel_callback_t callback);
public:
static SeriesWork *
create_series_work(SubTask *first, SubTask *last,
series_callback_t callback);
static void
start_series_work(SubTask *first, SubTask *last,
series_callback_t callback);
};
回调与内存回收机制
- 一切调用都是异步执行,几乎不存在占着线程等待的操作。
- 显式的回调机制。用户清楚自己在写异步程序。
- 通过一套对象生命周期机制,大幅简化异步程序的内存管理
任何框架创建的任务,生命周期都是从创建到callback函数运行结束为止。没有泄漏风险。
如果创建了任务之后不想运行,则需要通过dismiss()接口删除。
任务中的数据,例如网络请求的resp,也会随着任务被回收。此时用户可通过std::move()把需要的数据移走。
项目中不使用任何智能指针来管理内存。代码观感清新。
- 尽量避免用户级别派生,以std::function封装用户行为,包括:
任何任务的callback。
任何server的process。符合FaaS(Function as aService)思想。
一个算法的实现,简单来讲也是一个std::function。
如果深入使用,又会发现一切皆可派生。
线程模型
主线程
4个网络线程
poller 做那么几件事情
1 检测条件是否满足,如果是网络的话,组装完整的数据,然后抛出队列
20个工作线程池
从任务队列中取出任务,执行server注册的回调函数 ,如果这个过程中新生成了net/timer/file io的时间需要去监听注册,则直接调用epoll_ctl 加入到某个poller线程里面去 , 如果有计算密集的任务则将任务加入到go线程的任务队列里面去给go线程处理
8个计算线程池
专门处理cpu类型的任务
应用场景
高扇出场景
搜索服务一般都是高扇出场景;它需要调用许多下游模块多,考验网络通信框架的调度能力。
高扇出解释:一个节点与许多其他节点存在大量连接的情况。
高扇出的痛点:吞吐与长尾;提升吞吐需要提高单个请求的响应速度,所以需要尽量少切换网络收发线程,但是不切换容易导致处理的慢的资源堆积,长尾问题就很明显。
多 client 混用场景
同时访问 redis/mysql/kafka 的数据管理需求。workflow 实现了常见的网络协议,不同协议的任务可以在底层调度层无缝打通。
SRPC
业务层协议 IDL 使用了 protobuf,需要对里边定义的service 支持简单的计算功能,同时支持可以异步执行,如何能够快速搭建这样的服务。基于Workflow做底层调度的生态项目,拥有Workflow的网络性能优势,且自生成service接口,因此可以让 service 接口底层打通 Workflow 底层的异步 server 功能。除此之外使 用案例 2 中的其他协议或者计算任务都很方便。
嵌入式领域
服务治理场景(服务发现)
自定义协议接入
场景:公网接入网需要 http 协议,但是后端服务是自定义私有协议,接入层需要自己开发。
常见的办法是使用 nginx,开发 ngx_module_t,但 nginx的网络有 11 个阶段,内部资源纯自行管理,模块代码与框架代码完全耦合到一起,开发起来非常困难,出了问题也很难排查。
workflow 可以派生基本网络层,实现消息的序列化/反序列化接口,即得到自定义协议的任务。然后启动 http server,创建自定义任务,再利用案例 4 中提到的转发功能,即可得到一个自定义协议接入层,转发性能无损耗。
workflow三板斧
抽象粒度合适的任务
通过任务流组织任务, 串联,并联,DAG
协调任务
counter 递减的“信号量”
conditional 条件变量
resource pool 类似信号量
message queue 类似消息队列
不占用线程的方式,条件触发
网络模块的设计
多线程网络
workflow会创建4个poller线程 , 将listen fd % poller线程数,落入其中的一个poller线程里面
accept一个新的连接之后 ,又会将clientfd % poller线程数量,去均衡的加入到一个poller线程当中去处理后续的网络事件
epoll三个系统调用是否线程安全
都是线程安全的
线程状态
1 epoll_wait
2 正在处理事件
工作线程调用epoll_ctl 是否安全
增加 : 一个epoll里面本来没有这个fd , 增加一个是没问题的
修改 :没有问题,epoll本身有这个fd , 把读事件改为写事件如connect ,或者把写事件改为读事件。
删除 : 有问题。 其中一个线程正在处理epoll_wait返回的事件循环,在这个过程中也可能执行删除的操作,另外一个线程同样执行删除的操作,内核就会报错。事件循环的那个线程要下一次epoll_wait的时候才能感知到另外一个线程的删除操作
删除操作通过管道pipe去解决。
工作线程把删除的操作通过管道传递给epoll_wait线程,让删除的操作串行执行。
队列设计
workflow使用有锁队列
workflow每天要处理成千上万的高并发请求,竞争相当的激烈,且任务的执行时间也比较长。
如果get队列为空,get队列会和put队里做一个转换,生产者只往put队列添加任务,消费者只从get队列拿任务。
好处:生产者线程和消费者线程碰撞的概率很低,把多对多的问题转换成了多对一的问题。只有当消费者线程发现get队列为空的时候,会锁定put队列,这时候才会发生碰撞,然后把put队列里面的任务拷贝到get队列里面去。
设计的相当优雅,提升并发性
队列元素通用性设计
1 不限制节点的类型
2 对节点只有一个约束: 节点偏移linkoff 字节之后有一个指针类型用于链接下一个节点
关于无锁队列
什么时候使用无锁队列:
1 资源竞争不那么激烈的时候
2 任务执行时间比较短
线程池设计
网络线程当中添加一个线程任务给线程池调度,context是Communicator的this指针
线程任务中执行一个死循环
如果在swap的时候发现put队列里面没有任务,就会阻塞线程池里面的线程