导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.FunctionCalling
  • starlette
  • FastAPI
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • LiteLLM
  • pydantic-settings
  • ai_agent
  • format
  • diff
  • mcp_server
  • 1. 一条请求里要经过哪些「形态」
  • 2. 流式路由:stream_message 在做什么
  • 3. HTTP 请求体:JSON → Pydantic
  • 4. 数据库与 ORM:MCP 服务 → 纯字典列表
  • 5. 历史消息:ORM 行 → OpenAI Chat messages
  • 6. MCP 工具:SDK 对象 → 内部列表 → OpenAI tools
    • 6.0 对应总览表「OpenAI 工具参数」一行:inputSchema → parameters
    • 6.1 list_tools 结果 → 统一 dict
    • 6.2 内部 dict → OpenAI Tools 数组
    • 6.3 工具「暴露名」归一化
    • 6.4 parameters JSON Schema 归一化(实现摘要)
  • 7. 模型服务端点:配置 URL → OpenAI SDK base_url
  • 8. 单次补全请求:内存 dict → SDK 参数 → 网络协议
  • 9. 流式响应:SDK chunk → 应用内事件
    • 9.1 文本增量:chunk → {"type": "delta", "text": "..."}
    • 9.2 工具调用增量:chunk → 内存里累加
    • 9.3 一轮结束:伪 Chat Completions 结构
  • 10. 模型输出的 arguments 字符串 → Python dict(MCP 入参)
  • 11. MCP call_tool 结果 → 单条 tool 角色消息文本
  • 12. 应用内事件 dict → SSE 文本帧
    • 12.1 不向客户端转发的内部 / 收尾类型
    • 12.2 工具事件与落库 meta
  • 13. 落库:助手回复与工具摘要
  • 14. 会话标题(首条用户消息时):长文本 → 短标题
  • 15. 多轮工具循环中的「消息列表」演变(概念)
  • 16. 调试建议:从哪里打断点看「格式是否对」
  • 17. 相关源文件索引

1. 一条请求里要经过哪些「形态」 #

可以把整条链路看成多段管道,每一段输入输出类型不同。流式入口为 POST /api/agent-chat-sessions/{session_id}/messages/stream 的 stream_message(app/routers/agent_chat.py),经 prepare_stream_chat → iter_agent_chat_sse → generate_with_tools 等。

阶段 输入形态 输出形态 主要代码位置
HTTP 入参 JSON 字节流 Pydantic 模型 FastAPI + schemas.AgentChatSendRequest
流式前置上下文 ORM + 请求体 StreamChatContext prepare_stream_chat(同文件)
智能体绑定的 MCP ORM / DB 行 纯 dict 列表 mcp_service_dicts
历史消息 ORM 消息行列表 OpenAI Chat messages 列表 build_llm_messages_from_history
MCP 工具清单 SDK 对象 list_tools 内部统一 dict + OpenAI tools list_tools_for_service → build_openai_tools
OpenAI 工具参数 MCP 工具里的 inputSchema Chat Completions 里 tools[].function.parameters build_openai_tools、normalize_tool_schema
模型服务地址 配置字符串 OpenAI SDK 要求的 base_url openai_base_url
单次请求体 内存中的 req_payload dict HTTP 请求(SDK 封装) openai_chat_kwargs + AsyncOpenAI
模型流式响应 SSE/JSON 流 chunk 对象 应用内事件 dict iter_chat_completion_events
工具调用增量 流式 tool_calls 片段 完整的 tool_calls 数组 merge_stream_tool_calls
工具参数 模型输出的 JSON 字符串 dict 传给 MCP tool_call_args_dict
MCP 执行结果 call_tool 返回结构 单行/多行文本 call_tool_for_service
推送给浏览器 Python dict SSE 文本帧 agent_chat_stream.sse(iter_agent_chat_sse 内)
会话标题(首条) 用户长文本 截断后的标题字符串 build_session_title_from_question

下面按时间顺序展开每一段为什么要转换、规则是什么、常见坑在哪。

2. 流式路由:stream_message 在做什么 #

路径:POST /api/agent-chat-sessions/{session_id}/messages/stream
签名:async def stream_message(session_id, payload: schemas.AgentChatSendRequest, session: Session = Depends(get_session))

数据流:

  1. payload:请求体 JSON → AgentChatSendRequest(当前仅 content: str,见 app/schemas.py)。
  2. ctx = prepare_stream_chat(session, session_id, payload):同步阶段完成校验、用户消息落库、拼 StreamChatContext(见 §3 与 docs/stream_message_data_flow.md)。
  3. return StreamingResponse(iter_agent_chat_sse(...), media_type="text/event-stream", headers=SSE_STREAM_HEADERS):异步生成器产出 SSE 字符串;不在路由内直接调用 generate_with_tools(由 iter_agent_chat_sse 封装)。

响应头:SSE_STREAM_HEADERS(Cache-Control、Connection、X-Accel-Buffering 等),见 agent_chat.py 同文件顶部常量。

3. HTTP 请求体:JSON → Pydantic #

形态:客户端发送 Content-Type: application/json 的字节流(UTF-8)。

转换:FastAPI 将 body 解析并校验为 AgentChatSendRequest。当前模型字段为 content(必填,min_length=1),无 enable_mcp_tools 等额外开关;是否注册 MCP 工具由智能体绑定的 mcp_service_ids 与 mcp_service_dicts 决定。

要点:

  • 校验通过后,路由里拿到的已是结构化对象;prepare_stream_chat 中 payload.content.strip() 在字符串层再规范化。
  • 若网关/终端编码不是 UTF-8,解析可能失败。

4. 数据库与 ORM:MCP 服务 → 纯字典列表 #

形态:Agent 上挂的是 MCP 服务 ID 列表;库中每条 MCP 服务是 ORM 行。

转换:mcp_service_dicts(session, agent) 对每个 ID 调用 mcp_repository.get_mcp_service,拼成:

{"id": <int>, "name": "<str>", "protocol": "<str>", "config": { ... }}

为什么要变成 dict:

  • 下层 generate_with_tools / mcp_transport_streams 只依赖可序列化、可透传的配置(protocol、config),与 SQLAlchemy 会话解耦,避免把 ORM 实体传到异步 MCP 客户端里引发会话/线程问题。

细节:

  • name/protocol 统一转成 str,config 缺省为 {},减少 None 分支。

5. 历史消息:ORM 行 → OpenAI Chat messages #

函数:build_llm_messages_from_history(agent, message_rows)(app/services/agent_chat.py)。

输入:

  • agent:取 system_prompt。
  • message_rows:库里的多行 AgentChatMessage(role、content、meta 等)。

输出:形如 OpenAI Chat Completions 所需的列表:

[
  {"role": "system", "content": "..."},
  {"role": "user", "content": "..."},
  {"role": "assistant", "content": "..."},
  {"role": "tool", "content": "..."},
  ...
]

转换规则(与当前实现一致):

  1. 首条固定为 system,内容来自 agent.system_prompt(strip)。
  2. 只保留 role ∈ {user, assistant, tool},其它跳过。
  3. 每条非 system 消息当前仅设置 role 与 content(未从 meta 注入 tool_call_id)。若多轮工具对话要求 API 侧严格带 tool_call_id,需与实现同步演进。

语义:把「业务库里的聊天表」映射到「大模型厂商认识的对话格式」;工具轮次能否连贯还取决于落库时 tool 消息是否与模型返回的 call_id 对齐。

6. MCP 工具:SDK 对象 → 内部列表 → OpenAI tools #

6.0 对应总览表「OpenAI 工具参数」一行:inputSchema → parameters #

总览表里这一行概括的是:把 MCP 声明的工具入参格式,变成 Chat Completions 里 function.parameters 能接受的 JSON Schema。

一侧 字段 / 位置 含义
MCP(list_tools 返回的每个 Tool) inputSchema(规范里常用驼峰;对象上也可能出现 input_schema) 描述该工具调用时参数对象的 JSON Schema。
OpenAI Chat Completions tools[].function.parameters 同样是一份 JSON Schema,描述模型应生成的 function.arguments(JSON 对象字符串)需符合的结构。

在本项目里的落点(app/services/agent_chat.py):

  1. list_tools_for_service 把 SDK 对象的 inputSchema / input_schema 读出来,放进内部统一结构的 input_schema 键。
  2. build_openai_tools 构建每条 function 时写:"parameters": normalize_tool_schema(t.get("input_schema"))。

normalize_tool_schema 做什么:

  • 若 input_schema 已是 dict:原样返回(对 OpenAI 透传 MCP 给出的 JSON Schema)。
  • 否则:退回 {"type": "object", "properties": {}},避免非法类型导致整次请求失败。

兼容性与坑:

  • OpenAI 兼容的 JSON Schema 只是 子集;MCP 若使用复杂 $ref、不支持的关键字或过深嵌套,可能在请求阶段报错。

6.1 list_tools 结果 → 统一 dict #

函数:list_tools_for_service(service)。

  1. 按 protocol 建立传输(stdio / sse / streamable-http),ClientSession.initialize() 后 list_tools()。
  2. 取 tools_result.tools(缺省 [])。
  3. 每个 tool 压成:name、description、input_schema(见源码)。

6.2 内部 dict → OpenAI Tools 数组 #

函数:build_openai_tools(services)。

对每个工具生成一项:type: function,function.name / description / parameters。

同时维护 tool_mapping:暴露名 → {service, tool_name, service_name},供 call_tool_for_service 使用真实 MCP 工具名。

6.3 工具「暴露名」归一化 #

函数:normalize_tool_name(service_name, tool_name)。

  • 服务名与工具名转小写,非 [a-z0-9_-] 的字符压成 _,合并为 服务__工具,最长 64 字符。
  • 原因:多 MCP 服务可能有同名工具;OpenAI 侧 function.name 必须唯一。

6.4 parameters JSON Schema 归一化(实现摘要) #

函数:normalize_tool_schema(schema)。

  • 已是 dict → 原样作为 parameters。
  • 否则 → {"type": "object", "properties": {}}。

概念与字段名对应关系见 6.0。

7. 模型服务端点:配置 URL → OpenAI SDK base_url #

函数:openai_base_url(api_base_url)。

  • 去掉末尾 /。
  • 若尚不以 /v1 结尾(大小写不敏感),则追加 /v1。

原因:与 OpenAI 官方 Python SDK 约定一致:base_url 指向 API 根,路径由 SDK 拼接 chat/completions。

8. 单次补全请求:内存 dict → SDK 参数 → 网络协议 #

组装(generate_with_tools 内每轮):

req_payload = {"model": model_name, "messages": messages, "temperature": 0.3}
# 若有工具:
req_payload["tools"] = tools
req_payload["tool_choice"] = "auto"

转 SDK:openai_chat_kwargs(payload, stream=True) → chat.completions.create(**kwargs),含 stream=True。

网络侧:HTTP +(通常)SSE 或 chunked 流;对应用代码而言是 AsyncOpenAI 的异步流对象。

9. 流式响应:SDK chunk → 应用内事件 #

函数:iter_chat_completion_events。

9.1 文本增量:chunk → {"type": "delta", "text": "..."} #

  • 从每个 chunk 的 choices[0].delta.content 取字符串片段。
  • 有内容则 yield delta,供上层包成 SSE。

9.2 工具调用增量:chunk → 内存里累加 #

  • delta.tool_calls 可能分多段到达。
  • merge_stream_tool_calls 按 index 槽位把字符串 += 拼到 function.name 和 function.arguments 上。

9.3 一轮结束:伪 Chat Completions 结构 #

流结束后组装 msg(role: assistant、content、可选 tool_calls),然后:

yield {"type": AGENT_CHAT_ROUND_DONE, "data": {"choices": [{"message": msg}]}}

常量 AGENT_CHAT_ROUND_DONE 在 agent_chat.py 中定义为 "AGENT_CHAT_ROUND_DONE"(不是旧文档中的双下划线占位符)。

用途:与 REST 非流式 choices[0].message 形状对齐,generate_with_tools 用同一套逻辑判断本轮是否要执行工具。

10. 模型输出的 arguments 字符串 → Python dict(MCP 入参) #

函数:tool_call_args_dict(args_text)。

  • json.loads 成功且为 dict → 作为 call_tool 的参数;失败或非 dict → {}。

注意:模型有时会输出残缺 JSON;MCP 侧可能报错或得到空参。

11. MCP call_tool 结果 → 单条 tool 角色消息文本 #

函数:call_tool_for_service。

  • mcp_transport_streams + ClientSession,call_tool(tool_name, args)。
  • 遍历返回的 content 列表,取每项的 text,用 \n 拼成一个字符串。

去向:generate_with_tools 追加:

{"role": "tool", "tool_call_id": call_id, "content": result_text}

(工具执行的展示事件另见 iter_tool_run 产出的 tool_start / tool_end / AGENT_CHAT_TOOL_DONE。)

12. 应用内事件 dict → SSE 文本帧 #

函数:sse(data: dict)(app/services/agent_chat_stream.py,由 iter_agent_chat_sse 使用)。

  • json.dumps(data, ensure_ascii=False, default=str)。
  • 前缀 data: + 负载 + \n\n:Server-Sent Events 单帧格式。

路由侧:agent_chat.py 中另有同名 sse,用于同文件内其它逻辑;流式对话响应以 iter_agent_chat_sse 内的 sse 为准。

响应:StreamingResponse(..., media_type="text/event-stream", headers=SSE_STREAM_HEADERS)。

12.1 不向客户端转发的内部 / 收尾类型 #

  • AGENT_CHAT_STREAM_RUN_COMPLETE(值为 agent_chat_stream_run_complete):在 iter_agent_chat_sse 中读取 final_text 写库,不 yield 给浏览器。
  • AGENT_CHAT_ROUND_DONE:在 generate_with_tools 内消费,不直接作为 SSE 类型推给前端。
  • AGENT_CHAT_TOOL_DONE:在 iter_tool_run / generate_with_tools 内用于拿到 result_text,不作为 SSE 推给前端;前端看到的是 tool_start / tool_end(及 delta 等)。

12.2 工具事件与落库 meta #

iter_agent_chat_sse 用 tool_correlation_key 配对 tool_start / tool_end,经 tool_run_row_for_db 合并为 mcp_tool_runs 列表;流结束后 create_message(..., role="assistant", content=final_reply, meta={"mcp_tool_runs": ...})(无工具运行则 meta 为空对象 {})。

13. 落库:助手回复与工具摘要 #

流正常结束后:

  • final_reply:来自 AGENT_CHAT_STREAM_RUN_COMPLETE 的 final_text(在 iter_agent_chat_sse 内提取)。
  • meta:可选 mcp_tool_runs(工具调用摘要数组)。
  • 再 touch_session 更新会话活跃时间。

形态:内存中的整段最终文本 + 结构化 meta → 数据库一条 assistant 消息;在线展示的 delta 仅用于流式 UI,与最终 content 在逻辑上应对齐。

14. 会话标题(首条用户消息时):长文本 → 短标题 #

函数:build_session_title_from_question(app/routers/agent_chat.py)。

  • 空白压缩为单词间单空格。
  • 空 → "新对话";否则最长 24 字符,超出加 ...。

15. 多轮工具循环中的「消息列表」演变(概念) #

generate_with_tools 内 messages 为 OpenAI Chat 消息数组 的副本,每轮可能追加:

  1. 模型无工具:追加 {"role":"assistant","content": final_text},并 yield AGENT_CHAT_STREAM_RUN_COMPLETE。
  2. 模型有工具:追加带 tool_calls 的 assistant,再对每个 call 追加 role: tool,然后进入下一轮(最多 10 轮;用尽后还有一次不带 tools 的最终补全,见源码)。

格式约束:须满足厂商约定——先有带 tool_calls 的 assistant,再跟对应 tool_call_id 的 tool 消息。

16. 调试建议:从哪里打断点看「格式是否对」 #

  1. build_llm_messages_from_history 返回值:system 是否在首条;tool 行是否与模型 call_id 一致(若 API 报错可核对此处是否需补 tool_call_id)。
  2. build_openai_tools 的 tools 与 tool_mapping:暴露名是否唯一;parameters 是否为合法 JSON Schema。
  3. iter_chat_completion_events 最后一 yield:合并后的 msg 是否含完整 tool_calls。
  4. SSE 输出:浏览器 Network 里是否连续收到 data: {...}\n\n。
  5. 数据库:assistant 一条 content 是否与 final_text 一致;meta.mcp_tool_runs 是否与工具 UI 一致。

17. 相关源文件索引 #

文件 职责
app/routers/agent_chat.py stream_message、prepare_stream_chat、mcp_service_dicts、build_session_title_from_question、SSE_STREAM_HEADERS
app/services/agent_chat_stream.py StreamChatContext、iter_agent_chat_sse、sse、tool_correlation_key、tool_run_row_for_db
app/services/agent_chat.py generate_with_tools、iter_chat_completion_events、merge_stream_tool_calls、build_openai_tools、iter_tool_run、常量 AGENT_CHAT_*
app/schemas.py AgentChatSendRequest 等
docs/stream_message_data_flow.md stream_message 数据结构转换专篇
← 上一节 FastAPI 下一节 httpx →

访问验证

请输入访问令牌

Token不正确,请重新输入