1. 参考资料 #
3. 配置MCP #
3.1 配置cline MCP #
{
"mcpServers": {
"get_weather_forecast_stdio": {
"disabled": false,
"timeout": 60,
"type": "stdio",
"command": "uv",
"args": [
"run",
"--directory",
"D:/aprepare/mcp/mcp-server",
"stdio_server.py"
]
}
}
}3.2 配置sse #

3.3 配置streamable #

4.STDIO #
4.1 服务器 #
# 导入json模块,用于处理JSON数据
import json
# 导入httpx模块,用于异步HTTP请求
import httpx
# 导入sys模块,用于标准错误输出
import sys
# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP
# 创建FastMCP服务器实例,名称为"weather-forecast-server"
server = FastMCP("weather-forecast-server")
# 定义高德地图API的基础URL
API_BASE_URL = "https://restapi.amap.com"
# 定义高德地图API的密钥
API_KEY = "7592015e86c61bdae7d5b04b8fb5a00f"
# 注册名为"get_weather_forecast"的工具到FastMCP服务器
@server.tool("get_weather_forecast")
# 定义异步函数,根据地址查询天气预报信息
async def get_weather_forecast(location: str) -> str:
"""
根据地址查询天气预报信息
Args:
location: 需要查询天气的地址或城市名称
Returns:
天气信息的JSON字符串
"""
# 创建异步HTTP客户端
async with httpx.AsyncClient() as http_client:
try:
# 第一步:地理编码,获取城市代码
geo_response = await http_client.get(
f"{API_BASE_URL}/v3/geocode/geo",
params={"key": API_KEY, "address": location},
)
# 将查询地址和返回内容写入标准错误输出,便于调试
sys.stderr.write(location + geo_response.text)
# 如果地理编码请求失败,返回错误信息
if geo_response.status_code != 200:
return f"地理编码请求失败: {geo_response.status_code} - {geo_response.text}"
# 解析地理编码返回的JSON数据
geo_data = json.loads(geo_response.text)
# 如果API返回状态不是"1",说明有错误,返回错误信息
if geo_data["status"] != "1":
return f"地理编码API错误: {geo_data['info']}"
# 提取城市代码(adcode)
city_code = geo_data["geocodes"][0]["adcode"]
# 第二步:根据城市代码获取天气信息
weather_response = await http_client.get(
f"{API_BASE_URL}/v3/weather/weatherInfo",
params={"key": API_KEY, "city": city_code},
)
# 如果天气查询请求失败,返回错误信息
if weather_response.status_code != 200:
return f"天气查询请求失败: {weather_response.status_code} - {weather_response.text}"
# 解析天气查询返回的JSON数据
weather_data = json.loads(weather_response.text)
# 如果API返回状态不是"1",说明有错误,返回错误信息
if weather_data["status"] != "1":
return f"天气查询API错误: {weather_data['info']}"
# 返回天气数据的原始JSON字符串
return weather_response.text
# 捕获异常并返回异常信息
except Exception as error:
return f"查询天气时发生异常: {str(error)}"
# 判断当前模块是否为主程序入口
if __name__ == "__main__":
# 启动FastMCP服务器
server.run(transport="stdio")
4.2 客户端 #
# 导入json模块,用于处理JSON数据
import json
# 导入asyncio模块,用于异步编程
import asyncio
# 从mcp模块导入ClientSession和StdioServerParameters
from mcp import ClientSession, StdioServerParameters
# 从mcp.client.stdio导入stdio_client,用于建立stdio连接
from mcp.client.stdio import stdio_client
# 导入OpenAI SDK
from openai import OpenAI
# 定义天气MCP客户端类
class WeatherMCPClient:
"""天气查询MCP客户端类"""
# 初始化方法
def __init__(self):
"""初始化客户端"""
# 客户端会话对象
self.session = None
# stdio流上下文
self._stream_context = None
# 会话上下文
self._session_context = None
# 可用工具列表
self.available_tools = []
# 初始化OpenAI客户端
self.openai_client = OpenAI(
base_url="https://api.deepseek.com",
api_key="sk-129a4b9a8eeb48d3a7648dc81fca0de2",
)
# 指定AI模型
self.ai_model = "deepseek-chat"
# 异步方法:连接到MCP服务端
async def connect_to_server(self) -> None:
"""连接到MCP服务端"""
try:
# 配置服务端连接参数
server_params = StdioServerParameters(
command="python", args=["stdio_server.py"], env={}
)
# 建立stdio连接
self._stream_context = stdio_client(server_params)
stream = await self._stream_context.__aenter__()
# 创建客户端会话
self._session_context = ClientSession(*stream)
self.session = await self._session_context.__aenter__()
# 初始化会话
await self.session.initialize()
print("MCP客户端初始化成功!")
# 获取可用工具列表
await self._load_available_tools()
# 捕获异常并输出错误信息
except Exception as error:
print(f"连接服务端失败: {error}")
raise
# 异步方法:加载服务端提供的工具列表
async def _load_available_tools(self) -> None:
"""加载服务端提供的工具列表"""
try:
# 获取工具列表响应
tools_response = await self.session.list_tools()
# 保存工具列表
self.available_tools = tools_response.tools
print(f"发现 {len(self.available_tools)} 个可用工具:")
# 遍历并打印每个工具的信息
for tool in self.available_tools:
print(f" - {tool.name}: {tool.description}")
# 捕获异常并输出错误信息
except Exception as error:
print(f"加载工具列表失败: {error}")
# 异步方法:处理用户请求,调用AI和MCP工具
async def process_request(self, user_input: str) -> str:
"""处理用户请求,使用OpenAI进行智能分析"""
# 如果未连接到服务端,返回提示
if not self.session:
return "客户端未连接到服务端"
try:
# 构建用户消息
conversation_messages = [{"role": "user", "content": user_input}]
# 获取可用工具列表
tools_response = await self.session.list_tools()
tool_definitions = []
# 构建工具定义列表
for tool in tools_response.tools:
tool_definitions.append(
{
"type": "function",
"name": tool.name,
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
},
}
)
# 第一次调用OpenAI API,获取AI回复
first_response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=conversation_messages,
max_tokens=512,
tools=tool_definitions,
)
# 用于存储最终回复内容
response_texts = []
# 获取AI的第一个回复选项
first_choice = first_response.choices[0]
# 如果AI要求调用工具
if first_choice.finish_reason == "tool_calls":
# 获取第一个工具调用
first_tool_call = first_choice.message.tool_calls[0]
# 工具名称
tool_name = first_tool_call.function.name
# 工具参数
tool_arguments = json.loads(first_tool_call.function.arguments)
# 添加AI的回复到消息历史
conversation_messages.append(
{
"role": "assistant",
"content": None,
"tool_calls": [first_tool_call],
}
)
# 调用MCP工具
tool_result = await self.session.call_tool(tool_name, tool_arguments)
# 添加工具调用结果到消息历史
conversation_messages.append(
{
"role": "tool",
"content": tool_result.content[0].text,
"tool_call_id": first_tool_call.id,
}
)
# 再次调用OpenAI获取最终回复
final_response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=conversation_messages,
max_tokens=512,
)
# 记录工具调用及最终AI回复
response_texts.append(f"调用工具 {tool_name} 参数: {tool_arguments}")
response_texts.append(final_response.choices[0].message.content)
# 如果AI直接给出最终回复
elif first_choice.finish_reason == "stop":
response_texts.append(first_choice.message.content)
# 返回拼接后的回复内容
return "\n".join(response_texts)
# 捕获异常并返回错误信息
except Exception as error:
return f"处理请求时发生错误: {error}"
# 异步方法:清理资源
async def cleanup(self) -> None:
"""清理资源"""
try:
# 关闭会话上下文
if self._session_context:
await self._session_context.__aexit__(None, None, None)
# 关闭流上下文
if self._stream_context:
await self._stream_context.__aexit__(None, None, None)
print("资源清理完成")
# 捕获异常并输出错误信息
except Exception as error:
print(f"清理资源时发生错误: {error}")
# 异步方法:智能聊天循环
async def chat_loop(self) -> None:
"""智能聊天循环模式"""
print("MCP客户端已启动!")
print("请输入您的内容,输入 'quit' 退出程序")
print("==================================================")
# 循环读取用户输入
while True:
try:
# 获取用户输入
user_message = input("\n您的内容: ").strip()
# 如果输入quit则退出
if user_message == "quit":
print("感谢使用,再见!")
break
# 如果输入为空,提示重新输入
if not user_message:
print("请输入有效内容")
continue
# 处理用户请求并输出AI回复
ai_response = await self.process_request(user_message)
print("\n" + ai_response)
# 捕获Ctrl+C中断
except KeyboardInterrupt:
print("\n\n程序被用户中断,正在退出...")
break
# 捕获其他异常
except Exception as error:
print(f"发生未预期的错误: {error}")
# 定义主函数
async def main():
"""主函数"""
# 创建MCP客户端实例
mcp_client = WeatherMCPClient()
try:
# 连接服务端
await mcp_client.connect_to_server()
# 启动智能聊天模式
await mcp_client.chat_loop()
# 捕获主流程异常
except Exception as error:
print(f"程序运行失败: {error}")
finally:
# 清理资源
await mcp_client.cleanup()
# 判断是否为主程序入口
if __name__ == "__main__":
# 运行主程序
asyncio.run(main())5.SSE #
5.1 服务器 #
# 导入json模块,用于处理JSON数据
import json
# 导入httpx模块,用于异步HTTP请求
import httpx
# 导入sys模块,用于标准错误输出
import sys
# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP
# 创建FastMCP服务器实例,名称为"weather-forecast-server"
server = FastMCP("weather-forecast-server")
# 定义高德地图API的基础URL
API_BASE_URL = "https://restapi.amap.com"
# 定义高德地图API的密钥
API_KEY = "7592015e86c61bdae7d5b04b8fb5a00f"
# 注册名为"get_weather_forecast"的工具到FastMCP服务器
@server.tool("get_weather_forecast")
# 定义异步函数,根据地址查询天气预报信息
async def get_weather_forecast(location: str) -> str:
"""
根据地址查询天气预报信息
Args:
location: 需要查询天气的地址或城市名称
Returns:
天气信息的JSON字符串
"""
# 创建异步HTTP客户端
async with httpx.AsyncClient() as http_client:
try:
# 第一步:地理编码,获取城市代码
geo_response = await http_client.get(
f"{API_BASE_URL}/v3/geocode/geo",
params={"key": API_KEY, "address": location},
)
# 将查询地址和返回内容写入标准错误输出,便于调试
sys.stderr.write(location + geo_response.text)
# 如果地理编码请求失败,返回错误信息
if geo_response.status_code != 200:
return f"地理编码请求失败: {geo_response.status_code} - {geo_response.text}"
# 解析地理编码返回的JSON数据
geo_data = json.loads(geo_response.text)
# 如果API返回状态不是"1",说明有错误,返回错误信息
if geo_data["status"] != "1":
return f"地理编码API错误: {geo_data['info']}"
# 提取城市代码(adcode)
city_code = geo_data["geocodes"][0]["adcode"]
# 第二步:根据城市代码获取天气信息
weather_response = await http_client.get(
f"{API_BASE_URL}/v3/weather/weatherInfo",
params={"key": API_KEY, "city": city_code},
)
# 如果天气查询请求失败,返回错误信息
if weather_response.status_code != 200:
return f"天气查询请求失败: {weather_response.status_code} - {weather_response.text}"
# 解析天气查询返回的JSON数据
weather_data = json.loads(weather_response.text)
# 如果API返回状态不是"1",说明有错误,返回错误信息
if weather_data["status"] != "1":
return f"天气查询API错误: {weather_data['info']}"
# 返回天气数据的原始JSON字符串
return weather_response.text
# 捕获异常并返回异常信息
except Exception as error:
return f"查询天气时发生异常: {str(error)}"
# 判断当前模块是否为主程序入口
if __name__ == "__main__":
# 启动FastMCP服务器
server.run(transport="streamable-http")
5.2 客户端 #
# 导入json模块,用于处理JSON数据
import json
# 导入asyncio模块,用于异步编程
import asyncio
# 从mcp模块导入ClientSession和StdioServerParameters
from mcp import ClientSession, StdioServerParameters
# 从mcp.client.stdio导入stdio_client,用于建立stdio连接
from mcp.client.sse import sse_client
# 导入OpenAI SDK
from openai import OpenAI
# 定义天气MCP客户端类
class WeatherMCPClient:
"""天气查询MCP客户端类"""
# 初始化方法
def __init__(self):
"""初始化客户端"""
# 客户端会话对象
self.session = None
# stdio流上下文
self._stream_context = None
# 会话上下文
self._session_context = None
# 可用工具列表
self.available_tools = []
# 初始化OpenAI客户端
self.openai_client = OpenAI(
base_url="https://api.deepseek.com",
api_key="sk-129a4b9a8eeb48d3a7648dc81fca0de2",
)
# 指定AI模型
self.ai_model = "deepseek-chat"
# 异步方法:连接到MCP服务端
async def connect_to_server(self, server_url: str) -> None:
"""连接到MCP服务端"""
try:
# 建立stdio连接
self._stream_context = sse_client(server_url)
stream = await self._stream_context.__aenter__()
# 创建客户端会话
self._session_context = ClientSession(*stream)
self.session = await self._session_context.__aenter__()
# 初始化会话
await self.session.initialize()
print("MCP客户端初始化成功!")
# 获取可用工具列表
await self._load_available_tools()
# 捕获异常并输出错误信息
except Exception as error:
print(f"连接服务端失败: {error}")
raise
# 异步方法:加载服务端提供的工具列表
async def _load_available_tools(self) -> None:
"""加载服务端提供的工具列表"""
try:
# 获取工具列表响应
tools_response = await self.session.list_tools()
# 保存工具列表
self.available_tools = tools_response.tools
print(f"发现 {len(self.available_tools)} 个可用工具:")
# 遍历并打印每个工具的信息
for tool in self.available_tools:
print(f" - {tool.name}: {tool.description}")
# 捕获异常并输出错误信息
except Exception as error:
print(f"加载工具列表失败: {error}")
# 异步方法:处理用户请求,调用AI和MCP工具
async def process_request(self, user_input: str) -> str:
"""处理用户请求,使用OpenAI进行智能分析"""
# 如果未连接到服务端,返回提示
if not self.session:
return "客户端未连接到服务端"
try:
# 构建用户消息
conversation_messages = [{"role": "user", "content": user_input}]
# 获取可用工具列表
tools_response = await self.session.list_tools()
tool_definitions = []
# 构建工具定义列表
for tool in tools_response.tools:
tool_definitions.append(
{
"type": "function",
"name": tool.name,
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
},
}
)
# 第一次调用OpenAI API,获取AI回复
first_response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=conversation_messages,
max_tokens=512,
tools=tool_definitions,
)
# 用于存储最终回复内容
response_texts = []
# 获取AI的第一个回复选项
first_choice = first_response.choices[0]
# 如果AI要求调用工具
if first_choice.finish_reason == "tool_calls":
# 获取第一个工具调用
first_tool_call = first_choice.message.tool_calls[0]
# 工具名称
tool_name = first_tool_call.function.name
# 工具参数
tool_arguments = json.loads(first_tool_call.function.arguments)
# 添加AI的回复到消息历史
conversation_messages.append(
{
"role": "assistant",
"content": None,
"tool_calls": [first_tool_call],
}
)
# 调用MCP工具
tool_result = await self.session.call_tool(tool_name, tool_arguments)
# 添加工具调用结果到消息历史
conversation_messages.append(
{
"role": "tool",
"content": tool_result.content[0].text,
"tool_call_id": first_tool_call.id,
}
)
# 再次调用OpenAI获取最终回复
final_response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=conversation_messages,
max_tokens=512,
)
# 记录工具调用及最终AI回复
response_texts.append(f"调用工具 {tool_name} 参数: {tool_arguments}")
response_texts.append(final_response.choices[0].message.content)
# 如果AI直接给出最终回复
elif first_choice.finish_reason == "stop":
response_texts.append(first_choice.message.content)
# 返回拼接后的回复内容
return "\n".join(response_texts)
# 捕获异常并返回错误信息
except Exception as error:
return f"处理请求时发生错误: {error}"
# 异步方法:清理资源
async def cleanup(self) -> None:
"""清理资源"""
try:
# 关闭会话上下文
if self._session_context:
await self._session_context.__aexit__(None, None, None)
# 关闭流上下文
if self._stream_context:
await self._stream_context.__aexit__(None, None, None)
print("资源清理完成")
# 捕获异常并输出错误信息
except Exception as error:
print(f"清理资源时发生错误: {error}")
# 异步方法:智能聊天循环
async def chat_loop(self) -> None:
"""智能聊天循环模式"""
print("MCP客户端已启动!")
print("请输入您的内容,输入 'quit' 退出程序")
print("==================================================")
# 循环读取用户输入
while True:
try:
# 获取用户输入
user_message = input("\n您的内容: ").strip()
# 如果输入quit则退出
if user_message == "quit":
print("感谢使用,再见!")
break
# 如果输入为空,提示重新输入
if not user_message:
print("请输入有效内容")
continue
# 处理用户请求并输出AI回复
ai_response = await self.process_request(user_message)
print("\n" + ai_response)
# 捕获Ctrl+C中断
except KeyboardInterrupt:
print("\n\n程序被用户中断,正在退出...")
break
# 捕获其他异常
except Exception as error:
print(f"发生未预期的错误: {error}")
# 定义主函数
async def main():
"""主函数"""
# 创建MCP客户端实例
mcp_client = WeatherMCPClient()
try:
# 连接服务端
await mcp_client.connect_to_server(server_url="http://127.0.0.1:8000/sse")
# 启动智能聊天模式
await mcp_client.chat_loop()
# 捕获主流程异常
except Exception as error:
print(f"程序运行失败: {error}")
finally:
# 清理资源
await mcp_client.cleanup()
# 判断是否为主程序入口
if __name__ == "__main__":
# 运行主程序
asyncio.run(main())
6.Streamable #
6.1 服务器 #
# 导入json模块,用于处理JSON数据
import json
# 导入httpx模块,用于异步HTTP请求
import httpx
# 导入sys模块,用于标准错误输出
import sys
# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP
# 创建FastMCP服务器实例,名称为"weather-forecast-server"
server = FastMCP("weather-forecast-server")
# 定义高德地图API的基础URL
API_BASE_URL = "https://restapi.amap.com"
# 定义高德地图API的密钥
API_KEY = "7592015e86c61bdae7d5b04b8fb5a00f"
# 注册名为"get_weather_forecast"的工具到FastMCP服务器
@server.tool("get_weather_forecast")
# 定义异步函数,根据地址查询天气预报信息
async def get_weather_forecast(location: str) -> str:
"""
根据地址查询天气预报信息
Args:
location: 需要查询天气的地址或城市名称
Returns:
天气信息的JSON字符串
"""
# 创建异步HTTP客户端
async with httpx.AsyncClient() as http_client:
try:
# 第一步:地理编码,获取城市代码
geo_response = await http_client.get(
f"{API_BASE_URL}/v3/geocode/geo",
params={"key": API_KEY, "address": location},
)
# 将查询地址和返回内容写入标准错误输出,便于调试
sys.stderr.write(location + geo_response.text)
# 如果地理编码请求失败,返回错误信息
if geo_response.status_code != 200:
return f"地理编码请求失败: {geo_response.status_code} - {geo_response.text}"
# 解析地理编码返回的JSON数据
geo_data = json.loads(geo_response.text)
# 如果API返回状态不是"1",说明有错误,返回错误信息
if geo_data["status"] != "1":
return f"地理编码API错误: {geo_data['info']}"
# 提取城市代码(adcode)
city_code = geo_data["geocodes"][0]["adcode"]
# 第二步:根据城市代码获取天气信息
weather_response = await http_client.get(
f"{API_BASE_URL}/v3/weather/weatherInfo",
params={"key": API_KEY, "city": city_code},
)
# 如果天气查询请求失败,返回错误信息
if weather_response.status_code != 200:
return f"天气查询请求失败: {weather_response.status_code} - {weather_response.text}"
# 解析天气查询返回的JSON数据
weather_data = json.loads(weather_response.text)
# 如果API返回状态不是"1",说明有错误,返回错误信息
if weather_data["status"] != "1":
return f"天气查询API错误: {weather_data['info']}"
# 返回天气数据的原始JSON字符串
return weather_response.text
# 捕获异常并返回异常信息
except Exception as error:
return f"查询天气时发生异常: {str(error)}"
# 判断当前模块是否为主程序入口
if __name__ == "__main__":
# 启动FastMCP服务器
server.run(transport="streamable-http")
6.2 客户端 #
# 导入json模块,用于处理JSON数据
import json
# 导入asyncio模块,用于异步编程
import asyncio
# 从mcp模块导入ClientSession
from mcp import ClientSession
# 从mcp.client.stdio导入stdio_client,用于建立stdio连接
from mcp.client.streamable_http import streamablehttp_client
# 导入OpenAI SDK
from openai import OpenAI
# 定义天气MCP客户端类
class WeatherMCPClient:
"""天气查询MCP客户端类"""
# 初始化方法
def __init__(self):
"""初始化客户端"""
# 客户端会话对象
self.session = None
# stdio流上下文
self._stream_context = None
# 会话上下文
self._session_context = None
# 可用工具列表
self.available_tools = []
# 初始化OpenAI客户端
self.openai_client = OpenAI(
base_url="https://api.deepseek.com",
api_key="sk-129a4b9a8eeb48d3a7648dc81fca0de2",
)
# 指定AI模型
self.ai_model = "deepseek-chat"
# 异步方法:连接到MCP服务端
async def connect_to_server(self, server_url: str) -> None:
"""连接到MCP服务端"""
try:
# 建立stdio连接
self._stream_context = streamablehttp_client(server_url)
stream = await self._stream_context.__aenter__()
# 创建客户端会话
self._session_context = ClientSession(stream[0], stream[1])
self.session = await self._session_context.__aenter__()
# 初始化会话
await self.session.initialize()
print("MCP客户端初始化成功!")
# 获取可用工具列表
await self._load_available_tools()
# 捕获异常并输出错误信息
except Exception as error:
print(f"连接服务端失败: {error}")
raise
# 异步方法:加载服务端提供的工具列表
async def _load_available_tools(self) -> None:
"""加载服务端提供的工具列表"""
try:
# 获取工具列表响应
tools_response = await self.session.list_tools()
# 保存工具列表
self.available_tools = tools_response.tools
print(f"发现 {len(self.available_tools)} 个可用工具:")
# 遍历并打印每个工具的信息
for tool in self.available_tools:
print(f" - {tool.name}: {tool.description}")
# 捕获异常并输出错误信息
except Exception as error:
print(f"加载工具列表失败: {error}")
# 异步方法:处理用户请求,调用AI和MCP工具
async def process_request(self, user_input: str) -> str:
"""处理用户请求,使用OpenAI进行智能分析"""
# 如果未连接到服务端,返回提示
if not self.session:
return "客户端未连接到服务端"
try:
# 构建用户消息
conversation_messages = [{"role": "user", "content": user_input}]
# 获取可用工具列表
tools_response = await self.session.list_tools()
tool_definitions = []
# 构建工具定义列表
for tool in tools_response.tools:
tool_definitions.append(
{
"type": "function",
"name": tool.name,
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
},
}
)
# 第一次调用OpenAI API,获取AI回复
first_response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=conversation_messages,
max_tokens=512,
tools=tool_definitions,
)
# 用于存储最终回复内容
response_texts = []
# 获取AI的第一个回复选项
first_choice = first_response.choices[0]
# 如果AI要求调用工具
if first_choice.finish_reason == "tool_calls":
# 获取第一个工具调用
first_tool_call = first_choice.message.tool_calls[0]
# 工具名称
tool_name = first_tool_call.function.name
# 工具参数
tool_arguments = json.loads(first_tool_call.function.arguments)
# 添加AI的回复到消息历史
conversation_messages.append(
{
"role": "assistant",
"content": None,
"tool_calls": [first_tool_call],
}
)
# 调用MCP工具
tool_result = await self.session.call_tool(tool_name, tool_arguments)
# 添加工具调用结果到消息历史
conversation_messages.append(
{
"role": "tool",
"content": tool_result.content[0].text,
"tool_call_id": first_tool_call.id,
}
)
# 再次调用OpenAI获取最终回复
final_response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=conversation_messages,
max_tokens=512,
)
# 记录工具调用及最终AI回复
response_texts.append(f"调用工具 {tool_name} 参数: {tool_arguments}")
response_texts.append(final_response.choices[0].message.content)
# 如果AI直接给出最终回复
elif first_choice.finish_reason == "stop":
response_texts.append(first_choice.message.content)
# 返回拼接后的回复内容
return "\n".join(response_texts)
# 捕获异常并返回错误信息
except Exception as error:
return f"处理请求时发生错误: {error}"
# 异步方法:清理资源
async def cleanup(self) -> None:
"""清理资源"""
try:
# 关闭会话上下文
if self._session_context:
await self._session_context.__aexit__(None, None, None)
# 关闭流上下文
if self._stream_context:
await self._stream_context.__aexit__(None, None, None)
print("资源清理完成")
# 捕获异常并输出错误信息
except Exception as error:
print(f"清理资源时发生错误: {error}")
# 异步方法:智能聊天循环
async def chat_loop(self) -> None:
"""智能聊天循环模式"""
print("MCP客户端已启动!")
print("请输入您的内容,输入 'quit' 退出程序")
print("==================================================")
# 循环读取用户输入
while True:
try:
# 获取用户输入
user_message = input("\n您的内容: ").strip()
# 如果输入quit则退出
if user_message == "quit":
print("感谢使用,再见!")
break
# 如果输入为空,提示重新输入
if not user_message:
print("请输入有效内容")
continue
# 处理用户请求并输出AI回复
ai_response = await self.process_request(user_message)
print("\n" + ai_response)
# 捕获Ctrl+C中断
except KeyboardInterrupt:
print("\n\n程序被用户中断,正在退出...")
break
# 捕获其他异常
except Exception as error:
print(f"发生未预期的错误: {error}")
# 定义主函数
async def main():
"""主函数"""
# 创建MCP客户端实例
mcp_client = WeatherMCPClient()
try:
# 连接服务端
await mcp_client.connect_to_server(
server_url="http://127.0.0.1:8000/mcp?key=7592015e86c61bdae7d5b04b8fb5a00f"
)
# 启动智能聊天模式
await mcp_client.chat_loop()
# 捕获主流程异常
except Exception as error:
print(f"程序运行失败: {error}")
finally:
# 清理资源
await mcp_client.cleanup()
# 判断是否为主程序入口
if __name__ == "__main__":
# 运行主程序
asyncio.run(main())
7.发布 #
7.1 账号申请 #
- pypi.org
- 注册
- 登录 hongqishiq @
- 生成API Token
7.2 安装 #
pip install twine7.3 打包 #
uv build7.4 发布 #
python -m twine upload dist/*
pypi-AgEIcHlwaS5vcmcCJDNmZDJlNWFkLWYyNzYtNGU1Zi1hYTcxLTg0OTIyNDhlYWUxNAACKlszLCIyYThjZTYwNy03YWY4LTQwYzYtYmEyMS1hMTE1NmEzZWE4ODciXQAABiCw6syPpy96mhfrSKDUmB5NVwkWdpLJEFeVHePoZKZopg7.5 安装 #
pip install mcp_rs_publish
pip install mcp_rs_publish --upgrade7.6 启动 #
mcp_rs_publish7.7 代码 #
7.7.1 pyproject.toml #
pyproject.toml
[project]
name = "mcp_rs_publish"
version = "0.1.7"
description = "mcp_rs_publish"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"mcp[cli]>=1.13.1",
]
[project.scripts]
mcp_rs_publish = "mcp_rs_publish.command:main"7.7.2 mcp_rs_publish__init__.py #
mcp_rs_publish__init__.py
7.7.3 command.py #
mcp_rs_publish\command.py
# 从mcp.server.fastmcp模块导入FastMCP类
from mcp.server.fastmcp import FastMCP
# 创建一个MCP服务器实例,名称为"Demo"
mcp = FastMCP("Demo")
# 添加一个加法工具
@mcp.tool()
# 定义加法函数,接收两个整数参数a和b,返回它们的和
def add(a: int, b: int) -> int:
"""Add two numbers"""
# 返回a和b的和
return a + b
# 添加一个动态问候资源
@mcp.resource("greeting://{name}")
# 定义获取问候语的函数,接收一个字符串参数name,返回个性化问候语
def get_greeting(name: str) -> str:
"""Get a personalized greeting"""
# 返回格式化后的问候语
return f"Hello, {name}!"
# 添加一个提示(prompt)工具
@mcp.prompt()
# 定义问候用户的函数,接收name和style参数,style有默认值"friendly"
def greet_user(name: str, style: str = "friendly") -> str:
"""Generate a greeting prompt"""
# 定义不同风格的问候语模板
styles = {
"friendly": "Please write a warm, friendly greeting",
"formal": "Please write a formal, professional greeting",
"casual": "Please write a casual, relaxed greeting",
}
# 根据style选择对应的模板,默认使用"friendly",并返回格式化后的提示语
return f"{styles.get(style, styles['friendly'])} for someone named {name}."
# 定义主函数
def main():
# 以SSE(Server-Sent Events)方式运行MCP服务器
mcp.run(transport="sse")
# 如果当前模块是主程序入口,则执行main函数
if __name__ == "__main__":
main()
7.7.4 配置MCP #
7.7.4.1 SSE #

7.7.4.2 STDIO #

8. email #
8.1. config.py #
config.py
# 配置文件
# 请根据您的实际情况修改以下配置
# 邮件SMTP配置
EMAIL_CONFIG = {
"smtp_server": "smtp.qq.com", # QQ邮箱SMTP服务器
"smtp_port": 465, # SSL端口
"sender_email": "1959583119@qq.com", # 发件人邮箱
"sender_password": "vsppezhmjihyciai", # 发件人应用密码(不是登录密码)
"use_ssl": True, # 使用SSL连接
"use_tls": False, # 不使用TLS
}
# 高德地图API配置
AMAP_CONFIG = {
"api_key": "7592015e86c61bdae7d5b04b8fb5a00f", # 高德地图API密钥
"base_url": "https://restapi.amap.com", # API基础URL
}
# OpenAI配置
OPENAI_CONFIG = {
"base_url": "https://api.deepseek.com", # API基础URL
"api_key": "sk-129a4b9a8eeb48d3a7648dc81fca0de2", # API密钥
"model": "deepseek-chat", # 模型名称
}
# 定时任务配置
SCHEDULE_CONFIG = {
"check_interval": 60, # 定时任务检查间隔(秒)
"max_tasks": 100, # 最大定时任务数量
}
# 日志配置
LOG_CONFIG = {
"level": "INFO", # 日志级别
"format": "%(asctime)s - %(levelname)s - %(message)s",
}
8.2. weather_client.py #
weather_client.py
# 导入json模块,用于处理JSON数据
import json
# 导入asyncio模块,用于异步编程
import asyncio
# 从mcp模块导入ClientSession和StdioServerParameters
from mcp import ClientSession, StdioServerParameters
# 从mcp.client.stdio导入stdio_client,用于建立stdio连接
from mcp.client.stdio import stdio_client
# 导入OpenAI SDK
from openai import OpenAI
# 导入smtplib用于发送邮件
+import smtplib
# 导入MIMEText用于构建邮件正文
+from email.mime.text import MIMEText
# 导入MIMEMultipart用于构建多部分邮件
+from email.mime.multipart import MIMEMultipart
# 导入MIMEBase用于添加附件
+from email.mime.base import MIMEBase
# 导入encoders用于对附件进行编码
+from email import encoders
# 导入schedule用于定时任务
+import schedule
# 导入time用于时间相关操作
+import time
# 导入datetime用于获取当前时间
+from datetime import datetime
# 导入threading用于多线程
+import threading
# 导入re用于正则表达式
+import re
# 定义天气MCP客户端类
class WeatherMCPClient:
"""天气查询MCP客户端类"""
# 初始化方法
def __init__(self):
"""初始化客户端"""
# 客户端会话对象
self.session = None
# stdio流上下文
self._stream_context = None
# 会话上下文
self._session_context = None
# 可用工具列表
self.available_tools = []
# 尝试导入配置文件
+ try:
# 从config.py导入OPENAI_CONFIG和EMAIL_CONFIG
+ from config import OPENAI_CONFIG, EMAIL_CONFIG
# 初始化OpenAI客户端
+ self.openai_client = OpenAI(
+ base_url=OPENAI_CONFIG["base_url"],
+ api_key=OPENAI_CONFIG["api_key"],
+ )
# 指定AI模型
+ self.ai_model = OPENAI_CONFIG["model"]
# 邮件配置
+ self.email_config = EMAIL_CONFIG.copy()
# 如果邮件配置中没有use_ssl字段,默认True
+ if "use_ssl" not in self.email_config:
+ self.email_config["use_ssl"] = True
# 如果邮件配置中没有use_tls字段,默认False
+ if "use_tls" not in self.email_config:
+ self.email_config["use_tls"] = False
# 如果导入失败,使用默认配置
+ except ImportError:
# 打印警告信息
+ print("警告: 未找到config.py文件,使用默认配置")
# 初始化OpenAI客户端,使用默认base_url和api_key
+ self.openai_client = OpenAI(
+ base_url="https://api.deepseek.com",
+ api_key="your_openai_api_key",
+ )
# 指定默认AI模型
+ self.ai_model = "deepseek-chat"
# 默认邮件配置
+ self.email_config = {
+ "smtp_server": "smtp.qq.com",
+ "smtp_port": 465,
+ "sender_email": "your_email@qq.com",
+ "sender_password": "your_app_password",
+ "use_ssl": True,
+ "use_tls": False,
+ }
# 定时任务存储字典
+ self.scheduled_tasks = {}
# 初始化markdown转换器
+ self._init_markdown_converter()
# 初始化markdown转换器
+ def _init_markdown_converter(self):
+ """初始化markdown转换器"""
# 定义markdown到HTML的转换规则
+ self.markdown_rules = [
# 标题转换
+ (r"^### (.*?)$", r"<h3>\1</h3>", re.MULTILINE),
+ (r"^## (.*?)$", r"<h2>\1</h2>", re.MULTILINE),
+ (r"^# (.*?)$", r"<h1>\1</h1>", re.MULTILINE),
# 粗体和斜体
+ (r"\*\*(.*?)\*\*", r"<strong>\1</strong>"),
+ (r"\*(.*?)\*", r"<em>\1</em>"),
# 代码块
+ (r"```(.*?)```", r"<pre><code>\1</code></pre>", re.DOTALL),
+ (r"`(.*?)`", r"<code>\1</code>"),
# 链接
+ (r"$$([^$$]+)\]$([^)]+)$", r'<a href="\2">\1</a>'),
# 列表
+ (r"^\* (.*?)$", r"<li>\1</li>", re.MULTILINE),
+ (r"^- (.*?)$", r"<li>\1</li>", re.MULTILINE),
+ (r"^\d+\. (.*?)$", r"<li>\1</li>", re.MULTILINE),
# 段落
+ (r"\n\n", r"</p><p>"),
# 换行
+ (r"\n", r"<br>"),
+ ]
# 将markdown文本转换为HTML
+ def _convert_markdown_to_html(self, markdown_text):
+ """将markdown文本转换为HTML"""
# 如果markdown文本为空,返回空字符串
+ if not markdown_text:
+ return ""
# 赋值原始markdown文本
+ html_text = markdown_text
# 应用所有转换规则
+ for pattern, replacement, *flags in self.markdown_rules:
+ if flags:
+ html_text = re.sub(pattern, replacement, html_text, flags=flags[0])
+ else:
+ html_text = re.sub(pattern, replacement, html_text)
# 包装在完整HTML结构中
+ html_content = f"""
+ <!DOCTYPE html>
+ <html>
+ <head>
+ <meta charset="utf-8">
+ <meta name="viewport" content="width=device-width, initial-scale=1.0">
+ <title>天气报告</title>
+ <style>
+ body {{
+ font-family: 'Microsoft YaHei', Arial, sans-serif;
+ line-height: 1.6;
+ color: #333;
+ max-width: 800px;
+ margin: 0 auto;
+ padding: 20px;
+ background-color: #f5f5f5;
+ }}
+ .container {{
+ background-color: white;
+ padding: 30px;
+ border-radius: 10px;
+ box-shadow: 0 2px 10px rgba(0,0,0,0.1);
+ }}
+ h1, h2, h3 {{
+ color: #2c3e50;
+ border-bottom: 2px solid #3498db;
+ padding-bottom: 10px;
+ }}
+ h1 {{ font-size: 24px; }}
+ h2 {{ font-size: 20px; }}
+ h3 {{ font-size: 18px; }}
+ p {{
+ margin: 15px 0;
+ text-align: justify;
+ }}
+ code {{
+ background-color: #f8f9fa;
+ padding: 2px 6px;
+ border-radius: 4px;
+ font-family: 'Courier New', monospace;
+ color: #e83e8c;
+ }}
+ pre {{
+ background-color: #f8f9fa;
+ padding: 15px;
+ border-radius: 5px;
+ overflow-x: auto;
+ border-left: 4px solid #3498db;
+ }}
+ li {{
+ margin: 8px 0;
+ }}
+ a {{
+ color: #3498db;
+ text-decoration: none;
+ }}
+ a:hover {{
+ text-decoration: underline;
+ }}
+ .weather-info {{
+ background-color: #ecf0f1;
+ padding: 20px;
+ border-radius: 8px;
+ margin: 20px 0;
+ border-left: 5px solid #3498db;
+ }}
+ .footer {{
+ margin-top: 30px;
+ padding-top: 20px;
+ border-top: 1px solid #ecf0f1;
+ text-align: center;
+ color: #7f8c8d;
+ }}
+ </style>
+ </head>
+ <body>
+ <div class="container">
+ <div class="weather-info">
+ {html_text}
+ </div>
+ <div class="footer">
+ <p>此邮件由天气服务系统自动生成</p>
+ <p>发送时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
+ </div>
+ </div>
+ </body>
+ </html>
+ """
# 返回HTML内容
+ return html_content
# 异步方法:连接到MCP服务端
async def connect_to_server(self) -> None:
"""连接到MCP服务端"""
try:
# 配置服务端连接参数
server_params = StdioServerParameters(
+ command="python", args=["weather_server.py"], env={}
)
# 建立stdio连接
self._stream_context = stdio_client(server_params)
# 进入异步上下文,获取流
stream = await self._stream_context.__aenter__()
# 创建客户端会话
self._session_context = ClientSession(*stream)
# 进入异步上下文,获取session
self.session = await self._session_context.__aenter__()
# 初始化会话
await self.session.initialize()
print("MCP客户端初始化成功!")
# 获取可用工具列表
await self._load_available_tools()
# 捕获异常并输出错误信息
except Exception as error:
print(f"连接服务端失败: {error}")
raise
# 异步方法:加载服务端提供的工具列表
async def _load_available_tools(self) -> None:
"""加载服务端提供的工具列表"""
try:
# 获取工具列表响应
tools_response = await self.session.list_tools()
# 保存工具列表
self.available_tools = tools_response.tools
print(f"发现 {len(self.available_tools)} 个可用工具:")
# 遍历并打印每个工具的信息
for tool in self.available_tools:
print(f" - {tool.name}: {tool.description}")
# 捕获异常并输出错误信息
except Exception as error:
print(f"加载工具列表失败: {error}")
# 异步方法:处理用户请求,调用AI和MCP工具
async def process_request(self, user_input: str) -> str:
"""处理用户请求,使用OpenAI进行智能分析"""
# 如果未连接到服务端,返回提示
if not self.session:
return "客户端未连接到服务端"
try:
# 构建用户消息
conversation_messages = [{"role": "user", "content": user_input}]
# 获取可用工具列表
tools_response = await self.session.list_tools()
tool_definitions = []
# 构建工具定义列表
for tool in tools_response.tools:
tool_definitions.append(
{
"type": "function",
"name": tool.name,
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
},
}
)
# 第一次调用OpenAI API,获取AI回复
first_response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=conversation_messages,
max_tokens=512,
tools=tool_definitions,
)
# 用于存储最终回复内容
response_texts = []
# 获取AI的第一个回复选项
first_choice = first_response.choices[0]
# 如果AI要求调用工具
if first_choice.finish_reason == "tool_calls":
# 获取第一个工具调用
first_tool_call = first_choice.message.tool_calls[0]
# 工具名称
tool_name = first_tool_call.function.name
# 工具参数
tool_arguments = json.loads(first_tool_call.function.arguments)
# 添加AI的回复到消息历史
conversation_messages.append(
{
"role": "assistant",
"content": None,
"tool_calls": [first_tool_call],
}
)
# 调用MCP工具
tool_result = await self.session.call_tool(tool_name, tool_arguments)
# 添加工具调用结果到消息历史
conversation_messages.append(
{
"role": "tool",
"content": tool_result.content[0].text,
"tool_call_id": first_tool_call.id,
}
)
# 再次调用OpenAI获取最终回复
final_response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=conversation_messages,
max_tokens=512,
)
# 记录工具调用及最终AI回复
response_texts.append(f"调用工具 {tool_name} 参数: {tool_arguments}")
response_texts.append(final_response.choices[0].message.content)
# 如果AI直接给出最终回复
elif first_choice.finish_reason == "stop":
response_texts.append(first_choice.message.content)
# 返回拼接后的回复内容
return "\n".join(response_texts)
# 捕获异常并返回错误信息
except Exception as error:
return f"处理请求时发生错误: {error}"
# 异步方法:发送邮件
+ async def send_email(
+ self,
+ recipient_email: str,
+ subject: str,
+ body: str,
+ attachment_path: str = None,
+ is_html: bool = False,
+ ) -> str:
+ """发送邮件功能"""
+ try:
# 打印正在连接SMTP服务器
+ print(
+ f"正在连接SMTP服务器: {self.email_config['smtp_server']}:{self.email_config['smtp_port']}"
+ )
# 创建邮件对象
+ msg = MIMEMultipart()
# 设置发件人
+ msg["From"] = self.email_config["sender_email"]
# 设置收件人
+ msg["To"] = recipient_email
# 设置主题
+ msg["Subject"] = subject
# 添加邮件正文
+ if is_html:
# 如果是HTML格式
+ msg.attach(MIMEText(body, "html", "utf-8"))
+ else:
# 普通文本格式
+ msg.attach(MIMEText(body, "plain", "utf-8"))
# 如果有附件,添加附件
+ if attachment_path:
+ try:
# 打开附件文件
+ with open(attachment_path, "rb") as attachment:
+ part = MIMEBase("application", "octet-stream")
+ part.set_payload(attachment.read())
# 对附件进行base64编码
+ encoders.encode_base64(part)
# 添加附件头
+ part.add_header(
+ "Content-Disposition",
+ f'attachment; filename= {attachment_path.split("/")[-1]}',
+ )
# 添加附件到邮件
+ msg.attach(part)
+ except FileNotFoundError:
# 附件文件未找到
+ return f"附件文件未找到: {attachment_path}"
# 连接SMTP服务器并发送邮件
+ if self.email_config.get("use_ssl", False):
# 如果使用SSL
+ print("正在建立SSL SMTP连接...")
+ import ssl
# 创建SSL上下文
+ context = ssl.create_default_context()
# 建立SSL SMTP连接
+ server_smtp = smtplib.SMTP_SSL(
+ self.email_config["smtp_server"],
+ self.email_config["smtp_port"],
+ context=context,
+ timeout=30,
+ )
+ print(" ✓ SSL连接建立成功")
+ else:
# 标准SMTP连接
+ print("正在建立标准SMTP连接...")
+ server_smtp = smtplib.SMTP(
+ self.email_config["smtp_server"],
+ self.email_config["smtp_port"],
+ timeout=30,
+ )
# 如果需要TLS加密
+ if self.email_config.get("use_tls", False):
+ print("正在启用TLS加密...")
+ server_smtp.starttls()
+ print(" ✓ TLS启用成功")
# 登录邮箱
+ print("正在登录邮箱...")
+ server_smtp.login(
+ self.email_config["sender_email"],
+ self.email_config["sender_password"],
+ )
# 发送邮件
+ print("正在发送邮件...")
+ text = msg.as_string()
+ server_smtp.sendmail(
+ self.email_config["sender_email"], recipient_email, text
+ )
# 关闭SMTP连接
+ print("邮件发送完成,正在关闭连接...")
+ server_smtp.quit()
# 返回成功信息
+ return f"邮件发送成功!收件人: {recipient_email}, 主题: {subject}"
# 邮箱认证失败
+ except smtplib.SMTPAuthenticationError as auth_error:
+ return f"邮箱认证失败: {str(auth_error)}。请检查邮箱和密码是否正确。"
# SMTP连接失败
+ except smtplib.SMTPConnectError as connect_error:
+ return (
+ f"SMTP连接失败: {str(connect_error)}。请检查网络连接和SMTP服务器设置。"
+ )
# 其他SMTP异常
+ except smtplib.SMTPException as smtp_error:
+ return f"SMTP错误: {str(smtp_error)}"
# 其他异常
+ except Exception as error:
+ return f"邮件发送失败: {str(error)}"
# 异步方法:创建定时天气邮件任务
+ async def schedule_weather_email(
+ self,
+ recipient_email: str,
+ location: str,
+ schedule_time: str,
+ frequency: str = "daily",
+ ) -> str:
+ """创建定时发送天气邮件的任务"""
+ try:
# 生成任务ID
+ task_id = f"weather_email_{recipient_email}_{location}_{int(time.time())}"
# 定义定时任务函数
+ def send_weather_email():
+ try:
# 获取天气信息
+ weather_info = asyncio.run(
+ self.process_request(f"查询{location}的天气")
+ )
# 将markdown转换为HTML
+ html_body = self._convert_markdown_to_html(weather_info)
# 构建邮件主题
+ subject = f"天气预报 - {location}"
# 发送HTML邮件
+ asyncio.run(
+ self.send_email(
+ recipient_email, subject, html_body, is_html=True
+ )
+ )
+ except Exception as e:
# 打印定时任务执行失败信息
+ print(f"定时任务执行失败: {e}")
# 根据频率设置定时任务
+ if frequency == "daily":
# 每天定时
+ schedule.every().day.at(schedule_time).do(send_weather_email)
+ elif frequency == "weekly":
# 每周定时
+ schedule.every().week.at(schedule_time).do(send_weather_email)
+ elif frequency == "monthly":
# 每月定时
+ schedule.every().month.at(schedule_time).do(send_weather_email)
+ else:
# 不支持的频率
+ return f"不支持的频率: {frequency}"
# 存储任务信息
+ self.scheduled_tasks[task_id] = {
+ "type": "weather_email",
+ "recipient": recipient_email,
+ "location": location,
+ "time": schedule_time,
+ "frequency": frequency,
+ "created_at": datetime.now().isoformat(),
+ }
# 定义运行定时任务的线程函数
+ def run_schedule():
+ while True:
# 检查并运行待执行的任务
+ schedule.run_pending()
# 每分钟检查一次
+ time.sleep(60)
# 如果定时任务线程未启动,则启动
+ if (
+ not hasattr(self, "_schedule_thread")
+ or not self._schedule_thread.is_alive()
+ ):
+ self._schedule_thread = threading.Thread(
+ target=run_schedule, daemon=True
+ )
+ self._schedule_thread.start()
# 返回任务创建成功信息
+ return f"定时任务创建成功!任务ID: {task_id}, 将在每天 {schedule_time} 发送 {location} 的天气信息到 {recipient_email}"
# 捕获异常并返回错误信息
+ except Exception as error:
+ return f"创建定时任务失败: {str(error)}"
# 异步方法:列出所有定时任务
+ async def list_scheduled_tasks(self) -> str:
+ """列出所有定时任务"""
+ try:
# 导入json模块
+ import json
# 返回定时任务的json字符串
+ return json.dumps(self.scheduled_tasks, ensure_ascii=False, indent=2)
# 捕获异常并返回错误信息
+ except Exception as error:
+ return f"获取定时任务列表失败: {str(error)}"
# 异步方法:取消定时任务
+ async def cancel_scheduled_task(self, task_id: str) -> str:
+ """取消指定的定时任务"""
+ try:
# 如果任务ID存在
+ if task_id in self.scheduled_tasks:
# 清除定时任务
+ schedule.clear(task_id)
# 删除任务记录
+ del self.scheduled_tasks[task_id]
# 返回取消成功信息
+ return f"任务 {task_id} 已成功取消"
+ else:
# 未找到任务ID
+ return f"未找到任务ID: {task_id}"
# 捕获异常并返回错误信息
+ except Exception as error:
+ return f"取消任务失败: {str(error)}"
# 异步方法:发送天气报告邮件
+ async def send_weather_report(
+ self, recipient_email: str, location: str, report_type: str = "current"
+ ) -> str:
+ """发送天气报告邮件"""
+ try:
# 获取天气信息
+ weather_info = await self.process_request(f"查询{location}的天气")
# 将markdown转换为HTML
+ html_body = self._convert_markdown_to_html(weather_info)
# 根据报告类型构建邮件主题
+ if report_type == "current":
+ subject = f"当前天气 - {location}"
+ else: # forecast
+ subject = f"天气预报 - {location}"
# 发送HTML邮件
+ result = await self.send_email(
+ recipient_email, subject, html_body, is_html=True
+ )
# 返回发送结果
+ return result
# 捕获异常并返回错误信息
+ except Exception as error:
+ return f"发送天气报告失败: {str(error)}"
# 异步方法:清理资源
async def cleanup(self) -> None:
"""清理资源"""
try:
# 关闭会话上下文
if self._session_context:
await self._session_context.__aexit__(None, None, None)
# 关闭流上下文
if self._stream_context:
await self._stream_context.__aexit__(None, None, None)
# 打印资源清理完成
print("资源清理完成")
# 捕获异常并输出错误信息
except Exception as error:
print(f"清理资源时发生错误: {error}")
# 异步方法:智能聊天循环
async def chat_loop(self) -> None:
"""智能聊天循环模式"""
# 打印欢迎信息和命令提示
print("MCP客户端已启动!")
print("请输入您的内容,输入 'quit' 退出程序")
print("==================================================")
+ print("新增功能命令:")
+ print(" 发送邮件 收件人邮箱 主题 内容")
+ print(" 定时天气邮件 收件人邮箱 地点 时间(HH:MM) [频率]")
+ print(" 查看定时任务")
+ print(" 取消定时任务 任务ID")
+ print(" 天气报告邮件 收件人邮箱 地点 [类型:current/forecast]")
+ print("==================================================")
# 循环读取用户输入
while True:
try:
# 获取用户输入
user_message = input("\n您的内容: ").strip()
# 如果输入quit则退出
if user_message == "quit":
print("感谢使用,再见!")
break
# 如果输入为空,提示重新输入
if not user_message:
print("请输入有效内容")
continue
# 检查是否是特殊命令
+ if user_message.startswith("发送邮件"):
# 解析邮件命令:发送邮件 收件人 主题 内容
+ parts = user_message.split(" ", 3)
+ if len(parts) >= 4:
+ _, recipient, subject, body = parts
+ result = await self.send_email(recipient, subject, body)
+ print("\n" + result)
+ else:
+ print("\n邮件命令格式:发送邮件 收件人邮箱 主题 内容")
+ elif user_message.startswith("定时天气邮件"):
# 解析定时任务命令:定时天气邮件 收件人 地点 时间 频率
+ parts = user_message.split(" ", 4)
+ if len(parts) >= 4:
+ _, recipient, location, time_str = parts[:4]
+ frequency = parts[4] if len(parts) > 4 else "daily"
+ result = await self.schedule_weather_email(
+ recipient, location, time_str, frequency
+ )
+ print("\n" + result)
+ else:
+ print(
+ "\n定时任务命令格式:定时天气邮件 收件人邮箱 地点 时间(HH:MM) [频率]"
+ )
+ elif user_message.startswith("查看定时任务"):
# 查看所有定时任务
+ result = await self.list_scheduled_tasks()
+ print("\n" + result)
+ elif user_message.startswith("取消定时任务"):
# 解析取消命令:取消定时任务 任务ID
+ parts = user_message.split(" ", 2)
+ if len(parts) >= 2:
+ _, task_id = parts
+ result = await self.cancel_scheduled_task(task_id)
+ print("\n" + result)
+ else:
+ print("\n取消命令格式:取消定时任务 任务ID")
+ elif user_message.startswith("天气报告邮件"):
# 解析报告命令:天气报告邮件 收件人 地点 [类型]
+ parts = user_message.split(" ", 3)
+ if len(parts) >= 3:
+ _, recipient, location = parts[:3]
+ report_type = parts[3] if len(parts) > 3 else "current"
+ result = await self.send_weather_report(
+ recipient, location, report_type
+ )
+ print("\n" + result)
+ else:
+ print(
+ "\n报告命令格式:天气报告邮件 收件人邮箱 地点 [类型:current/forecast]"
+ )
+ else:
# 处理普通用户请求并输出AI回复
+ ai_response = await self.process_request(user_message)
+ print("\n" + ai_response)
# 捕获Ctrl+C中断
except KeyboardInterrupt:
print("\n\n程序被用户中断,正在退出...")
break
# 捕获其他异常
except Exception as error:
print(f"发生未预期的错误: {error}")
# 定义主函数
async def main():
"""主函数"""
# 创建MCP客户端实例
mcp_client = WeatherMCPClient()
try:
# 连接服务端
await mcp_client.connect_to_server()
# 启动智能聊天模式
await mcp_client.chat_loop()
# 捕获主流程异常
except Exception as error:
print(f"程序运行失败: {error}")
finally:
# 清理资源
await mcp_client.cleanup()
# 判断是否为主程序入口
if __name__ == "__main__":
# 运行主程序
asyncio.run(main())
8.3. weather_server.py #
weather_server.py
# 导入json模块,用于处理JSON数据
import json
# 导入httpx模块,用于异步HTTP请求
import httpx
# 导入sys模块,用于标准错误输出
import sys
# 导入smtplib模块,用于发送邮件
+import smtplib
# 导入MIMEText类,用于构建邮件正文
+from email.mime.text import MIMEText
# 导入MIMEMultipart类,用于构建带附件的邮件
+from email.mime.multipart import MIMEMultipart
# 导入MIMEBase类,用于构建邮件附件
+from email.mime.base import MIMEBase
# 导入encoders模块,用于对附件进行编码
+from email import encoders
# 导入异步、定时任务和时间相关模块
+import asyncio
+import schedule
+import time
+from datetime import datetime
+import threading
# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP
# 创建FastMCP服务器实例,名称为"weather-forecast-server"
server = FastMCP("weather-forecast-server")
# 尝试导入配置文件中的EMAIL_CONFIG和AMAP_CONFIG
+try:
+ from config import EMAIL_CONFIG, AMAP_CONFIG
+except ImportError:
# 如果导入失败,使用默认配置并打印警告
+ print("警告: 未找到config.py文件,使用默认配置")
+ EMAIL_CONFIG = {
+ "smtp_server": "smtp.qq.com",
+ "smtp_port": 465,
+ "sender_email": "your_email@qq.com",
+ "sender_password": "your_app_password",
+ "use_ssl": True,
+ "use_tls": False,
+ }
+ AMAP_CONFIG = {
+ "api_key": "your_amap_api_key",
+ "base_url": "https://restapi.amap.com",
+ }
# 定义高德地图API的基础URL
+API_BASE_URL = AMAP_CONFIG["base_url"]
# 定义高德地图API的密钥
+API_KEY = AMAP_CONFIG["api_key"]
# 定义定时任务的全局存储字典
+scheduled_tasks = {}
# 注册名为"get_weather_forecast"的工具到FastMCP服务器
@server.tool("get_weather_forecast")
# 定义异步函数,根据地址查询天气预报信息
async def get_weather_forecast(location: str) -> str:
"""
根据地址查询天气预报信息
Args:
location: 需要查询天气的地址或城市名称
Returns:
天气信息的JSON字符串
"""
# 创建异步HTTP客户端
async with httpx.AsyncClient() as http_client:
try:
# 第一步:地理编码,获取城市代码
geo_response = await http_client.get(
f"{API_BASE_URL}/v3/geocode/geo",
params={"key": API_KEY, "address": location},
)
# 将查询地址和返回内容写入标准错误输出,便于调试
sys.stderr.write(location + geo_response.text)
# 如果地理编码请求失败,返回错误信息
if geo_response.status_code != 200:
return f"地理编码请求失败: {geo_response.status_code} - {geo_response.text}"
# 解析地理编码返回的JSON数据
geo_data = json.loads(geo_response.text)
# 如果API返回状态不是"1",说明有错误,返回错误信息
if geo_data["status"] != "1":
return f"地理编码API错误: {geo_data['info']}"
# 提取城市代码(adcode)
city_code = geo_data["geocodes"][0]["adcode"]
# 第二步:根据城市代码获取天气信息
weather_response = await http_client.get(
f"{API_BASE_URL}/v3/weather/weatherInfo",
params={"key": API_KEY, "city": city_code},
)
# 如果天气查询请求失败,返回错误信息
if weather_response.status_code != 200:
return f"天气查询请求失败: {weather_response.status_code} - {weather_response.text}"
# 解析天气查询返回的JSON数据
weather_data = json.loads(weather_response.text)
# 如果API返回状态不是"1",说明有错误,返回错误信息
if weather_data["status"] != "1":
return f"天气查询API错误: {weather_data['info']}"
# 返回天气数据的原始JSON字符串
return weather_response.text
# 捕获异常并返回异常信息
except Exception as error:
return f"查询天气时发生异常: {str(error)}"
# 注册名为"send_email"的工具到FastMCP服务器
+@server.tool("send_email")
# 定义异步函数,发送邮件
+async def send_email(
+ recipient_email: str, subject: str, body: str, attachment_path: str = None
+) -> str:
+ """
+ 发送邮件功能
+ Args:
+ recipient_email: 收件人邮箱
+ subject: 邮件主题
+ body: 邮件正文
+ attachment_path: 附件路径(可选)
+ Returns:
+ 发送结果信息
+ """
+ try:
# 创建邮件对象
+ msg = MIMEMultipart()
# 设置发件人
+ msg["From"] = EMAIL_CONFIG["sender_email"]
# 设置收件人
+ msg["To"] = recipient_email
# 设置邮件主题
+ msg["Subject"] = subject
# 添加邮件正文
+ msg.attach(MIMEText(body, "plain", "utf-8"))
# 如果有附件,添加附件
+ if attachment_path:
+ try:
# 以二进制方式打开附件文件
+ with open(attachment_path, "rb") as attachment:
+ part = MIMEBase("application", "octet-stream")
+ part.set_payload(attachment.read())
# 对附件内容进行base64编码
+ encoders.encode_base64(part)
# 添加附件头信息
+ part.add_header(
+ "Content-Disposition",
+ f'attachment; filename= {attachment_path.split("/")[-1]}',
+ )
# 将附件添加到邮件对象
+ msg.attach(part)
+ except FileNotFoundError:
# 如果附件文件未找到,返回错误信息
+ return f"附件文件未找到: {attachment_path}"
# 判断是否使用SSL方式连接SMTP服务器
+ if EMAIL_CONFIG.get("use_ssl", False):
# 打印正在建立SSL连接的信息
+ print(
+ f"正在建立SSL SMTP连接: {EMAIL_CONFIG['smtp_server']}:{EMAIL_CONFIG['smtp_port']}"
+ )
# 导入ssl模块
+ import ssl
# 创建SSL上下文
+ context = ssl.create_default_context()
# 建立SSL SMTP连接
+ server_smtp = smtplib.SMTP_SSL(
+ EMAIL_CONFIG["smtp_server"],
+ EMAIL_CONFIG["smtp_port"],
+ context=context,
+ timeout=30,
+ )
# 打印SSL连接建立成功
+ print(" ✓ SSL连接建立成功")
+ else:
# 打印正在建立标准SMTP连接的信息
+ print(
+ f"正在建立标准SMTP连接: {EMAIL_CONFIG['smtp_server']}:{EMAIL_CONFIG['smtp_port']}"
+ )
# 建立普通SMTP连接
+ server_smtp = smtplib.SMTP(
+ EMAIL_CONFIG["smtp_server"], EMAIL_CONFIG["smtp_port"], timeout=30
+ )
# 判断是否启用TLS加密
+ if EMAIL_CONFIG.get("use_tls", False):
# 打印启用TLS加密的信息
+ print("正在启用TLS加密...")
# 启用TLS
+ server_smtp.starttls()
# 打印TLS启用成功
+ print(" ✓ TLS启用成功")
# 打印正在登录邮箱
+ print("正在登录邮箱...")
# 登录邮箱
+ server_smtp.login(EMAIL_CONFIG["sender_email"], EMAIL_CONFIG["sender_password"])
# 打印正在发送邮件
+ print("正在发送邮件...")
# 将邮件对象转为字符串
+ text = msg.as_string()
# 发送邮件
+ server_smtp.sendmail(EMAIL_CONFIG["sender_email"], recipient_email, text)
# 打印邮件发送完成,正在关闭连接
+ print("邮件发送完成,正在关闭连接...")
# 关闭SMTP连接
+ server_smtp.quit()
# 返回发送成功信息
+ return f"邮件发送成功!收件人: {recipient_email}, 主题: {subject}"
+ except Exception as error:
# 捕获异常并返回错误信息
+ return f"邮件发送失败: {str(error)}"
# 注册名为"schedule_weather_email"的工具到FastMCP服务器
+@server.tool("schedule_weather_email")
# 定义异步函数,定时发送天气邮件
+async def schedule_weather_email(
+ recipient_email: str, location: str, schedule_time: str, frequency: str = "daily"
+) -> str:
+ """
+ 定时发送天气邮件功能
+ Args:
+ recipient_email: 收件人邮箱
+ location: 天气查询地点
+ schedule_time: 发送时间(格式:HH:MM)
+ frequency: 频率(daily, weekly, monthly)
+ Returns:
+ 定时任务创建结果
+ """
+ try:
# 生成任务ID,包含收件人、地点和当前时间戳
+ task_id = f"weather_email_{recipient_email}_{location}_{int(time.time())}"
# 定义定时任务的执行函数
+ def send_weather_email():
+ try:
# 获取天气信息
+ weather_info = asyncio.run(get_weather_forecast(location))
# 解析天气信息为字典
+ weather_data = json.loads(weather_info)
# 构建邮件主题
+ subject = f"天气预报 - {location}"
# 构建邮件正文
+ body = f"""
+您好!
+以下是 {location} 的天气预报信息:
+{json.dumps(weather_data, ensure_ascii=False, indent=2)}
+祝您生活愉快!
+天气服务系统
+ """
# 发送邮件
+ asyncio.run(send_email(recipient_email, subject, body))
+ except Exception as e:
# 打印定时任务执行失败信息
+ print(f"定时任务执行失败: {e}")
# 根据频率设置定时任务
+ if frequency == "daily":
# 每天定时
+ schedule.every().day.at(schedule_time).do(send_weather_email)
+ elif frequency == "weekly":
# 每周定时
+ schedule.every().week.at(schedule_time).do(send_weather_email)
+ elif frequency == "monthly":
# 每月定时
+ schedule.every().month.at(schedule_time).do(send_weather_email)
+ else:
# 不支持的频率,返回错误信息
+ return f"不支持的频率: {frequency}"
# 存储任务信息到全局字典
+ scheduled_tasks[task_id] = {
+ "type": "weather_email",
+ "recipient": recipient_email,
+ "location": location,
+ "time": schedule_time,
+ "frequency": frequency,
+ "created_at": datetime.now().isoformat(),
+ }
# 定义运行定时任务的线程函数
+ def run_schedule():
+ while True:
# 检查并运行待执行的任务
+ schedule.run_pending()
# 每分钟检查一次
+ time.sleep(60) # 每分钟检查一次
# 如果定时任务线程未启动,则启动
+ if (
+ not hasattr(server, "_schedule_thread")
+ or not server._schedule_thread.is_alive()
+ ):
# 创建并启动定时任务线程
+ server._schedule_thread = threading.Thread(target=run_schedule, daemon=True)
+ server._schedule_thread.start()
# 返回任务创建成功信息
+ return f"定时任务创建成功!任务ID: {task_id}, 将在每天 {schedule_time} 发送 {location} 的天气信息到 {recipient_email}"
+ except Exception as error:
# 捕获异常并返回错误信息
+ return f"创建定时任务失败: {str(error)}"
# 注册名为"list_scheduled_tasks"的工具到FastMCP服务器
+@server.tool("list_scheduled_tasks")
# 定义异步函数,列出所有定时任务
+async def list_scheduled_tasks() -> str:
+ """
+ 列出所有定时任务
+ Returns:
+ 定时任务列表的JSON字符串
+ """
+ try:
# 返回定时任务的json字符串
+ return json.dumps(scheduled_tasks, ensure_ascii=False, indent=2)
+ except Exception as error:
# 捕获异常并返回错误信息
+ return f"获取定时任务列表失败: {str(error)}"
# 注册名为"cancel_scheduled_task"的工具到FastMCP服务器
+@server.tool("cancel_scheduled_task")
# 定义异步函数,取消指定的定时任务
+async def cancel_scheduled_task(task_id: str) -> str:
+ """
+ 取消指定的定时任务
+ Args:
+ task_id: 任务ID
+ Returns:
+ 取消结果信息
+ """
+ try:
# 判断任务ID是否存在
+ if task_id in scheduled_tasks:
# 清除定时任务
+ schedule.clear(task_id)
# 删除任务记录
+ del scheduled_tasks[task_id]
# 返回取消成功信息
+ return f"任务 {task_id} 已成功取消"
+ else:
# 未找到任务ID
+ return f"未找到任务ID: {task_id}"
+ except Exception as error:
# 捕获异常并返回错误信息
+ return f"取消任务失败: {str(error)}"
# 注册名为"send_weather_report"的工具到FastMCP服务器
+@server.tool("send_weather_report")
# 定义异步函数,发送天气报告邮件
+async def send_weather_report(
+ recipient_email: str, location: str, report_type: str = "current"
+) -> str:
+ """
+ 发送天气报告邮件
+ Args:
+ recipient_email: 收件人邮箱
+ location: 天气查询地点
+ report_type: 报告类型(current: 当前天气, forecast: 天气预报)
+ Returns:
+ 发送结果信息
+ """
+ try:
# 获取天气信息
+ weather_info = await get_weather_forecast(location)
# 解析天气信息为字典
+ weather_data = json.loads(weather_info)
# 根据报告类型构建邮件内容
+ if report_type == "current":
# 当前天气邮件主题
+ subject = f"当前天气 - {location}"
# 当前天气邮件正文
+ body = f"""
+您好!
+以下是 {location} 的当前天气信息:
+{json.dumps(weather_data, ensure_ascii=False, indent=2)}
+祝您生活愉快!
+天气服务系统
+ """
+ else: # forecast
# 天气预报邮件主题
+ subject = f"天气预报 - {location}"
# 天气预报邮件正文
+ body = f"""
+您好!
+以下是 {location} 的天气预报信息:
+{json.dumps(weather_data, ensure_ascii=False, indent=2)}
+祝您生活愉快!
+天气服务系统
+ """
# 发送邮件
+ result = await send_email(recipient_email, subject, body)
# 返回发送结果
+ return result
+ except Exception as error:
# 捕获异常并返回错误信息
+ return f"发送天气报告失败: {str(error)}"
# 判断当前模块是否为主程序入口
if __name__ == "__main__":
# 启动FastMCP服务器,使用stdio作为传输方式
server.run(transport="stdio")