导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.FunctionCalling
  • starlette
  • FastAPI
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • LiteLLM
  • pydantic-settings
  • ai_agent
  • format
  • diff
  • mcp_server
  • 1.1. agent_chat.py
  • 1.2. agent_chat_stream.py

1.1. agent_chat.py #

app/services/agent_chat.py

from typing import AsyncIterator
from openai import APIStatusError, AsyncOpenAI
from contextlib import asynccontextmanager
from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
from mcp.client.streamable_http import streamable_http_client
from app.services.mcp_httpx import mcp_httpx_client_factory
import logging
import re
import json

logger = logging.getLogger(__file__)
# 这是generate_with_tools的最后一个事件,路由据此结尾,表示整个调用结束,注意不要把这个事件发给客户端
AGENT_CHAT_STREAM_RUN_COMPLETE = "AGENT_CHAT_STREAM_RUN_COMPLETE"
# 一轮对话结束
AGENT_CHAT_ROUND_DONE = "AGENT_CHAT_ROUND_DONE"
# 工具调用结束
AGENT_CHAT_TOOL_DONE = "AGENT_CHAT_TOOL_DONE"


# 工具函数,支持service是对象或字典属性的安全访问   字典obj['key'] 或者 对象的话 obj.key
def service_value(service, key, default=None):
    if isinstance(service, dict):
        return service.get(key, default)
    return getattr(service, key, default)


@asynccontextmanager
async def mcp_transport_streams(mcp_service):
    # 获取MCP服务器的协议
    protocol = service_value(mcp_service, "protocol", "").strip().lower()
    # 获取MCP服务器的配置字典
    cfg = mcp_service["config"]
    if protocol == "stdio":
        stdioServerParameters = StdioServerParameters(
            command=str(cfg.get("command") or ""),
            args=[str(arg) for arg in (cfg.get("args", []))],
            env={str(k): str(v) for k, v in cfg.get("env", {}).items()},
        )
        async with stdio_client(stdioServerParameters) as (read, write):
            yield read, write
    elif protocol == "sse":
        # 请求头
        headers = {str(k): str(v) for k, v in (cfg.get("headers") or {}).items()}
        # 创建SSE客户端,获取读写流
        async with sse_client(
            str(cfg.get("url") or ""),
            headers=headers,
            httpx_client_factory=mcp_httpx_client_factory,
        ) as (read, write):
            yield read, write
    else:
        # 请求头
        headers = {str(k): str(v) for k, v in (cfg.get("headers") or {}).items()}
        # 创建Streamable HTTP客户端,获取读写流
        url = cfg.get("url") or ""
        async with mcp_httpx_client_factory(headers=headers) as http_client:
            async with streamable_http_client(url, http_client=http_client) as (
                read,
                write,
                _,
            ):
                yield read, write


# 获取指定的MCP服务器对应的全部工具列表
async def list_tools_for_service(mcp_service):
    # 通过mcp_transport_streams可能 获取服务器的连接
    async with mcp_transport_streams(mcp_service) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            tools_result = await session.list_tools()
            # 获取MCP服务器原始的工具列表
            raw_tools = getattr(tools_result, "tools", None) or []
            out = []
            for tool in raw_tools:
                out.append(
                    {
                        "name": str(getattr(tool, "name", "") or ""),
                        "description": str(getattr(tool, "description", "") or ""),
                        "input_schema": getattr(tool, "inputSchema", None)
                        or getattr(tool, "input_schema", None)
                        or {},
                    }
                )
            return out


# 把工具名称归一化,或者说规范化
def normalize_tool_name(service_name, tool_name):
    raw_s = str(service_name or "").strip().lower()
    raw_t = str(tool_name or "").strip().lower()
    s = re.sub(r"[^a-zA-Z0-9_-]", "_", raw_s).strip("_")
    t = re.sub(r"[^a-zA-Z0-9_-]", "_", raw_t).strip("_")
    if not s:
        s = "svc"
    if not t:
        t = "tool"
    merged = f"{s}__{t}"
    merged = re.sub(r"_+", "_", merged)
    return merged


def normalize_tool_schema(schema):
    if isinstance(schema, dict):
        return schema
    return {"type": "object", "properties": {}}


async def build_openai_tools(mcp_services):
    tools = []
    tools_mapping = {}
    for mcp_service in mcp_services:
        # 获取服务名
        mcp_service_name = (
            str(service_value(mcp_service, "name", ""))
            or f"service_{service_value(mcp_service, "id", "")}"
        )
        # 获取服务器暴露的所有的全部工具
        for tool in await list_tools_for_service(mcp_service):
            # 原始的工具名称
            original_name = str(tool.get("name", "") or "").strip()
            if not original_name:
                continue
            # 归一化后的工具名称,这个名字是准备发给大模型,让大模型调用的
            exposed_name = normalize_tool_name(mcp_service_name, original_name)
            if exposed_name in tools_mapping:
                continue
            # 记录映射关系
            tools_mapping[exposed_name] = {
                "mcp_service": mcp_service,  # MCP服务字典
                "tool_name": original_name,  # MCP里的工具名
                "mcp_service_name": mcp_service_name,  # MCP服务名
            }
            # 追加 OPENAI 工具结构
            tools.append(
                {
                    "type": "function",
                    "function": {
                        "name": exposed_name,  # 函数的名称
                        "description": str(
                            tool.get("description")
                            or f"{mcp_service_name}/{original_name}"
                        ),  # 函数描述
                        # MCP服务里工具的输入模型刚好就等于OPENAI工具函数的参数
                        "parameters": normalize_tool_schema(
                            tool.get("input_schema")
                        ),  ##函数参数
                    },
                }
            )

    return tools, tools_mapping


# 定义一个函数,将给定的API地址标准化为/v1结尾,以兼容OPENAI的官方客户端(不包括chat/completions)
def openai_base_url(api_base_url):
    # 将输入的api_base_url转为字符串,再去掉首尾的空格和末尾的/
    base = str(api_base_url or "").strip().rstrip("/")
    if not base:
        return ""
    lower = base.lower()
    if lower.endswith("/v1"):
        return base
    return f"{base}/v1"


def payload_summary(payload):
    messages = payload.get("messages") if isinstance(payload, dict) else []
    msg_list = messages if isinstance(messages, list) else []
    return {
        "model": payload.get("model"),
        "stream": payload.get("stream"),
        "temperature": payload.get("temperature"),
        "message_count": len(msg_list),
        "roles": [
            message.get("role") for message in msg_list if isinstance(message, dict)
        ],
        "has_tools": bool(payload.get("tools")),
        "tool_count": (
            len(payload.get("tools", []))
            if isinstance(payload.get("tools"), list)
            else 0
        ),
    }


def openai_chat_kwargs(payload: dict, *, stream: bool) -> dict:
    kwargs: dict = {
        "model": payload["model"],
        "messages": payload["messages"],
        "temperature": float(payload.get("temperature", 0.3)),
    }
    # 如果有tools字段,则添加到参数中
    if payload.get("tools"):
        kwargs["tools"] = payload.get("tools")
    # 如果payload里有tool_choice属性字段,也添加了,表示让大模型自己选择使用工具
    if payload.get("tool_choice") is not None:
        kwargs["tool_choice"] = payload.get("tool_choice")
    if stream:
        kwargs["stream"] = True
    return kwargs


# 合并流式工具调用信息,将新的一批调用片段合并到已存在的索引字段中
def merge_stream_tool_calls(tool_calls_by_index: dict[int, dict], chunk_tool_calls):
    # 遍历本次分片中的每个工具调用对象,如果没有则是空列表
    for tool_call in chunk_tool_calls or []:
        # 获取当前调用的索引index,如果index不存在,则默认为0
        index = int(tool_call.index) if tool_call.index is not None else 0
        # 如果该索引还没有对应的条目数据,则先初始化一个默认结构
        if index not in tool_calls_by_index:
            tool_calls_by_index[index] = {
                "id": "call_00_hEzZvoDEROnklQ86a3eC1EVt",  # 工具调用ID
                "type": "function",  # 大模型想调用函数
                "function": {"name": "", "arguments": ""},  # 调用函数的信息
            }
        # 获取当前索引对应的结构槽(里面放着已经合并的工具调用信息)
        slot = tool_calls_by_index[index]
        # 如果tool_call有ID的话,把ID放到结构槽里
        if getattr(tool_call, "id", None):
            slot["id"] = tool_call.id
        # 如果tool_call有类型的话,也放进来结构槽
        if getattr(tool_call, "type", None):
            slot["type"] = tool_call.type
        # 获取tool_call想要调用函数
        fn = getattr(tool_call, "function", None)
        if fn is not None:
            # 如果fn有name属性,则追加到function name中
            if getattr(fn, "name", None):
                slot["function"]["name"] += fn.name
            # 如果fn有arguments属性,则追加到function arguments中
            if getattr(fn, "arguments", None):
                slot["function"]["arguments"] += fn.arguments


# 定义异步生成器函数,用于流式返回chat_completion的每个部分
async def iter_chat_completion_events(api_base_url, api_key, payload):
    base = openai_base_url(api_base_url)
    if not base:
        raise RuntimeError("模型服务器地址不能为空")
    # 获取请求摘要信息,并添加stream:true
    summary = payload_summary({**payload, "stream": True})
    logger.info(f"大模型聊天请求 base_url=%s,请求摘要=%s", base, summary)
    # 设置超时时间为90秒
    timeout = 90
    key = (api_key or "").strip()
    async with AsyncOpenAI(api_key=key, base_url=base, timeout=timeout) as client:
        # 根据请求参数生成OPENAI聊天接口所需的参数
        kwargs = openai_chat_kwargs(payload, stream=True)
        # 初始化内容部分的列表,用于保存每一块返回的数据
        content_parts = []
        # 初始化工具调用索引字典,用于合并流式工具调用信息
        tool_calls_by_index = {}
        try:
            # 发起聊天的流式响应,获取异步流对象
            stream = await client.chat.completions.create(**kwargs)
            try:
                # 异步迭代stream,逐步获取结果块
                async for chunk in stream:
                    # 如果没有choices字段,就跳过
                    if not chunk.choices:
                        continue
                    # 获取当前块的delta字段,表示文本的增量
                    delta = chunk.choices[0].delta
                    # 如果说delta对空,则跳过
                    if delta is None:
                        continue
                    # 提取delta里的内容片段
                    piece = getattr(delta, "content", None) or ""
                    if piece:
                        # 把这一片数据添加到content_parts里
                        content_parts.append(piece)
                        # 通过yield 返回本次的delta
                        yield {"type": "delta", "data": piece}
                    # 提取delta里的tool_calls字段
                    tool_part = getattr(delta, "tool_calls", None)
                    # 如果说tool_part不为空,则合并到tool_calls_by_index
                    if tool_part:
                        merge_stream_tool_calls(tool_calls_by_index, tool_part)
            finally:
                # 请求结束后把流关闭掉
                close = getattr(stream, "close", None)
                if close is not None:
                    await close()

        except APIStatusError as e:
            logger.info("大模型聊天响应的状态码为=%s", e.status_code)
            raise e
        # 最终返回完整的文本
        content = "".join(content_parts)
+       logger.info("大模型聊天响应:%s", content)
        yield {
            "type": AGENT_CHAT_ROUND_DONE,  # 代表大模型结束一轮调用
            "data": {
                "content": content,
                "tool_calls": [
                    tool_calls_by_index[index] for index in sorted(tool_calls_by_index)
                ],
                # [{"id":"call_id_001","type": "function","function": {"name": "search_place, "arguments": '{"query": "故宫博物院","region": "北京"}'}]
            },  # 这是大模型返回的消息
        }


def tool_call_args_dict(args_text: str) -> dict:
    try:
        args = json.loads(args_text) if args_text else {}
        return args if isinstance(args, dict) else {}
    except json.JSONDecodeError:
        return {}


def result_preview(text, limit=260):
    # 将文本转成字符串,再按空白拆分后再重组,可以去掉多余的空格
    clean = " ".join(str(text or "").split())
    if len(clean) <= limit:
        return clean
    return f"{clean[:limit]}"


# 定义调用某个指定服务的工具函数
async def call_tool_for_service(service, tool_name, args):
    async with mcp_transport_streams(service) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            result = await session.call_tool(tool_name, args)
        parts = []
        for item in getattr(result, "content", None) or []:
            txt = str(getattr(item, "text", "") or "").strip()
            if txt:
                parts.append(txt)
        if parts:
            return "\n".join(parts)
        return "工具调用成功,但是未返回文本内容"


# 定义一个异步生成器函数,用于执行工具的运行流程
async def iter_tool_run(mapping, call_id, args):
    # 格式化显示的字符串,显示名称为 服务名/工具名
    display_name = f"{mapping['mcp_service_name']}/{mapping['tool_name']}"
    # 获取原始的工具名
    tool = mapping["tool_name"]
    # 产出工具调用开始的事件
    yield {
        "type": "tool_start",
        "call_id": call_id,
        "name": display_name,
        "tool": tool,
        "args": args,
    }
    # 调用实际的MCP服务的对应的工具,获取返回的文本结果
    result_text = await call_tool_for_service(
        mapping["mcp_service"], mapping["tool_name"], args
    )
    # 产出工具调用成功的结束事件,包含预览文本
    yield {
        "type": "tool_end",
        "call_id": call_id,
        "name": display_name,
        "tool": tool,
        "ok": True,
        "result_preview": result_preview(result_text),
    }
    # 工具执行流程完成,产出最终工具执行完成事件,包含实际的结果文本
    yield {"type": AGENT_CHAT_TOOL_DONE, "result_text": result_text}


# 主流程 异步生成器,流式产出事件
async def generate_with_tools(
    api_base_url, api_key, model_name, base_messages, mcp_services
):
    # 在这个过程中,可能会多轮调用工具
    tools = []
    tools_mapping = {}
    # 如果此Agent配置了对应的MCP服务器,我们就要获取这些MCP服务器提供的工具并且转换为openai的标准格式发送给大模型,让大模型调用
    if mcp_services:
        tools, tools_mapping = await build_openai_tools(mcp_services)
        logger.info(
            "MCP工具已经注入模型中:服务数量=%s,工具数量=%s",
            len(mcp_services),
            len(tools),
        )
    else:
        logger.info("未向模型注册工具,智能体未绑定任何的MCP服务")
    # 将base_messages转为列表,避免原始的消息被修改
    messages = list(base_messages)
    # 最多循环10次(防止死循环或者多轮嵌套工具调用)
    for _ in range(10):
        # 构建请求payload
        req_payload = {"model": model_name, "messages": messages, "temperature": 0.3}
        if tools:
            # 如果工具列表不为空,则将tools注入到告诉大模型可用的工具
            req_payload["tools"] = tools
            # 设置tool_choice字段的值为auto,允许 模型自主决定是否调用工具以及调用哪个工具
            req_payload["tool_choice"] = "auto"
        # 初始化data变量为None,用来后续接收完整的一轮对话的结果
        data = None
        # 异步遍历模型的流式事件生成器,处理每个事件
        async for event in iter_chat_completion_events(
            api_base_url, api_key, req_payload
        ):
            # 如果事件的类型为 AGENT_CHAT_ROUND_DONE,说明一轮会话已经结束,需要提取数据
            if event.get("type") == AGENT_CHAT_ROUND_DONE:
                data = event["data"]
            else:
                # 否则直接将事件产出给调用方
                yield event
        if data is None:
            raise RuntimeError(f"模型流式响应异常,未收到完整的一轮结果")
        # 这是大模型返回的数据
        content = data["content"]
        # 这是大模型返回的工具调用
        tool_calls = data["tool_calls"]
        if not tool_calls:
            # 将最终的消息添加到消息列表中,方便后续进行多轮对话
            messages.append({"role": "assistant", "content": content})
            # 产出流式工具调用完成的事件,包含最终的文本
            yield {"type": AGENT_CHAT_STREAM_RUN_COMPLETE, "final_text": content}
            return
        # 将最终的消息添加到消息列表中,方便后续进行多轮对话
        messages.append(
            {"role": "assistant", "content": content, "tool_calls": tool_calls}
        )
        # 遍历本轮需要调用的工具
        for tool_call in tool_calls:
            # 获取 想调用 函数
            fn = tool_call.get("function")
            # 获取函数的名称
            exposed_name = fn.get("name")
            # 获取函数的字段串参数
            args_text = fn.get("arguments")
            # 获取本次调用的ID
            call_id = tool_call.get("id")
            # 把字符串参数转成字典参数
            args = tool_call_args_dict(args_text)
            # 查找exposed_name映射到的工具的实现
            mapping = tools_mapping.get(exposed_name)
            if not mapping:
                result_text = f"未找到可调用的工具:{exposed_name}"
                yield {
                    "type": "tool_start",  # 表示工具调用开始的事件
                    "call_id": call_id,  # 用于标识工具调用的唯一ID
                    "name": exposed_name,  # 工具显示名称
                    "tool": exposed_name,  # 工具名
                    "args": args,  # 工具的参数
                }
                yield {
                    "type": "tool_end",  # 表示工具调用开始的事件
                    "call_id": call_id,  # 用于标识工具调用的唯一ID
                    "name": exposed_name,  # 工具显示名称
                    "tool": exposed_name,  # 工具名
                    "ok": False,  # 表示工具是否调用成功 True就是成功,False就是失败
                    "message": result_text,  # 错误消息,会在UI中进行显示
                    "result_preview": result_preview(result_text),  # 预览结果的文本
                }
            else:
                result_text = None
                # 实际调用工具
                async for event in iter_tool_run(mapping, call_id, args):
                    # 如果工具调用流程走完了,则保存最终的文本
                    if event.get("type") == AGENT_CHAT_TOOL_DONE:
                        result_text = event["result_text"]
                    else:
                        yield event
                # 工具调用的结果放入消息列表中,供下一轮决策参考
                messages.append(
                    {"role": "tool", "tool_call_id": call_id, "content": result_text}
                )
        # 初始化data变量为None,用来后续接收完整的一轮对话的结果
+       data = None
        # 异步遍历模型的流式事件生成器,处理每个事件
+       async for event in iter_chat_completion_events(
+           api_base_url, api_key, req_payload
+       ):
            # 如果事件的类型为 AGENT_CHAT_ROUND_DONE,说明一轮会话已经结束,需要提取数据
+           if event.get("type") == AGENT_CHAT_ROUND_DONE:
+               data = event["data"]
+           else:
                # 否则直接将事件产出给调用方
+               yield event
+       if data is None:
+           raise RuntimeError(f"模型流式响应异常,未收到完整的一轮结果")
        # 这是大模型返回的数据
+       content = data["content"]
        # 将最终的消息添加到消息列表中,方便后续进行多轮对话
+       messages.append({"role": "assistant", "content": content})
        # 产出流式工具调用完成的事件,包含最终的文本
+       yield {"type": AGENT_CHAT_STREAM_RUN_COMPLETE, "final_text": content}

# {
#   "mcp_service": mcp_service,  # MCP服务字典
#   "tool_name": original_name,  # MCP里的工具名
#   "mcp_service_name": mcp_service_name,  # MCP服务名
# }

1.2. agent_chat_stream.py #

app/services/agent_chat_stream.py

from typing import NamedTuple
from app import models
from sqlalchemy.orm import Session
import logging
import json
from app.repositories import agent_chat_repository
from app.services.agent_chat import generate_with_tools, AGENT_CHAT_STREAM_RUN_COMPLETE

logger = logging.getLogger(__name__)


# 定义一个将数据字典格式化为SSE协议规范的字符串的函数
def sse(data: dict) -> str:
    return f"data: {json.dumps(data,ensure_ascii=False,default=str)}\n\n"


# 定义流式聊天的上下文的数据结构
class StreamChatContext(NamedTuple):
    # 当前的聊天会话对话
    session_row: models.AgentChatSession
    # LLM API基础地址
    llm_api_base_url: str
    # LLM APIKEY
    llm_api_key: str
    # LLM 模型名称
    llm_model_name: str
    # 聊天历史消息列表
    messages: list
    # 相关联的MCP服务列表
    mcp_services: list

# 定义一个函数,根据事件字典生成用于工具调用相关性的唯一 key
+def tool_correlation_key(ev: dict) -> str:
+ """与前端 toolCallRowKey 一致,用于配对 tool_start / tool_end。"""
    # 尝试从事件字典中获取 call_id
+ cid = ev.get("call_id")
    # 如果 call_id 非空且去除空白后不为"",则直接返回字符串形式的 call_id 作为 key
+ if cid is not None and str(cid).strip() != "":
+     return str(cid)
    # 否则,用 name 和 tool 两个字段拼接为 key,格式为 "name#tool"
+ return f"{ev.get('name') or ''}#{ev.get('tool') or ''}"

# 定义一个函数,用于将工具调用的开始和结束事件合成一条用于数据库存储的摘要
+def tool_run_row_for_db(tool_start: dict, tool_end: dict) -> dict:
    # """合并一次工具调用的起止事件,写入 assistant 消息的 meta.mcp_tool_runs。"""
    # 获取工具调用参数,如果 tool_start 中 "args" 字段是字典就用它,否则用空字典
+ args = tool_start.get("args") if isinstance(tool_start.get("args"), dict) else {}
    # 返回一个字典,汇总这次工具调用的关键信息
+ return {
        # 工具调用的唯一标识,将 call_id 转为字符串(为 None 时转空字符串)
+     "call_id": str(tool_end.get("call_id") or ""),
        # 工具显示名
+     "name": tool_end.get("name"),
        # 工具的唯一名称
+     "tool": tool_end.get("tool"),
        # 工具调用状态标志,ok 为 True 则为 "done",否则为 "error"
+     "status": "done" if tool_end.get("ok") else "error",
        # 若有 message 字段则转为字符串,否则为 ""
+     "message": str(tool_end.get("message") or ""),
        # 工具调用时传递的参数
+     "args": args,
        # 工具调用的结果简要预览,若不存在则为 ""
+     "result_preview": str(tool_end.get("result_preview") or ""),
+ }


async def iter_agent_chat_sse(
    session: Session, session_id: int, ctx: StreamChatContext
):
    logger.info(
        "开始流式对话 session_id=%s,模型名称=%s", session_id, ctx.llm_model_name
    )
    # 发送流式对话开始事件
    yield sse({"type": "start"})
    try:
        # 定义一个变量用于存储工具调用的开始事件
+       pending_tool = {}
        # 定义一个变量用于存储工具调用的摘要
+       mcp_tool_runs = []
        # 定义一个变量用于存储最终的回复内容
        final_reply = "AI的回答"
        # 定义一个异步遍历generate_with_tools生成的每个事件
        async for event in generate_with_tools(
            ctx.llm_api_base_url,
            ctx.llm_api_key,
            ctx.llm_model_name,
            ctx.messages,
            ctx.mcp_services,
        ):
            # 获取每个事件的类型
            type = event.get("type")
            if type == AGENT_CHAT_STREAM_RUN_COMPLETE:
                final_reply = event.get("final_text") or ""
                continue
            # 如果事件类型是工具开始,则将事件存入 pending_tool,等待配对结束事件
+           elif type == "tool_start":
+               pending_tool[tool_correlation_key(event)] = event
            # 如果事件类型是工具结束,则从 pending_tool 匹配拿出开始事件,并生成工具调用摘要
+           elif type == "tool_end":
              # 获取工具调用的相关性唯一 key
+               key = tool_correlation_key(event)
                # 从 pending_tool 匹配拿出开始事件
+               start = pending_tool.pop(key, {})
                # 生成工具调用摘要
+               mcp_tool_runs.append(tool_run_row_for_db(start, event))  
            # 将每个事件以SSE字符串的方式流式输出
            yield sse(event)
        # 如果有工具调用摘要则写入 meta,否则为空字典
+       meta = {"mcp_tool_runs": mcp_tool_runs} if mcp_tool_runs else {}     
        # 在数据库里添加一条助手消息
        agent_chat_repository.create_message(
+           session, session_id, "assistant", final_reply,  meta=meta
        )
        # 更新会话的活跃时间
        agent_chat_repository.touch_session(session, ctx.session_row)
        logger.info(
            "流式对话结束 session_id=%s 助手回复的字数=%s", session_id, len(final_reply)
        )
        # 给客户端发送完成事件
        yield sse({"type": "done"})
    except Exception as e:
        logger.exception("流式对话生成失败session_id=%s", session_id)
        yield sse({"type": "error", "message": str(e)})
← 上一节 contextlib 下一节 FastAPI →

访问验证

请输入访问令牌

Token不正确,请重新输入