- 1. 一条请求里要经过哪些「形态」
- 2. 流式路由:stream_message 在做什么
- 3. HTTP 请求体:JSON → Pydantic
- 4. 数据库与 ORM:MCP 服务 → 纯字典列表
- 5. 历史消息:ORM 行 → OpenAI Chat messages
- 6. MCP 工具:SDK 对象 → 内部列表 → OpenAI tools
- 7. 模型服务端点:配置 URL → OpenAI SDK base_url
- 8. 单次补全请求:内存 dict → SDK 参数 → 网络协议
- 9. 流式响应:SDK chunk → 应用内事件
- 10. 模型输出的 arguments 字符串 → Python dict(MCP 入参)
- 11. MCP call_tool 结果 → 单条 tool 角色消息文本
- 12. 应用内事件 dict → SSE 文本帧
- 13. 落库:助手回复与工具摘要
- 14. 会话标题(首条用户消息时):长文本 → 短标题
- 15. 多轮工具循环中的「消息列表」演变(概念)
- 16. 调试建议:从哪里打断点看「格式是否对」
- 17. 相关源文件索引
1. 一条请求里要经过哪些「形态」 #
可以把整条链路看成多段管道,每一段输入输出类型不同。流式入口为 POST /api/agent-chat-sessions/{session_id}/messages/stream 的 stream_message(app/routers/agent_chat.py),经 prepare_stream_chat → iter_agent_chat_sse → generate_with_tools 等。
| 阶段 | 输入形态 | 输出形态 | 主要代码位置 |
|---|---|---|---|
| HTTP 入参 | JSON 字节流 | Pydantic 模型 | FastAPI + schemas.AgentChatSendRequest |
| 流式前置上下文 | ORM + 请求体 | StreamChatContext |
prepare_stream_chat(同文件) |
| 智能体绑定的 MCP | ORM / DB 行 | 纯 dict 列表 |
mcp_service_dicts |
| 历史消息 | ORM 消息行列表 | OpenAI Chat messages 列表 |
build_llm_messages_from_history |
| MCP 工具清单 | SDK 对象 list_tools |
内部统一 dict + OpenAI tools |
list_tools_for_service → build_openai_tools |
| OpenAI 工具参数 | MCP 工具里的 inputSchema |
Chat Completions 里 tools[].function.parameters |
build_openai_tools、normalize_tool_schema |
| 模型服务地址 | 配置字符串 | OpenAI SDK 要求的 base_url |
openai_base_url |
| 单次请求体 | 内存中的 req_payload dict |
HTTP 请求(SDK 封装) | openai_chat_kwargs + AsyncOpenAI |
| 模型流式响应 | SSE/JSON 流 chunk 对象 | 应用内事件 dict | iter_chat_completion_events |
| 工具调用增量 | 流式 tool_calls 片段 |
完整的 tool_calls 数组 |
merge_stream_tool_calls |
| 工具参数 | 模型输出的 JSON 字符串 | dict 传给 MCP |
tool_call_args_dict |
| MCP 执行结果 | call_tool 返回结构 |
单行/多行文本 | call_tool_for_service |
| 推送给浏览器 | Python dict |
SSE 文本帧 | agent_chat_stream.sse(iter_agent_chat_sse 内) |
| 会话标题(首条) | 用户长文本 | 截断后的标题字符串 | build_session_title_from_question |
下面按时间顺序展开每一段为什么要转换、规则是什么、常见坑在哪。
2. 流式路由:stream_message 在做什么 #
路径:POST /api/agent-chat-sessions/{session_id}/messages/stream
签名:async def stream_message(session_id, payload: schemas.AgentChatSendRequest, session: Session = Depends(get_session))
数据流:
payload:请求体 JSON →AgentChatSendRequest(当前仅content: str,见app/schemas.py)。ctx = prepare_stream_chat(session, session_id, payload):同步阶段完成校验、用户消息落库、拼StreamChatContext(见 §3 与docs/stream_message_data_flow.md)。return StreamingResponse(iter_agent_chat_sse(...), media_type="text/event-stream", headers=SSE_STREAM_HEADERS):异步生成器产出 SSE 字符串;不在路由内直接调用generate_with_tools(由iter_agent_chat_sse封装)。
响应头:SSE_STREAM_HEADERS(Cache-Control、Connection、X-Accel-Buffering 等),见 agent_chat.py 同文件顶部常量。
3. HTTP 请求体:JSON → Pydantic #
形态:客户端发送 Content-Type: application/json 的字节流(UTF-8)。
转换:FastAPI 将 body 解析并校验为 AgentChatSendRequest。当前模型字段为 content(必填,min_length=1),无 enable_mcp_tools 等额外开关;是否注册 MCP 工具由智能体绑定的 mcp_service_ids 与 mcp_service_dicts 决定。
要点:
- 校验通过后,路由里拿到的已是结构化对象;
prepare_stream_chat中payload.content.strip()在字符串层再规范化。 - 若网关/终端编码不是 UTF-8,解析可能失败。
4. 数据库与 ORM:MCP 服务 → 纯字典列表 #
形态:Agent 上挂的是 MCP 服务 ID 列表;库中每条 MCP 服务是 ORM 行。
转换:mcp_service_dicts(session, agent) 对每个 ID 调用 mcp_repository.get_mcp_service,拼成:
{"id": <int>, "name": "<str>", "protocol": "<str>", "config": { ... }}为什么要变成 dict:
- 下层
generate_with_tools/mcp_transport_streams只依赖可序列化、可透传的配置(protocol、config),与 SQLAlchemy 会话解耦,避免把 ORM 实体传到异步 MCP 客户端里引发会话/线程问题。
细节:
name/protocol统一转成str,config缺省为{},减少None分支。
5. 历史消息:ORM 行 → OpenAI Chat messages #
函数:build_llm_messages_from_history(agent, message_rows)(app/services/agent_chat.py)。
输入:
agent:取system_prompt。message_rows:库里的多行AgentChatMessage(role、content、meta等)。
输出:形如 OpenAI Chat Completions 所需的列表:
[
{"role": "system", "content": "..."},
{"role": "user", "content": "..."},
{"role": "assistant", "content": "..."},
{"role": "tool", "content": "..."},
...
]转换规则(与当前实现一致):
- 首条固定为 system,内容来自
agent.system_prompt(strip)。 - 只保留
role ∈ {user, assistant, tool},其它跳过。 - 每条非 system 消息当前仅设置
role与content(未从meta注入tool_call_id)。若多轮工具对话要求 API 侧严格带tool_call_id,需与实现同步演进。
语义:把「业务库里的聊天表」映射到「大模型厂商认识的对话格式」;工具轮次能否连贯还取决于落库时 tool 消息是否与模型返回的 call_id 对齐。
6. MCP 工具:SDK 对象 → 内部列表 → OpenAI tools #
6.0 对应总览表「OpenAI 工具参数」一行:inputSchema → parameters #
总览表里这一行概括的是:把 MCP 声明的工具入参格式,变成 Chat Completions 里 function.parameters 能接受的 JSON Schema。
| 一侧 | 字段 / 位置 | 含义 |
|---|---|---|
MCP(list_tools 返回的每个 Tool) |
inputSchema(规范里常用驼峰;对象上也可能出现 input_schema) |
描述该工具调用时参数对象的 JSON Schema。 |
| OpenAI Chat Completions | tools[].function.parameters |
同样是一份 JSON Schema,描述模型应生成的 function.arguments(JSON 对象字符串)需符合的结构。 |
在本项目里的落点(app/services/agent_chat.py):
list_tools_for_service把 SDK 对象的inputSchema/input_schema读出来,放进内部统一结构的input_schema键。build_openai_tools构建每条function时写:"parameters": normalize_tool_schema(t.get("input_schema"))。
normalize_tool_schema 做什么:
- 若
input_schema已是dict:原样返回(对 OpenAI 透传 MCP 给出的 JSON Schema)。 - 否则:退回
{"type": "object", "properties": {}},避免非法类型导致整次请求失败。
兼容性与坑:
- OpenAI 兼容的 JSON Schema 只是 子集;MCP 若使用复杂
$ref、不支持的关键字或过深嵌套,可能在请求阶段报错。
6.1 list_tools 结果 → 统一 dict #
函数:list_tools_for_service(service)。
- 按
protocol建立传输(stdio / sse / streamable-http),ClientSession.initialize()后list_tools()。 - 取
tools_result.tools(缺省[])。 - 每个 tool 压成:
name、description、input_schema(见源码)。
6.2 内部 dict → OpenAI Tools 数组 #
函数:build_openai_tools(services)。
对每个工具生成一项:type: function,function.name / description / parameters。
同时维护 tool_mapping:暴露名 → {service, tool_name, service_name},供 call_tool_for_service 使用真实 MCP 工具名。
6.3 工具「暴露名」归一化 #
函数:normalize_tool_name(service_name, tool_name)。
- 服务名与工具名转小写,非
[a-z0-9_-]的字符压成_,合并为服务__工具,最长 64 字符。 - 原因:多 MCP 服务可能有同名工具;OpenAI 侧
function.name必须唯一。
6.4 parameters JSON Schema 归一化(实现摘要) #
函数:normalize_tool_schema(schema)。
- 已是
dict→ 原样作为parameters。 - 否则 →
{"type": "object", "properties": {}}。
概念与字段名对应关系见 6.0。
7. 模型服务端点:配置 URL → OpenAI SDK base_url #
函数:openai_base_url(api_base_url)。
- 去掉末尾
/。 - 若尚不以
/v1结尾(大小写不敏感),则追加/v1。
原因:与 OpenAI 官方 Python SDK 约定一致:base_url 指向 API 根,路径由 SDK 拼接 chat/completions。
8. 单次补全请求:内存 dict → SDK 参数 → 网络协议 #
组装(generate_with_tools 内每轮):
req_payload = {"model": model_name, "messages": messages, "temperature": 0.3}
# 若有工具:
req_payload["tools"] = tools
req_payload["tool_choice"] = "auto"转 SDK:openai_chat_kwargs(payload, stream=True) → chat.completions.create(**kwargs),含 stream=True。
网络侧:HTTP +(通常)SSE 或 chunked 流;对应用代码而言是 AsyncOpenAI 的异步流对象。
9. 流式响应:SDK chunk → 应用内事件 #
函数:iter_chat_completion_events。
9.1 文本增量:chunk → {"type": "delta", "text": "..."} #
- 从每个 chunk 的
choices[0].delta.content取字符串片段。 - 有内容则
yielddelta,供上层包成 SSE。
9.2 工具调用增量:chunk → 内存里累加 #
delta.tool_calls可能分多段到达。merge_stream_tool_calls按index槽位把字符串 += 拼到function.name和function.arguments上。
9.3 一轮结束:伪 Chat Completions 结构 #
流结束后组装 msg(role: assistant、content、可选 tool_calls),然后:
yield {"type": AGENT_CHAT_ROUND_DONE, "data": {"choices": [{"message": msg}]}}常量 AGENT_CHAT_ROUND_DONE 在 agent_chat.py 中定义为 "AGENT_CHAT_ROUND_DONE"(不是旧文档中的双下划线占位符)。
用途:与 REST 非流式 choices[0].message 形状对齐,generate_with_tools 用同一套逻辑判断本轮是否要执行工具。
10. 模型输出的 arguments 字符串 → Python dict(MCP 入参) #
函数:tool_call_args_dict(args_text)。
json.loads成功且为dict→ 作为call_tool的参数;失败或非 dict →{}。
注意:模型有时会输出残缺 JSON;MCP 侧可能报错或得到空参。
11. MCP call_tool 结果 → 单条 tool 角色消息文本 #
函数:call_tool_for_service。
mcp_transport_streams+ClientSession,call_tool(tool_name, args)。- 遍历返回的
content列表,取每项的text,用\n拼成一个字符串。
去向:generate_with_tools 追加:
{"role": "tool", "tool_call_id": call_id, "content": result_text}(工具执行的展示事件另见 iter_tool_run 产出的 tool_start / tool_end / AGENT_CHAT_TOOL_DONE。)
12. 应用内事件 dict → SSE 文本帧 #
函数:sse(data: dict)(app/services/agent_chat_stream.py,由 iter_agent_chat_sse 使用)。
json.dumps(data, ensure_ascii=False, default=str)。- 前缀
data:+ 负载 +\n\n:Server-Sent Events 单帧格式。
路由侧:agent_chat.py 中另有同名 sse,用于同文件内其它逻辑;流式对话响应以 iter_agent_chat_sse 内的 sse 为准。
响应:StreamingResponse(..., media_type="text/event-stream", headers=SSE_STREAM_HEADERS)。
12.1 不向客户端转发的内部 / 收尾类型 #
AGENT_CHAT_STREAM_RUN_COMPLETE(值为agent_chat_stream_run_complete):在iter_agent_chat_sse中读取final_text写库,不yield给浏览器。AGENT_CHAT_ROUND_DONE:在generate_with_tools内消费,不直接作为 SSE 类型推给前端。AGENT_CHAT_TOOL_DONE:在iter_tool_run/generate_with_tools内用于拿到result_text,不作为 SSE 推给前端;前端看到的是tool_start/tool_end(及delta等)。
12.2 工具事件与落库 meta #
iter_agent_chat_sse 用 tool_correlation_key 配对 tool_start / tool_end,经 tool_run_row_for_db 合并为 mcp_tool_runs 列表;流结束后 create_message(..., role="assistant", content=final_reply, meta={"mcp_tool_runs": ...})(无工具运行则 meta 为空对象 {})。
13. 落库:助手回复与工具摘要 #
流正常结束后:
final_reply:来自AGENT_CHAT_STREAM_RUN_COMPLETE的final_text(在iter_agent_chat_sse内提取)。meta:可选mcp_tool_runs(工具调用摘要数组)。- 再
touch_session更新会话活跃时间。
形态:内存中的整段最终文本 + 结构化 meta → 数据库一条 assistant 消息;在线展示的 delta 仅用于流式 UI,与最终 content 在逻辑上应对齐。
14. 会话标题(首条用户消息时):长文本 → 短标题 #
函数:build_session_title_from_question(app/routers/agent_chat.py)。
- 空白压缩为单词间单空格。
- 空 →
"新对话";否则最长 24 字符,超出加...。
15. 多轮工具循环中的「消息列表」演变(概念) #
generate_with_tools 内 messages 为 OpenAI Chat 消息数组 的副本,每轮可能追加:
- 模型无工具:追加
{"role":"assistant","content": final_text},并yield AGENT_CHAT_STREAM_RUN_COMPLETE。 - 模型有工具:追加带
tool_calls的 assistant,再对每个 call 追加role: tool,然后进入下一轮(最多 10 轮;用尽后还有一次不带tools的最终补全,见源码)。
格式约束:须满足厂商约定——先有带 tool_calls 的 assistant,再跟对应 tool_call_id 的 tool 消息。
16. 调试建议:从哪里打断点看「格式是否对」 #
build_llm_messages_from_history返回值:system 是否在首条;tool行是否与模型call_id一致(若 API 报错可核对此处是否需补tool_call_id)。build_openai_tools的tools与tool_mapping:暴露名是否唯一;parameters是否为合法 JSON Schema。iter_chat_completion_events最后一 yield:合并后的msg是否含完整tool_calls。- SSE 输出:浏览器 Network 里是否连续收到
data: {...}\n\n。 - 数据库:
assistant一条content是否与final_text一致;meta.mcp_tool_runs是否与工具 UI 一致。
17. 相关源文件索引 #
| 文件 | 职责 |
|---|---|
app/routers/agent_chat.py |
stream_message、prepare_stream_chat、mcp_service_dicts、build_session_title_from_question、SSE_STREAM_HEADERS |
app/services/agent_chat_stream.py |
StreamChatContext、iter_agent_chat_sse、sse、tool_correlation_key、tool_run_row_for_db |
app/services/agent_chat.py |
generate_with_tools、iter_chat_completion_events、merge_stream_tool_calls、build_openai_tools、iter_tool_run、常量 AGENT_CHAT_* |
app/schemas.py |
AgentChatSendRequest 等 |
docs/stream_message_data_flow.md |
stream_message 数据结构转换专篇 |