【C++ AI 大模型接入 SDK】 - DeepSeek 模型接入(下)
欢迎文末添加好友交流,共同进步!
- 一、本篇概述
- 二、什么是 SSE?
- 2.1 SSE vs 普通请求
- 2.2 SSE 数据格式
- 三、与 sendMessage 的相同部分
- 3.1 请求体多了 "stream": true
- 3.2 读取超时更长
- 3.3 请求头多了 Accept
- 四、流式处理变量
- 五、构造 Request 对象
- 5.1 为什么不用 client.Post()?
- 5.2 response_handler — 响应头处理器
- 5.3 content_receiver — 数据接收处理器
- 六、content_receiver 逐行解析
- 6.1 错误检查
- 6.2 追加数据到 buffer
- 6.3 按 \n\n 分割并处理每个事件
- 6.4 过滤无效事件
- 6.5 提取 data 字段
- 6.6 检查结束标记
- 6.7 解析 JSON 增量数据
- 6.8 多重检查提取增量内容
- 6.9 累积并回调
- 6.10 JSON 解析失败的容错
- 6.11 content_receiver 返回
- 七、发送请求与收尾
- 7.1 发送请求
- 7.2 确保流式正常结束
- 7.3 返回完整回复
- 八、完整流程图
- 九、全量 vs 流式对比总结
- 十、总结
一、本篇概述
上篇讲了initModel()和sendMessage()(全量请求),本篇聚焦sendMessageStream()(流式请求)。这是整个 SDK最复杂的函数,涉及 SSE 协议解析、数据缓冲、逐块回调。
sendMessageStream() 的工作方式: 用户提问 → SDK 发起 HTTP 请求 → DeepSeek 边生成边返回 → SDK 逐块解析 → 每解析出一段文字就通过 callback 通知上层 → 上层(ChatServer)立即推送给前端 → 用户看到"打字机"效果二、什么是 SSE?
在深入代码之前,需要先理解SSE(Server-Sent Events,服务器推送事件)。
2.1 SSE vs 普通请求
普通 HTTP 请求(sendMessage 全量): 客户端 ──请求──→ 服务器 客户端 ←──等待──────── 客户端 ←──────────── 完整响应(一次性) SSE 流式请求(sendMessageStream): 客户端 ──请求──→ 服务器 客户端 ←──chunk1── "你" 客户端 ←──chunk2── "好" 客户端 ←──chunk3── "!" 客户端 ←──chunk4── "我是AI助手" 客户端 ←──[DONE]── 结束2.2 SSE 数据格式
SSE 是一种文本协议,每条消息用\n\n(两个换行)分隔。DeepSeek 返回的原始数据大概长这样:
data: {"choices":[{"delta":{"content":"你"}}]}\n\n data: {"choices":[{"delta":{"content":"好"}}]}\n\n data: {"choices":[{"delta":{"content":"!"}}]}\n\n : comment\n\n data: {"choices":[{"delta":{"content":"我是AI助手"}}]}\n\n data: [DONE]\n\n规则:
- 每条消息以
data:开头 - 以
:开头的是注释行,需要忽略 data: [DONE]是结束标记- 每条消息之间用
\n\n分隔
三、与 sendMessage 的相同部分
sendMessageStream()的前半部分(构造请求参数、历史消息、请求体)和sendMessage()几乎一样,只有两处区别:
3.1 请求体多了 “stream”: true
requestBody["stream"]=true;这一行告诉 DeepSeek API:请用流式方式返回结果。如果设为false或不设,就返回完整结果。
3.2 读取超时更长
// sendMessage 全量 client.set_read_timeout(60, 0); // 60秒 // sendMessageStream 流式 client.set_read_timeout(300, 0); // 300秒流式响应是边生成边传输的,模型生成一段文字就发一段,整个过程的持续时间远比全量请求长。如果设太短,长文本生成还没结束就超时了。
3.3 请求头多了 Accept
{"Accept","text/event-stream"}告诉服务器客户端期望接收 SSE 格式的流式数据。
四、流式处理变量
在发送请求之前,先声明一组用于流式处理的变量:
std::string buffer;// 接受流式响应的数据块boolgotError=false;// 标记响应是否成功std::string errorMsg;// 错误描述符intstatusCode=0;// 响应状态码boolstreamFinish=false;// 标记流式响应是否完成std::string fullResponse;// 累积完整的响应| 变量 | 类型 | 作用 |
|---|---|---|
buffer | string | 数据缓冲区。网络传输中,一次 recv 可能收到不完整的数据,需要缓冲拼接 |
gotError | bool | 一旦 HTTP 状态码非 200,设为true,后续接收器直接终止 |
errorMsg | string | 存储错误描述(如"HTTP status code: 401") |
statusCode | int | 存储响应状态码(本代码中声明了但未实际使用) |
streamFinish | bool | 收到[DONE]标记后设为true,用于最后检查流是否正常结束 |
fullResponse | string | 累积所有增量文本,最终作为函数返回值返回给调用方 |
为什么需要 buffer?
网络传输的特点是:一次 recv 调用可能收到任意数量的数据。可能一次收到半个 SSE 事件,也可能一次收到三个完整事件。所以需要一个缓冲区来拼接数据,等凑够一个完整事件(找到\n\n分隔符)再解析。
第 1 次 recv: "data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\nda" ↑ 分隔符 ↑ 不完整 buffer 中: "data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\nda" ├────── 完整事件,可以解析 ──────┤└─ 留在 buffer 等下次 ─┘ 第 2 次 recv: "ta: {\"choices\":[{\"delta\":{\"content\":\"好\"}}]}\n\n" 与 buffer 中残留的 "da" 拼接成完整事件五、构造 Request 对象
5.1 为什么不用 client.Post()?
上篇的全量请求直接用client.Post(),它是一次性发送请求、等待完整响应。但流式请求需要边接收边处理,所以要用更底层的httplib::Request对象,手动设置两个回调:
httplib::Request req;req.method="POST";req.path="/v1/chat/completions";req.headers=headers;req.body=requestBodyStr;| 字段 | 赋值 | 说明 |
|---|---|---|
method | "POST" | HTTP 方法 |
path | "/v1/chat/completions" | 请求路径 |
headers | 之前构造的 headers | 包含认证、内容类型、SSE 接受头 |
body | 序列化后的 JSON 字符串 | 请求体 |
和client.Post(path, headers, body, type)的参数是对应的,只是拆成了结构体的字段。
5.2 response_handler — 响应头处理器
req.response_handler=[&](consthttplib::Response&res){if(res.status!=200){gotError=true;errorMsg="HTTP status code: "+std::to_string(res.status);returnfalse;// 终止请求}returntrue;// 继续接收后续数据};这个回调在收到 HTTP 响应头时被调用(注意:此时还没收到 body 数据):
[&]— Lambda 按引用捕获所有外部变量,这样可以修改gotError和errorMsgres.status != 200— 如果状态码不是 200(比如 401 认证失败),说明请求出错了gotError = true— 设置错误标记,后续content_receiver会检查这个标记return false— 告诉 cpp-httplib终止请求,不需要继续接收 body 数据了return true— 状态码正常,继续接收后续的 body 数据
执行时序:
client.send(req) │ ├── TCP 连接建立 ├── 发送请求数据 ├── 收到响应头 ──→ response_handler 被调用 │ ├── status != 200 → gotError=true, return false → 终止 │ └── status == 200 → return true → 继续 │ ├── 收到第 1 块 body ──→ content_receiver 被调用 ├── 收到第 2 块 body ──→ content_receiver 被调用 ├── ... └── 接收完毕 ──→ client.send() 返回5.3 content_receiver — 数据接收处理器
这是流式处理的核心,每收到一块数据就会被 cpp-httplib 调用:
req.content_receiver=[&](constchar*data,size_t len,size_t offset,size_t totalLength){四个参数的含义:
| 参数 | 类型 | 说明 |
|---|---|---|
data | const char* | 指向本次收到的原始数据的指针 |
len | size_t | 本次收到数据的字节长度 |
offset | size_t | 当前数据在整个响应中的偏移量(本代码未使用) |
totalLength | size_t | 响应体总长度(本代码未使用) |
下面逐块解析 content_receiver 的内部逻辑。
六、content_receiver 逐行解析
6.1 错误检查
if(gotError){returnfalse;}入口先检查response_handler是否标记了错误。如果 HTTP 状态码不是 200,gotError已经是true,直接返回false终止接收。
6.2 追加数据到 buffer
buffer.append(data,len);std::string::append(const char* s, size_t n)— 将data指向的前len个字节追加到buffer末尾。
为什么不直接用buffer += data?因为data是原始的字节指针,不一定以\0结尾。operator+=遇到\0就停了,而append(data, len)精确地追加len个字节,即使中间有\0。
6.3 按 \n\n 分割并处理每个事件
size_t pos=0;while((pos=buffer.find("\n\n"))!=std::string::npos){std::string chunk=buffer.substr(0,pos);buffer.erase(0,pos+2);buffer.find("\n\n")— 在缓冲区中查找 SSE 事件分隔符\n\nstd::string::npos— 表示没找到,是string::find失败时的返回值(值为-1,但类型是size_t即无符号最大值)buffer.substr(0, pos)— 从位置 0 开始截取pos个字符,得到一个完整的 SSE 事件文本buffer.erase(0, pos + 2)— 从 buffer 中删除已处理的部分,pos + 2是数据长度加上\n\n的 2 个字符
用 while 而不是 if 的原因:一次 recv 可能收到多个完整事件,需要循环处理直到 buffer 中没有完整的\n\n分隔的事件为止。
buffer 内容: "data: {...}\n\ndata: {...}\n\ndata: {...Incomplete" ↑ 第 1 个 \n\n ↑ 第 2 个 \n\n while 循环 3 次: 第 1 次:提取 "data: {...}",删除已处理部分 第 2 次:提取 "data: {...}",删除已处理部分 第 3 次:find("\n\n") == npos → 退出循环 buffer 中剩余 "data: {...Incomplete" 等待下次 recv 拼接6.4 过滤无效事件
if(chunk.empty()||chunk[0]==':'){continue;}chunk.empty()— 空事件,跳过chunk[0] == ':'— SSE 协议规定以:开头的行是注释(comment),服务器可能用来维持连接(心跳),需要忽略
可能收到的数据: "\n\n" → chunk 为空,跳过 ": keep-alive\n\n" → chunk 以 ':' 开头,跳过 "data: {...}\n\n" → 有效数据,继续处理6.5 提取 data 字段
if(chunk.compare(0,6,"data: ")==0){std::string modelData=chunk.substr(6);chunk.compare(0, 6, "data: ")— 从 chunk 的第 0 个位置开始,取 6 个字符,与"data: "比较。等于 0 表示匹配chunk.substr(6)— 去掉"data: "前缀(6 个字符),得到后面的 JSON 数据
等价于:
if(chunk.starts_with("data: ")){// C++20 写法std::string modelData=chunk.substr(6);}compare是 C++98 就有的方法,兼容性更好。
6.6 检查结束标记
if(modelData=="[DONE]"){callback("",true);streamFinish=true;returntrue;}modelData == "[DONE]"— DeepSeek 在流式数据最后会发送data: [DONE],表示生成结束callback("", true)— 调用回调通知上层:没有更多内容了(第一个参数为空),流式结束(第二个参数为true)streamFinish = true— 标记流式正常结束return true— 告诉 cpp-httplib 继续接收(虽然后面不会有什么有效数据了)
6.7 解析 JSON 增量数据
Json::Value modelDataJson;Json::CharReaderBuilder reader;std::string errors;std::istringstreammodelDataStream(modelData);if(Json::parseFromStream(reader,modelDataStream,&modelDataJson,&errors)){和sendMessage()中的 JSON 解析逻辑完全一样:
Json::CharReaderBuilder— JSON 解析器构建器std::istringstream modelDataStream(modelData)— 将data:后面的 JSON 字符串包装成流Json::parseFromStream()— 解析 JSON,成功返回true
每个 SSE 事件中的 JSON 长这样:
{"choices":[{"delta":{"content":"你"}}]}注意流式响应用的是**delta**而不是message:
| 对比 | 全量响应(sendMessage) | 流式响应(sendMessageStream) |
|---|---|---|
| 字段路径 | choices[0].message.content | choices[0].delta.content |
| 含义 | 完整的回复内容 | 本次增量内容(一个字或几个字) |
| 出现次数 | 一次 | 多次,逐段返回 |
6.8 多重检查提取增量内容
if(modelDataJson.isMember("choices")&&modelDataJson["choices"].isArray()&&!modelDataJson["choices"].empty()&&modelDataJson["choices"][0].isMember("delta")&&modelDataJson["choices"][0]["delta"].isMember("content")){std::string content=modelDataJson["choices"][0]["delta"]["content"].asString();5 层检查,逐层深入:
isMember("choices")— 有choices字段吗?isArray()— 是数组吗?!empty()— 数组不为空吗?choices[0].isMember("delta")— 第一个元素有delta字段吗?delta.isMember("content")—delta里有content字段吗?
这样做的原因:流式响应的某些事件可能没有**content**字段。比如模型生成结束前的最后一个事件可能只包含finish_reason: "stop"而没有content。不做检查直接访问会崩溃。
6.9 累积并回调
fullResponse+=content;callback(content,false);fullResponse += content— 将增量内容追加到完整回复字符串中,最终作为函数返回值callback(content, false)— 调用回调通知上层:收到一段新内容(第一个参数),还没结束(第二个参数为false)
这两个操作是并行的:一边把碎片拼成完整回复(fullResponse),一边实时通知上层(callback)。
6.10 JSON 解析失败的容错
}else{WARN("DeepSeekProvider sendMessageStream parse modelDataJson error: {}",errors);}如果某个 chunk 的 JSON 解析失败,只打一条 WARN 日志,不中断流式接收。这是合理的,因为个别事件格式异常不应影响整体流程,后续事件可能仍然正常。
6.11 content_receiver 返回
returntrue;while 循环处理完 buffer 中所有完整事件后,返回true告诉 cpp-httplib继续接收后续数据。如果 buffer 中还有不完整的数据(没找到\n\n),会留在 buffer 中等下次content_receiver被调用时拼接。
七、发送请求与收尾
7.1 发送请求
autoresult=client.send(req);if(!result){ERR("Network error {}",to_string(result.error()));return"";}client.send(req)— 发送请求。与client.Post()不同,send()接受一个Request对象,会使用我们设置的response_handler和content_receiverresult—httplib::Result类型,重载了operator bool()- 网络失败时(DNS 解析失败、连接超时等)
result为false,result.error()返回具体的错误类型
7.2 确保流式正常结束
if(!streamFinish){WARN("stream ended without [DONE] marker");callback("",true);}如果整个请求结束了但streamFinish仍然是false,说明没有收到**[DONE]**标记。可能的原因:
- 网络中断,数据没传完
- 服务器异常,提前关闭了连接
不管什么原因,都要调用callback("", true)通知上层流式已结束,避免上层一直在等。
7.3 返回完整回复
returnfullResponse;fullResponse在content_receiver中逐段累积,到这里包含了模型的完整回复文本,和sendMessage()返回的内容一样,只是获取方式不同(一个是直接从响应体提取,一个是逐步拼接)。
八、完整流程图
sendMessageStream(messages, requestParam, callback) │ ├─ 1. 检查 isAvailable() │ ├─ 2. 构造 JSON 请求体(stream: true) │ ├─ 3. 创建 HTTP Client(超时 300 秒) │ ├─ 4. 构造 Request,设置两个回调: │ ├── response_handler → 检查 HTTP 状态码 │ └── content_receiver → 逐块处理 SSE 数据 │ ├─ 5. client.send(req) ──→ 请求发出 │ │ │ ├── 收到响应头 → response_handler │ │ └── status != 200 → gotError = true │ │ │ └── 收到每块 body → content_receiver │ │ │ ├── 检查 gotError → 有错则终止 │ ├── 追加到 buffer │ ├── while (找到 \n\n) │ │ ├── 提取 chunk │ │ ├── 空行/注释 → 跳过 │ │ ├── "data: [DONE]" → callback("", true), 结束 │ │ └── "data: {json}" → 解析 JSON │ │ └── choices[0].delta.content │ │ ├── fullResponse += content │ │ └── callback(content, false) │ └── return true(继续接收) │ ├─ 6. 检查 streamFinish │ └── 未收到 [DONE] → callback("", true) 兜底 │ └─ 7. return fullResponse(累积的完整回复)九、全量 vs 流式对比总结
| 对比项 | sendMessage(全量) | sendMessageStream(流式) |
|---|---|---|
| 请求体 | 无stream字段 | "stream": true |
| 读取超时 | 60 秒 | 300 秒 |
| 请求头 | Authorization+Content-Type | 多一个Accept: text/event-stream |
| 请求方式 | client.Post() | client.send(req)+ 回调 |
| 响应格式 | 完整 JSON | 多个 SSE 事件,\n\n分隔 |
| 内容字段 | choices[0].message.content | choices[0].delta.content |
| 结束标记 | 无(整个响应就是完整的) | data: [DONE] |
| 返回方式 | 一次性返回完整文本 | callback 逐段回调 + 最终返回完整文本 |
核心区别就一句话:全量是等完再给,流式是边收边给。
十、总结
sendMessageStream()的难点在于:
- SSE 协议解析— 需要理解
data:前缀、\n\n分隔符、[DONE]结束标记 - Buffer 缓冲机制— 网络传输的数据边界不确定,必须缓冲拼接后按
\n\n分割 - 双重回调架构—
response_handler检查状态码,content_receiver逐块处理数据 - 容错处理— 注释行过滤、JSON 解析失败不中断、缺少
[DONE]的兜底回调
后续将实现ChatGPT 和 Gemini 模型接入,它们的整体结构与 DeepSeek 类似,但在 API 端点、SSE 数据格式上有重要差异。
