1. 什么是 Function Calling? #
Function Calling(函数调用)是大型语言模型(如 GPT、Claude)提供的一种能力:你可以事先告诉模型「有哪些函数可以调用」,模型在回答用户问题时,如果觉得需要用到某个函数,就会返回「我要调用某某函数,参数是……」的结构化信息;你的程序收到后,真正去执行这个函数,再把结果交给模型,由模型生成最终回答。
通俗理解:模型不会真的执行代码,它只会「决定要调用什么」。真正执行函数的是你的程序。就像顾问:顾问说「去查一下天气」,你(程序)去查,再把结果告诉顾问,顾问根据结果给出建议。
2. AI 模型如何「知道」有哪些函数? #
模型本身不会自动知道你的业务函数。你需要用结构化描述告诉它,一般包括:
- 函数名:如
get_weather - 功能说明:这个函数做什么,模型根据说明决定何时调用
- 参数:参数名、类型、是否必填、说明
模型根据这些描述,在合适的时候返回「调用请求」(函数名 + 参数值),由你的代码去真正执行。
3. Function Calling 的完整流程 #
- 定义函数:在代码里实现业务逻辑(如查天气、算加法)
- 描述函数:把函数名、说明、参数 schema 发给模型(通常在对话开始时或每次请求时)
- 用户提问:用户问「北京今天天气怎么样?」
- 模型决策:模型判断需要查天气,返回
{"name": "get_weather", "arguments": {"city": "北京"}} - 执行函数:你的程序解析这个结构,调用
get_weather(city="北京"),得到结果 - 回传结果:把结果再发给模型
- 生成回答:模型根据结果生成自然语言回复给用户
4. 什么是 MCP? #
MCP(Model Context Protocol,模型上下文协议)是由 Anthropic 提出的开放协议,用来统一 AI 应用与外部工具、数据源之间的交互方式。
核心思想:不依赖某一家模型厂商的私有 API,而是定义一套通用协议。任何支持 MCP 的客户端(如 Cursor、Claude Desktop)都可以连接任何 MCP 服务器,自动发现并调用其中的工具,无需为每个模型单独写适配代码。
MCP 中的主要概念:
| 概念 | 说明 |
|---|---|
| 工具(Tool) | 可被模型调用的函数,相当于 function calling 中的「函数」 |
| 资源(Resource) | 静态或动态数据,如文件、数据库记录 |
| 提示(Prompt) | 可复用的提示模板 |
5. MCP 与 Function Calling #
在实际应用中,MCP 客户端通常作为「桥梁」,把 MCP 服务器的工具暴露给大模型,再把模型的调用请求转发给 MCP 服务器执行。完整流程如下:
5.1 流程概览 #
| 步骤 | 名称 | 说明 |
|---|---|---|
| 1 | 工具发现 | MCP 客户端通过协议,从 MCP 服务器获取可用的工具列表(如 get_weather、send_email) |
| 2 | 工具描述转换 | 客户端将 MCP 工具描述转换成 Function Calling 所需格式,传给大模型 |
| 3 | 模型决策 | 大模型根据用户提问,决定调用哪个工具,并通过 Function Calling 返回调用请求 |
| 4 | 请求路由 | 客户端收到调用请求后,通过 MCP 协议转发给对应的 MCP 服务器执行 |
| 5 | 结果返回 | MCP 服务器执行完毕,通过 MCP 返回结果,客户端再通过 Function Calling 流程把结果交给模型,生成自然语言回答 |
5.2 流程示意 #
- Function Calling 是模型层面的能力:模型厂商在 API 里提供「描述函数 + 返回调用请求」的机制,各家格式可能不同。
- MCP 是协议层面的规范:把「工具」的声明、发现、调用、结果返回都标准化,任何兼容 MCP 的模型/客户端都能用同一套工具。
5.4 关系总结 #
- MCP 标准化了 Function Calling:工具用统一格式描述,调用用统一协议,不再依赖某一家 API。
- MCP 中的「工具」就是 Function Calling 的体现:模型通过 MCP 发现工具、发出调用请求,服务器执行并返回结果。
- 可以一起用:用 MCP 暴露工具,支持 MCP 的客户端(包括支持 function calling 的模型)都能直接使用,一次开发,多处复用。
5.5. 两者对比 #
| 维度 | Function Calling | MCP |
|---|---|---|
| 定位 | 模型 API 提供的能力 | 跨模型的通用协议 |
| 工具定义 | 各厂商格式不同 | 统一格式 |
| 适用场景 | 对接单一模型(如只用 OpenAI) | 多模型、多客户端共用同一套工具 |
| 扩展性 | 主要围绕函数调用 | 工具 + 资源 + 提示 |
简单记:Function Calling 是「模型会调用函数」的能力;MCP 是「让所有模型都能用同一套工具」的协议。用 MCP 暴露工具,可以一次开发、多处复用。
1.配置日志 #
uv add openai "mcp[cli]" python-dotenv1.1. main.py #
main.py
# 导入操作系统相关库
+import os
# 导入日志处理库
+import logging
# 定义配置类,用于读取和保存环境变量中的配置信息
+class Config:
# 从环境变量读取的配置,与 .env 配合使用
+ def __init__(self):
# 读取日志等级(默认 INFO)
+ self.log_level = os.environ.get("LOG_LEVEL", "INFO")
# 设置当前模块日志记录器
+logger = logging.getLogger(__name__)
# 配置日志格式,支持 level 配置
+def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
+ logging.basicConfig(
+ level=getattr(logging, level.upper(), logging.INFO),
+ format="%(asctime)s [%(levelname)s] %(message)s",
+ datefmt="%H:%M:%S",
+ )
# 判断是否为主程序入口diff
if __name__ == "__main__":
# 载入配置
+ config = Config()
# 设置日志等级和格式
+ setup_logging(config.log_level)2.MCP客户端和服务器 #
2.1. .env #
.env
DEEPSEEK_API_KEY=sk-614848d97c0c49588e142274eb09e3042.2. main.py #
main.py
# 导入操作系统相关库
import os
# 导入日志处理库
import logging
# 导入系统参数和函数库
+import sys
# 导入异步相关库
+import asyncio
# 导入 OpenAI 认证异常
+from openai import AuthenticationError
# 导入 MCP 桥接通信相关类
+from mcp import ClientSession, StdioServerParameters
# 导入 MCP stdio 客户端工具
+from mcp.client.stdio import stdio_client
# 导入快速 MCP 服务器类
+from mcp.server.fastmcp import FastMCP
# 定义配置类,用于读取和保存环境变量中的配置信息
class Config:
# 从环境变量读取的配置,与 .env 配合使用
def __init__(self):
# 读取日志等级(默认 INFO)
self.log_level = os.environ.get("LOG_LEVEL", "INFO")
# 读取 MCP 命令(默认是 python)
+ self.mcp_command = os.environ.get("MCP_COMMAND", "python")
# 获取 MCP 服务器的启动参数
+ def get_mcp_server_params(self):
# 构造 MCP 服务器启动参数
+ args = [__file__, "serve"]
# 返回 MCP 服务器启动参数
+ return StdioServerParameters(command=self.mcp_command, args=args)
# 设置当前模块日志记录器
logger = logging.getLogger(__name__)
# 配置日志格式,支持 level 配置
def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
+ )
# MCP Bridge 类,负责连接 MCP 工具和大模型
+class MCPBridge:
+ """
+ 连接 MCP 服务器与大模型,负责:
+ - 从 MCP 获取工具列表并转成 Function Calling 格式
+ - 调用大模型
+ - 将模型返回的工具调用请求转发给 MCP 执行
+ """
# 初始化 MCPBridge (保存配置和对象句柄)
+ def __init__(self, config):
+ self.config = config
# 异步函数,桥接主入口:调用 MCP 子进程并与大模型对话
+async def run_bridge(user_message, config):
# 获取 MCP 服务器启动参数
+ server_params = config.get_mcp_server_params()
# 使用 stdio_client 启动 MCP 服务器并建立通信流
+ async with stdio_client(server_params) as (read_stream, write_stream):
# 建立 MCP 会话
+ async with ClientSession(read_stream, write_stream) as session:
# 初始化会话
+ await session.initialize()
# 获取已注册的工具
+ tools = await session.list_tools()
# 日志记录已发现工具
+ logger.info("工具发现: %s", [t.name for t in tools.tools])
# 创建并注册工具的 MCP 服务器
+def create_mcp_server():
# 实例化 FastMCP,命名为 MCP-Bridge
+ mcp = FastMCP(name="MCP-Bridge")
# 注册获取天气工具
+ @mcp.tool()
+ def get_weather(city):
# 查询指定城市的天气
+ return f"{city}今天晴,气温 25℃"
# 注册发送邮件工具
+ @mcp.tool()
+ def send_email(to, subject, body):
# 发送邮件到指定收件人
+ return f"已发送邮件给 {to},主题:{subject}"
# 注册加法计算工具
+ @mcp.tool()
+ def add(a, b):
# 计算两个整数的和
+ return a + b
# 返回 mcp 服务器对象
+ return mcp
# 创建 MCP 服务器实例
+mcp = create_mcp_server()
# 以 stdio 模式运行 MCP 服务器,供子进程或编辑器调用
+def run_server():
# 输出日志信息:
+ logger.info("MCP 服务器已启动(stdio 模式)")
# 运行 MCP 服务(stdio 传输方式)
+ mcp.run(transport="stdio")
# 配置日志格式,支持 level 配置
+def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
+ logging.basicConfig(
+ level=getattr(logging, level.upper(), logging.INFO),
+ format="%(asctime)s [%(levelname)s] %(message)s",
+ datefmt="%H:%M:%S",
+ )
# 判断是否为主程序入口diff
if __name__ == "__main__":
# 载入配置
config = Config()
# 设置日志等级和格式
setup_logging(config.log_level)
# 判断参数是否要求以 serve 启动服务器
+ if len(sys.argv) >= 2 and sys.argv[1] == "serve":
# 启动 stdio MCP 服务器
+ run_server()
+ else:
# 否则当作命令行问答客户端
+ question = "北京今天天气怎么样?"
+ try:
# 执行桥接对话
+ reply = asyncio.run(run_bridge(question, config))
# 输出回复
+ print(reply)
# 认证异常处理
+ except AuthenticationError:
+ logger.error("API 认证失败:请检查 DEEPSEEK_API_KEY 是否正确,可在 .env 中配置或设置环境变量")
+ sys.exit(1)3.将 MCP Tool 转为 OpenAI Function Calling 格式 #
3.1. main.py #
main.py
# 导入操作系统相关库
import os
# 导入日志处理库
import logging
# 导入系统参数和函数库
import sys
# 导入异步相关库
import asyncio
# 导入 OpenAI 认证异常
from openai import AuthenticationError
# 导入 MCP 桥接通信相关类
from mcp import ClientSession, StdioServerParameters
# 导入 MCP stdio 客户端工具
from mcp.client.stdio import stdio_client
# 导入快速 MCP 服务器类
from mcp.server.fastmcp import FastMCP
# 定义配置类,用于读取和保存环境变量中的配置信息
class Config:
# 从环境变量读取的配置,与 .env 配合使用
def __init__(self):
# 读取日志等级(默认 INFO)
self.log_level = os.environ.get("LOG_LEVEL", "INFO")
# 读取 MCP 命令(默认是 python)
self.mcp_command = os.environ.get("MCP_COMMAND", "python")
# 获取 MCP 服务器的启动参数
def get_mcp_server_params(self):
# 构造 MCP 服务器启动参数
args = [__file__, "serve"]
# 返回 MCP 服务器启动参数
return StdioServerParameters(command=self.mcp_command, args=args)
# 设置当前模块日志记录器
logger = logging.getLogger(__name__)
# 配置日志格式,支持 level 配置
def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# 将 MCP Tool 转为 OpenAI Function Calling 格式
+def mcp_tools_to_openai_format(mcp_tools):
# 定义保存结果的列表
+ result = []
# 遍历每个工具
+ for tool in mcp_tools:
# 获取工具的输入 schema(参数说明)
+ schema = getattr(tool, "inputSchema", None) or getattr(tool, "input_schema", None) or {}
# 若 schema 为空,则使用默认格式
+ if not schema:
+ schema = {"type": "object", "properties": {}, "required": []}
# 将该工具转为 openai 的 function call 格式并加入结果
+ result.append({
+ "name": tool.name,
+ "description": tool.description or f"调用工具 {tool.name}",
+ "parameters": schema,
+ })
# 返回转换后的工具列表
+ return result
# MCP Bridge 类,负责连接 MCP 工具和大模型
class MCPBridge:
"""
连接 MCP 服务器与大模型,负责:
- 从 MCP 获取工具列表并转成 Function Calling 格式
- 调用大模型
- 将模型返回的工具调用请求转发给 MCP 执行
"""
# 初始化 MCPBridge (保存配置和对象句柄)
def __init__(self, config):
self.config = config
# 异步单轮/多轮对话,实现桥接逻辑
+ async def chat(self, user_message, session):
+ """
+ 执行单轮对话:用户输入 -> 模型决策 -> 可选工具调用 -> 最终回答。
+ 支持多轮工具调用(模型可能连续多次调用工具)。
+ """
# 获取 MCP 工具列表
+ mcp_tools = list((await session.list_tools()).tools)
# 转为 OpenAI function calling 格式
+ function_defs = mcp_tools_to_openai_format(mcp_tools)
# 构造 tools 参数
+ tools = [{"type": "function", "function": f} for f in function_defs]
# 构造 tools 参数
+ tools = [{"type": "function", "function": f} for f in function_defs]
+ logger.info("tools: %s", tools)
# 异步函数,桥接主入口:调用 MCP 子进程并与大模型对话
async def run_bridge(user_message, config):
# 获取 MCP 服务器启动参数
server_params = config.get_mcp_server_params()
# 生成 MCPBridge 实例
+ bridge = MCPBridge(config)
# 使用 stdio_client 启动 MCP 服务器并建立通信流
async with stdio_client(server_params) as (read_stream, write_stream):
# 建立 MCP 会话
async with ClientSession(read_stream, write_stream) as session:
# 初始化会话
await session.initialize()
# 获取已注册的工具
tools = await session.list_tools()
# 日志记录已发现工具
logger.info("工具发现: %s", [t.name for t in tools.tools])
# 调用桥接逻辑
+ return await bridge.chat(user_message, session)
# 创建并注册工具的 MCP 服务器
def create_mcp_server():
# 实例化 FastMCP,命名为 MCP-Bridge
mcp = FastMCP(name="MCP-Bridge")
# 注册获取天气工具
@mcp.tool()
def get_weather(city):
# 查询指定城市的天气
return f"{city}今天晴,气温 25℃"
# 注册发送邮件工具
@mcp.tool()
def send_email(to, subject, body):
# 发送邮件到指定收件人
return f"已发送邮件给 {to},主题:{subject}"
# 注册加法计算工具
@mcp.tool()
def add(a, b):
# 计算两个整数的和
return a + b
# 返回 mcp 服务器对象
return mcp
# 创建 MCP 服务器实例
mcp = create_mcp_server()
# 以 stdio 模式运行 MCP 服务器,供子进程或编辑器调用
def run_server():
# 输出日志信息:
logger.info("MCP 服务器已启动(stdio 模式)")
# 运行 MCP 服务(stdio 传输方式)
mcp.run(transport="stdio")
# 配置日志格式,支持 level 配置
def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# 判断是否为主程序入口diff
if __name__ == "__main__":
# 载入配置
config = Config()
# 设置日志等级和格式
setup_logging(config.log_level)
# 判断参数是否要求以 serve 启动服务器
if len(sys.argv) >= 2 and sys.argv[1] == "serve":
# 启动 stdio MCP 服务器
run_server()
else:
# 否则当作命令行问答客户端
question = "北京今天天气怎么样?"
try:
# 执行桥接对话
reply = asyncio.run(run_bridge(question, config))
# 输出回复
print(reply)
# 认证异常处理
except AuthenticationError:
logger.error("API 认证失败:请检查 DEEPSEEK_API_KEY 是否正确,可在 .env 中配置或设置环境变量")
sys.exit(1)4.聊天 #
4.1. main.py #
main.py
# 导入操作系统相关库
import os
# 导入日志处理库
import logging
# 导入系统参数和函数库
import sys
# 导入异步相关库
import asyncio
# 导入 OpenAI 认证异常
from openai import AuthenticationError
# 导入 MCP 桥接通信相关类
from mcp import ClientSession, StdioServerParameters
# 导入 MCP stdio 客户端工具
from mcp.client.stdio import stdio_client
# 导入快速 MCP 服务器类
from mcp.server.fastmcp import FastMCP
# 导入 OpenAI 异步客户端
+from openai import AsyncOpenAI
# 导入 json 处理库
+import json
# 定义配置类,用于读取和保存环境变量中的配置信息
class Config:
# 从环境变量读取的配置,与 .env 配合使用
def __init__(self):
# 读取日志等级(默认 INFO)
self.log_level = os.environ.get("LOG_LEVEL", "INFO")
# 读取 MCP 命令(默认是 python)
self.mcp_command = os.environ.get("MCP_COMMAND", "python")
# 读取 LLM API Key
+ self.llm_api_key = os.environ.get("DEEPSEEK_API_KEY", "")
# 读取 LLM 模型名称
+ self.llm_model = os.environ.get("LLM_MODEL", "deepseek-chat")
# 读取 LLM 基础 API 地址
+ self.llm_base_url = os.environ.get("LLM_BASE_URL", "https://api.deepseek.com")
# 获取 MCP 服务器的启动参数
def get_mcp_server_params(self):
# 构造 MCP 服务器启动参数
args = [__file__, "serve"]
# 返回 MCP 服务器启动参数
return StdioServerParameters(command=self.mcp_command, args=args)
# 设置当前模块日志记录器
logger = logging.getLogger(__name__)
# 配置日志格式,支持 level 配置
def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# 将 MCP Tool 转为 OpenAI Function Calling 格式
def mcp_tools_to_openai_format(mcp_tools):
# 定义保存结果的列表
result = []
# 遍历每个工具
for tool in mcp_tools:
# 获取工具的输入 schema(参数说明)
schema = getattr(tool, "inputSchema", None) or getattr(tool, "input_schema", None) or {}
# 若 schema 为空,则使用默认格式
if not schema:
schema = {"type": "object", "properties": {}, "required": []}
# 将该工具转为 openai 的 function call 格式并加入结果
result.append({
"name": tool.name,
"description": tool.description or f"调用工具 {tool.name}",
"parameters": schema,
})
# 返回转换后的工具列表
return result
# 辅助函数:将 OpenAI 返回的 message 对象转为 API dict 格式
+def _message_to_dict(msg):
# 初始化字典,角色为助手,内容取 message 的 content(若为 None 则设为 None)
+ d = {"role": "assistant", "content": msg.content or None}
# 如果 message 包含工具调用(tool_calls)
+ if msg.tool_calls:
# 如果有工具调用,则设置 content 为 None
+ d["content"] = None
# 构建 tool_calls 列表,每个调用包含 id、类型、函数名和参数
+ d["tool_calls"] = [
+ {
+ "id": tc.id,
+ "type": "function",
+ "function": {
+ "name": tc.function.name,
+ "arguments": tc.function.arguments or "{}"
+ }
+ }
+ for tc in msg.tool_calls
+ ]
# 返回处理后的字典
+ return d
# MCP Bridge 类,负责连接 MCP 工具和大模型
class MCPBridge:
"""
连接 MCP 服务器与大模型,负责:
- 从 MCP 获取工具列表并转成 Function Calling 格式
- 调用大模型
- 将模型返回的工具调用请求转发给 MCP 执行
"""
# 初始化 MCPBridge (保存配置和对象句柄)
def __init__(self, config):
self.config = config
+ self._llm_client = AsyncOpenAI(
+ api_key=self.config.llm_api_key,
+ base_url=self.config.llm_base_url,
+ )
# 异步单轮/多轮对话,实现桥接逻辑
async def chat(self, user_message, session):
"""
执行单轮对话:用户输入 -> 模型决策 -> 可选工具调用 -> 最终回答。
支持多轮工具调用(模型可能连续多次调用工具)。
"""
# 获取 MCP 工具列表
mcp_tools = list((await session.list_tools()).tools)
# 转为 OpenAI function calling 格式
function_defs = mcp_tools_to_openai_format(mcp_tools)
# 构造 tools 参数
tools = [{"type": "function", "function": f} for f in function_defs]
# 构造 tools 参数
tools = [{"type": "function", "function": f} for f in function_defs]
logger.info("tools: %s", tools)
# 构造会话历史
+ messages = [
# 系统消息:说明助手行为
+ {"role": "system", "content": "你是一个有帮助的助手。当用户需要查询天气、发邮件、计算时,请调用相应工具。"},
# 用户输入
+ {"role": "user", "content": user_message},
+ ]
# 设置最多工具调用轮数,防止死循环
+ max_tool_rounds = 5 # 防止无限循环
+ round_count = 0
# 循环进行多轮(最多5轮)交互
+ while round_count < max_tool_rounds:
# 增加交互轮数
+ round_count += 1
# 向大语言模型发送当前的会话消息和可用工具信息,获取模型的回复
+ response = await self._llm_client.chat.completions.create(
+ model=self.config.llm_model, # 指定要使用的语言模型
+ messages=messages, # 传递对话历史消息
+ tools=tools, # 传递工具定义用于 Function Calling
+ tool_choice="auto", # 让模型自动决定是否调用工具
+ )
# 获取模型回复的 message
+ msg = response.choices[0].message
# 转换为 openai API 需要的 dict 格式并添加到消息历史
+ messages.append(_message_to_dict(msg))
# 当没有需要调用工具时,直接返回答案
+ if not msg.tool_calls:
+ return (msg.content or "").strip()
# 执行模型要求的所有工具调用
+ for tc in msg.tool_calls:
# 获取要调用的工具名称
+ name = tc.function.name
# 获取工具调用参数字符串
+ args_str = tc.function.arguments or "{}"
+ try:
# 解析参数字符串为 dict
+ arguments = json.loads(args_str)
+ except json.JSONDecodeError:
# 解析失败则使用空参数
+ arguments = {}
# 记录日志
+ logger.info("执行工具: %s", name)
# 调用 MCP 工具
+ result = await session.call_tool(name, arguments=arguments)
# 提取工具调用返回的文本内容
+ text = result.content[0].text if result.content else ""
# 如果执行失败,记录错误信息
+ if result.isError:
+ text = f"执行错误: {result.content}"
# 将工具调用结果添加至消息历史
+ messages.append({
+ "role": "tool",
+ "tool_call_id": tc.id,
+ "content": text,
+ })
# 超过最大轮数未终止,返回提示信息
+ return "工具调用次数过多,已终止。"
# 异步函数,桥接主入口:调用 MCP 子进程并与大模型对话
async def run_bridge(user_message, config):
# 获取 MCP 服务器启动参数
server_params = config.get_mcp_server_params()
# 生成 MCPBridge 实例
bridge = MCPBridge(config)
# 使用 stdio_client 启动 MCP 服务器并建立通信流
async with stdio_client(server_params) as (read_stream, write_stream):
# 建立 MCP 会话
async with ClientSession(read_stream, write_stream) as session:
# 初始化会话
await session.initialize()
# 获取已注册的工具
tools = await session.list_tools()
# 日志记录已发现工具
logger.info("工具发现: %s", [t.name for t in tools.tools])
# 调用桥接逻辑
return await bridge.chat(user_message, session)
# 创建并注册工具的 MCP 服务器
def create_mcp_server():
# 实例化 FastMCP,命名为 MCP-Bridge
mcp = FastMCP(name="MCP-Bridge")
# 注册获取天气工具
@mcp.tool()
def get_weather(city):
# 查询指定城市的天气
return f"{city}今天晴,气温 25℃"
# 注册发送邮件工具
@mcp.tool()
def send_email(to, subject, body):
# 发送邮件到指定收件人
return f"已发送邮件给 {to},主题:{subject}"
# 注册加法计算工具
@mcp.tool()
def add(a, b):
# 计算两个整数的和
return a + b
# 返回 mcp 服务器对象
return mcp
# 创建 MCP 服务器实例
mcp = create_mcp_server()
# 以 stdio 模式运行 MCP 服务器,供子进程或编辑器调用
def run_server():
# 输出日志信息:
logger.info("MCP 服务器已启动(stdio 模式)")
# 运行 MCP 服务(stdio 传输方式)
mcp.run(transport="stdio")
# 配置日志格式,支持 level 配置
def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# 判断是否为主程序入口diff
if __name__ == "__main__":
# 载入配置
config = Config()
# 设置日志等级和格式
setup_logging(config.log_level)
# 判断参数是否要求以 serve 启动服务器
if len(sys.argv) >= 2 and sys.argv[1] == "serve":
# 启动 stdio MCP 服务器
run_server()
else:
# 否则当作命令行问答客户端
question = "北京今天天气怎么样?"
try:
# 执行桥接对话
reply = asyncio.run(run_bridge(question, config))
# 输出回复
print(reply)
# 认证异常处理
except AuthenticationError:
logger.error("API 认证失败:请检查 DEEPSEEK_API_KEY 是否正确,可在 .env 中配置或设置环境变量")
sys.exit(1)5.执行过程 #
5.1 程序入口如何分流 #
入口在 if __name__ == "__main__"::
- 创建配置
config = Config()(读取环境变量) - 初始化日志
setup_logging(config.log_level) - 判断命令行参数:
- 有
serve->run_server() - 否则 ->
asyncio.run(run_bridge(question, config))
- 有
5.2 客户端模式的完整执行链路 #
当你直接运行 python main.py 时,流程是:
run_bridge()先通过config.get_mcp_server_params()生成子进程启动参数:本文件 +servestdio_client(...)启动一个子进程(即python main.py serve)- 与子进程建立
ClientSession,initialize()并list_tools() - 创建
MCPBridge,进入bridge.chat(...) chat()中:- 拉取 MCP 工具列表
- 转换成 OpenAI function calling 格式
- 组装
messages(system + user) - 进入最多 5 轮循环:
- 调 LLM(
tool_choice="auto") - 若 LLM 返回普通文本:直接结束并返回
- 若返回
tool_calls:逐个调用session.call_tool(...) - 把工具结果以
role="tool"塞回messages,继续下一轮
- 调 LLM(
- 最终得到答案并
print(reply)
5.3 服务模式在做什么 #
python main.py serve 时执行 run_server(),本质是把 create_mcp_server() 注册的工具暴露出来(stdio 传输):
get_weather(city)send_email(to, subject, body)add(a, b)
这部分是被上面客户端模式“拉起并调用”的工具端。
5.4 时序图(Mermaid) #
1.旅行计划 #
1.1. .env #
.env
DEEPSEEK_API_KEY=sk-614848d97c0c49588e142274eb09e3041.2. mcp_client.py #
mcp_client.py
# 导入操作系统相关库
import os
# 导入日志处理库
import logging
# 导入系统参数和函数库
import sys
# 导入异步相关库
import asyncio
# 导入 OpenAI 认证异常
from openai import AuthenticationError
# 导入 MCP 桥接通信相关类
from mcp import ClientSession, StdioServerParameters
# 导入 MCP stdio 客户端工具
from mcp.client.stdio import stdio_client
# 导入快速 MCP 服务器类
from mcp.server.fastmcp import FastMCP
# 导入 OpenAI 异步客户端
from openai import AsyncOpenAI
# 导入 json 处理库
import json
# 定义配置类,用于读取和保存环境变量中的配置信息
class Config:
# 从环境变量读取的配置,与 .env 配合使用
def __init__(self):
# 读取日志等级(默认 INFO)
self.log_level = os.environ.get("LOG_LEVEL", "INFO")
# 读取 MCP 命令(默认是 python)
self.mcp_command = os.environ.get("MCP_COMMAND", "python")
# 读取 LLM API Key
self.llm_api_key = os.environ.get("DEEPSEEK_API_KEY", "sk-614848d97c0c49588e142274eb09e304")
# 读取 LLM 模型名称
self.llm_model = os.environ.get("LLM_MODEL", "deepseek-chat")
# 读取 LLM 基础 API 地址
self.llm_base_url = os.environ.get("LLM_BASE_URL", "https://api.deepseek.com")
# 获取 MCP 服务器的启动参数
def get_mcp_server_params(self):
# 构造 MCP 服务器启动参数
args = ["mcp_server.py"]
# 返回 MCP 服务器启动参数
return StdioServerParameters(command=self.mcp_command, args=args)
# 设置当前模块日志记录器
logger = logging.getLogger(__name__)
# 配置日志格式,支持 level 配置
def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# 将 MCP Tool 转为 OpenAI Function Calling 格式
def mcp_tools_to_openai_format(mcp_tools):
# 定义保存结果的列表
result = []
# 遍历每个工具
for tool in mcp_tools:
# 获取工具的输入 schema(参数说明)
schema = getattr(tool, "inputSchema", None) or getattr(tool, "input_schema", None) or {}
# 若 schema 为空,则使用默认格式
if not schema:
schema = {"type": "object", "properties": {}, "required": []}
# 将该工具转为 openai 的 function call 格式并加入结果
result.append({
"name": tool.name,
"description": tool.description or f"调用工具 {tool.name}",
"parameters": schema,
})
# 返回转换后的工具列表
return result
# 辅助函数:将 OpenAI 返回的 message 对象转为 API dict 格式
def _message_to_dict(msg):
# 初始化字典,角色为助手,内容取 message 的 content(若为 None 则设为 None)
d = {"role": "assistant", "content": msg.content or None}
# 如果 message 包含工具调用(tool_calls)
if msg.tool_calls:
# 如果有工具调用,则设置 content 为 None
d["content"] = None
# 构建 tool_calls 列表,每个调用包含 id、类型、函数名和参数
d["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments or "{}"
}
}
for tc in msg.tool_calls
]
# 返回处理后的字典
return d
# MCP Bridge 类,负责连接 MCP 工具和大模型
class MCPBridge:
"""
连接 MCP 服务器与大模型,负责:
- 从 MCP 获取工具列表并转成 Function Calling 格式
- 调用大模型
- 将模型返回的工具调用请求转发给 MCP 执行
"""
# 初始化 MCPBridge (保存配置和对象句柄)
def __init__(self, config):
self.config = config
self._llm_client = AsyncOpenAI(
api_key=self.config.llm_api_key,
base_url=self.config.llm_base_url,
)
# 异步单轮/多轮对话,实现桥接逻辑
async def chat(self, user_message, session):
"""
执行单轮对话:用户输入 -> 模型决策 -> 可选工具调用 -> 最终回答。
支持多轮工具调用(模型可能连续多次调用工具)。
"""
# 获取 MCP 工具列表
mcp_tools = list((await session.list_tools()).tools)
# 转为 OpenAI function calling 格式
function_defs = mcp_tools_to_openai_format(mcp_tools)
# 构造 tools 参数
tools = [{"type": "function", "function": f} for f in function_defs]
# 构造 tools 参数
tools = [{"type": "function", "function": f} for f in function_defs]
logger.info("tools: %s", tools)
# 构造会话历史
messages = [
# 系统消息:说明助手行为
{"role": "system", "content": "你是一个有帮助的助手。当用户需要查询天气、发邮件、计算时,请调用相应工具。"},
# 用户输入
{"role": "user", "content": user_message},
]
# 设置最多工具调用轮数,防止死循环
max_tool_rounds = 5 # 防止无限循环
round_count = 0
# 循环进行多轮(最多5轮)交互
while round_count < max_tool_rounds:
# 增加交互轮数
round_count += 1
# 向大语言模型发送当前的会话消息和可用工具信息,获取模型的回复
response = await self._llm_client.chat.completions.create(
model=self.config.llm_model, # 指定要使用的语言模型
messages=messages, # 传递对话历史消息
tools=tools, # 传递工具定义用于 Function Calling
tool_choice="auto", # 让模型自动决定是否调用工具
)
# 获取模型回复的 message
msg = response.choices[0].message
logger.info("msg: %s", msg)
# 转换为 openai API 需要的 dict 格式并添加到消息历史
messages.append(_message_to_dict(msg))
# 当没有需要调用工具时,直接返回答案
if not msg.tool_calls:
return (msg.content or "").strip()
# 执行模型要求的所有工具调用
for tc in msg.tool_calls:
# 获取要调用的工具名称
name = tc.function.name
# 获取工具调用参数字符串
args_str = tc.function.arguments or "{}"
try:
# 解析参数字符串为 dict
arguments = json.loads(args_str)
except json.JSONDecodeError:
# 解析失败则使用空参数
arguments = {}
# 记录日志
logger.info("执行工具: %s", name)
# 调用 MCP 工具
result = await session.call_tool(name, arguments=arguments)
# 提取工具调用返回的文本内容
text = result.content[0].text if result.content else ""
# 如果执行失败,记录错误信息
if result.isError:
text = f"执行错误: {result.content}"
# 将工具调用结果添加至消息历史
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": text,
})
# 超过最大轮数未终止,返回提示信息
return "工具调用次数过多,已终止。"
# 异步函数,桥接主入口:调用 MCP 子进程并与大模型对话
async def run_bridge(user_message, config):
# 获取 MCP 服务器启动参数
server_params = config.get_mcp_server_params()
# 生成 MCPBridge 实例
bridge = MCPBridge(config)
# 使用 stdio_client 启动 MCP 服务器并建立通信流
async with stdio_client(server_params) as (read_stream, write_stream):
# 建立 MCP 会话
async with ClientSession(read_stream, write_stream) as session:
# 初始化会话
await session.initialize()
# 获取已注册的工具
tools = await session.list_tools()
# 日志记录已发现工具
logger.info("工具发现: %s", [t.name for t in tools.tools])
# 调用桥接逻辑
return await bridge.chat(user_message, session)
# 配置日志格式,支持 level 配置
def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# 判断是否为主程序入口diff
if __name__ == "__main__":
# 载入配置
config = Config()
# 设置日志等级和格式
setup_logging(config.log_level)
question = "北京今天天气怎么样?"
try:
# 执行桥接对话
reply = asyncio.run(run_bridge(question, config))
# 输出回复
print(reply)
# 认证异常处理
except AuthenticationError:
logger.error("API 认证失败:请检查 DEEPSEEK_API_KEY 是否正确,可在 .env 中配置或设置环境变量")
sys.exit(1)
1.3. mcp_server.py #
mcp_server.py
# 旅行规划 MCP 服务器
"""
旅行规划 MCP 服务器
整合三类能力:
- 旅行服务器:航班、酒店、行程
- 天气服务器:气候、预报
- 日历/邮件服务器:日程、通知
提供工具、资源、提示,供 AI 应用调用。
"""
# 导入日志模块
import logging
# 导入 Path 类用于处理文件路径
from pathlib import Path
# 导入 dotenv 用于加载环境变量
from dotenv import load_dotenv
# 导入 FastMCP 用于创建 MCP 服务器
from mcp.server.fastmcp import FastMCP
# 加载上层目录中的 .env 文件中的环境变量
load_dotenv(Path(__file__).resolve().parent.parent / ".env")
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 实例化一个 FastMCP 服务器,名称为 Travel-Planning
mcp = FastMCP(name="Travel-Planning")
# 定义运行 MCP 服务器的函数
def run_server():
# 输出服务器启动日志信息
logger.info("旅行规划 MCP 服务器已启动(stdio 模式)")
# 以 stdio 方式运行 MCP 服务器
mcp.run(transport="stdio")
# 判断脚本是否作为主模块运行
if __name__ == "__main__":
# 配置日志基本格式和等级
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")
# 启动 MCP 服务器
run_server()
2.资源 #
2.1. mcp_client.py #
mcp_client.py
# 导入操作系统相关库
import os
# 导入日志处理库
import logging
# 导入系统参数和函数库
import sys
# 导入异步相关库
import asyncio
# 导入 OpenAI 认证异常
from openai import AuthenticationError
# 导入 MCP 桥接通信相关类
from mcp import ClientSession, StdioServerParameters
# 导入 MCP stdio 客户端工具
from mcp.client.stdio import stdio_client
# 导入快速 MCP 服务器类
from mcp.server.fastmcp import FastMCP
# 导入 OpenAI 异步客户端
from openai import AsyncOpenAI
# 导入 json 处理库
import json
# 定义配置类,用于读取和保存环境变量中的配置信息
class Config:
# 从环境变量读取的配置,与 .env 配合使用
def __init__(self):
# 读取日志等级(默认 INFO)
self.log_level = os.environ.get("LOG_LEVEL", "INFO")
# 读取 MCP 命令(默认是 python)
self.mcp_command = os.environ.get("MCP_COMMAND", "python")
# 读取 LLM API Key
self.llm_api_key = os.environ.get("DEEPSEEK_API_KEY", "sk-614848d97c0c49588e142274eb09e304")
# 读取 LLM 模型名称
self.llm_model = os.environ.get("LLM_MODEL", "deepseek-chat")
# 读取 LLM 基础 API 地址
self.llm_base_url = os.environ.get("LLM_BASE_URL", "https://api.deepseek.com")
# 获取 MCP 服务器的启动参数
def get_mcp_server_params(self):
# 构造 MCP 服务器启动参数
args = ["mcp_server.py"]
# 返回 MCP 服务器启动参数
return StdioServerParameters(command=self.mcp_command, args=args)
# 设置当前模块日志记录器
logger = logging.getLogger(__name__)
# 配置日志格式,支持 level 配置
def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# 将 MCP Tool 转为 OpenAI Function Calling 格式
def mcp_tools_to_openai_format(mcp_tools):
# 定义保存结果的列表
result = []
# 遍历每个工具
for tool in mcp_tools:
# 获取工具的输入 schema(参数说明)
schema = getattr(tool, "inputSchema", None) or getattr(tool, "input_schema", None) or {}
# 若 schema 为空,则使用默认格式
if not schema:
schema = {"type": "object", "properties": {}, "required": []}
# 将该工具转为 openai 的 function call 格式并加入结果
result.append({
"name": tool.name,
"description": tool.description or f"调用工具 {tool.name}",
"parameters": schema,
})
# 返回转换后的工具列表
return result
# 辅助函数:将 OpenAI 返回的 message 对象转为 API dict 格式
def _message_to_dict(msg):
# 初始化字典,角色为助手,内容取 message 的 content(若为 None 则设为 None)
d = {"role": "assistant", "content": msg.content or None}
# 如果 message 包含工具调用(tool_calls)
if msg.tool_calls:
# 如果有工具调用,则设置 content 为 None
d["content"] = None
# 构建 tool_calls 列表,每个调用包含 id、类型、函数名和参数
d["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments or "{}"
}
}
for tc in msg.tool_calls
]
# 返回处理后的字典
return d
# MCP Bridge 类,负责连接 MCP 工具和大模型
class MCPBridge:
"""
连接 MCP 服务器与大模型,负责:
- 从 MCP 获取工具列表并转成 Function Calling 格式
- 调用大模型
- 将模型返回的工具调用请求转发给 MCP 执行
"""
# 初始化 MCPBridge (保存配置和对象句柄)
def __init__(self, config):
self.config = config
self._llm_client = AsyncOpenAI(
api_key=self.config.llm_api_key,
base_url=self.config.llm_base_url,
)
# 异步单轮/多轮对话,实现桥接逻辑
async def chat(self, user_message, session):
"""
执行单轮对话:用户输入 -> 模型决策 -> 可选工具调用 -> 最终回答。
支持多轮工具调用(模型可能连续多次调用工具)。
"""
# 获取 MCP 工具列表
mcp_tools = list((await session.list_tools()).tools)
# 转为 OpenAI function calling 格式
function_defs = mcp_tools_to_openai_format(mcp_tools)
# 构造 tools 参数
tools = [{"type": "function", "function": f} for f in function_defs]
# 构造 tools 参数
tools = [{"type": "function", "function": f} for f in function_defs]
logger.info("tools: %s", tools)
# 构造会话历史
messages = [
# 系统消息:说明助手行为
{"role": "system", "content": "你是一个有帮助的助手。当用户需要查询天气、发邮件、计算时,请调用相应工具。"},
# 用户输入
{"role": "user", "content": user_message},
]
# 设置最多工具调用轮数,防止死循环
max_tool_rounds = 5 # 防止无限循环
round_count = 0
# 循环进行多轮(最多5轮)交互
while round_count < max_tool_rounds:
# 增加交互轮数
round_count += 1
# 向大语言模型发送当前的会话消息和可用工具信息,获取模型的回复
response = await self._llm_client.chat.completions.create(
model=self.config.llm_model, # 指定要使用的语言模型
messages=messages, # 传递对话历史消息
tools=tools, # 传递工具定义用于 Function Calling
tool_choice="auto", # 让模型自动决定是否调用工具
)
# 获取模型回复的 message
msg = response.choices[0].message
logger.info("msg: %s", msg)
# 转换为 openai API 需要的 dict 格式并添加到消息历史
messages.append(_message_to_dict(msg))
# 当没有需要调用工具时,直接返回答案
if not msg.tool_calls:
return (msg.content or "").strip()
# 执行模型要求的所有工具调用
for tc in msg.tool_calls:
# 获取要调用的工具名称
name = tc.function.name
# 获取工具调用参数字符串
args_str = tc.function.arguments or "{}"
try:
# 解析参数字符串为 dict
arguments = json.loads(args_str)
except json.JSONDecodeError:
# 解析失败则使用空参数
arguments = {}
# 记录日志
logger.info("执行工具: %s", name)
# 调用 MCP 工具
result = await session.call_tool(name, arguments=arguments)
# 提取工具调用返回的文本内容
text = result.content[0].text if result.content else ""
# 如果执行失败,记录错误信息
if result.isError:
text = f"执行错误: {result.content}"
# 将工具调用结果添加至消息历史
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": text,
})
# 超过最大轮数未终止,返回提示信息
return "工具调用次数过多,已终止。"
# 异步函数,桥接主入口:调用 MCP 子进程并与大模型对话
+async def run_bridge(user_message, config, resource_uris=None):
+ """
+ resource_uris: 可选,要预读的资源 URI 列表,如 ["calendar://events/2024", "preferences://travel"]
+ """
# 获取 MCP 服务器启动参数
server_params = config.get_mcp_server_params()
# 生成 MCPBridge 实例
bridge = MCPBridge(config)
# 使用 stdio_client 启动 MCP 服务器并建立通信流
async with stdio_client(server_params) as (read_stream, write_stream):
# 建立 MCP 会话
async with ClientSession(read_stream, write_stream) as session:
# 初始化会话
await session.initialize()
# 获取已注册的工具
tools = await session.list_tools()
# 日志记录已发现工具
logger.info("工具发现: %s", [t.name for t in tools.tools])
# 如果 resource_uris 列表非空,则遍历每个资源 URI
+ if resource_uris:
# 遍历资源 URI 列表
+ for uri in resource_uris:
+ try:
# 异步读取指定 URI 的资源内容
+ result = await session.read_resource(uri)
# 如果资源内容不为空
+ if result.contents:
# 获取资源的文本内容,如果没有则默认为空字符串
+ text = result.contents[0].text or ""
# 将读取到的资源内容插入到 user_message 字符串中,便于后续处理
+ user_message = f"[预读资源 {uri}]\n{text}\n\n[用户请求]\n{user_message}"
# 记录成功读取资源的日志
+ logger.info("已读取资源: %s", uri)
# 如果读取资源过程中发生异常
+ except Exception as e:
# 记录读取资源失败的警告日志
+ logger.warning("读取资源 %s 失败: %s", uri, e)
# 调用桥接逻辑
return await bridge.chat(user_message, session)
# 配置日志格式,支持 level 配置
def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# 判断是否为主程序入口diff
if __name__ == "__main__":
# 载入配置
config = Config()
# 设置日志等级和格式
setup_logging(config.log_level)
# 默认情况下,使用一条预设的出行规划提示(前往杭州,7天,预算5000元)
+ default_prompt = "请帮我规划一次前往杭州的 7 天假期,预算约 5000 元。请先搜索航班和酒店,查询当地天气,然后给出行程建议,并创建日历事件。"
# 如果用户在命令行传入了问题,则使用用户的输入,否则使用默认提示
+ question = sys.argv[1] if len(sys.argv) > 1 else default_prompt
# 预读资源列表(如日历事件、旅行偏好、历史行程),可为空或自定义
+ resource_uris = ["calendar://events/2024", "preferences://travel", "trips://history/杭州"]
try:
# 异步运行主桥接函数,传入当前问题、配置信息和预读资源
+ reply = asyncio.run(run_bridge(question, config, resource_uris=resource_uris))
# 输出视觉分隔符和标题“最终规划”
+ print("\n" + "=" * 50 + "\n最终规划:\n" + "=" * 50)
# 输出最终生成的旅行规划结果
print(reply)
except AuthenticationError:
# 若发生认证错误(如API KEY无效),记录错误日志并提醒用户检查密钥配置
+ logger.error("API 认证失败:请检查 DEEPSEEK_API_KEY,可在 .env 中配置")
# 系统退出,错误码1
sys.exit(1)
2.2. mcp_server.py #
mcp_server.py
# 旅行规划 MCP 服务器
"""
旅行规划 MCP 服务器
整合三类能力:
- 旅行服务器:航班、酒店、行程
- 天气服务器:气候、预报
- 日历/邮件服务器:日程、通知
提供工具、资源、提示,供 AI 应用调用。
"""
+import json
# 导入日志模块
import logging
# 导入 Path 类用于处理文件路径
from pathlib import Path
# 导入 dotenv 用于加载环境变量
from dotenv import load_dotenv
# 导入 FastMCP 用于创建 MCP 服务器
from mcp.server.fastmcp import FastMCP
# 加载上层目录中的 .env 文件中的环境变量
load_dotenv(Path(__file__).resolve().parent.parent / ".env")
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 实例化一个 FastMCP 服务器,名称为 Travel-Planning
mcp = FastMCP(name="Travel-Planning")
# ========== 资源 ==========
# 将此函数注册为 calendar://events/{year} 类型的资源
+@mcp.resource("calendar://events/{year}")
# 定义获取指定年份日历资源的函数
+def get_calendar_resource(year):
# 返回包含该年份可用周、忙碌日期和建议的JSON字符串
+ return json.dumps({
+ "year": year, # 当前年份
+ "available_weeks": ["2024-06-17", "2024-06-24", "2024-07-01", "2024-07-08"], # 可出行的周
+ "busy_dates": ["2024-06-15", "2024-06-22"], # 忙碌不可选的日期
+ "note": "建议选择 available_weeks 中的日期出行", # 备注建议
+ }, ensure_ascii=False)
# 将此函数注册为 trips://history/{destination} 类型的资源
+@mcp.resource("trips://history/{destination}")
# 定义获取某目的地历史行程信息的函数
+def get_trip_history(destination):
# 中国城市历史记录
+ history = {
+ "杭州": {"last_visit": "2023-05", "preferences": ["西湖", "龙井茶", "宋城"], "notes": "喜欢住在西湖附近"},
+ "成都": {"last_visit": "2023-08", "preferences": ["火锅", "熊猫", "宽窄巷子"], "notes": "偏好春熙路商圈"},
+ "西安": {"last_visit": "2022-10", "preferences": ["兵马俑", "肉夹馍", "古城墙"], "notes": "喜欢回民街美食"},
+ "北京": {"last_visit": "2023-03", "preferences": ["故宫", "长城", "烤鸭"], "notes": "建议避开节假日"},
+ "上海": {"last_visit": "2024-01", "preferences": ["外滩", "迪士尼", "豫园"], "notes": "浦东交通方便"},
+ }
# 根据目的地查找历史记录,若无则提供默认信息
+ data = history.get(destination, {"last_visit": "未去过", "preferences": ["文化景点", "当地美食"], "notes": "首次前往"})
# 返回合成后的JSON字符串
+ return json.dumps({"destination": destination, **data}, ensure_ascii=False)
# 将此函数注册为 preferences://travel 类型的资源
+@mcp.resource("preferences://travel")
# 定义获取用户旅⾏偏好的函数
+def get_travel_preferences():
# 返回预算、交通、住宿、兴趣爱好和偏好城市等信息的JSON字符串
+ return json.dumps({
+ "budget_level": "中等", # 预算档位
+ "preferred_transport": "高铁/飞机", # 优先交通方式
+ "accommodation": "酒店", # 首选住宿类型
+ "interests": ["文化古迹", "当地美食", "自然风光"], # 兴趣点
+ "preferred_cities": ["杭州", "成都", "西安", "北京", "上海"], # 偏好城市列表
+ }, ensure_ascii=False)
# 定义运行 MCP 服务器的函数
def run_server():
# 输出服务器启动日志信息
logger.info("旅行规划 MCP 服务器已启动(stdio 模式)")
# 以 stdio 方式运行 MCP 服务器
mcp.run(transport="stdio")
# 判断脚本是否作为主模块运行
if __name__ == "__main__":
# 配置日志基本格式和等级
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")
# 启动 MCP 服务器
run_server()
3.旅行规划 #
3.1. mcp_client.py #
mcp_client.py
# 导入操作系统相关库
import os
# 导入日志处理库
import logging
# 导入系统参数和函数库
import sys
# 导入异步相关库
import asyncio
# 导入 OpenAI 认证异常
from openai import AuthenticationError
# 导入 MCP 桥接通信相关类
from mcp import ClientSession, StdioServerParameters
# 导入 MCP stdio 客户端工具
from mcp.client.stdio import stdio_client
# 导入快速 MCP 服务器类
from mcp.server.fastmcp import FastMCP
# 导入 OpenAI 异步客户端
from openai import AsyncOpenAI
# 导入 json 处理库
import json
# 定义配置类,用于读取和保存环境变量中的配置信息
class Config:
# 从环境变量读取的配置,与 .env 配合使用
def __init__(self):
# 读取日志等级(默认 INFO)
self.log_level = os.environ.get("LOG_LEVEL", "INFO")
# 读取 MCP 命令(默认是 python)
self.mcp_command = os.environ.get("MCP_COMMAND", "python")
# 读取 LLM API Key
self.llm_api_key = os.environ.get("DEEPSEEK_API_KEY", "sk-614848d97c0c49588e142274eb09e304")
# 读取 LLM 模型名称
self.llm_model = os.environ.get("LLM_MODEL", "deepseek-chat")
# 读取 LLM 基础 API 地址
self.llm_base_url = os.environ.get("LLM_BASE_URL", "https://api.deepseek.com")
# 获取 MCP 服务器的启动参数
def get_mcp_server_params(self):
# 构造 MCP 服务器启动参数
+ args = ["mcp_server.py", "serve"]
# 返回 MCP 服务器启动参数
return StdioServerParameters(command=self.mcp_command, args=args)
# 设置当前模块日志记录器
logger = logging.getLogger(__name__)
# 配置日志格式,支持 level 配置
def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# 将 MCP Tool 转为 OpenAI Function Calling 格式
def mcp_tools_to_openai_format(mcp_tools):
# 定义保存结果的列表
result = []
# 遍历每个工具
for tool in mcp_tools:
# 获取工具的输入 schema(参数说明)
schema = getattr(tool, "inputSchema", None) or getattr(tool, "input_schema", None) or {}
# 若 schema 为空,则使用默认格式
if not schema:
schema = {"type": "object", "properties": {}, "required": []}
# 将该工具转为 openai 的 function call 格式并加入结果
result.append({
"name": tool.name,
"description": tool.description or f"调用工具 {tool.name}",
"parameters": schema,
})
# 返回转换后的工具列表
return result
# 定义函数,用于根据 MCP 实时工具列表生成系统提示
+def build_dynamic_system_prompt(mcp_tools):
# 初始化保存工具描述信息的列表
+ tool_lines = []
# 遍历传入的工具列表
+ for tool in mcp_tools:
# 如果工具有描述内容,则将名称和描述一起加入列表
+ if tool.description:
+ tool_lines.append(f"- {tool.name}: {tool.description}")
# 如果没有描述,只加入工具名称
+ else:
+ tool_lines.append(f"- {tool.name}")
# 将所有工具信息拼接成多行字符串,如果列表为空则提示“当前无可用工具”
+ tools_text = "\n".join(tool_lines) if tool_lines else "- 当前无可用工具"
# 构造最终的系统提示内容,介绍身份及可用工具,并给出建议说明
+ return (
+ "你是旅行规划助手。\n"
+ "你当前可调用的 MCP 工具如下:\n"
+ f"{tools_text}\n\n"
+ "请根据用户需求选择合适工具,可多步调用,并在最后给出清晰、可执行的中文建议。"
+ )
# 辅助函数:将 OpenAI 返回的 message 对象转为 API dict 格式
def _message_to_dict(msg):
# 初始化字典,角色为助手,内容取 message 的 content(若为 None 则设为 None)
d = {"role": "assistant", "content": msg.content or None}
# 如果 message 包含工具调用(tool_calls)
if msg.tool_calls:
# 如果有工具调用,则设置 content 为 None
d["content"] = None
# 构建 tool_calls 列表,每个调用包含 id、类型、函数名和参数
d["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments or "{}"
}
}
for tc in msg.tool_calls
]
# 返回处理后的字典
return d
# MCP Bridge 类,负责连接 MCP 工具和大模型
class MCPBridge:
"""
连接 MCP 服务器与大模型,负责:
- 从 MCP 获取工具列表并转成 Function Calling 格式
- 调用大模型
- 将模型返回的工具调用请求转发给 MCP 执行
"""
# 初始化 MCPBridge (保存配置和对象句柄)
def __init__(self, config):
self.config = config
self._llm_client = AsyncOpenAI(
api_key=self.config.llm_api_key,
base_url=self.config.llm_base_url,
)
# 异步单轮/多轮对话,实现桥接逻辑
async def chat(self, user_message, session):
"""
执行单轮对话:用户输入 -> 模型决策 -> 可选工具调用 -> 最终回答。
支持多轮工具调用(模型可能连续多次调用工具)。
"""
# 获取 MCP 工具列表
mcp_tools = list((await session.list_tools()).tools)
# 转为 OpenAI function calling 格式
function_defs = mcp_tools_to_openai_format(mcp_tools)
# 构造 tools 参数
tools = [{"type": "function", "function": f} for f in function_defs]
logger.info("tools: %s", tools)
+ system_prompt = build_dynamic_system_prompt(mcp_tools)
+ logger.info("system_prompt: %s", system_prompt)
# 构造会话历史
messages = [
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_message},
]
# 设置最多工具调用轮数,防止死循环
+ max_tool_rounds = 20 # 防止无限循环
round_count = 0
# 循环进行多轮(最多5轮)交互
while round_count < max_tool_rounds:
# 增加交互轮数
round_count += 1
# 向大语言模型发送当前的会话消息和可用工具信息,获取模型的回复
response = await self._llm_client.chat.completions.create(
model=self.config.llm_model, # 指定要使用的语言模型
messages=messages, # 传递对话历史消息
tools=tools, # 传递工具定义用于 Function Calling
tool_choice="auto", # 让模型自动决定是否调用工具
)
# 获取模型回复的 message
msg = response.choices[0].message
# 转换为 openai API 需要的 dict 格式并添加到消息历史
messages.append(_message_to_dict(msg))
# 当没有需要调用工具时,直接返回答案
if not msg.tool_calls:
return (msg.content or "").strip()
# 执行模型要求的所有工具调用
for tc in msg.tool_calls:
# 获取要调用的工具名称
name = tc.function.name
# 获取工具调用参数字符串
args_str = tc.function.arguments or "{}"
try:
# 解析参数字符串为 dict
arguments = json.loads(args_str)
except json.JSONDecodeError:
# 解析失败则使用空参数
arguments = {}
# 记录日志
logger.info("执行工具: %s", name)
# 调用 MCP 工具
result = await session.call_tool(name, arguments=arguments)
# 提取工具调用返回的文本内容
text = result.content[0].text if result.content else ""
# 如果执行失败,记录错误信息
if result.isError:
text = f"执行错误: {result.content}"
# 将工具调用结果添加至消息历史
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": text,
})
# 超过最大轮数未终止,返回提示信息
return "工具调用次数过多,已终止。"
# 异步函数,桥接主入口:调用 MCP 子进程并与大模型对话
async def run_bridge(user_message, config, resource_uris=None):
"""
resource_uris: 可选,要预读的资源 URI 列表,如 ["calendar://events/2024", "preferences://travel"]
"""
# 获取 MCP 服务器启动参数
server_params = config.get_mcp_server_params()
# 生成 MCPBridge 实例
bridge = MCPBridge(config)
# 使用 stdio_client 启动 MCP 服务器并建立通信流
async with stdio_client(server_params) as (read_stream, write_stream):
# 建立 MCP 会话
async with ClientSession(read_stream, write_stream) as session:
# 初始化会话
await session.initialize()
# 获取已注册的工具
tools = await session.list_tools()
# 日志记录已发现工具
logger.info("工具发现: %s", [t.name for t in tools.tools])
# 如果 resource_uris 列表非空,则遍历每个资源 URI
if resource_uris:
# 遍历资源 URI 列表
for uri in resource_uris:
try:
# 异步读取指定 URI 的资源内容
result = await session.read_resource(uri)
# 如果资源内容不为空
if result.contents:
# 获取资源的文本内容,如果没有则默认为空字符串
text = result.contents[0].text or ""
# 将读取到的资源内容插入到 user_message 字符串中,便于后续处理
user_message = f"[预读资源 {uri}]\n{text}\n\n[用户请求]\n{user_message}"
# 记录成功读取资源的日志
logger.info("已读取资源: %s", uri)
# 如果读取资源过程中发生异常
except Exception as e:
# 记录读取资源失败的警告日志
logger.warning("读取资源 %s 失败: %s", uri, e)
# 调用桥接逻辑
return await bridge.chat(user_message, session)
# 配置日志格式,支持 level 配置
def setup_logging(level="INFO"):
# 调用 logging.basicConfig 配置日志
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# 判断是否为主程序入口diff
if __name__ == "__main__":
# 若sys.stdout支持reconfigure,则设置编码为utf-8,避免中文乱码
+ if hasattr(sys.stdout, "reconfigure"):
+ sys.stdout.reconfigure(encoding="utf-8")
+ sys.stdin.reconfigure(encoding="utf-8")
# 载入配置
config = Config()
# 设置日志等级和格式
setup_logging(config.log_level)
# 默认情况下,使用一条预设的出行规划提示(前往杭州,7天,预算5000元)
default_prompt = "请帮我规划一次前往杭州的 7 天假期,预算约 5000 元。请先搜索航班和酒店,查询当地天气,然后给出行程建议,并创建日历事件。"
# 如果用户在命令行传入了问题,则使用用户的输入,否则使用默认提示
question = sys.argv[1] if len(sys.argv) > 1 else default_prompt
# 预读资源列表(如日历事件、旅行偏好、历史行程),可为空或自定义
resource_uris = ["calendar://events/2024", "preferences://travel", "trips://history/杭州"]
try:
# 异步运行主桥接函数,传入当前问题、配置信息和预读资源
reply = asyncio.run(run_bridge(question, config, resource_uris=resource_uris))
# 输出视觉分隔符和标题“最终规划”
print("\n" + "=" * 50 + "\n最终规划:\n" + "=" * 50)
# 输出最终生成的旅行规划结果
print(reply)
except AuthenticationError:
# 若发生认证错误(如API KEY无效),记录错误日志并提醒用户检查密钥配置
logger.error("API 认证失败:请检查 DEEPSEEK_API_KEY,可在 .env 中配置")
# 系统退出,错误码1
sys.exit(1)
3.2. mcp_server.py #
mcp_server.py
# 旅行规划 MCP 服务器
"""
旅行规划 MCP 服务器
整合三类能力:
- 旅行服务器:航班、酒店、行程
- 天气服务器:气候、预报
- 日历/邮件服务器:日程、通知
提供工具、资源、提示,供 AI 应用调用。
"""
import json
# 导入日志模块
import logging
# 导入 Path 类用于处理文件路径
from pathlib import Path
# 导入 dotenv 用于加载环境变量
from dotenv import load_dotenv
# 导入 FastMCP 用于创建 MCP 服务器
from mcp.server.fastmcp import FastMCP
# 加载上层目录中的 .env 文件中的环境变量
load_dotenv(Path(__file__).resolve().parent.parent / ".env")
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 实例化一个 FastMCP 服务器,名称为 Travel-Planning
mcp = FastMCP(name="Travel-Planning")
# ========== 旅行工具 ==========
+@mcp.tool()
+def search_flights(origin, destination, date):
+ """搜索可用航班。"""
# 模拟国内航班数据(国航、东航、南航)
+ flights = [
+ {"id": "F001", "airline": "国航 CA", "departure": "08:00", "arrival": "10:30", "price": 680},
+ {"id": "F002", "airline": "东航 MU", "departure": "14:20", "arrival": "16:50", "price": 520},
+ {"id": "F003", "airline": "南航 CZ", "departure": "19:00", "arrival": "21:30", "price": 450},
+ ]
+ return json.dumps({"flights": flights, "route": f"{origin} -> {destination}", "date": date}, ensure_ascii=False)
+@mcp.tool()
+def book_hotel(city, check_in, check_out, guests=1):
+ """预订酒店。"""
+ g = int(guests) if isinstance(guests, (str, float)) else guests
+ return json.dumps({
+ "status": "已预订",
+ "hotel": f"{city}市中心酒店",
+ "check_in": check_in,
+ "check_out": check_out,
+ "guests": g,
+ "total": 1200,
+ }, ensure_ascii=False)
+@mcp.tool()
+def get_trip_suggestions(destination, duration, interests=None):
+ """获取目的地行程建议。"""
+ d = 7
+ if isinstance(duration, str):
+ m = re.search(r"\d+", str(duration))
+ d = int(m.group()) if m else 7
+ else:
+ d = int(duration) if duration else 7
# 中国热门目的地行程建议
+ city_suggestions = {
+ "杭州": ["第1天:西湖、灵隐寺", "第2天:西溪湿地", "第3天:宋城、河坊街", "第4天:千岛湖一日游", "第5天:龙井村、九溪烟树", "第6天:乌镇一日游", "第7天:返程准备、购物"],
+ "成都": ["第1天:宽窄巷子、锦里", "第2天:大熊猫基地", "第3天:都江堰", "第4天:青城山", "第5天:春熙路、太古里", "第6天:乐山大佛", "第7天:返程准备"],
+ "西安": ["第1天:兵马俑", "第2天:大雁塔、大唐不夜城", "第3天:城墙、回民街", "第4天:华清池", "第5天:陕西历史博物馆", "第6天:华山一日游", "第7天:返程准备"],
+ "北京": ["第1天:故宫、天安门", "第2天:长城", "第3天:颐和园", "第4天:南锣鼓巷、什刹海", "第5天:天坛", "第6天:798艺术区、雍和宫", "第7天:返程准备"],
+ "上海": ["第1天:外滩、南京路", "第2天:迪士尼乐园", "第3天:豫园、田子坊", "第4天:新天地、淮海路", "第5天:朱家角古镇", "第6天:崇明岛或周边古镇", "第7天:返程准备"],
+ }
+ base = city_suggestions.get(destination, [f"第{i+1}天:{destination}市区游览" for i in range(7)])
+ suggestions = (base + [f"第{i+1}天:自由活动" for i in range(len(base), d)])[:d]
+ return json.dumps({"destination": destination, "duration": d, "suggestions": suggestions[:d]}, ensure_ascii=False)
# ========== 天气工具 ==========
# 将该函数注册为 MCP 工具,用于天气查询
+@mcp.tool()
# 定义查询城市天气的函数
+def check_weather(city, date=None):
+ """查询指定城市天气。"""
# 构造天气预报的字典,包括城市、日期(默认为"今天")、温度范围、天气状况和湿度
+ forecast = {"city": city, "date": date or "今天", "temp": "22-28℃", "condition": "晴", "humidity": "65%"}
# 返回天气信息的 JSON 字符串(确保中文可正常显示)
+ return json.dumps(forecast, ensure_ascii=False)
# 将该函数注册为 MCP 工具,用于查询多日天气预报
+@mcp.tool()
# 定义获取多日天气预报的函数
+def get_weather_forecast(city, days=7):
+ """获取多日天气预报。"""
# 构造指定天数(最多7天)的天气预报列表,每一天包括序号、温度区间和天气状况
+ days_data = [{"day": i + 1, "temp": "20-26℃", "condition": "晴"} for i in range(min(days, 7))]
# 返回城市及多天预报内容的 JSON 字符串
+ return json.dumps({"city": city, "forecast": days_data}, ensure_ascii=False)
# ========== 日历/邮件工具 ==========
# 注册日历事件创建工具函数为 MCP 工具
+@mcp.tool()
# 定义在日历中创建行程事件的函数
+def create_calendar_event(title, start_time, end_time, description=""):
+ """在日历中创建行程事件。"""
# 返回事件创建状态以及事件详细信息的 JSON 字符串
+ return json.dumps({
+ "status": "已创建",
+ "event": {"title": title, "start": start_time, "end": end_time, "description": description},
+ }, ensure_ascii=False)
# 注册邮件发送功能为 MCP 工具
+@mcp.tool()
# 定义发送邮件通知的函数
+def send_email(to, subject, body):
+ """发送邮件通知。"""
# 返回邮件发送状态、接收者、主题的 JSON 字符串
+ return json.dumps({"status": "已发送", "to": to, "subject": subject}, ensure_ascii=False)
# 注册日历事件查询工具为 MCP 工具
+@mcp.tool()
# 定义获取日历事件列表的函数
+def get_calendar_events(year, month=None):
+ """获取日历中的事件列表。"""
# 构造一个包含3个示例事件的列表
+ events = [
+ {"title": "工作会议", "date": "2024-06-15", "time": "10:00"},
+ {"title": "空闲", "date": "2024-06-20", "time": "全天"},
+ {"title": "周末休息", "date": "2024-06-22", "time": "全天"},
+ ]
# 返回年、月、事件列表的 JSON 字符串
+ return json.dumps({"year": year, "month": month, "events": events}, ensure_ascii=False)
# ========== 资源 ==========
# 将此函数注册为 calendar://events/{year} 类型的资源
@mcp.resource("calendar://events/{year}")
# 定义获取指定年份日历资源的函数
def get_calendar_resource(year):
+ """获取指定年份日历资源。"""
# 返回包含该年份可用周、忙碌日期和建议的JSON字符串
return json.dumps({
"year": year, # 当前年份
"available_weeks": ["2024-06-17", "2024-06-24", "2024-07-01", "2024-07-08"], # 可出行的周
"busy_dates": ["2024-06-15", "2024-06-22"], # 忙碌不可选的日期
"note": "建议选择 available_weeks 中的日期出行", # 备注建议
}, ensure_ascii=False)
# 将此函数注册为 trips://history/{destination} 类型的资源
@mcp.resource("trips://history/{destination}")
# 定义获取某目的地历史行程信息的函数
def get_trip_history(destination):
+ """获取某目的地历史行程信息。"""
# 中国城市历史记录
history = {
"杭州": {"last_visit": "2023-05", "preferences": ["西湖", "龙井茶", "宋城"], "notes": "喜欢住在西湖附近"},
"成都": {"last_visit": "2023-08", "preferences": ["火锅", "熊猫", "宽窄巷子"], "notes": "偏好春熙路商圈"},
"西安": {"last_visit": "2022-10", "preferences": ["兵马俑", "肉夹馍", "古城墙"], "notes": "喜欢回民街美食"},
"北京": {"last_visit": "2023-03", "preferences": ["故宫", "长城", "烤鸭"], "notes": "建议避开节假日"},
"上海": {"last_visit": "2024-01", "preferences": ["外滩", "迪士尼", "豫园"], "notes": "浦东交通方便"},
}
# 根据目的地查找历史记录,若无则提供默认信息
data = history.get(destination, {"last_visit": "未去过", "preferences": ["文化景点", "当地美食"], "notes": "首次前往"})
# 返回合成后的JSON字符串
return json.dumps({"destination": destination, **data}, ensure_ascii=False)
# 将此函数注册为 preferences://travel 类型的资源
@mcp.resource("preferences://travel")
# 定义获取用户旅⾏偏好的函数
def get_travel_preferences():
+ """获取用户旅行偏好。"""
# 返回预算、交通、住宿、兴趣爱好和偏好城市等信息的JSON字符串
return json.dumps({
"budget_level": "中等", # 预算档位
"preferred_transport": "高铁/飞机", # 优先交通方式
"accommodation": "酒店", # 首选住宿类型
"interests": ["文化古迹", "当地美食", "自然风光"], # 兴趣点
"preferred_cities": ["杭州", "成都", "西安", "北京", "上海"], # 偏好城市列表
}, ensure_ascii=False)
# 定义运行 MCP 服务器的函数
def run_server():
+ """运行 MCP 服务器。"""
# 输出服务器启动日志信息
+ logger.info("旅行规划 MCP 服务器已启动(stdio 模式)")
# 以 stdio 方式运行 MCP 服务器
mcp.run(transport="stdio")
# 判断脚本是否作为主模块运行
if __name__ == "__main__":
+ """主程序入口。"""
# 配置日志基本格式和等级
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")
# 启动 MCP 服务器
run_server()