导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.FunctionCalling
  • starlette
  • FastAPI
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • LiteLLM
  • pydantic-settings
  • ai_agent
  • format
  • stream_message
  • 1. 路由函数在做什么
  • 2. 端到端时序图
  • 3. 准备阶段:数据形态变化(流程图)
  • 4. 流阶段:内部事件 → SSE → 再落库
  • 5. generate_with_tools 与路由的关系(简图)
  • 6. 一句话总结

下面按当前仓库里的实现(Depends(get_session)、参数名 session)说明 stream_message 的完整执行过程与数据流,并配上 Mermaid 图。


1. 路由函数在做什么 #

@router.post("/api/agent-chat-sessions/{session_id}/messages/stream")
async def stream_message(session_id: int, payload: schemas.AgentChatSendRequest, session: Session = Depends(get_session)):
    ctx = _prepare_stream_chat(session, session_id, payload)
    return StreamingResponse(
        iter_agent_chat_sse(session, session_id, ctx),
        media_type="text/event-stream",
        headers=SSE_STREAM_HEADERS
    )

可以拆成四步理解:

  1. 入参

    • 路径:session_id
    • 体:经 Pydantic 校验的 AgentChatSendRequest(主要是 content)
    • 依赖:session = 本请求内的 SQLAlchemy Session(见下文 get_session)
  2. _prepare_stream_chat(在返回 StreamingResponse 之前同步执行完)
    读库校验、可能改标题、写入用户消息、拼出给模型的 messages 和 StreamChatContext。

  3. StreamingResponse(...)
    响应体不是一次性字节串,而是异步迭代器 iter_agent_chat_sse(session, session_id, ctx):客户端读 body 时,框架逐步拉取生成器,每次 yield 的字符串作为一块 SSE 数据写出。

  4. 响应头
    media_type="text/event-stream" + SSE_STREAM_HEADERS(禁用缓存、长连接、X-Accel-Buffering: no 等),便于浏览器/代理不把流缓冲死。

依赖 get_session 的生命周期(与流式是否安全相关):

def get_session():
    session = SessionLocal()
    try:
        yield session
    finally:
        session.close()

FastAPI 对 yield 型依赖 的约定是:yield 之后的清理(这里是 session.close())一般在响应结束后执行;对 StreamingResponse 而言,通常要等流发送完毕后才算响应结束。因此同一个 session 既用于准备阶段写用户消息,也用于流结束后写助手消息,在常见用法下是连贯的(若你遇到 “session is closed” 类错误,再单独排查中间件/超时/断开连接)。


2. 端到端时序图 #

sequenceDiagram participant C as 客户端 participant F as FastAPI participant R as stream_message participant P as _prepare_stream_chat participant DB as Repository / 数据库 participant G as iter_agent_chat_sse participant GW as generate_with_tools participant LLM as LLM API participant MCP as MCP 服务 C->>F: POST .../messages/stream {content} F->>F: get_session → yield Session F->>R: stream_message(session_id, payload, session) R->>P: _prepare_stream_chat(session, session_id, payload) P->>DB: get_session / get_agent / get_llm ... alt 校验失败 P-->>R: HTTPException R-->>C: 4xx end P->>DB: create_message(user) P->>DB: list_messages P-->>R: StreamChatContext R-->>F: StreamingResponse(iter_agent_chat_sse(...)) F-->>C: 200 + SSE 头,开始 chunked 写 body Note over C,G: 客户端开始消费 body 后,异步生成器才持续运行 G->>C: data: {"type":"start"}\n\n loop async for evt in generate_with_tools G->>GW: 拉取下一事件 GW->>LLM: 流式 Chat Completions(可能多轮) LLM-->>GW: 分片 / 聚合结果 opt 模型要求调用工具 GW->>MCP: 执行 MCP 工具 MCP-->>GW: 结果文本 GW-->>G: tool_start / tool_end 等 end GW-->>G: 其它流式 dict;末尾内部事件 RUN_COMPLETE(含 final_text) alt 非 RUN_COMPLETE G->>C: data: JSON(evt)\n\n end end G->>DB: create_message(assistant) + touch_session G->>C: data: {"type":"done"}\n\n opt 异常 G->>C: data: {"type":"error",...}\n\n end F->>F: get_session finally: session.close()

3. 准备阶段:数据形态变化(流程图) #

flowchart LR subgraph HTTP A[session_id 路径参数] B[JSON body content] end subgraph 校验与 ORM C[AgentChatSendRequest] D[session_row / agent / llm] E[user 消息行落库] end subgraph 模型输入 F["messages: OpenAI Chat 列表"] G["mcp_services: dict 列表"] end H[StreamChatContext] A --> P[_prepare_stream_chat] B --> C --> P P --> D P --> E P --> F P --> G D --> H F --> H G --> H E -.->|含于 list_messages| F

字段级转换要点(_prepare_stream_chat):

阶段 输入 输出 / 副作用
校验 session_id、payload.content.strip() 会话/智能体/LLM 存在;内容非空
首条消息标题 无历史且标题为默认 update_session_title
用户消息 字符串 create_message(session, ..., "user", user_content)
历史 → LLM list_messages 全量(已含新用户消息) build_llm_messages_from_history:system + 各条 user/assistant/tool(tool 从 meta.tool_call_id 映射到 tool_call_id)
MCP agent.mcp_service_ids _mcp_service_dicts → [{id,name,protocol,config}, ...]
上下文对象 上述 + LLM 连接信息 StreamChatContext(session_row, llm_*, messages, mcp_services)

4. 流阶段:内部事件 → SSE → 再落库 #

flowchart TB subgraph GW["generate_with_tools"] E1[delta 等流式 dict] E2[tool_start / tool_end] E3["RUN_COMPLETE\nfinal_text"] end subgraph G["iter_agent_chat_sse"] SSESTR["_sse(dict) → 字符串"] PT[pending_tool] MR[mcp_tool_runs] FT[final_reply] end subgraph 客户端 OUT[SSE 帧] end subgraph DB AM[assistant + meta] end E1 --> SSESTR E2 --> PT E2 --> MR E2 --> SSESTR E3 --> FT E3 -.->|continue 不 yield| X[不发给客户端] SSESTR --> OUT FT --> AM MR --> AM
  • _sse:dict → json.dumps(ensure_ascii=False, default=str)→ 前缀 data: + 结尾 \n\n,符合 SSE 单行 JSON 习惯。
  • AGENT_CHAT_STREAM_RUN_COMPLETE:只用于取出 final_text 写入 final_reply,不再包一层发给前端(前端靠前面的 token/事件拼 UI)。
  • tool_start / tool_end:用 _tool_correlation_key(优先 call_id)配对,合并为 mcp_tool_runs 里的一条;原始 tool_* 事件仍会 yield _sse(evt) 给前端。
  • 成功结束:create_message(session, session_id, "assistant", final_reply, meta),meta 在有工具时为 {"mcp_tool_runs": [...]};touch_session;最后 {"type":"done"}。
  • 异常:{"type":"error","message":...},且不会走成功路径的助手落库。

5. generate_with_tools 与路由的关系(简图) #

stateDiagram-v2 [*] --> CallModel: messages + 可选 tools CallModel --> NoTools: 无 tool_calls CallModel --> WithTools: 有 tool_calls WithTools --> CallModel: 追加 assistant+tool 消息再请求 NoTools --> [*]: yield RUN_COMPLETE

路由与流服务不实现多轮工具细节,只消费统一的事件流:转发(除 RUN_COMPLETE)→ 聚 mcp_tool_runs → 用 final_reply 写库。


6. 一句话总结 #

stream_message:用 get_session 拿到的数据库会话 先做 _prepare_stream_chat(校验 + 写用户消息 + 拼 StreamChatContext),再返回 StreamingResponse,由 iter_agent_chat_sse 把 generate_with_tools 的 dict 事件 转成 SSE 文本推给客户端,并在流正常结束时把 助手正文 + mcp_tool_runs 写入数据库。

← 上一节 starlette 下一节 subprocess →

访问验证

请输入访问令牌

Token不正确,请重新输入