当前位置: 首页 > news >正文

【C++ AI 大模型接入 SDK】 - DeepSeek 模型接入(下)

大家好,我是Halcyon.平安

欢迎文末添加好友交流,共同进步!

    • 一、本篇概述
    • 二、什么是 SSE?
      • 2.1 SSE vs 普通请求
      • 2.2 SSE 数据格式
    • 三、与 sendMessage 的相同部分
      • 3.1 请求体多了 "stream": true
      • 3.2 读取超时更长
      • 3.3 请求头多了 Accept
    • 四、流式处理变量
    • 五、构造 Request 对象
      • 5.1 为什么不用 client.Post()?
      • 5.2 response_handler — 响应头处理器
      • 5.3 content_receiver — 数据接收处理器
    • 六、content_receiver 逐行解析
      • 6.1 错误检查
      • 6.2 追加数据到 buffer
      • 6.3 按 \n\n 分割并处理每个事件
      • 6.4 过滤无效事件
      • 6.5 提取 data 字段
      • 6.6 检查结束标记
      • 6.7 解析 JSON 增量数据
      • 6.8 多重检查提取增量内容
      • 6.9 累积并回调
      • 6.10 JSON 解析失败的容错
      • 6.11 content_receiver 返回
    • 七、发送请求与收尾
      • 7.1 发送请求
      • 7.2 确保流式正常结束
      • 7.3 返回完整回复
    • 八、完整流程图
    • 九、全量 vs 流式对比总结
    • 十、总结

一、本篇概述

上篇讲了initModel()sendMessage()(全量请求),本篇聚焦sendMessageStream()(流式请求)。这是整个 SDK最复杂的函数,涉及 SSE 协议解析、数据缓冲、逐块回调。

sendMessageStream() 的工作方式: 用户提问 → SDK 发起 HTTP 请求 → DeepSeek 边生成边返回 → SDK 逐块解析 → 每解析出一段文字就通过 callback 通知上层 → 上层(ChatServer)立即推送给前端 → 用户看到"打字机"效果

二、什么是 SSE?

在深入代码之前,需要先理解SSE(Server-Sent Events,服务器推送事件)

2.1 SSE vs 普通请求

普通 HTTP 请求(sendMessage 全量): 客户端 ──请求──→ 服务器 客户端 ←──等待──────── 客户端 ←──────────── 完整响应(一次性) SSE 流式请求(sendMessageStream): 客户端 ──请求──→ 服务器 客户端 ←──chunk1── "你" 客户端 ←──chunk2── "好" 客户端 ←──chunk3── "!" 客户端 ←──chunk4── "我是AI助手" 客户端 ←──[DONE]── 结束

2.2 SSE 数据格式

SSE 是一种文本协议,每条消息用\n\n(两个换行)分隔。DeepSeek 返回的原始数据大概长这样:

data: {"choices":[{"delta":{"content":"你"}}]}\n\n data: {"choices":[{"delta":{"content":"好"}}]}\n\n data: {"choices":[{"delta":{"content":"!"}}]}\n\n : comment\n\n data: {"choices":[{"delta":{"content":"我是AI助手"}}]}\n\n data: [DONE]\n\n

规则:

  • 每条消息以data:开头
  • :开头的是注释行,需要忽略
  • data: [DONE]结束标记
  • 每条消息之间用\n\n分隔

三、与 sendMessage 的相同部分

sendMessageStream()的前半部分(构造请求参数、历史消息、请求体)和sendMessage()几乎一样,只有两处区别:

3.1 请求体多了 “stream”: true

requestBody["stream"]=true;

这一行告诉 DeepSeek API:请用流式方式返回结果。如果设为false或不设,就返回完整结果。

3.2 读取超时更长

// sendMessage 全量 client.set_read_timeout(60, 0); // 60秒 // sendMessageStream 流式 client.set_read_timeout(300, 0); // 300秒

流式响应是边生成边传输的,模型生成一段文字就发一段,整个过程的持续时间远比全量请求长。如果设太短,长文本生成还没结束就超时了。

3.3 请求头多了 Accept

{"Accept","text/event-stream"}

告诉服务器客户端期望接收 SSE 格式的流式数据。


四、流式处理变量

在发送请求之前,先声明一组用于流式处理的变量:

std::string buffer;// 接受流式响应的数据块boolgotError=false;// 标记响应是否成功std::string errorMsg;// 错误描述符intstatusCode=0;// 响应状态码boolstreamFinish=false;// 标记流式响应是否完成std::string fullResponse;// 累积完整的响应
变量类型作用
bufferstring数据缓冲区。网络传输中,一次 recv 可能收到不完整的数据,需要缓冲拼接
gotErrorbool一旦 HTTP 状态码非 200,设为true,后续接收器直接终止
errorMsgstring存储错误描述(如"HTTP status code: 401"
statusCodeint存储响应状态码(本代码中声明了但未实际使用)
streamFinishbool收到[DONE]标记后设为true,用于最后检查流是否正常结束
fullResponsestring累积所有增量文本,最终作为函数返回值返回给调用方

为什么需要 buffer?

网络传输的特点是:一次 recv 调用可能收到任意数量的数据。可能一次收到半个 SSE 事件,也可能一次收到三个完整事件。所以需要一个缓冲区来拼接数据,等凑够一个完整事件(找到\n\n分隔符)再解析。

第 1 次 recv: "data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\nda" ↑ 分隔符 ↑ 不完整 buffer 中: "data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\nda" ├────── 完整事件,可以解析 ──────┤└─ 留在 buffer 等下次 ─┘ 第 2 次 recv: "ta: {\"choices\":[{\"delta\":{\"content\":\"好\"}}]}\n\n" 与 buffer 中残留的 "da" 拼接成完整事件

五、构造 Request 对象

5.1 为什么不用 client.Post()?

上篇的全量请求直接用client.Post(),它是一次性发送请求、等待完整响应。但流式请求需要边接收边处理,所以要用更底层的httplib::Request对象,手动设置两个回调:

httplib::Request req;req.method="POST";req.path="/v1/chat/completions";req.headers=headers;req.body=requestBodyStr;
字段赋值说明
method"POST"HTTP 方法
path"/v1/chat/completions"请求路径
headers之前构造的 headers包含认证、内容类型、SSE 接受头
body序列化后的 JSON 字符串请求体

client.Post(path, headers, body, type)的参数是对应的,只是拆成了结构体的字段。

5.2 response_handler — 响应头处理器

req.response_handler=[&](consthttplib::Response&res){if(res.status!=200){gotError=true;errorMsg="HTTP status code: "+std::to_string(res.status);returnfalse;// 终止请求}returntrue;// 继续接收后续数据};

这个回调在收到 HTTP 响应头时被调用(注意:此时还没收到 body 数据):

  • [&]— Lambda 按引用捕获所有外部变量,这样可以修改gotErrorerrorMsg
  • res.status != 200— 如果状态码不是 200(比如 401 认证失败),说明请求出错了
  • gotError = true— 设置错误标记,后续content_receiver会检查这个标记
  • return false— 告诉 cpp-httplib终止请求,不需要继续接收 body 数据了
  • return true— 状态码正常,继续接收后续的 body 数据

执行时序:

client.send(req) │ ├── TCP 连接建立 ├── 发送请求数据 ├── 收到响应头 ──→ response_handler 被调用 │ ├── status != 200 → gotError=true, return false → 终止 │ └── status == 200 → return true → 继续 │ ├── 收到第 1 块 body ──→ content_receiver 被调用 ├── 收到第 2 块 body ──→ content_receiver 被调用 ├── ... └── 接收完毕 ──→ client.send() 返回

5.3 content_receiver — 数据接收处理器

这是流式处理的核心,每收到一块数据就会被 cpp-httplib 调用:

req.content_receiver=[&](constchar*data,size_t len,size_t offset,size_t totalLength){

四个参数的含义:

参数类型说明
dataconst char*指向本次收到的原始数据的指针
lensize_t本次收到数据的字节长度
offsetsize_t当前数据在整个响应中的偏移量(本代码未使用)
totalLengthsize_t响应体总长度(本代码未使用)

下面逐块解析 content_receiver 的内部逻辑。


六、content_receiver 逐行解析

6.1 错误检查

if(gotError){returnfalse;}

入口先检查response_handler是否标记了错误。如果 HTTP 状态码不是 200,gotError已经是true,直接返回false终止接收。

6.2 追加数据到 buffer

buffer.append(data,len);

std::string::append(const char* s, size_t n)— 将data指向的前len个字节追加到buffer末尾。

为什么不直接用buffer += data?因为data是原始的字节指针,不一定以\0结尾。operator+=遇到\0就停了,而append(data, len)精确地追加len个字节,即使中间有\0

6.3 按 \n\n 分割并处理每个事件

size_t pos=0;while((pos=buffer.find("\n\n"))!=std::string::npos){std::string chunk=buffer.substr(0,pos);buffer.erase(0,pos+2);
  • buffer.find("\n\n")— 在缓冲区中查找 SSE 事件分隔符\n\n
  • std::string::npos— 表示没找到,是string::find失败时的返回值(值为-1,但类型是size_t即无符号最大值)
  • buffer.substr(0, pos)— 从位置 0 开始截取pos个字符,得到一个完整的 SSE 事件文本
  • buffer.erase(0, pos + 2)— 从 buffer 中删除已处理的部分,pos + 2是数据长度加上\n\n的 2 个字符

用 while 而不是 if 的原因:一次 recv 可能收到多个完整事件,需要循环处理直到 buffer 中没有完整的\n\n分隔的事件为止。

buffer 内容: "data: {...}\n\ndata: {...}\n\ndata: {...Incomplete" ↑ 第 1 个 \n\n ↑ 第 2 个 \n\n while 循环 3 次: 第 1 次:提取 "data: {...}",删除已处理部分 第 2 次:提取 "data: {...}",删除已处理部分 第 3 次:find("\n\n") == npos → 退出循环 buffer 中剩余 "data: {...Incomplete" 等待下次 recv 拼接

6.4 过滤无效事件

if(chunk.empty()||chunk[0]==':'){continue;}
  • chunk.empty()— 空事件,跳过
  • chunk[0] == ':'— SSE 协议规定以:开头的行是注释(comment),服务器可能用来维持连接(心跳),需要忽略
可能收到的数据: "\n\n" → chunk 为空,跳过 ": keep-alive\n\n" → chunk 以 ':' 开头,跳过 "data: {...}\n\n" → 有效数据,继续处理

6.5 提取 data 字段

if(chunk.compare(0,6,"data: ")==0){std::string modelData=chunk.substr(6);
  • chunk.compare(0, 6, "data: ")— 从 chunk 的第 0 个位置开始,取 6 个字符,与"data: "比较。等于 0 表示匹配
  • chunk.substr(6)— 去掉"data: "前缀(6 个字符),得到后面的 JSON 数据

等价于:

if(chunk.starts_with("data: ")){// C++20 写法std::string modelData=chunk.substr(6);}

compare是 C++98 就有的方法,兼容性更好。

6.6 检查结束标记

if(modelData=="[DONE]"){callback("",true);streamFinish=true;returntrue;}
  • modelData == "[DONE]"— DeepSeek 在流式数据最后会发送data: [DONE],表示生成结束
  • callback("", true)— 调用回调通知上层:没有更多内容了(第一个参数为空),流式结束(第二个参数为true
  • streamFinish = true— 标记流式正常结束
  • return true— 告诉 cpp-httplib 继续接收(虽然后面不会有什么有效数据了)

6.7 解析 JSON 增量数据

Json::Value modelDataJson;Json::CharReaderBuilder reader;std::string errors;std::istringstreammodelDataStream(modelData);if(Json::parseFromStream(reader,modelDataStream,&modelDataJson,&errors)){

sendMessage()中的 JSON 解析逻辑完全一样:

  • Json::CharReaderBuilder— JSON 解析器构建器
  • std::istringstream modelDataStream(modelData)— 将data:后面的 JSON 字符串包装成流
  • Json::parseFromStream()— 解析 JSON,成功返回true

每个 SSE 事件中的 JSON 长这样:

{"choices":[{"delta":{"content":"你"}}]}

注意流式响应用的是**delta**而不是message

对比全量响应(sendMessage)流式响应(sendMessageStream)
字段路径choices[0].message.contentchoices[0].delta.content
含义完整的回复内容本次增量内容(一个字或几个字)
出现次数一次多次,逐段返回

6.8 多重检查提取增量内容

if(modelDataJson.isMember("choices")&&modelDataJson["choices"].isArray()&&!modelDataJson["choices"].empty()&&modelDataJson["choices"][0].isMember("delta")&&modelDataJson["choices"][0]["delta"].isMember("content")){std::string content=modelDataJson["choices"][0]["delta"]["content"].asString();

5 层检查,逐层深入:

  1. isMember("choices")— 有choices字段吗?
  2. isArray()— 是数组吗?
  3. !empty()— 数组不为空吗?
  4. choices[0].isMember("delta")— 第一个元素有delta字段吗?
  5. delta.isMember("content")delta里有content字段吗?

这样做的原因:流式响应的某些事件可能没有**content**字段。比如模型生成结束前的最后一个事件可能只包含finish_reason: "stop"而没有content。不做检查直接访问会崩溃。

6.9 累积并回调

fullResponse+=content;callback(content,false);
  • fullResponse += content— 将增量内容追加到完整回复字符串中,最终作为函数返回值
  • callback(content, false)— 调用回调通知上层:收到一段新内容(第一个参数),还没结束(第二个参数为false

这两个操作是并行的:一边把碎片拼成完整回复(fullResponse),一边实时通知上层(callback)

6.10 JSON 解析失败的容错

}else{WARN("DeepSeekProvider sendMessageStream parse modelDataJson error: {}",errors);}

如果某个 chunk 的 JSON 解析失败,只打一条 WARN 日志,不中断流式接收。这是合理的,因为个别事件格式异常不应影响整体流程,后续事件可能仍然正常。

6.11 content_receiver 返回

returntrue;

while 循环处理完 buffer 中所有完整事件后,返回true告诉 cpp-httplib继续接收后续数据。如果 buffer 中还有不完整的数据(没找到\n\n),会留在 buffer 中等下次content_receiver被调用时拼接。


七、发送请求与收尾

7.1 发送请求

autoresult=client.send(req);if(!result){ERR("Network error {}",to_string(result.error()));return"";}
  • client.send(req)— 发送请求。与client.Post()不同,send()接受一个Request对象,会使用我们设置的response_handlercontent_receiver
  • resulthttplib::Result类型,重载了operator bool()
  • 网络失败时(DNS 解析失败、连接超时等)resultfalseresult.error()返回具体的错误类型

7.2 确保流式正常结束

if(!streamFinish){WARN("stream ended without [DONE] marker");callback("",true);}

如果整个请求结束了但streamFinish仍然是false,说明没有收到**[DONE]**标记。可能的原因:

  • 网络中断,数据没传完
  • 服务器异常,提前关闭了连接

不管什么原因,都要调用callback("", true)通知上层流式已结束,避免上层一直在等。

7.3 返回完整回复

returnfullResponse;

fullResponsecontent_receiver中逐段累积,到这里包含了模型的完整回复文本,和sendMessage()返回的内容一样,只是获取方式不同(一个是直接从响应体提取,一个是逐步拼接)。


八、完整流程图

sendMessageStream(messages, requestParam, callback) │ ├─ 1. 检查 isAvailable() │ ├─ 2. 构造 JSON 请求体(stream: true) │ ├─ 3. 创建 HTTP Client(超时 300 秒) │ ├─ 4. 构造 Request,设置两个回调: │ ├── response_handler → 检查 HTTP 状态码 │ └── content_receiver → 逐块处理 SSE 数据 │ ├─ 5. client.send(req) ──→ 请求发出 │ │ │ ├── 收到响应头 → response_handler │ │ └── status != 200 → gotError = true │ │ │ └── 收到每块 body → content_receiver │ │ │ ├── 检查 gotError → 有错则终止 │ ├── 追加到 buffer │ ├── while (找到 \n\n) │ │ ├── 提取 chunk │ │ ├── 空行/注释 → 跳过 │ │ ├── "data: [DONE]" → callback("", true), 结束 │ │ └── "data: {json}" → 解析 JSON │ │ └── choices[0].delta.content │ │ ├── fullResponse += content │ │ └── callback(content, false) │ └── return true(继续接收) │ ├─ 6. 检查 streamFinish │ └── 未收到 [DONE] → callback("", true) 兜底 │ └─ 7. return fullResponse(累积的完整回复)

九、全量 vs 流式对比总结

对比项sendMessage(全量)sendMessageStream(流式)
请求体stream字段"stream": true
读取超时60 秒300 秒
请求头Authorization+Content-Type多一个Accept: text/event-stream
请求方式client.Post()client.send(req)+ 回调
响应格式完整 JSON多个 SSE 事件,\n\n分隔
内容字段choices[0].message.contentchoices[0].delta.content
结束标记无(整个响应就是完整的)data: [DONE]
返回方式一次性返回完整文本callback 逐段回调 + 最终返回完整文本

核心区别就一句话:全量是等完再给,流式是边收边给


十、总结

sendMessageStream()的难点在于:

  1. SSE 协议解析— 需要理解data:前缀、\n\n分隔符、[DONE]结束标记
  2. Buffer 缓冲机制— 网络传输的数据边界不确定,必须缓冲拼接后按\n\n分割
  3. 双重回调架构response_handler检查状态码,content_receiver逐块处理数据
  4. 容错处理— 注释行过滤、JSON 解析失败不中断、缺少[DONE]的兜底回调

后续将实现ChatGPT 和 Gemini 模型接入,它们的整体结构与 DeepSeek 类似,但在 API 端点、SSE 数据格式上有重要差异。

http://www.cnnetsun.cn/news/2495251.html

相关文章:

  • AI教材写作大揭秘!低查重工具,为教材编写保驾护航!
  • AI教材生成秘籍!揭秘低查重的AI教材编写工具,高效产出优质教材
  • Tidal-Media-Downloader:3分钟掌握终极Tidal音乐下载方案
  • 华为OD机试真题 新系统【小学英语老师批改作文】
  • 每天节省25分钟:淘宝淘金币自动化脚本全攻略
  • USBIPD-Win终极指南:3步实现Windows与WSL 2的USB设备无缝共享
  • 如何在IDEA中高效编辑JAR文件:JarEditor插件完整指南
  • AhabAssistantLimbusCompany:PC端《Limbus Company》自动化助手终极指南
  • 快速制作系统启动盘:Rufus实战指南与高级配置技巧
  • AI编程助手的现状与未来:Copilot、CodeLlama与GPT-4
  • Cursor Free VIP技术架构深度解析:设备标识重置与多平台兼容实现
  • 如何在GTA V中安全使用YimMenu:新手完全指南与防封技巧
  • ElevenLabs东北话语音API调用失败率高达41%?一线工程师紧急封存的6个底层HTTP头配置方案
  • Obsidian Full Calendar插件完整指南:在笔记中高效管理你的日程
  • 告别数据锁定:用youdaonote-pull实现有道云笔记的本地化自由
  • 5分钟搞定歌词管理:LDDC免费歌词下载工具完全指南
  • cann/asc-devkit:内置数据类型
  • 如何通过CDCS项目快速提升数据科学实战能力:中国数据竞赛优胜解集锦的终极指南 [特殊字符]
  • TextShot多语言OCR配置指南:如何轻松识别中文、英文、法文等100+语言
  • requests-oauthlib实战:构建完整的第三方应用集成方案
  • fltk-rs主题定制技巧:打造个性化GUI界面的10个实用方法
  • 如何在Windows上快速运行安卓应用:APK Installer终极指南
  • 如何高效使用Mihon漫画阅读器:Android平台上的开源漫画管理解决方案
  • 如何为老款Mac安装最新macOS?OCLP-Mod技术深度解析
  • 5分钟快速搭建Windows RTMP流媒体服务器:新手完整指南
  • Axure RP 中文语言包:3分钟告别英文界面困扰
  • OpCore-Simplify:10分钟搞定黑苹果配置的终极指南
  • 开发AI应用时如何利用Taotoken实现多模型降级容灾策略
  • 终极指南:如何快速配置org-brain概念映射工具
  • 如何在Windows电脑上高效刷酷安?酷安UWP终极指南帮你告别小屏时代