1.1. agent_chat.py #
app/services/agent_chat.py
from typing import AsyncIterator
from openai import APIStatusError, AsyncOpenAI
from contextlib import asynccontextmanager
from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
from mcp.client.streamable_http import streamable_http_client
from app.services.mcp_httpx import mcp_httpx_client_factory
import logging
import re
import json
logger = logging.getLogger(__file__)
# 这是generate_with_tools的最后一个事件,路由据此结尾,表示整个调用结束,注意不要把这个事件发给客户端
AGENT_CHAT_STREAM_RUN_COMPLETE = "AGENT_CHAT_STREAM_RUN_COMPLETE"
# 一轮对话结束
AGENT_CHAT_ROUND_DONE = "AGENT_CHAT_ROUND_DONE"
# 工具调用结束
AGENT_CHAT_TOOL_DONE = "AGENT_CHAT_TOOL_DONE"
# 工具函数,支持service是对象或字典属性的安全访问 字典obj['key'] 或者 对象的话 obj.key
def service_value(service, key, default=None):
if isinstance(service, dict):
return service.get(key, default)
return getattr(service, key, default)
@asynccontextmanager
async def mcp_transport_streams(mcp_service):
# 获取MCP服务器的协议
protocol = service_value(mcp_service, "protocol", "").strip().lower()
# 获取MCP服务器的配置字典
cfg = mcp_service["config"]
if protocol == "stdio":
stdioServerParameters = StdioServerParameters(
command=str(cfg.get("command") or ""),
args=[str(arg) for arg in (cfg.get("args", []))],
env={str(k): str(v) for k, v in cfg.get("env", {}).items()},
)
async with stdio_client(stdioServerParameters) as (read, write):
yield read, write
elif protocol == "sse":
# 请求头
headers = {str(k): str(v) for k, v in (cfg.get("headers") or {}).items()}
# 创建SSE客户端,获取读写流
async with sse_client(
str(cfg.get("url") or ""),
headers=headers,
httpx_client_factory=mcp_httpx_client_factory,
) as (read, write):
yield read, write
else:
# 请求头
headers = {str(k): str(v) for k, v in (cfg.get("headers") or {}).items()}
# 创建Streamable HTTP客户端,获取读写流
url = cfg.get("url") or ""
async with mcp_httpx_client_factory(headers=headers) as http_client:
async with streamable_http_client(url, http_client=http_client) as (
read,
write,
_,
):
yield read, write
# 获取指定的MCP服务器对应的全部工具列表
async def list_tools_for_service(mcp_service):
# 通过mcp_transport_streams可能 获取服务器的连接
async with mcp_transport_streams(mcp_service) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools_result = await session.list_tools()
# 获取MCP服务器原始的工具列表
raw_tools = getattr(tools_result, "tools", None) or []
out = []
for tool in raw_tools:
out.append(
{
"name": str(getattr(tool, "name", "") or ""),
"description": str(getattr(tool, "description", "") or ""),
"input_schema": getattr(tool, "inputSchema", None)
or getattr(tool, "input_schema", None)
or {},
}
)
return out
# 把工具名称归一化,或者说规范化
def normalize_tool_name(service_name, tool_name):
raw_s = str(service_name or "").strip().lower()
raw_t = str(tool_name or "").strip().lower()
s = re.sub(r"[^a-zA-Z0-9_-]", "_", raw_s).strip("_")
t = re.sub(r"[^a-zA-Z0-9_-]", "_", raw_t).strip("_")
if not s:
s = "svc"
if not t:
t = "tool"
merged = f"{s}__{t}"
merged = re.sub(r"_+", "_", merged)
return merged
def normalize_tool_schema(schema):
if isinstance(schema, dict):
return schema
return {"type": "object", "properties": {}}
async def build_openai_tools(mcp_services):
tools = []
tools_mapping = {}
for mcp_service in mcp_services:
# 获取服务名
mcp_service_name = (
str(service_value(mcp_service, "name", ""))
or f"service_{service_value(mcp_service, "id", "")}"
)
# 获取服务器暴露的所有的全部工具
for tool in await list_tools_for_service(mcp_service):
# 原始的工具名称
original_name = str(tool.get("name", "") or "").strip()
if not original_name:
continue
# 归一化后的工具名称,这个名字是准备发给大模型,让大模型调用的
exposed_name = normalize_tool_name(mcp_service_name, original_name)
if exposed_name in tools_mapping:
continue
# 记录映射关系
tools_mapping[exposed_name] = {
"mcp_service": mcp_service, # MCP服务字典
"tool_name": original_name, # MCP里的工具名
"mcp_service_name": mcp_service_name, # MCP服务名
}
# 追加 OPENAI 工具结构
tools.append(
{
"type": "function",
"function": {
"name": exposed_name, # 函数的名称
"description": str(
tool.get("description")
or f"{mcp_service_name}/{original_name}"
), # 函数描述
# MCP服务里工具的输入模型刚好就等于OPENAI工具函数的参数
"parameters": normalize_tool_schema(
tool.get("input_schema")
), ##函数参数
},
}
)
return tools, tools_mapping
# 定义一个函数,将给定的API地址标准化为/v1结尾,以兼容OPENAI的官方客户端(不包括chat/completions)
def openai_base_url(api_base_url):
# 将输入的api_base_url转为字符串,再去掉首尾的空格和末尾的/
base = str(api_base_url or "").strip().rstrip("/")
if not base:
return ""
lower = base.lower()
if lower.endswith("/v1"):
return base
return f"{base}/v1"
def payload_summary(payload):
messages = payload.get("messages") if isinstance(payload, dict) else []
msg_list = messages if isinstance(messages, list) else []
return {
"model": payload.get("model"),
"stream": payload.get("stream"),
"temperature": payload.get("temperature"),
"message_count": len(msg_list),
"roles": [
message.get("role") for message in msg_list if isinstance(message, dict)
],
"has_tools": bool(payload.get("tools")),
"tool_count": (
len(payload.get("tools", []))
if isinstance(payload.get("tools"), list)
else 0
),
}
def openai_chat_kwargs(payload: dict, *, stream: bool) -> dict:
kwargs: dict = {
"model": payload["model"],
"messages": payload["messages"],
"temperature": float(payload.get("temperature", 0.3)),
}
# 如果有tools字段,则添加到参数中
if payload.get("tools"):
kwargs["tools"] = payload.get("tools")
# 如果payload里有tool_choice属性字段,也添加了,表示让大模型自己选择使用工具
if payload.get("tool_choice") is not None:
kwargs["tool_choice"] = payload.get("tool_choice")
if stream:
kwargs["stream"] = True
return kwargs
# 合并流式工具调用信息,将新的一批调用片段合并到已存在的索引字段中
def merge_stream_tool_calls(tool_calls_by_index: dict[int, dict], chunk_tool_calls):
# 遍历本次分片中的每个工具调用对象,如果没有则是空列表
for tool_call in chunk_tool_calls or []:
# 获取当前调用的索引index,如果index不存在,则默认为0
index = int(tool_call.index) if tool_call.index is not None else 0
# 如果该索引还没有对应的条目数据,则先初始化一个默认结构
if index not in tool_calls_by_index:
tool_calls_by_index[index] = {
"id": "call_00_hEzZvoDEROnklQ86a3eC1EVt", # 工具调用ID
"type": "function", # 大模型想调用函数
"function": {"name": "", "arguments": ""}, # 调用函数的信息
}
# 获取当前索引对应的结构槽(里面放着已经合并的工具调用信息)
slot = tool_calls_by_index[index]
# 如果tool_call有ID的话,把ID放到结构槽里
if getattr(tool_call, "id", None):
slot["id"] = tool_call.id
# 如果tool_call有类型的话,也放进来结构槽
if getattr(tool_call, "type", None):
slot["type"] = tool_call.type
# 获取tool_call想要调用函数
fn = getattr(tool_call, "function", None)
if fn is not None:
# 如果fn有name属性,则追加到function name中
if getattr(fn, "name", None):
slot["function"]["name"] += fn.name
# 如果fn有arguments属性,则追加到function arguments中
if getattr(fn, "arguments", None):
slot["function"]["arguments"] += fn.arguments
# 定义异步生成器函数,用于流式返回chat_completion的每个部分
async def iter_chat_completion_events(api_base_url, api_key, payload):
base = openai_base_url(api_base_url)
if not base:
raise RuntimeError("模型服务器地址不能为空")
# 获取请求摘要信息,并添加stream:true
summary = payload_summary({**payload, "stream": True})
logger.info(f"大模型聊天请求 base_url=%s,请求摘要=%s", base, summary)
# 设置超时时间为90秒
timeout = 90
key = (api_key or "").strip()
async with AsyncOpenAI(api_key=key, base_url=base, timeout=timeout) as client:
# 根据请求参数生成OPENAI聊天接口所需的参数
kwargs = openai_chat_kwargs(payload, stream=True)
# 初始化内容部分的列表,用于保存每一块返回的数据
content_parts = []
# 初始化工具调用索引字典,用于合并流式工具调用信息
tool_calls_by_index = {}
try:
# 发起聊天的流式响应,获取异步流对象
stream = await client.chat.completions.create(**kwargs)
try:
# 异步迭代stream,逐步获取结果块
async for chunk in stream:
# 如果没有choices字段,就跳过
if not chunk.choices:
continue
# 获取当前块的delta字段,表示文本的增量
delta = chunk.choices[0].delta
# 如果说delta对空,则跳过
if delta is None:
continue
# 提取delta里的内容片段
piece = getattr(delta, "content", None) or ""
if piece:
# 把这一片数据添加到content_parts里
content_parts.append(piece)
# 通过yield 返回本次的delta
yield {"type": "delta", "data": piece}
# 提取delta里的tool_calls字段
tool_part = getattr(delta, "tool_calls", None)
# 如果说tool_part不为空,则合并到tool_calls_by_index
if tool_part:
merge_stream_tool_calls(tool_calls_by_index, tool_part)
finally:
# 请求结束后把流关闭掉
close = getattr(stream, "close", None)
if close is not None:
await close()
except APIStatusError as e:
logger.info("大模型聊天响应的状态码为=%s", e.status_code)
raise e
# 最终返回完整的文本
content = "".join(content_parts)
+ logger.info("大模型聊天响应:%s", content)
yield {
"type": AGENT_CHAT_ROUND_DONE, # 代表大模型结束一轮调用
"data": {
"content": content,
"tool_calls": [
tool_calls_by_index[index] for index in sorted(tool_calls_by_index)
],
# [{"id":"call_id_001","type": "function","function": {"name": "search_place, "arguments": '{"query": "故宫博物院","region": "北京"}'}]
}, # 这是大模型返回的消息
}
def tool_call_args_dict(args_text: str) -> dict:
try:
args = json.loads(args_text) if args_text else {}
return args if isinstance(args, dict) else {}
except json.JSONDecodeError:
return {}
def result_preview(text, limit=260):
# 将文本转成字符串,再按空白拆分后再重组,可以去掉多余的空格
clean = " ".join(str(text or "").split())
if len(clean) <= limit:
return clean
return f"{clean[:limit]}"
# 定义调用某个指定服务的工具函数
async def call_tool_for_service(service, tool_name, args):
async with mcp_transport_streams(service) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(tool_name, args)
parts = []
for item in getattr(result, "content", None) or []:
txt = str(getattr(item, "text", "") or "").strip()
if txt:
parts.append(txt)
if parts:
return "\n".join(parts)
return "工具调用成功,但是未返回文本内容"
# 定义一个异步生成器函数,用于执行工具的运行流程
async def iter_tool_run(mapping, call_id, args):
# 格式化显示的字符串,显示名称为 服务名/工具名
display_name = f"{mapping['mcp_service_name']}/{mapping['tool_name']}"
# 获取原始的工具名
tool = mapping["tool_name"]
# 产出工具调用开始的事件
yield {
"type": "tool_start",
"call_id": call_id,
"name": display_name,
"tool": tool,
"args": args,
}
# 调用实际的MCP服务的对应的工具,获取返回的文本结果
result_text = await call_tool_for_service(
mapping["mcp_service"], mapping["tool_name"], args
)
# 产出工具调用成功的结束事件,包含预览文本
yield {
"type": "tool_end",
"call_id": call_id,
"name": display_name,
"tool": tool,
"ok": True,
"result_preview": result_preview(result_text),
}
# 工具执行流程完成,产出最终工具执行完成事件,包含实际的结果文本
yield {"type": AGENT_CHAT_TOOL_DONE, "result_text": result_text}
# 主流程 异步生成器,流式产出事件
async def generate_with_tools(
api_base_url, api_key, model_name, base_messages, mcp_services
):
# 在这个过程中,可能会多轮调用工具
tools = []
tools_mapping = {}
# 如果此Agent配置了对应的MCP服务器,我们就要获取这些MCP服务器提供的工具并且转换为openai的标准格式发送给大模型,让大模型调用
if mcp_services:
tools, tools_mapping = await build_openai_tools(mcp_services)
logger.info(
"MCP工具已经注入模型中:服务数量=%s,工具数量=%s",
len(mcp_services),
len(tools),
)
else:
logger.info("未向模型注册工具,智能体未绑定任何的MCP服务")
# 将base_messages转为列表,避免原始的消息被修改
messages = list(base_messages)
# 最多循环10次(防止死循环或者多轮嵌套工具调用)
for _ in range(10):
# 构建请求payload
req_payload = {"model": model_name, "messages": messages, "temperature": 0.3}
if tools:
# 如果工具列表不为空,则将tools注入到告诉大模型可用的工具
req_payload["tools"] = tools
# 设置tool_choice字段的值为auto,允许 模型自主决定是否调用工具以及调用哪个工具
req_payload["tool_choice"] = "auto"
# 初始化data变量为None,用来后续接收完整的一轮对话的结果
data = None
# 异步遍历模型的流式事件生成器,处理每个事件
async for event in iter_chat_completion_events(
api_base_url, api_key, req_payload
):
# 如果事件的类型为 AGENT_CHAT_ROUND_DONE,说明一轮会话已经结束,需要提取数据
if event.get("type") == AGENT_CHAT_ROUND_DONE:
data = event["data"]
else:
# 否则直接将事件产出给调用方
yield event
if data is None:
raise RuntimeError(f"模型流式响应异常,未收到完整的一轮结果")
# 这是大模型返回的数据
content = data["content"]
# 这是大模型返回的工具调用
tool_calls = data["tool_calls"]
if not tool_calls:
# 将最终的消息添加到消息列表中,方便后续进行多轮对话
messages.append({"role": "assistant", "content": content})
# 产出流式工具调用完成的事件,包含最终的文本
yield {"type": AGENT_CHAT_STREAM_RUN_COMPLETE, "final_text": content}
return
# 将最终的消息添加到消息列表中,方便后续进行多轮对话
messages.append(
{"role": "assistant", "content": content, "tool_calls": tool_calls}
)
# 遍历本轮需要调用的工具
for tool_call in tool_calls:
# 获取 想调用 函数
fn = tool_call.get("function")
# 获取函数的名称
exposed_name = fn.get("name")
# 获取函数的字段串参数
args_text = fn.get("arguments")
# 获取本次调用的ID
call_id = tool_call.get("id")
# 把字符串参数转成字典参数
args = tool_call_args_dict(args_text)
# 查找exposed_name映射到的工具的实现
mapping = tools_mapping.get(exposed_name)
if not mapping:
result_text = f"未找到可调用的工具:{exposed_name}"
yield {
"type": "tool_start", # 表示工具调用开始的事件
"call_id": call_id, # 用于标识工具调用的唯一ID
"name": exposed_name, # 工具显示名称
"tool": exposed_name, # 工具名
"args": args, # 工具的参数
}
yield {
"type": "tool_end", # 表示工具调用开始的事件
"call_id": call_id, # 用于标识工具调用的唯一ID
"name": exposed_name, # 工具显示名称
"tool": exposed_name, # 工具名
"ok": False, # 表示工具是否调用成功 True就是成功,False就是失败
"message": result_text, # 错误消息,会在UI中进行显示
"result_preview": result_preview(result_text), # 预览结果的文本
}
else:
result_text = None
# 实际调用工具
async for event in iter_tool_run(mapping, call_id, args):
# 如果工具调用流程走完了,则保存最终的文本
if event.get("type") == AGENT_CHAT_TOOL_DONE:
result_text = event["result_text"]
else:
yield event
# 工具调用的结果放入消息列表中,供下一轮决策参考
messages.append(
{"role": "tool", "tool_call_id": call_id, "content": result_text}
)
# 初始化data变量为None,用来后续接收完整的一轮对话的结果
+ data = None
# 异步遍历模型的流式事件生成器,处理每个事件
+ async for event in iter_chat_completion_events(
+ api_base_url, api_key, req_payload
+ ):
# 如果事件的类型为 AGENT_CHAT_ROUND_DONE,说明一轮会话已经结束,需要提取数据
+ if event.get("type") == AGENT_CHAT_ROUND_DONE:
+ data = event["data"]
+ else:
# 否则直接将事件产出给调用方
+ yield event
+ if data is None:
+ raise RuntimeError(f"模型流式响应异常,未收到完整的一轮结果")
# 这是大模型返回的数据
+ content = data["content"]
# 将最终的消息添加到消息列表中,方便后续进行多轮对话
+ messages.append({"role": "assistant", "content": content})
# 产出流式工具调用完成的事件,包含最终的文本
+ yield {"type": AGENT_CHAT_STREAM_RUN_COMPLETE, "final_text": content}
# {
# "mcp_service": mcp_service, # MCP服务字典
# "tool_name": original_name, # MCP里的工具名
# "mcp_service_name": mcp_service_name, # MCP服务名
# }
1.2. agent_chat_stream.py #
app/services/agent_chat_stream.py
from typing import NamedTuple
from app import models
from sqlalchemy.orm import Session
import logging
import json
from app.repositories import agent_chat_repository
from app.services.agent_chat import generate_with_tools, AGENT_CHAT_STREAM_RUN_COMPLETE
logger = logging.getLogger(__name__)
# 定义一个将数据字典格式化为SSE协议规范的字符串的函数
def sse(data: dict) -> str:
return f"data: {json.dumps(data,ensure_ascii=False,default=str)}\n\n"
# 定义流式聊天的上下文的数据结构
class StreamChatContext(NamedTuple):
# 当前的聊天会话对话
session_row: models.AgentChatSession
# LLM API基础地址
llm_api_base_url: str
# LLM APIKEY
llm_api_key: str
# LLM 模型名称
llm_model_name: str
# 聊天历史消息列表
messages: list
# 相关联的MCP服务列表
mcp_services: list
# 定义一个函数,根据事件字典生成用于工具调用相关性的唯一 key
+def tool_correlation_key(ev: dict) -> str:
+ """与前端 toolCallRowKey 一致,用于配对 tool_start / tool_end。"""
# 尝试从事件字典中获取 call_id
+ cid = ev.get("call_id")
# 如果 call_id 非空且去除空白后不为"",则直接返回字符串形式的 call_id 作为 key
+ if cid is not None and str(cid).strip() != "":
+ return str(cid)
# 否则,用 name 和 tool 两个字段拼接为 key,格式为 "name#tool"
+ return f"{ev.get('name') or ''}#{ev.get('tool') or ''}"
# 定义一个函数,用于将工具调用的开始和结束事件合成一条用于数据库存储的摘要
+def tool_run_row_for_db(tool_start: dict, tool_end: dict) -> dict:
# """合并一次工具调用的起止事件,写入 assistant 消息的 meta.mcp_tool_runs。"""
# 获取工具调用参数,如果 tool_start 中 "args" 字段是字典就用它,否则用空字典
+ args = tool_start.get("args") if isinstance(tool_start.get("args"), dict) else {}
# 返回一个字典,汇总这次工具调用的关键信息
+ return {
# 工具调用的唯一标识,将 call_id 转为字符串(为 None 时转空字符串)
+ "call_id": str(tool_end.get("call_id") or ""),
# 工具显示名
+ "name": tool_end.get("name"),
# 工具的唯一名称
+ "tool": tool_end.get("tool"),
# 工具调用状态标志,ok 为 True 则为 "done",否则为 "error"
+ "status": "done" if tool_end.get("ok") else "error",
# 若有 message 字段则转为字符串,否则为 ""
+ "message": str(tool_end.get("message") or ""),
# 工具调用时传递的参数
+ "args": args,
# 工具调用的结果简要预览,若不存在则为 ""
+ "result_preview": str(tool_end.get("result_preview") or ""),
+ }
async def iter_agent_chat_sse(
session: Session, session_id: int, ctx: StreamChatContext
):
logger.info(
"开始流式对话 session_id=%s,模型名称=%s", session_id, ctx.llm_model_name
)
# 发送流式对话开始事件
yield sse({"type": "start"})
try:
# 定义一个变量用于存储工具调用的开始事件
+ pending_tool = {}
# 定义一个变量用于存储工具调用的摘要
+ mcp_tool_runs = []
# 定义一个变量用于存储最终的回复内容
final_reply = "AI的回答"
# 定义一个异步遍历generate_with_tools生成的每个事件
async for event in generate_with_tools(
ctx.llm_api_base_url,
ctx.llm_api_key,
ctx.llm_model_name,
ctx.messages,
ctx.mcp_services,
):
# 获取每个事件的类型
type = event.get("type")
if type == AGENT_CHAT_STREAM_RUN_COMPLETE:
final_reply = event.get("final_text") or ""
continue
# 如果事件类型是工具开始,则将事件存入 pending_tool,等待配对结束事件
+ elif type == "tool_start":
+ pending_tool[tool_correlation_key(event)] = event
# 如果事件类型是工具结束,则从 pending_tool 匹配拿出开始事件,并生成工具调用摘要
+ elif type == "tool_end":
# 获取工具调用的相关性唯一 key
+ key = tool_correlation_key(event)
# 从 pending_tool 匹配拿出开始事件
+ start = pending_tool.pop(key, {})
# 生成工具调用摘要
+ mcp_tool_runs.append(tool_run_row_for_db(start, event))
# 将每个事件以SSE字符串的方式流式输出
yield sse(event)
# 如果有工具调用摘要则写入 meta,否则为空字典
+ meta = {"mcp_tool_runs": mcp_tool_runs} if mcp_tool_runs else {}
# 在数据库里添加一条助手消息
agent_chat_repository.create_message(
+ session, session_id, "assistant", final_reply, meta=meta
)
# 更新会话的活跃时间
agent_chat_repository.touch_session(session, ctx.session_row)
logger.info(
"流式对话结束 session_id=%s 助手回复的字数=%s", session_id, len(final_reply)
)
# 给客户端发送完成事件
yield sse({"type": "done"})
except Exception as e:
logger.exception("流式对话生成失败session_id=%s", session_id)
yield sse({"type": "error", "message": str(e)})