深入理解C++ Workflow源码(1)
第1篇 一个 Task 是如何诞生的
很多人第一次接触 Workflow,会觉得它“用起来很轻”,但真正神奇的地方不在 API 表面,而在于一个 Task 从创建、初始化、调度到回调结束,几乎把异步编程里最麻烦的几件事都藏起来了。
这一篇我们不急着钻 epoll,也不急着看协议细节,只回答一个问题:
用户写下WFTaskFactory::create_http_task()之后,框架内部到底发生了什么?
一、先看用户代码:Task 是从工厂出来的
Workflow 的设计非常明确:任务由工厂创建,业务由任务流组织。
典型代码大概长这样:
auto*task=WFTaskFactory::create_http_task("http://www.example.com",3,2,[](WFHttpTask*task){if(task->get_state()==WFT_STATE_SUCCESS){protocol::HttpResponse*resp=task->get_resp();// 处理响应}});task->start();从这段代码里,至少能看到四个事实:
- 用户并不直接
new某个底层任务对象。 - 用户看到的是强类型任务,比如
WFHttpTask、WFMySQLTask。 - 用户只写 callback,不处理线程、fd、连接池。
- 一个任务本身就能
start(),说明它最终一定能挂进某种统一调度模型。
这正是 Workflow 的第一层抽象:所有异步操作先被包装成 SubTask,再由 Workflow 把它们编排成 Series 或 Parallel。
二、Task 的“壳”:为什么一切都先变成 SubTask
在内核层,最底层的抽象不是 HTTP,也不是 MySQL,而是SubTask:
classSubTask{public:virtualvoiddispatch()=0;private:virtualSubTask*done()=0;protected:voidsubtask_done();};这个接口很小,但语义非常重:
dispatch():把任务真正投递出去。done():任务结束后,告诉框架“下一个该跑谁”。subtask_done():统一推进整个任务流。
也就是说,Workflow 并不是“回调到回调”的设计,而是“当前 SubTask 完成后,返回下一个 SubTask”的设计。
这和很多传统异步框架最大的不同在于:控制流并没有散掉,而是被收敛在统一的任务推进器里。
三、Task 的“流”:为什么 start() 其实是在启动一个 Series
无论是线程任务、网络任务还是定时器任务,start()的实现都非常像:
voidstart(){assert(!series_of(this));Workflow::start_series_work(this,nullptr);}这段代码说明了一个非常关键的设计:
一个独立任务启动时,并不是“裸奔”的,它会先被包进一个串行流SeriesWork。
这样做有三个好处:
- 单个任务和复杂工作流共享同一套执行模型。
- callback 结束后对象销毁、上下文传递、取消语义都能统一。
- 后续动态往 series 里
push_back()新任务变得非常自然。
换句话说,Workflow 不是“任务系统 + 工作流系统”两套机制,而是一套机制从一开始就覆盖了最简单到最复杂的场景。
四、Task 工厂到底创建了什么
以 HTTP 为例,WFTaskFactory暴露的是工厂接口:
usingWFHttpTask=WFNetworkTask<protocol::HttpRequest,protocol::HttpResponse>;WFHttpTask不是一个具体类,而是一个类型别名。真正被创建出来的,通常是某个更复杂的实现类,例如HttpTaskImpl.cc里的ComplexHttpTask:
classComplexHttpTask:publicWFComplexClientTask<HttpRequest,HttpResponse>{protected:virtualCommMessageOut*message_out();virtualCommMessageIn*message_in();virtualintkeep_alive_timeout();virtualboolinit_success();virtualvoidinit_failed();virtualboolfinish_once();};这层实现很重要,因为它说明 Workflow 的任务对象并不是“请求包 + 回调”这么简单,而是一个封装了以下能力的复杂状态机:
- URI 解析
- DNS/路由选择
- 连接建立
- SSL 握手
- 请求编码
- 响应解码
- 重定向
- 重试
- keep-alive
所以,从用户视角看是“一个 HTTP 任务”,从框架视角看其实是“一个隐藏了多个异步阶段的复合任务”。
五、Task 为什么能统一成几种基类
Workflow 把任务大致收敛成三条主线:
template<classINPUT,classOUTPUT>classWFThreadTask:publicExecRequest{...};template<classREQ,classRESP>classWFNetworkTask:publicCommRequest{...};classWFTimerTask:publicSleepRequest{...};这三类分别对应:
- 计算任务:交给
Executor + thrdpool - 网络任务:交给
CommScheduler + Communicator - 定时器任务:交给
Communicator的 timer 机制
这也是 Workflow 很漂亮的一点:
上层 API 很丰富,但底层入口非常少。
丰富的协议和组件,最后都会落到少数几种可调度任务上。
六、一个 HTTP Task 的诞生流程
把源码串起来后,一个 HTTP Task 的出生过程大致是下面这样:
第 1 步:用户调用工厂
WFTaskFactory::create_http_task(...)工厂负责创建具体实现对象,并把 callback、重试次数、重定向次数这些策略参数灌进去。
第 2 步:具体任务初始化
ComplexHttpTask构造时会设置默认方法、版本等基础状态:
client_req->set_method(HttpMethodGet);client_req->set_http_version("HTTP/1.1");第 3 步:start() 包装成 Series
任务调用start()后,被Workflow::start_series_work()包成一个最小串行流。
第 4 步:dispatch() 投递到底层调度器
如果是网络任务,会进入CommRequest::dispatch():
if(this->scheduler->request(this,this->object,this->wait_timeout,&this->target)<0){this->handle(CS_STATE_ERROR,errno);}第 5 步:Communicator 驱动整个异步生命周期
后面连接、发送、接收、解析、超时、错误处理都由Communicator和poller接管。
第 6 步:done() 回到 Series
任务完成后,会执行 callback,然后回到 series 取下一个任务:
if(this->callback)this->callback(this);deletethis;returnseries->pop();这一步非常关键:Workflow 不是“回调结束就结束”,而是“回调结束后决定工作流下一步怎么走”。
七、为什么说 Workflow 的 Task 是“结构化并发单元”
很多异步框架里的 task,更像是一段“待执行回调”。
Workflow 里的 task 则更像一个完整的并发单元,因为它天然带着这些能力:
- 有明确的生命周期:创建到 callback 结束
- 有统一的归属:一定属于某个 series
- 有清晰的完成语义:通过
done()交出控制权 - 有统一的调度入口:
dispatch() - 有强类型输入输出:请求/响应对象都挂在任务上
这使得 Workflow 的任务既能表达“做一次网络请求”,也能表达“作为工作流中的一个节点存在”。
八、从“一个 Task 的诞生”里能看出什么设计思想
读到这里,其实已经能看出 Workflow 的几条核心设计哲学:
1. 用户接口丰富,内核抽象极简
上层有 HTTP/MySQL/Redis/Timer/FileIO/Graph 等等,底层却尽量收敛到:
SubTaskSeriesWorkCommRequestExecRequest
2. 把复杂异步过程隐藏到“一个任务”里
HTTP 任务里可能包含 DNS、连接、SSL、重试、重定向,但用户只看到一个 Task。
3. 通过 series 统一生命周期和控制流
单任务和复杂工作流用同一套机制,所以没有“简单场景一套逻辑,复杂场景另一套逻辑”的割裂感。
4. 不把并发模型暴露给业务代码
业务代码写 callback 即可;线程、fd、epoll、连接复用都由框架兜底。
九、这篇先记住三句话
如果你第一次读 Workflow 源码,我建议先把下面三句话记住:
- Task 的真正基类是
SubTask,不是 HTTPTask。 - 独立 Task 启动时会先被包进
SeriesWork。 - 一个“看起来简单”的任务,内部通常是一个复合状态机。
理解了这三点,后面看SeriesWork、WFNetworkTask、Communicator时,脑子里就不会把它们当成互相独立的模块,而会把它们看成同一条链路上的不同层。
十、下一篇看什么
既然 Task 一出生就会被包进SeriesWork,那下一个问题自然就是:
Series 到底是什么?为什么 Workflow 要把所有任务都放进一个串行流里?
下一篇,我们就专门拆开SeriesWork看它如何管理队列、推进后继任务、处理取消,以及如何跟并行任务拼成更大的工作流。
