【C/C++】用 epoll 写一个 Reactor:连接对象、回调和状态机
【C/C++】用 epoll 写一个 Reactor:连接对象、回调和状态机
1. Reactor 解决了什么问题
裸epoll版本里,主循环通常会写成这样:
if(events[i].data.fd==sockfd){accept(...);}else{recv(...);send(...);}这种写法适合演示 API,但业务一复杂,主循环会越来越臃肿。比如 HTTP 要分“响应头”和“响应体”;WebSocket 要分“握手阶段”和“帧数据阶段”;长响应还要处理一次send()没写完的情况。
Reactor 模式的核心思路是:主循环只负责等事件和分发事件,真正的业务处理放到回调函数里。
本项目的reactor.c已经体现了这个结构:
epoll_wait()等待事件。- 监听 fd 触发
accept_cb()。 - 客户端 fd 读事件触发
recv_cb()。 - 客户端 fd 写事件触发
send_cb()。 - 每个连接的数据缓冲、写偏移、状态都放在
connections[fd]里。
2. connection:把连接上下文集中管理
server.h里的struct connection是整个 Reactor 的核心数据结构:
#defineBUFFER_SIZE1024typedefint(*callback_t)(intfd);structconnection{intfd;charrbuffer[BUFFER_SIZE];intrlength;charwbuffer[BUFFER_SIZE];intwlength;intwoffset;callback_tsend_callback;union{callback_trecv_callback;callback_taccept_callback;}rcallback;FILE*fp;longfile_size;longfile_offset;charpayload[BUFFER_SIZE];intpayload_length;intstate;};这里有几个字段很关键:
rbuffer/rlength:保存本次读到的数据。wbuffer/wlength/woffset:保存待发送数据和当前发送偏移。recv_callback/send_callback:把事件和处理函数绑定起来。state:给 HTTP 或 WebSocket 这种分阶段协议使用。fp/file_offset/file_size:用于 HTTP 大文件响应分块发送。
项目里直接用connections[fd]作为连接表,这样通过 fd 可以 O(1) 找到连接上下文。
3. 事件注册:epoll_ctl 封装成 set_event
reactor.c把epoll_ctl()封装成了set_event():
intset_event(intfd,uint32_tevents,intopt){structepoll_eventev;ev.events=events;ev.data.fd=fd;if(epoll_ctl(epoll_fd,opt,fd,&ev)<0){perror("epoll_ctl");close(fd);return-1;}return0;}这样添加、修改、删除事件都可以复用同一个函数:
set_event(client_fd,EPOLLIN,EPOLL_CTL_ADD);set_event(fd,EPOLLOUT,EPOLL_CTL_MOD);set_event(fd,EPOLLIN,EPOLL_CTL_DEL);在 Reactor 中,事件不是一次性写死的。比如读到请求后,业务生成了响应数据,就应该把连接从“监听可读”切换到“监听可写”。
4. event_register:绑定 fd、事件和回调
新连接建立后,项目通过event_register()初始化连接上下文:
intevent_register(intfd,uint32_tevents,callback_trecv_callback,callback_tsend_callback){if(set_event(fd,events,EPOLL_CTL_ADD)<0){return-1;}connections[fd].fd=fd;connections[fd].rcallback.recv_callback=recv_cb;connections[fd].send_callback=send_cb;memset(connections[fd].rbuffer,0,BUFFER_SIZE);connections[fd].rlength=0;memset(connections[fd].wbuffer,0,BUFFER_SIZE);connections[fd].wlength=0;connections[fd].woffset=0;connections[fd].fp=NULL;connections[fd].file_offset=0;connections[fd].file_size=0;connections[fd].payload_length=0;connections[fd].state=0;if(events&EPOLLIN){connections[fd].rcallback.recv_callback=recv_callback;}if(events&EPOLLOUT){connections[fd].send_callback=send_callback;}return0;}这段代码做了三件事:
- 把 fd 加入 epoll。
- 初始化连接的读写缓存和状态。
- 绑定读写回调函数。
5. 主循环只做事件分发
Reactor 的主循环不再直接写业务逻辑,而是判断 fd 类型和事件类型,然后调用对应回调:
while(1){intn=epoll_wait(epoll_fd,events,MAX_EVENTS,-1);if(n<0){perror("epoll_wait");break;}for(inti=0;i<n;i++){intfd=events[i].data.fd;if(find_server_fd(fd)!=-1){connections[fd].rcallback.accept_callback(fd);}else{if(events[i].events&EPOLLIN){connections[fd].rcallback.recv_callback(fd);}if(events[i].events&EPOLLOUT){connections[fd].send_callback(fd);}}}}这种结构的好处是清晰:事件循环是事件循环,协议处理是协议处理,两者不混在一起。
6. recv_cb 和 send_cb:读写事件如何切换
读事件回调把数据读入rbuffer,然后交给业务函数处理。当前项目里接入的是 WebSocket:
intrecv_cb(intfd){ssize_tbytes_read=recv(fd,connections[fd].rbuffer,BUFFER_SIZE,0);if(bytes_read<=0){set_event(fd,EPOLLIN,EPOLL_CTL_DEL);close(fd);return-1;}connections[fd].rlength=bytes_read;websocket_request(&connections[fd]);set_event(fd,EPOLLOUT,EPOLL_CTL_MOD);return0;}当业务处理后需要响应客户端,就把事件改成EPOLLOUT。写事件回调负责把wbuffer中的数据写出去:
ssize_tbytes_sent=send(fd,connections[fd].wbuffer+connections[fd].woffset,connections[fd].wlength-connections[fd].woffset,MSG_NOSIGNAL);connections[fd].woffset+=bytes_sent;if(connections[fd].woffset>=connections[fd].wlength){connections[fd].woffset=0;connections[fd].wlength=0;}这里的woffset很重要。真实网络里一次send()不一定能把所有数据写完,必须记录已经写了多少。
7. 状态机示例:HTTP 图片响应
webserver.c展示了另一个典型业务:HTTP 返回一张c1000k.jpg。它把响应拆成两个阶段:
if(conn->state==0){conn->fp=fopen("c1000k.jpg","r");fseek(conn->fp,0,SEEK_END);conn->file_size=ftell(conn->fp);fseek(conn->fp,0,SEEK_SET);conn->file_offset=0;intn=sprintf(conn->wbuffer,"HTTP/1.1 200 OK\r\n""Content-Type: image/jpeg\r\n""Content-Length: %ld\r\n\r\n",conn->file_size);conn->wlength=n;conn->state=1;}elseif(conn->state==1){intn=fread(conn->wbuffer,1,BUFFER_SIZE,conn->fp);conn->wlength=n;conn->file_offset+=n;}state == 0时准备响应头,state == 1时分块读取图片内容。这个例子说明 Reactor 不是只能处理简单 echo,它能自然承载“多次读写才能完成”的协议。
8. 编译运行
当前reactor.c中接入的是 WebSocket 业务:
gcc reactor.c websocket.c-owebsocket-lssl-lcrypto./websocket服务端默认监听 8080:
Server is listening on port 8080如果你要把 HTTP 业务也接进 Reactor,可以把recv_cb()/send_cb()中的业务函数从websocket_request()/websocket_response()替换或抽象成可配置回调,再链接webserver.c。
9. 小结
Reactor 的核心不是某个 API,而是一种代码组织方式:
epoll负责发现事件。- Reactor 主循环负责分发事件。
- callback 负责处理事件。
connection保存每个连接的上下文。state负责表达协议阶段。
学习链接: https://github.com/0voice
