下面按当前仓库里的实现(Depends(get_session)、参数名 session)说明 stream_message 的完整执行过程与数据流,并配上 Mermaid 图。
1. 路由函数在做什么 #
@router.post("/api/agent-chat-sessions/{session_id}/messages/stream")
async def stream_message(session_id: int, payload: schemas.AgentChatSendRequest, session: Session = Depends(get_session)):
ctx = _prepare_stream_chat(session, session_id, payload)
return StreamingResponse(
iter_agent_chat_sse(session, session_id, ctx),
media_type="text/event-stream",
headers=SSE_STREAM_HEADERS
)可以拆成四步理解:
入参
- 路径:
session_id - 体:经 Pydantic 校验的
AgentChatSendRequest(主要是content) - 依赖:
session= 本请求内的 SQLAlchemySession(见下文get_session)
- 路径:
_prepare_stream_chat(在返回StreamingResponse之前同步执行完)
读库校验、可能改标题、写入用户消息、拼出给模型的messages和StreamChatContext。StreamingResponse(...)
响应体不是一次性字节串,而是异步迭代器iter_agent_chat_sse(session, session_id, ctx):客户端读 body 时,框架逐步拉取生成器,每次yield的字符串作为一块 SSE 数据写出。响应头
media_type="text/event-stream"+SSE_STREAM_HEADERS(禁用缓存、长连接、X-Accel-Buffering: no等),便于浏览器/代理不把流缓冲死。
依赖 get_session 的生命周期(与流式是否安全相关):
def get_session():
session = SessionLocal()
try:
yield session
finally:
session.close()FastAPI 对 yield 型依赖 的约定是:yield 之后的清理(这里是 session.close())一般在响应结束后执行;对 StreamingResponse 而言,通常要等流发送完毕后才算响应结束。因此同一个 session 既用于准备阶段写用户消息,也用于流结束后写助手消息,在常见用法下是连贯的(若你遇到 “session is closed” 类错误,再单独排查中间件/超时/断开连接)。
2. 端到端时序图 #
3. 准备阶段:数据形态变化(流程图) #
字段级转换要点(_prepare_stream_chat):
| 阶段 | 输入 | 输出 / 副作用 |
|---|---|---|
| 校验 | session_id、payload.content.strip() |
会话/智能体/LLM 存在;内容非空 |
| 首条消息标题 | 无历史且标题为默认 | update_session_title |
| 用户消息 | 字符串 | create_message(session, ..., "user", user_content) |
| 历史 → LLM | list_messages 全量(已含新用户消息) |
build_llm_messages_from_history:system + 各条 user/assistant/tool(tool 从 meta.tool_call_id 映射到 tool_call_id) |
| MCP | agent.mcp_service_ids |
_mcp_service_dicts → [{id,name,protocol,config}, ...] |
| 上下文对象 | 上述 + LLM 连接信息 | StreamChatContext(session_row, llm_*, messages, mcp_services) |
4. 流阶段:内部事件 → SSE → 再落库 #
_sse:dict→json.dumps(ensure_ascii=False,default=str)→ 前缀data:+ 结尾\n\n,符合 SSE 单行 JSON 习惯。AGENT_CHAT_STREAM_RUN_COMPLETE:只用于取出final_text写入final_reply,不再包一层发给前端(前端靠前面的 token/事件拼 UI)。tool_start/tool_end:用_tool_correlation_key(优先call_id)配对,合并为mcp_tool_runs里的一条;原始tool_*事件仍会yield _sse(evt)给前端。- 成功结束:
create_message(session, session_id, "assistant", final_reply, meta),meta在有工具时为{"mcp_tool_runs": [...]};touch_session;最后{"type":"done"}。 - 异常:
{"type":"error","message":...},且不会走成功路径的助手落库。
5. generate_with_tools 与路由的关系(简图) #
路由与流服务不实现多轮工具细节,只消费统一的事件流:转发(除 RUN_COMPLETE)→ 聚 mcp_tool_runs → 用 final_reply 写库。
6. 一句话总结 #
stream_message:用 get_session 拿到的数据库会话 先做 _prepare_stream_chat(校验 + 写用户消息 + 拼 StreamChatContext),再返回 StreamingResponse,由 iter_agent_chat_sse 把 generate_with_tools 的 dict 事件 转成 SSE 文本推给客户端,并在流正常结束时把 助手正文 + mcp_tool_runs 写入数据库。