ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1. 参考资料
  • 3. 配置MCP
    • 3.1 配置cline MCP
    • 3.2 配置sse
    • 3.3 配置streamable
  • 4.STDIO
    • 4.1 服务器
    • 4.2 客户端
  • 5.SSE
    • 5.1 服务器
    • 5.2 客户端
  • 6.Streamable
    • 6.1 服务器
    • 6.2 客户端
  • 7.发布
    • 7.1 账号申请
    • 7.2 安装
    • 7.3 打包
    • 7.4 发布
    • 7.5 安装
    • 7.6 启动
    • 7.7 代码
      • 7.7.1 pyproject.toml
      • 7.7.2 mcp_rs_publish__init__.py
      • 7.7.3 command.py
      • 7.7.4 配置MCP
        • 7.7.4.1 SSE
        • 7.7.4.2 STDIO
  • 8. email
    • 8.1. config.py
    • 8.2. weather_client.py
    • 8.3. weather_server.py

1. 参考资料 #

  • 高德地图API
  • python-sdk
  • 地理/逆地理编码
  • 天气查询

3. 配置MCP #

3.1 配置cline MCP #

{
  "mcpServers": {
    "get_weather_forecast_stdio": {
      "disabled": false,
      "timeout": 60,
      "type": "stdio",
      "command": "uv",
      "args": [
        "run",
        "--directory",
        "D:/aprepare/mcp/mcp-server",
        "stdio_server.py"
      ]
    }
  }
}

3.2 配置sse #

3.3 配置streamable #

4.STDIO #

4.1 服务器 #

# 导入json模块,用于处理JSON数据
import json

# 导入httpx模块,用于异步HTTP请求
import httpx

# 导入sys模块,用于标准错误输出
import sys

# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP

# 创建FastMCP服务器实例,名称为"weather-forecast-server"
server = FastMCP("weather-forecast-server")

# 定义高德地图API的基础URL
API_BASE_URL = "https://restapi.amap.com"
# 定义高德地图API的密钥
API_KEY = "7592015e86c61bdae7d5b04b8fb5a00f"


# 注册名为"get_weather_forecast"的工具到FastMCP服务器
@server.tool("get_weather_forecast")
# 定义异步函数,根据地址查询天气预报信息
async def get_weather_forecast(location: str) -> str:
    """
    根据地址查询天气预报信息

    Args:
        location: 需要查询天气的地址或城市名称

    Returns:
        天气信息的JSON字符串
    """
    # 创建异步HTTP客户端
    async with httpx.AsyncClient() as http_client:
        try:
            # 第一步:地理编码,获取城市代码
            geo_response = await http_client.get(
                f"{API_BASE_URL}/v3/geocode/geo",
                params={"key": API_KEY, "address": location},
            )
            # 将查询地址和返回内容写入标准错误输出,便于调试
            sys.stderr.write(location + geo_response.text)

            # 如果地理编码请求失败,返回错误信息
            if geo_response.status_code != 200:
                return f"地理编码请求失败: {geo_response.status_code} - {geo_response.text}"

            # 解析地理编码返回的JSON数据
            geo_data = json.loads(geo_response.text)
            # 如果API返回状态不是"1",说明有错误,返回错误信息
            if geo_data["status"] != "1":
                return f"地理编码API错误: {geo_data['info']}"

            # 提取城市代码(adcode)
            city_code = geo_data["geocodes"][0]["adcode"]

            # 第二步:根据城市代码获取天气信息
            weather_response = await http_client.get(
                f"{API_BASE_URL}/v3/weather/weatherInfo",
                params={"key": API_KEY, "city": city_code},
            )

            # 如果天气查询请求失败,返回错误信息
            if weather_response.status_code != 200:
                return f"天气查询请求失败: {weather_response.status_code} - {weather_response.text}"

            # 解析天气查询返回的JSON数据
            weather_data = json.loads(weather_response.text)
            # 如果API返回状态不是"1",说明有错误,返回错误信息
            if weather_data["status"] != "1":
                return f"天气查询API错误: {weather_data['info']}"

            # 返回天气数据的原始JSON字符串
            return weather_response.text

        # 捕获异常并返回异常信息
        except Exception as error:
            return f"查询天气时发生异常: {str(error)}"


# 判断当前模块是否为主程序入口
if __name__ == "__main__":
    # 启动FastMCP服务器
    server.run(transport="stdio")

4.2 客户端 #

# 导入json模块,用于处理JSON数据
import json

# 导入asyncio模块,用于异步编程
import asyncio

# 从mcp模块导入ClientSession和StdioServerParameters
from mcp import ClientSession, StdioServerParameters

# 从mcp.client.stdio导入stdio_client,用于建立stdio连接
from mcp.client.stdio import stdio_client

# 导入OpenAI SDK
from openai import OpenAI


# 定义天气MCP客户端类
class WeatherMCPClient:
    """天气查询MCP客户端类"""

    # 初始化方法
    def __init__(self):
        """初始化客户端"""
        # 客户端会话对象
        self.session = None
        # stdio流上下文
        self._stream_context = None
        # 会话上下文
        self._session_context = None
        # 可用工具列表
        self.available_tools = []

        # 初始化OpenAI客户端
        self.openai_client = OpenAI(
            base_url="https://api.deepseek.com",
            api_key="sk-129a4b9a8eeb48d3a7648dc81fca0de2",
        )
        # 指定AI模型
        self.ai_model = "deepseek-chat"

    # 异步方法:连接到MCP服务端
    async def connect_to_server(self) -> None:
        """连接到MCP服务端"""
        try:
            # 配置服务端连接参数
            server_params = StdioServerParameters(
                command="python", args=["stdio_server.py"], env={}
            )

            # 建立stdio连接
            self._stream_context = stdio_client(server_params)
            stream = await self._stream_context.__aenter__()

            # 创建客户端会话
            self._session_context = ClientSession(*stream)
            self.session = await self._session_context.__aenter__()

            # 初始化会话
            await self.session.initialize()
            print("MCP客户端初始化成功!")

            # 获取可用工具列表
            await self._load_available_tools()

        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"连接服务端失败: {error}")
            raise

    # 异步方法:加载服务端提供的工具列表
    async def _load_available_tools(self) -> None:
        """加载服务端提供的工具列表"""
        try:
            # 获取工具列表响应
            tools_response = await self.session.list_tools()
            # 保存工具列表
            self.available_tools = tools_response.tools
            print(f"发现 {len(self.available_tools)} 个可用工具:")
            # 遍历并打印每个工具的信息
            for tool in self.available_tools:
                print(f"   - {tool.name}: {tool.description}")
        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"加载工具列表失败: {error}")

    # 异步方法:处理用户请求,调用AI和MCP工具
    async def process_request(self, user_input: str) -> str:
        """处理用户请求,使用OpenAI进行智能分析"""
        # 如果未连接到服务端,返回提示
        if not self.session:
            return "客户端未连接到服务端"

        try:
            # 构建用户消息
            conversation_messages = [{"role": "user", "content": user_input}]

            # 获取可用工具列表
            tools_response = await self.session.list_tools()
            tool_definitions = []

            # 构建工具定义列表
            for tool in tools_response.tools:
                tool_definitions.append(
                    {
                        "type": "function",
                        "name": tool.name,
                        "function": {
                            "name": tool.name,
                            "description": tool.description,
                            "parameters": tool.inputSchema,
                        },
                    }
                )

            # 第一次调用OpenAI API,获取AI回复
            first_response = self.openai_client.chat.completions.create(
                model=self.ai_model,
                messages=conversation_messages,
                max_tokens=512,
                tools=tool_definitions,
            )

            # 用于存储最终回复内容
            response_texts = []
            # 获取AI的第一个回复选项
            first_choice = first_response.choices[0]

            # 如果AI要求调用工具
            if first_choice.finish_reason == "tool_calls":
                # 获取第一个工具调用
                first_tool_call = first_choice.message.tool_calls[0]
                # 工具名称
                tool_name = first_tool_call.function.name
                # 工具参数
                tool_arguments = json.loads(first_tool_call.function.arguments)

                # 添加AI的回复到消息历史
                conversation_messages.append(
                    {
                        "role": "assistant",
                        "content": None,
                        "tool_calls": [first_tool_call],
                    }
                )

                # 调用MCP工具
                tool_result = await self.session.call_tool(tool_name, tool_arguments)

                # 添加工具调用结果到消息历史
                conversation_messages.append(
                    {
                        "role": "tool",
                        "content": tool_result.content[0].text,
                        "tool_call_id": first_tool_call.id,
                    }
                )

                # 再次调用OpenAI获取最终回复
                final_response = self.openai_client.chat.completions.create(
                    model=self.ai_model,
                    messages=conversation_messages,
                    max_tokens=512,
                )

                # 记录工具调用及最终AI回复
                response_texts.append(f"调用工具 {tool_name} 参数: {tool_arguments}")
                response_texts.append(final_response.choices[0].message.content)

            # 如果AI直接给出最终回复
            elif first_choice.finish_reason == "stop":
                response_texts.append(first_choice.message.content)

            # 返回拼接后的回复内容
            return "\n".join(response_texts)

        # 捕获异常并返回错误信息
        except Exception as error:
            return f"处理请求时发生错误: {error}"

    # 异步方法:清理资源
    async def cleanup(self) -> None:
        """清理资源"""
        try:
            # 关闭会话上下文
            if self._session_context:
                await self._session_context.__aexit__(None, None, None)
            # 关闭流上下文
            if self._stream_context:
                await self._stream_context.__aexit__(None, None, None)
            print("资源清理完成")
        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"清理资源时发生错误: {error}")

    # 异步方法:智能聊天循环
    async def chat_loop(self) -> None:
        """智能聊天循环模式"""
        print("MCP客户端已启动!")
        print("请输入您的内容,输入 'quit' 退出程序")
        print("==================================================")

        # 循环读取用户输入
        while True:
            try:
                # 获取用户输入
                user_message = input("\n您的内容: ").strip()
                # 如果输入quit则退出
                if user_message == "quit":
                    print("感谢使用,再见!")
                    break

                # 如果输入为空,提示重新输入
                if not user_message:
                    print("请输入有效内容")
                    continue

                # 处理用户请求并输出AI回复
                ai_response = await self.process_request(user_message)
                print("\n" + ai_response)

            # 捕获Ctrl+C中断
            except KeyboardInterrupt:
                print("\n\n程序被用户中断,正在退出...")
                break
            # 捕获其他异常
            except Exception as error:
                print(f"发生未预期的错误: {error}")


# 定义主函数
async def main():
    """主函数"""
    # 创建MCP客户端实例
    mcp_client = WeatherMCPClient()

    try:
        # 连接服务端
        await mcp_client.connect_to_server()

        # 启动智能聊天模式
        await mcp_client.chat_loop()

    # 捕获主流程异常
    except Exception as error:
        print(f"程序运行失败: {error}")
    finally:
        # 清理资源
        await mcp_client.cleanup()


# 判断是否为主程序入口
if __name__ == "__main__":
    # 运行主程序
    asyncio.run(main())

5.SSE #

5.1 服务器 #

# 导入json模块,用于处理JSON数据
import json

# 导入httpx模块,用于异步HTTP请求
import httpx

# 导入sys模块,用于标准错误输出
import sys

# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP

# 创建FastMCP服务器实例,名称为"weather-forecast-server"
server = FastMCP("weather-forecast-server")

# 定义高德地图API的基础URL
API_BASE_URL = "https://restapi.amap.com"
# 定义高德地图API的密钥
API_KEY = "7592015e86c61bdae7d5b04b8fb5a00f"


# 注册名为"get_weather_forecast"的工具到FastMCP服务器
@server.tool("get_weather_forecast")
# 定义异步函数,根据地址查询天气预报信息
async def get_weather_forecast(location: str) -> str:
    """
    根据地址查询天气预报信息

    Args:
        location: 需要查询天气的地址或城市名称

    Returns:
        天气信息的JSON字符串
    """
    # 创建异步HTTP客户端
    async with httpx.AsyncClient() as http_client:
        try:
            # 第一步:地理编码,获取城市代码
            geo_response = await http_client.get(
                f"{API_BASE_URL}/v3/geocode/geo",
                params={"key": API_KEY, "address": location},
            )
            # 将查询地址和返回内容写入标准错误输出,便于调试
            sys.stderr.write(location + geo_response.text)

            # 如果地理编码请求失败,返回错误信息
            if geo_response.status_code != 200:
                return f"地理编码请求失败: {geo_response.status_code} - {geo_response.text}"

            # 解析地理编码返回的JSON数据
            geo_data = json.loads(geo_response.text)
            # 如果API返回状态不是"1",说明有错误,返回错误信息
            if geo_data["status"] != "1":
                return f"地理编码API错误: {geo_data['info']}"

            # 提取城市代码(adcode)
            city_code = geo_data["geocodes"][0]["adcode"]

            # 第二步:根据城市代码获取天气信息
            weather_response = await http_client.get(
                f"{API_BASE_URL}/v3/weather/weatherInfo",
                params={"key": API_KEY, "city": city_code},
            )

            # 如果天气查询请求失败,返回错误信息
            if weather_response.status_code != 200:
                return f"天气查询请求失败: {weather_response.status_code} - {weather_response.text}"

            # 解析天气查询返回的JSON数据
            weather_data = json.loads(weather_response.text)
            # 如果API返回状态不是"1",说明有错误,返回错误信息
            if weather_data["status"] != "1":
                return f"天气查询API错误: {weather_data['info']}"

            # 返回天气数据的原始JSON字符串
            return weather_response.text

        # 捕获异常并返回异常信息
        except Exception as error:
            return f"查询天气时发生异常: {str(error)}"


# 判断当前模块是否为主程序入口
if __name__ == "__main__":
    # 启动FastMCP服务器
    server.run(transport="streamable-http")

5.2 客户端 #

# 导入json模块,用于处理JSON数据
import json

# 导入asyncio模块,用于异步编程
import asyncio

# 从mcp模块导入ClientSession和StdioServerParameters
from mcp import ClientSession, StdioServerParameters

# 从mcp.client.stdio导入stdio_client,用于建立stdio连接
from mcp.client.sse import sse_client

# 导入OpenAI SDK
from openai import OpenAI


# 定义天气MCP客户端类
class WeatherMCPClient:
    """天气查询MCP客户端类"""

    # 初始化方法
    def __init__(self):
        """初始化客户端"""
        # 客户端会话对象
        self.session = None
        # stdio流上下文
        self._stream_context = None
        # 会话上下文
        self._session_context = None
        # 可用工具列表
        self.available_tools = []

        # 初始化OpenAI客户端
        self.openai_client = OpenAI(
            base_url="https://api.deepseek.com",
            api_key="sk-129a4b9a8eeb48d3a7648dc81fca0de2",
        )
        # 指定AI模型
        self.ai_model = "deepseek-chat"

    # 异步方法:连接到MCP服务端
    async def connect_to_server(self, server_url: str) -> None:
        """连接到MCP服务端"""
        try:

            # 建立stdio连接
            self._stream_context = sse_client(server_url)
            stream = await self._stream_context.__aenter__()

            # 创建客户端会话
            self._session_context = ClientSession(*stream)
            self.session = await self._session_context.__aenter__()

            # 初始化会话
            await self.session.initialize()
            print("MCP客户端初始化成功!")

            # 获取可用工具列表
            await self._load_available_tools()

        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"连接服务端失败: {error}")
            raise

    # 异步方法:加载服务端提供的工具列表
    async def _load_available_tools(self) -> None:
        """加载服务端提供的工具列表"""
        try:
            # 获取工具列表响应
            tools_response = await self.session.list_tools()
            # 保存工具列表
            self.available_tools = tools_response.tools
            print(f"发现 {len(self.available_tools)} 个可用工具:")
            # 遍历并打印每个工具的信息
            for tool in self.available_tools:
                print(f"   - {tool.name}: {tool.description}")
        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"加载工具列表失败: {error}")

    # 异步方法:处理用户请求,调用AI和MCP工具
    async def process_request(self, user_input: str) -> str:
        """处理用户请求,使用OpenAI进行智能分析"""
        # 如果未连接到服务端,返回提示
        if not self.session:
            return "客户端未连接到服务端"

        try:
            # 构建用户消息
            conversation_messages = [{"role": "user", "content": user_input}]

            # 获取可用工具列表
            tools_response = await self.session.list_tools()
            tool_definitions = []

            # 构建工具定义列表
            for tool in tools_response.tools:
                tool_definitions.append(
                    {
                        "type": "function",
                        "name": tool.name,
                        "function": {
                            "name": tool.name,
                            "description": tool.description,
                            "parameters": tool.inputSchema,
                        },
                    }
                )

            # 第一次调用OpenAI API,获取AI回复
            first_response = self.openai_client.chat.completions.create(
                model=self.ai_model,
                messages=conversation_messages,
                max_tokens=512,
                tools=tool_definitions,
            )

            # 用于存储最终回复内容
            response_texts = []
            # 获取AI的第一个回复选项
            first_choice = first_response.choices[0]

            # 如果AI要求调用工具
            if first_choice.finish_reason == "tool_calls":
                # 获取第一个工具调用
                first_tool_call = first_choice.message.tool_calls[0]
                # 工具名称
                tool_name = first_tool_call.function.name
                # 工具参数
                tool_arguments = json.loads(first_tool_call.function.arguments)

                # 添加AI的回复到消息历史
                conversation_messages.append(
                    {
                        "role": "assistant",
                        "content": None,
                        "tool_calls": [first_tool_call],
                    }
                )

                # 调用MCP工具
                tool_result = await self.session.call_tool(tool_name, tool_arguments)

                # 添加工具调用结果到消息历史
                conversation_messages.append(
                    {
                        "role": "tool",
                        "content": tool_result.content[0].text,
                        "tool_call_id": first_tool_call.id,
                    }
                )

                # 再次调用OpenAI获取最终回复
                final_response = self.openai_client.chat.completions.create(
                    model=self.ai_model,
                    messages=conversation_messages,
                    max_tokens=512,
                )

                # 记录工具调用及最终AI回复
                response_texts.append(f"调用工具 {tool_name} 参数: {tool_arguments}")
                response_texts.append(final_response.choices[0].message.content)

            # 如果AI直接给出最终回复
            elif first_choice.finish_reason == "stop":
                response_texts.append(first_choice.message.content)

            # 返回拼接后的回复内容
            return "\n".join(response_texts)

        # 捕获异常并返回错误信息
        except Exception as error:
            return f"处理请求时发生错误: {error}"

    # 异步方法:清理资源
    async def cleanup(self) -> None:
        """清理资源"""
        try:
            # 关闭会话上下文
            if self._session_context:
                await self._session_context.__aexit__(None, None, None)
            # 关闭流上下文
            if self._stream_context:
                await self._stream_context.__aexit__(None, None, None)
            print("资源清理完成")
        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"清理资源时发生错误: {error}")

    # 异步方法:智能聊天循环
    async def chat_loop(self) -> None:
        """智能聊天循环模式"""
        print("MCP客户端已启动!")
        print("请输入您的内容,输入 'quit' 退出程序")
        print("==================================================")

        # 循环读取用户输入
        while True:
            try:
                # 获取用户输入
                user_message = input("\n您的内容: ").strip()
                # 如果输入quit则退出
                if user_message == "quit":
                    print("感谢使用,再见!")
                    break

                # 如果输入为空,提示重新输入
                if not user_message:
                    print("请输入有效内容")
                    continue

                # 处理用户请求并输出AI回复
                ai_response = await self.process_request(user_message)
                print("\n" + ai_response)

            # 捕获Ctrl+C中断
            except KeyboardInterrupt:
                print("\n\n程序被用户中断,正在退出...")
                break
            # 捕获其他异常
            except Exception as error:
                print(f"发生未预期的错误: {error}")


# 定义主函数
async def main():
    """主函数"""
    # 创建MCP客户端实例
    mcp_client = WeatherMCPClient()

    try:
        # 连接服务端
        await mcp_client.connect_to_server(server_url="http://127.0.0.1:8000/sse")

        # 启动智能聊天模式
        await mcp_client.chat_loop()

    # 捕获主流程异常
    except Exception as error:
        print(f"程序运行失败: {error}")
    finally:
        # 清理资源
        await mcp_client.cleanup()


# 判断是否为主程序入口
if __name__ == "__main__":
    # 运行主程序
    asyncio.run(main())

6.Streamable #

6.1 服务器 #

# 导入json模块,用于处理JSON数据
import json

# 导入httpx模块,用于异步HTTP请求
import httpx

# 导入sys模块,用于标准错误输出
import sys

# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP

# 创建FastMCP服务器实例,名称为"weather-forecast-server"
server = FastMCP("weather-forecast-server")

# 定义高德地图API的基础URL
API_BASE_URL = "https://restapi.amap.com"
# 定义高德地图API的密钥
API_KEY = "7592015e86c61bdae7d5b04b8fb5a00f"


# 注册名为"get_weather_forecast"的工具到FastMCP服务器
@server.tool("get_weather_forecast")
# 定义异步函数,根据地址查询天气预报信息
async def get_weather_forecast(location: str) -> str:
    """
    根据地址查询天气预报信息

    Args:
        location: 需要查询天气的地址或城市名称

    Returns:
        天气信息的JSON字符串
    """
    # 创建异步HTTP客户端
    async with httpx.AsyncClient() as http_client:
        try:
            # 第一步:地理编码,获取城市代码
            geo_response = await http_client.get(
                f"{API_BASE_URL}/v3/geocode/geo",
                params={"key": API_KEY, "address": location},
            )
            # 将查询地址和返回内容写入标准错误输出,便于调试
            sys.stderr.write(location + geo_response.text)

            # 如果地理编码请求失败,返回错误信息
            if geo_response.status_code != 200:
                return f"地理编码请求失败: {geo_response.status_code} - {geo_response.text}"

            # 解析地理编码返回的JSON数据
            geo_data = json.loads(geo_response.text)
            # 如果API返回状态不是"1",说明有错误,返回错误信息
            if geo_data["status"] != "1":
                return f"地理编码API错误: {geo_data['info']}"

            # 提取城市代码(adcode)
            city_code = geo_data["geocodes"][0]["adcode"]

            # 第二步:根据城市代码获取天气信息
            weather_response = await http_client.get(
                f"{API_BASE_URL}/v3/weather/weatherInfo",
                params={"key": API_KEY, "city": city_code},
            )

            # 如果天气查询请求失败,返回错误信息
            if weather_response.status_code != 200:
                return f"天气查询请求失败: {weather_response.status_code} - {weather_response.text}"

            # 解析天气查询返回的JSON数据
            weather_data = json.loads(weather_response.text)
            # 如果API返回状态不是"1",说明有错误,返回错误信息
            if weather_data["status"] != "1":
                return f"天气查询API错误: {weather_data['info']}"

            # 返回天气数据的原始JSON字符串
            return weather_response.text

        # 捕获异常并返回异常信息
        except Exception as error:
            return f"查询天气时发生异常: {str(error)}"


# 判断当前模块是否为主程序入口
if __name__ == "__main__":
    # 启动FastMCP服务器
    server.run(transport="streamable-http")

6.2 客户端 #

# 导入json模块,用于处理JSON数据
import json

# 导入asyncio模块,用于异步编程
import asyncio

# 从mcp模块导入ClientSession
from mcp import ClientSession

# 从mcp.client.stdio导入stdio_client,用于建立stdio连接
from mcp.client.streamable_http import streamablehttp_client

# 导入OpenAI SDK
from openai import OpenAI


# 定义天气MCP客户端类
class WeatherMCPClient:
    """天气查询MCP客户端类"""

    # 初始化方法
    def __init__(self):
        """初始化客户端"""
        # 客户端会话对象
        self.session = None
        # stdio流上下文
        self._stream_context = None
        # 会话上下文
        self._session_context = None
        # 可用工具列表
        self.available_tools = []

        # 初始化OpenAI客户端
        self.openai_client = OpenAI(
            base_url="https://api.deepseek.com",
            api_key="sk-129a4b9a8eeb48d3a7648dc81fca0de2",
        )
        # 指定AI模型
        self.ai_model = "deepseek-chat"

    # 异步方法:连接到MCP服务端
    async def connect_to_server(self, server_url: str) -> None:
        """连接到MCP服务端"""
        try:

            # 建立stdio连接
            self._stream_context = streamablehttp_client(server_url)
            stream = await self._stream_context.__aenter__()

            # 创建客户端会话
            self._session_context = ClientSession(stream[0], stream[1])
            self.session = await self._session_context.__aenter__()

            # 初始化会话
            await self.session.initialize()
            print("MCP客户端初始化成功!")

            # 获取可用工具列表
            await self._load_available_tools()

        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"连接服务端失败: {error}")
            raise

    # 异步方法:加载服务端提供的工具列表
    async def _load_available_tools(self) -> None:
        """加载服务端提供的工具列表"""
        try:
            # 获取工具列表响应
            tools_response = await self.session.list_tools()
            # 保存工具列表
            self.available_tools = tools_response.tools
            print(f"发现 {len(self.available_tools)} 个可用工具:")
            # 遍历并打印每个工具的信息
            for tool in self.available_tools:
                print(f"   - {tool.name}: {tool.description}")
        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"加载工具列表失败: {error}")

    # 异步方法:处理用户请求,调用AI和MCP工具
    async def process_request(self, user_input: str) -> str:
        """处理用户请求,使用OpenAI进行智能分析"""
        # 如果未连接到服务端,返回提示
        if not self.session:
            return "客户端未连接到服务端"

        try:
            # 构建用户消息
            conversation_messages = [{"role": "user", "content": user_input}]

            # 获取可用工具列表
            tools_response = await self.session.list_tools()
            tool_definitions = []

            # 构建工具定义列表
            for tool in tools_response.tools:
                tool_definitions.append(
                    {
                        "type": "function",
                        "name": tool.name,
                        "function": {
                            "name": tool.name,
                            "description": tool.description,
                            "parameters": tool.inputSchema,
                        },
                    }
                )

            # 第一次调用OpenAI API,获取AI回复
            first_response = self.openai_client.chat.completions.create(
                model=self.ai_model,
                messages=conversation_messages,
                max_tokens=512,
                tools=tool_definitions,
            )

            # 用于存储最终回复内容
            response_texts = []
            # 获取AI的第一个回复选项
            first_choice = first_response.choices[0]

            # 如果AI要求调用工具
            if first_choice.finish_reason == "tool_calls":
                # 获取第一个工具调用
                first_tool_call = first_choice.message.tool_calls[0]
                # 工具名称
                tool_name = first_tool_call.function.name
                # 工具参数
                tool_arguments = json.loads(first_tool_call.function.arguments)

                # 添加AI的回复到消息历史
                conversation_messages.append(
                    {
                        "role": "assistant",
                        "content": None,
                        "tool_calls": [first_tool_call],
                    }
                )

                # 调用MCP工具
                tool_result = await self.session.call_tool(tool_name, tool_arguments)

                # 添加工具调用结果到消息历史
                conversation_messages.append(
                    {
                        "role": "tool",
                        "content": tool_result.content[0].text,
                        "tool_call_id": first_tool_call.id,
                    }
                )

                # 再次调用OpenAI获取最终回复
                final_response = self.openai_client.chat.completions.create(
                    model=self.ai_model,
                    messages=conversation_messages,
                    max_tokens=512,
                )

                # 记录工具调用及最终AI回复
                response_texts.append(f"调用工具 {tool_name} 参数: {tool_arguments}")
                response_texts.append(final_response.choices[0].message.content)

            # 如果AI直接给出最终回复
            elif first_choice.finish_reason == "stop":
                response_texts.append(first_choice.message.content)

            # 返回拼接后的回复内容
            return "\n".join(response_texts)

        # 捕获异常并返回错误信息
        except Exception as error:
            return f"处理请求时发生错误: {error}"

    # 异步方法:清理资源
    async def cleanup(self) -> None:
        """清理资源"""
        try:
            # 关闭会话上下文
            if self._session_context:
                await self._session_context.__aexit__(None, None, None)
            # 关闭流上下文
            if self._stream_context:
                await self._stream_context.__aexit__(None, None, None)
            print("资源清理完成")
        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"清理资源时发生错误: {error}")

    # 异步方法:智能聊天循环
    async def chat_loop(self) -> None:
        """智能聊天循环模式"""
        print("MCP客户端已启动!")
        print("请输入您的内容,输入 'quit' 退出程序")
        print("==================================================")

        # 循环读取用户输入
        while True:
            try:
                # 获取用户输入
                user_message = input("\n您的内容: ").strip()
                # 如果输入quit则退出
                if user_message == "quit":
                    print("感谢使用,再见!")
                    break

                # 如果输入为空,提示重新输入
                if not user_message:
                    print("请输入有效内容")
                    continue

                # 处理用户请求并输出AI回复
                ai_response = await self.process_request(user_message)
                print("\n" + ai_response)

            # 捕获Ctrl+C中断
            except KeyboardInterrupt:
                print("\n\n程序被用户中断,正在退出...")
                break
            # 捕获其他异常
            except Exception as error:
                print(f"发生未预期的错误: {error}")


# 定义主函数
async def main():
    """主函数"""
    # 创建MCP客户端实例
    mcp_client = WeatherMCPClient()

    try:
        # 连接服务端
        await mcp_client.connect_to_server(
            server_url="http://127.0.0.1:8000/mcp?key=7592015e86c61bdae7d5b04b8fb5a00f"
        )

        # 启动智能聊天模式
        await mcp_client.chat_loop()

    # 捕获主流程异常
    except Exception as error:
        print(f"程序运行失败: {error}")
    finally:
        # 清理资源
        await mcp_client.cleanup()


# 判断是否为主程序入口
if __name__ == "__main__":
    # 运行主程序
    asyncio.run(main())

7.发布 #

7.1 账号申请 #

  • pypi.org
  • 注册
  • 登录 hongqishiq @
  • 生成API Token

7.2 安装 #

pip install twine

7.3 打包 #

uv build

7.4 发布 #

python -m twine upload dist/*
pypi-AgEIcHlwaS5vcmcCJDNmZDJlNWFkLWYyNzYtNGU1Zi1hYTcxLTg0OTIyNDhlYWUxNAACKlszLCIyYThjZTYwNy03YWY4LTQwYzYtYmEyMS1hMTE1NmEzZWE4ODciXQAABiCw6syPpy96mhfrSKDUmB5NVwkWdpLJEFeVHePoZKZopg

7.5 安装 #

pip install mcp_rs_publish
pip install mcp_rs_publish --upgrade

7.6 启动 #

mcp_rs_publish

7.7 代码 #

7.7.1 pyproject.toml #

pyproject.toml

[project]
name = "mcp_rs_publish"
version = "0.1.7"
description = "mcp_rs_publish"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
    "mcp[cli]>=1.13.1",
]
[project.scripts]
mcp_rs_publish = "mcp_rs_publish.command:main"

7.7.2 mcp_rs_publish__init__.py #

mcp_rs_publish__init__.py

7.7.3 command.py #

mcp_rs_publish\command.py

# 从mcp.server.fastmcp模块导入FastMCP类
from mcp.server.fastmcp import FastMCP

# 创建一个MCP服务器实例,名称为"Demo"
mcp = FastMCP("Demo")


# 添加一个加法工具
@mcp.tool()
# 定义加法函数,接收两个整数参数a和b,返回它们的和
def add(a: int, b: int) -> int:
    """Add two numbers"""
    # 返回a和b的和
    return a + b


# 添加一个动态问候资源
@mcp.resource("greeting://{name}")
# 定义获取问候语的函数,接收一个字符串参数name,返回个性化问候语
def get_greeting(name: str) -> str:
    """Get a personalized greeting"""
    # 返回格式化后的问候语
    return f"Hello, {name}!"


# 添加一个提示(prompt)工具
@mcp.prompt()
# 定义问候用户的函数,接收name和style参数,style有默认值"friendly"
def greet_user(name: str, style: str = "friendly") -> str:
    """Generate a greeting prompt"""
    # 定义不同风格的问候语模板
    styles = {
        "friendly": "Please write a warm, friendly greeting",
        "formal": "Please write a formal, professional greeting",
        "casual": "Please write a casual, relaxed greeting",
    }

    # 根据style选择对应的模板,默认使用"friendly",并返回格式化后的提示语
    return f"{styles.get(style, styles['friendly'])} for someone named {name}."


# 定义主函数
def main():
    # 以SSE(Server-Sent Events)方式运行MCP服务器
    mcp.run(transport="sse")


# 如果当前模块是主程序入口,则执行main函数
if __name__ == "__main__":
    main()

7.7.4 配置MCP #

7.7.4.1 SSE #

7.7.4.2 STDIO #

8. email #

8.1. config.py #

config.py

# 配置文件
# 请根据您的实际情况修改以下配置

# 邮件SMTP配置
EMAIL_CONFIG = {
    "smtp_server": "smtp.qq.com",  # QQ邮箱SMTP服务器
    "smtp_port": 465,  # SSL端口
    "sender_email": "1959583119@qq.com",  # 发件人邮箱
    "sender_password": "vsppezhmjihyciai",  # 发件人应用密码(不是登录密码)
    "use_ssl": True,  # 使用SSL连接
    "use_tls": False,  # 不使用TLS
}

# 高德地图API配置
AMAP_CONFIG = {
    "api_key": "7592015e86c61bdae7d5b04b8fb5a00f",  # 高德地图API密钥
    "base_url": "https://restapi.amap.com",  # API基础URL
}

# OpenAI配置
OPENAI_CONFIG = {
    "base_url": "https://api.deepseek.com",  # API基础URL
    "api_key": "sk-129a4b9a8eeb48d3a7648dc81fca0de2",  # API密钥
    "model": "deepseek-chat",  # 模型名称
}

# 定时任务配置
SCHEDULE_CONFIG = {
    "check_interval": 60,  # 定时任务检查间隔(秒)
    "max_tasks": 100,  # 最大定时任务数量
}

# 日志配置
LOG_CONFIG = {
    "level": "INFO",  # 日志级别
    "format": "%(asctime)s - %(levelname)s - %(message)s",
}

8.2. weather_client.py #

weather_client.py

# 导入json模块,用于处理JSON数据
import json

# 导入asyncio模块,用于异步编程
import asyncio

# 从mcp模块导入ClientSession和StdioServerParameters
from mcp import ClientSession, StdioServerParameters

# 从mcp.client.stdio导入stdio_client,用于建立stdio连接
from mcp.client.stdio import stdio_client

# 导入OpenAI SDK
from openai import OpenAI

# 导入smtplib用于发送邮件
+import smtplib
# 导入MIMEText用于构建邮件正文
+from email.mime.text import MIMEText
# 导入MIMEMultipart用于构建多部分邮件
+from email.mime.multipart import MIMEMultipart
# 导入MIMEBase用于添加附件
+from email.mime.base import MIMEBase
# 导入encoders用于对附件进行编码
+from email import encoders
# 导入schedule用于定时任务
+import schedule
# 导入time用于时间相关操作
+import time
# 导入datetime用于获取当前时间
+from datetime import datetime
# 导入threading用于多线程
+import threading
# 导入re用于正则表达式
+import re

# 定义天气MCP客户端类
class WeatherMCPClient:
    """天气查询MCP客户端类"""

    # 初始化方法
    def __init__(self):
        """初始化客户端"""
        # 客户端会话对象
        self.session = None
        # stdio流上下文
        self._stream_context = None
        # 会话上下文
        self._session_context = None
        # 可用工具列表
        self.available_tools = []

        # 尝试导入配置文件
+       try:
            # 从config.py导入OPENAI_CONFIG和EMAIL_CONFIG
+           from config import OPENAI_CONFIG, EMAIL_CONFIG

            # 初始化OpenAI客户端
+           self.openai_client = OpenAI(
+               base_url=OPENAI_CONFIG["base_url"],
+               api_key=OPENAI_CONFIG["api_key"],
+           )
            # 指定AI模型
+           self.ai_model = OPENAI_CONFIG["model"]

            # 邮件配置
+           self.email_config = EMAIL_CONFIG.copy()
            # 如果邮件配置中没有use_ssl字段,默认True
+           if "use_ssl" not in self.email_config:
+               self.email_config["use_ssl"] = True
            # 如果邮件配置中没有use_tls字段,默认False
+           if "use_tls" not in self.email_config:
+               self.email_config["use_tls"] = False

        # 如果导入失败,使用默认配置
+       except ImportError:
            # 打印警告信息
+           print("警告: 未找到config.py文件,使用默认配置")
            # 初始化OpenAI客户端,使用默认base_url和api_key
+           self.openai_client = OpenAI(
+               base_url="https://api.deepseek.com",
+               api_key="your_openai_api_key",
+           )
            # 指定默认AI模型
+           self.ai_model = "deepseek-chat"

            # 默认邮件配置
+           self.email_config = {
+               "smtp_server": "smtp.qq.com",
+               "smtp_port": 465,
+               "sender_email": "your_email@qq.com",
+               "sender_password": "your_app_password",
+               "use_ssl": True,
+               "use_tls": False,
+           }

        # 定时任务存储字典
+       self.scheduled_tasks = {}

        # 初始化markdown转换器
+       self._init_markdown_converter()

    # 初始化markdown转换器
+   def _init_markdown_converter(self):
+       """初始化markdown转换器"""
        # 定义markdown到HTML的转换规则
+       self.markdown_rules = [
            # 标题转换
+           (r"^### (.*?)$", r"<h3>\1</h3>", re.MULTILINE),
+           (r"^## (.*?)$", r"<h2>\1</h2>", re.MULTILINE),
+           (r"^# (.*?)$", r"<h1>\1</h1>", re.MULTILINE),
            # 粗体和斜体
+           (r"\*\*(.*?)\*\*", r"<strong>\1</strong>"),
+           (r"\*(.*?)\*", r"<em>\1</em>"),
            # 代码块
+           (r"```(.*?)```", r"<pre><code>\1</code></pre>", re.DOTALL),
+           (r"`(.*?)`", r"<code>\1</code>"),
            # 链接
+           (r"$$([^$$]+)\]$([^)]+)$", r'<a href="\2">\1</a>'),
            # 列表
+           (r"^\* (.*?)$", r"<li>\1</li>", re.MULTILINE),
+           (r"^- (.*?)$", r"<li>\1</li>", re.MULTILINE),
+           (r"^\d+\. (.*?)$", r"<li>\1</li>", re.MULTILINE),
            # 段落
+           (r"\n\n", r"</p><p>"),
            # 换行
+           (r"\n", r"<br>"),
+       ]

    # 将markdown文本转换为HTML
+   def _convert_markdown_to_html(self, markdown_text):
+       """将markdown文本转换为HTML"""
        # 如果markdown文本为空,返回空字符串
+       if not markdown_text:
+           return ""

        # 赋值原始markdown文本
+       html_text = markdown_text

        # 应用所有转换规则
+       for pattern, replacement, *flags in self.markdown_rules:
+           if flags:
+               html_text = re.sub(pattern, replacement, html_text, flags=flags[0])
+           else:
+               html_text = re.sub(pattern, replacement, html_text)

        # 包装在完整HTML结构中
+       html_content = f"""
+       <!DOCTYPE html>
+       <html>
+       <head>
+           <meta charset="utf-8">
+           <meta name="viewport" content="width=device-width, initial-scale=1.0">
+           <title>天气报告</title>
+           <style>
+               body {{
+                   font-family: 'Microsoft YaHei', Arial, sans-serif;
+                   line-height: 1.6;
+                   color: #333;
+                   max-width: 800px;
+                   margin: 0 auto;
+                   padding: 20px;
+                   background-color: #f5f5f5;
+               }}
+               .container {{
+                   background-color: white;
+                   padding: 30px;
+                   border-radius: 10px;
+                   box-shadow: 0 2px 10px rgba(0,0,0,0.1);
+               }}
+               h1, h2, h3 {{
+                   color: #2c3e50;
+                   border-bottom: 2px solid #3498db;
+                   padding-bottom: 10px;
+               }}
+               h1 {{ font-size: 24px; }}
+               h2 {{ font-size: 20px; }}
+               h3 {{ font-size: 18px; }}
+               p {{
+                   margin: 15px 0;
+                   text-align: justify;
+               }}
+               code {{
+                   background-color: #f8f9fa;
+                   padding: 2px 6px;
+                   border-radius: 4px;
+                   font-family: 'Courier New', monospace;
+                   color: #e83e8c;
+               }}
+               pre {{
+                   background-color: #f8f9fa;
+                   padding: 15px;
+                   border-radius: 5px;
+                   overflow-x: auto;
+                   border-left: 4px solid #3498db;
+               }}
+               li {{
+                   margin: 8px 0;
+               }}
+               a {{
+                   color: #3498db;
+                   text-decoration: none;
+               }}
+               a:hover {{
+                   text-decoration: underline;
+               }}
+               .weather-info {{
+                   background-color: #ecf0f1;
+                   padding: 20px;
+                   border-radius: 8px;
+                   margin: 20px 0;
+                   border-left: 5px solid #3498db;
+               }}
+               .footer {{
+                   margin-top: 30px;
+                   padding-top: 20px;
+                   border-top: 1px solid #ecf0f1;
+                   text-align: center;
+                   color: #7f8c8d;
+               }}
+           </style>
+       </head>
+       <body>
+           <div class="container">
+               <div class="weather-info">
+                   {html_text}
+               </div>
+               <div class="footer">
+                   <p>此邮件由天气服务系统自动生成</p>
+                   <p>发送时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
+               </div>
+           </div>
+       </body>
+       </html>
+       """

        # 返回HTML内容
+       return html_content

    # 异步方法:连接到MCP服务端
    async def connect_to_server(self) -> None:
        """连接到MCP服务端"""
        try:
            # 配置服务端连接参数
            server_params = StdioServerParameters(
+               command="python", args=["weather_server.py"], env={}
            )

            # 建立stdio连接
            self._stream_context = stdio_client(server_params)
            # 进入异步上下文,获取流
            stream = await self._stream_context.__aenter__()

            # 创建客户端会话
            self._session_context = ClientSession(*stream)
            # 进入异步上下文,获取session
            self.session = await self._session_context.__aenter__()

            # 初始化会话
            await self.session.initialize()
            print("MCP客户端初始化成功!")

            # 获取可用工具列表
            await self._load_available_tools()

        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"连接服务端失败: {error}")
            raise

    # 异步方法:加载服务端提供的工具列表
    async def _load_available_tools(self) -> None:
        """加载服务端提供的工具列表"""
        try:
            # 获取工具列表响应
            tools_response = await self.session.list_tools()
            # 保存工具列表
            self.available_tools = tools_response.tools
            print(f"发现 {len(self.available_tools)} 个可用工具:")
            # 遍历并打印每个工具的信息
            for tool in self.available_tools:
                print(f"   - {tool.name}: {tool.description}")
        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"加载工具列表失败: {error}")

    # 异步方法:处理用户请求,调用AI和MCP工具
    async def process_request(self, user_input: str) -> str:
        """处理用户请求,使用OpenAI进行智能分析"""
        # 如果未连接到服务端,返回提示
        if not self.session:
            return "客户端未连接到服务端"

        try:
            # 构建用户消息
            conversation_messages = [{"role": "user", "content": user_input}]

            # 获取可用工具列表
            tools_response = await self.session.list_tools()
            tool_definitions = []

            # 构建工具定义列表
            for tool in tools_response.tools:
                tool_definitions.append(
                    {
                        "type": "function",
                        "name": tool.name,
                        "function": {
                            "name": tool.name,
                            "description": tool.description,
                            "parameters": tool.inputSchema,
                        },
                    }
                )

            # 第一次调用OpenAI API,获取AI回复
            first_response = self.openai_client.chat.completions.create(
                model=self.ai_model,
                messages=conversation_messages,
                max_tokens=512,
                tools=tool_definitions,
            )

            # 用于存储最终回复内容
            response_texts = []
            # 获取AI的第一个回复选项
            first_choice = first_response.choices[0]

            # 如果AI要求调用工具
            if first_choice.finish_reason == "tool_calls":
                # 获取第一个工具调用
                first_tool_call = first_choice.message.tool_calls[0]
                # 工具名称
                tool_name = first_tool_call.function.name
                # 工具参数
                tool_arguments = json.loads(first_tool_call.function.arguments)

                # 添加AI的回复到消息历史
                conversation_messages.append(
                    {
                        "role": "assistant",
                        "content": None,
                        "tool_calls": [first_tool_call],
                    }
                )

                # 调用MCP工具
                tool_result = await self.session.call_tool(tool_name, tool_arguments)

                # 添加工具调用结果到消息历史
                conversation_messages.append(
                    {
                        "role": "tool",
                        "content": tool_result.content[0].text,
                        "tool_call_id": first_tool_call.id,
                    }
                )

                # 再次调用OpenAI获取最终回复
                final_response = self.openai_client.chat.completions.create(
                    model=self.ai_model,
                    messages=conversation_messages,
                    max_tokens=512,
                )

                # 记录工具调用及最终AI回复
                response_texts.append(f"调用工具 {tool_name} 参数: {tool_arguments}")
                response_texts.append(final_response.choices[0].message.content)

            # 如果AI直接给出最终回复
            elif first_choice.finish_reason == "stop":
                response_texts.append(first_choice.message.content)

            # 返回拼接后的回复内容
            return "\n".join(response_texts)

        # 捕获异常并返回错误信息
        except Exception as error:
            return f"处理请求时发生错误: {error}"

    # 异步方法:发送邮件
+   async def send_email(
+       self,
+       recipient_email: str,
+       subject: str,
+       body: str,
+       attachment_path: str = None,
+       is_html: bool = False,
+   ) -> str:
+       """发送邮件功能"""
+       try:
            # 打印正在连接SMTP服务器
+           print(
+               f"正在连接SMTP服务器: {self.email_config['smtp_server']}:{self.email_config['smtp_port']}"
+           )

            # 创建邮件对象
+           msg = MIMEMultipart()
            # 设置发件人
+           msg["From"] = self.email_config["sender_email"]
            # 设置收件人
+           msg["To"] = recipient_email
            # 设置主题
+           msg["Subject"] = subject

            # 添加邮件正文
+           if is_html:
                # 如果是HTML格式
+               msg.attach(MIMEText(body, "html", "utf-8"))
+           else:
                # 普通文本格式
+               msg.attach(MIMEText(body, "plain", "utf-8"))

            # 如果有附件,添加附件
+           if attachment_path:
+               try:
                    # 打开附件文件
+                   with open(attachment_path, "rb") as attachment:
+                       part = MIMEBase("application", "octet-stream")
+                       part.set_payload(attachment.read())

                    # 对附件进行base64编码
+                   encoders.encode_base64(part)
                    # 添加附件头
+                   part.add_header(
+                       "Content-Disposition",
+                       f'attachment; filename= {attachment_path.split("/")[-1]}',
+                   )
                    # 添加附件到邮件
+                   msg.attach(part)
+               except FileNotFoundError:
                    # 附件文件未找到
+                   return f"附件文件未找到: {attachment_path}"

            # 连接SMTP服务器并发送邮件
+           if self.email_config.get("use_ssl", False):
                # 如果使用SSL
+               print("正在建立SSL SMTP连接...")
+               import ssl

                # 创建SSL上下文
+               context = ssl.create_default_context()
                # 建立SSL SMTP连接
+               server_smtp = smtplib.SMTP_SSL(
+                   self.email_config["smtp_server"],
+                   self.email_config["smtp_port"],
+                   context=context,
+                   timeout=30,
+               )
+               print("   ✓ SSL连接建立成功")
+           else:
                # 标准SMTP连接
+               print("正在建立标准SMTP连接...")
+               server_smtp = smtplib.SMTP(
+                   self.email_config["smtp_server"],
+                   self.email_config["smtp_port"],
+                   timeout=30,
+               )

                # 如果需要TLS加密
+               if self.email_config.get("use_tls", False):
+                   print("正在启用TLS加密...")
+                   server_smtp.starttls()
+                   print("   ✓ TLS启用成功")

            # 登录邮箱
+           print("正在登录邮箱...")
+           server_smtp.login(
+               self.email_config["sender_email"],
+               self.email_config["sender_password"],
+           )

            # 发送邮件
+           print("正在发送邮件...")
+           text = msg.as_string()
+           server_smtp.sendmail(
+               self.email_config["sender_email"], recipient_email, text
+           )

            # 关闭SMTP连接
+           print("邮件发送完成,正在关闭连接...")
+           server_smtp.quit()

            # 返回成功信息
+           return f"邮件发送成功!收件人: {recipient_email}, 主题: {subject}"

        # 邮箱认证失败
+       except smtplib.SMTPAuthenticationError as auth_error:
+           return f"邮箱认证失败: {str(auth_error)}。请检查邮箱和密码是否正确。"
        # SMTP连接失败
+       except smtplib.SMTPConnectError as connect_error:
+           return (
+               f"SMTP连接失败: {str(connect_error)}。请检查网络连接和SMTP服务器设置。"
+           )
        # 其他SMTP异常
+       except smtplib.SMTPException as smtp_error:
+           return f"SMTP错误: {str(smtp_error)}"
        # 其他异常
+       except Exception as error:
+           return f"邮件发送失败: {str(error)}"

    # 异步方法:创建定时天气邮件任务
+   async def schedule_weather_email(
+       self,
+       recipient_email: str,
+       location: str,
+       schedule_time: str,
+       frequency: str = "daily",
+   ) -> str:
+       """创建定时发送天气邮件的任务"""
+       try:
            # 生成任务ID
+           task_id = f"weather_email_{recipient_email}_{location}_{int(time.time())}"

            # 定义定时任务函数
+           def send_weather_email():
+               try:
                    # 获取天气信息
+                   weather_info = asyncio.run(
+                       self.process_request(f"查询{location}的天气")
+                   )

                    # 将markdown转换为HTML
+                   html_body = self._convert_markdown_to_html(weather_info)

                    # 构建邮件主题
+                   subject = f"天气预报 - {location}"

                    # 发送HTML邮件
+                   asyncio.run(
+                       self.send_email(
+                           recipient_email, subject, html_body, is_html=True
+                       )
+                   )

+               except Exception as e:
                    # 打印定时任务执行失败信息
+                   print(f"定时任务执行失败: {e}")

            # 根据频率设置定时任务
+           if frequency == "daily":
                # 每天定时
+               schedule.every().day.at(schedule_time).do(send_weather_email)
+           elif frequency == "weekly":
                # 每周定时
+               schedule.every().week.at(schedule_time).do(send_weather_email)
+           elif frequency == "monthly":
                # 每月定时
+               schedule.every().month.at(schedule_time).do(send_weather_email)
+           else:
                # 不支持的频率
+               return f"不支持的频率: {frequency}"

            # 存储任务信息
+           self.scheduled_tasks[task_id] = {
+               "type": "weather_email",
+               "recipient": recipient_email,
+               "location": location,
+               "time": schedule_time,
+               "frequency": frequency,
+               "created_at": datetime.now().isoformat(),
+           }

            # 定义运行定时任务的线程函数
+           def run_schedule():
+               while True:
                    # 检查并运行待执行的任务
+                   schedule.run_pending()
                    # 每分钟检查一次
+                   time.sleep(60)

            # 如果定时任务线程未启动,则启动
+           if (
+               not hasattr(self, "_schedule_thread")
+               or not self._schedule_thread.is_alive()
+           ):
+               self._schedule_thread = threading.Thread(
+                   target=run_schedule, daemon=True
+               )
+               self._schedule_thread.start()

            # 返回任务创建成功信息
+           return f"定时任务创建成功!任务ID: {task_id}, 将在每天 {schedule_time} 发送 {location} 的天气信息到 {recipient_email}"

        # 捕获异常并返回错误信息
+       except Exception as error:
+           return f"创建定时任务失败: {str(error)}"

    # 异步方法:列出所有定时任务
+   async def list_scheduled_tasks(self) -> str:
+       """列出所有定时任务"""
+       try:
            # 导入json模块
+           import json

            # 返回定时任务的json字符串
+           return json.dumps(self.scheduled_tasks, ensure_ascii=False, indent=2)
        # 捕获异常并返回错误信息
+       except Exception as error:
+           return f"获取定时任务列表失败: {str(error)}"

    # 异步方法:取消定时任务
+   async def cancel_scheduled_task(self, task_id: str) -> str:
+       """取消指定的定时任务"""
+       try:
            # 如果任务ID存在
+           if task_id in self.scheduled_tasks:
                # 清除定时任务
+               schedule.clear(task_id)
                # 删除任务记录
+               del self.scheduled_tasks[task_id]
                # 返回取消成功信息
+               return f"任务 {task_id} 已成功取消"
+           else:
                # 未找到任务ID
+               return f"未找到任务ID: {task_id}"
        # 捕获异常并返回错误信息
+       except Exception as error:
+           return f"取消任务失败: {str(error)}"

    # 异步方法:发送天气报告邮件
+   async def send_weather_report(
+       self, recipient_email: str, location: str, report_type: str = "current"
+   ) -> str:
+       """发送天气报告邮件"""
+       try:
            # 获取天气信息
+           weather_info = await self.process_request(f"查询{location}的天气")

            # 将markdown转换为HTML
+           html_body = self._convert_markdown_to_html(weather_info)

            # 根据报告类型构建邮件主题
+           if report_type == "current":
+               subject = f"当前天气 - {location}"
+           else:  # forecast
+               subject = f"天气预报 - {location}"

            # 发送HTML邮件
+           result = await self.send_email(
+               recipient_email, subject, html_body, is_html=True
+           )
            # 返回发送结果
+           return result

        # 捕获异常并返回错误信息
+       except Exception as error:
+           return f"发送天气报告失败: {str(error)}"

    # 异步方法:清理资源
    async def cleanup(self) -> None:
        """清理资源"""
        try:
            # 关闭会话上下文
            if self._session_context:
                await self._session_context.__aexit__(None, None, None)
            # 关闭流上下文
            if self._stream_context:
                await self._stream_context.__aexit__(None, None, None)
            # 打印资源清理完成
            print("资源清理完成")
        # 捕获异常并输出错误信息
        except Exception as error:
            print(f"清理资源时发生错误: {error}")

    # 异步方法:智能聊天循环
    async def chat_loop(self) -> None:
        """智能聊天循环模式"""
        # 打印欢迎信息和命令提示
        print("MCP客户端已启动!")
        print("请输入您的内容,输入 'quit' 退出程序")
        print("==================================================")
+       print("新增功能命令:")
+       print("  发送邮件 收件人邮箱 主题 内容")
+       print("  定时天气邮件 收件人邮箱 地点 时间(HH:MM) [频率]")
+       print("  查看定时任务")
+       print("  取消定时任务 任务ID")
+       print("  天气报告邮件 收件人邮箱 地点 [类型:current/forecast]")
+       print("==================================================")

        # 循环读取用户输入
        while True:
            try:
                # 获取用户输入
                user_message = input("\n您的内容: ").strip()
                # 如果输入quit则退出
                if user_message == "quit":
                    print("感谢使用,再见!")
                    break

                # 如果输入为空,提示重新输入
                if not user_message:
                    print("请输入有效内容")
                    continue

                # 检查是否是特殊命令
+               if user_message.startswith("发送邮件"):
                    # 解析邮件命令:发送邮件 收件人 主题 内容
+                   parts = user_message.split(" ", 3)
+                   if len(parts) >= 4:
+                       _, recipient, subject, body = parts
+                       result = await self.send_email(recipient, subject, body)
+                       print("\n" + result)
+                   else:
+                       print("\n邮件命令格式:发送邮件 收件人邮箱 主题 内容")

+               elif user_message.startswith("定时天气邮件"):
                    # 解析定时任务命令:定时天气邮件 收件人 地点 时间 频率
+                   parts = user_message.split(" ", 4)
+                   if len(parts) >= 4:
+                       _, recipient, location, time_str = parts[:4]
+                       frequency = parts[4] if len(parts) > 4 else "daily"
+                       result = await self.schedule_weather_email(
+                           recipient, location, time_str, frequency
+                       )
+                       print("\n" + result)
+                   else:
+                       print(
+                           "\n定时任务命令格式:定时天气邮件 收件人邮箱 地点 时间(HH:MM) [频率]"
+                       )

+               elif user_message.startswith("查看定时任务"):
                    # 查看所有定时任务
+                   result = await self.list_scheduled_tasks()
+                   print("\n" + result)

+               elif user_message.startswith("取消定时任务"):
                    # 解析取消命令:取消定时任务 任务ID
+                   parts = user_message.split(" ", 2)
+                   if len(parts) >= 2:
+                       _, task_id = parts
+                       result = await self.cancel_scheduled_task(task_id)
+                       print("\n" + result)
+                   else:
+                       print("\n取消命令格式:取消定时任务 任务ID")

+               elif user_message.startswith("天气报告邮件"):
                    # 解析报告命令:天气报告邮件 收件人 地点 [类型]
+                   parts = user_message.split(" ", 3)
+                   if len(parts) >= 3:
+                       _, recipient, location = parts[:3]
+                       report_type = parts[3] if len(parts) > 3 else "current"
+                       result = await self.send_weather_report(
+                           recipient, location, report_type
+                       )
+                       print("\n" + result)
+                   else:
+                       print(
+                           "\n报告命令格式:天气报告邮件 收件人邮箱 地点 [类型:current/forecast]"
+                       )

+               else:
                    # 处理普通用户请求并输出AI回复
+                   ai_response = await self.process_request(user_message)
+                   print("\n" + ai_response)

            # 捕获Ctrl+C中断
            except KeyboardInterrupt:
                print("\n\n程序被用户中断,正在退出...")
                break
            # 捕获其他异常
            except Exception as error:
                print(f"发生未预期的错误: {error}")

# 定义主函数
async def main():
    """主函数"""
    # 创建MCP客户端实例
    mcp_client = WeatherMCPClient()

    try:
        # 连接服务端
        await mcp_client.connect_to_server()

        # 启动智能聊天模式
        await mcp_client.chat_loop()

    # 捕获主流程异常
    except Exception as error:
        print(f"程序运行失败: {error}")
    finally:
        # 清理资源
        await mcp_client.cleanup()

# 判断是否为主程序入口
if __name__ == "__main__":
    # 运行主程序
    asyncio.run(main())

8.3. weather_server.py #

weather_server.py

# 导入json模块,用于处理JSON数据
import json

# 导入httpx模块,用于异步HTTP请求
import httpx

# 导入sys模块,用于标准错误输出
import sys

# 导入smtplib模块,用于发送邮件
+import smtplib

# 导入MIMEText类,用于构建邮件正文
+from email.mime.text import MIMEText

# 导入MIMEMultipart类,用于构建带附件的邮件
+from email.mime.multipart import MIMEMultipart

# 导入MIMEBase类,用于构建邮件附件
+from email.mime.base import MIMEBase

# 导入encoders模块,用于对附件进行编码
+from email import encoders

# 导入异步、定时任务和时间相关模块
+import asyncio
+import schedule
+import time
+from datetime import datetime
+import threading

# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP

# 创建FastMCP服务器实例,名称为"weather-forecast-server"
server = FastMCP("weather-forecast-server")

# 尝试导入配置文件中的EMAIL_CONFIG和AMAP_CONFIG
+try:
+   from config import EMAIL_CONFIG, AMAP_CONFIG
+except ImportError:
    # 如果导入失败,使用默认配置并打印警告
+   print("警告: 未找到config.py文件,使用默认配置")
+   EMAIL_CONFIG = {
+       "smtp_server": "smtp.qq.com",
+       "smtp_port": 465,
+       "sender_email": "your_email@qq.com",
+       "sender_password": "your_app_password",
+       "use_ssl": True,
+       "use_tls": False,
+   }
+   AMAP_CONFIG = {
+       "api_key": "your_amap_api_key",
+       "base_url": "https://restapi.amap.com",
+   }

# 定义高德地图API的基础URL
+API_BASE_URL = AMAP_CONFIG["base_url"]
# 定义高德地图API的密钥
+API_KEY = AMAP_CONFIG["api_key"]

# 定义定时任务的全局存储字典
+scheduled_tasks = {}


# 注册名为"get_weather_forecast"的工具到FastMCP服务器
@server.tool("get_weather_forecast")
# 定义异步函数,根据地址查询天气预报信息
async def get_weather_forecast(location: str) -> str:
    """
    根据地址查询天气预报信息

    Args:
        location: 需要查询天气的地址或城市名称

    Returns:
        天气信息的JSON字符串
    """
    # 创建异步HTTP客户端
    async with httpx.AsyncClient() as http_client:
        try:
            # 第一步:地理编码,获取城市代码
            geo_response = await http_client.get(
                f"{API_BASE_URL}/v3/geocode/geo",
                params={"key": API_KEY, "address": location},
            )
            # 将查询地址和返回内容写入标准错误输出,便于调试
            sys.stderr.write(location + geo_response.text)

            # 如果地理编码请求失败,返回错误信息
            if geo_response.status_code != 200:
                return f"地理编码请求失败: {geo_response.status_code} - {geo_response.text}"

            # 解析地理编码返回的JSON数据
            geo_data = json.loads(geo_response.text)
            # 如果API返回状态不是"1",说明有错误,返回错误信息
            if geo_data["status"] != "1":
                return f"地理编码API错误: {geo_data['info']}"

            # 提取城市代码(adcode)
            city_code = geo_data["geocodes"][0]["adcode"]

            # 第二步:根据城市代码获取天气信息
            weather_response = await http_client.get(
                f"{API_BASE_URL}/v3/weather/weatherInfo",
                params={"key": API_KEY, "city": city_code},
            )

            # 如果天气查询请求失败,返回错误信息
            if weather_response.status_code != 200:
                return f"天气查询请求失败: {weather_response.status_code} - {weather_response.text}"

            # 解析天气查询返回的JSON数据
            weather_data = json.loads(weather_response.text)
            # 如果API返回状态不是"1",说明有错误,返回错误信息
            if weather_data["status"] != "1":
                return f"天气查询API错误: {weather_data['info']}"

            # 返回天气数据的原始JSON字符串
            return weather_response.text

        # 捕获异常并返回异常信息
        except Exception as error:
            return f"查询天气时发生异常: {str(error)}"


# 注册名为"send_email"的工具到FastMCP服务器
+@server.tool("send_email")
# 定义异步函数,发送邮件
+async def send_email(
+   recipient_email: str, subject: str, body: str, attachment_path: str = None
+) -> str:
+   """
+   发送邮件功能

+   Args:
+       recipient_email: 收件人邮箱
+       subject: 邮件主题
+       body: 邮件正文
+       attachment_path: 附件路径(可选)

+   Returns:
+       发送结果信息
+   """
+   try:
        # 创建邮件对象
+       msg = MIMEMultipart()
        # 设置发件人
+       msg["From"] = EMAIL_CONFIG["sender_email"]
        # 设置收件人
+       msg["To"] = recipient_email
        # 设置邮件主题
+       msg["Subject"] = subject

        # 添加邮件正文
+       msg.attach(MIMEText(body, "plain", "utf-8"))

        # 如果有附件,添加附件
+       if attachment_path:
+           try:
                # 以二进制方式打开附件文件
+               with open(attachment_path, "rb") as attachment:
+                   part = MIMEBase("application", "octet-stream")
+                   part.set_payload(attachment.read())

                # 对附件内容进行base64编码
+               encoders.encode_base64(part)
                # 添加附件头信息
+               part.add_header(
+                   "Content-Disposition",
+                   f'attachment; filename= {attachment_path.split("/")[-1]}',
+               )
                # 将附件添加到邮件对象
+               msg.attach(part)
+           except FileNotFoundError:
                # 如果附件文件未找到,返回错误信息
+               return f"附件文件未找到: {attachment_path}"

        # 判断是否使用SSL方式连接SMTP服务器
+       if EMAIL_CONFIG.get("use_ssl", False):
            # 打印正在建立SSL连接的信息
+           print(
+               f"正在建立SSL SMTP连接: {EMAIL_CONFIG['smtp_server']}:{EMAIL_CONFIG['smtp_port']}"
+           )
            # 导入ssl模块
+           import ssl

            # 创建SSL上下文
+           context = ssl.create_default_context()
            # 建立SSL SMTP连接
+           server_smtp = smtplib.SMTP_SSL(
+               EMAIL_CONFIG["smtp_server"],
+               EMAIL_CONFIG["smtp_port"],
+               context=context,
+               timeout=30,
+           )
            # 打印SSL连接建立成功
+           print("   ✓ SSL连接建立成功")
+       else:
            # 打印正在建立标准SMTP连接的信息
+           print(
+               f"正在建立标准SMTP连接: {EMAIL_CONFIG['smtp_server']}:{EMAIL_CONFIG['smtp_port']}"
+           )
            # 建立普通SMTP连接
+           server_smtp = smtplib.SMTP(
+               EMAIL_CONFIG["smtp_server"], EMAIL_CONFIG["smtp_port"], timeout=30
+           )

            # 判断是否启用TLS加密
+           if EMAIL_CONFIG.get("use_tls", False):
                # 打印启用TLS加密的信息
+               print("正在启用TLS加密...")
                # 启用TLS
+               server_smtp.starttls()
                # 打印TLS启用成功
+               print("   ✓ TLS启用成功")

        # 打印正在登录邮箱
+       print("正在登录邮箱...")
        # 登录邮箱
+       server_smtp.login(EMAIL_CONFIG["sender_email"], EMAIL_CONFIG["sender_password"])

        # 打印正在发送邮件
+       print("正在发送邮件...")
        # 将邮件对象转为字符串
+       text = msg.as_string()
        # 发送邮件
+       server_smtp.sendmail(EMAIL_CONFIG["sender_email"], recipient_email, text)

        # 打印邮件发送完成,正在关闭连接
+       print("邮件发送完成,正在关闭连接...")
        # 关闭SMTP连接
+       server_smtp.quit()

        # 返回发送成功信息
+       return f"邮件发送成功!收件人: {recipient_email}, 主题: {subject}"

+   except Exception as error:
        # 捕获异常并返回错误信息
+       return f"邮件发送失败: {str(error)}"


# 注册名为"schedule_weather_email"的工具到FastMCP服务器
+@server.tool("schedule_weather_email")
# 定义异步函数,定时发送天气邮件
+async def schedule_weather_email(
+   recipient_email: str, location: str, schedule_time: str, frequency: str = "daily"
+) -> str:
+   """
+   定时发送天气邮件功能

+   Args:
+       recipient_email: 收件人邮箱
+       location: 天气查询地点
+       schedule_time: 发送时间(格式:HH:MM)
+       frequency: 频率(daily, weekly, monthly)

+   Returns:
+       定时任务创建结果
+   """
+   try:
        # 生成任务ID,包含收件人、地点和当前时间戳
+       task_id = f"weather_email_{recipient_email}_{location}_{int(time.time())}"

        # 定义定时任务的执行函数
+       def send_weather_email():
+           try:
                # 获取天气信息
+               weather_info = asyncio.run(get_weather_forecast(location))
                # 解析天气信息为字典
+               weather_data = json.loads(weather_info)

                # 构建邮件主题
+               subject = f"天气预报 - {location}"
                # 构建邮件正文
+               body = f"""
+您好!

+以下是 {location} 的天气预报信息:

+{json.dumps(weather_data, ensure_ascii=False, indent=2)}

+祝您生活愉快!
+天气服务系统
+               """

                # 发送邮件
+               asyncio.run(send_email(recipient_email, subject, body))

+           except Exception as e:
                # 打印定时任务执行失败信息
+               print(f"定时任务执行失败: {e}")

        # 根据频率设置定时任务
+       if frequency == "daily":
            # 每天定时
+           schedule.every().day.at(schedule_time).do(send_weather_email)
+       elif frequency == "weekly":
            # 每周定时
+           schedule.every().week.at(schedule_time).do(send_weather_email)
+       elif frequency == "monthly":
            # 每月定时
+           schedule.every().month.at(schedule_time).do(send_weather_email)
+       else:
            # 不支持的频率,返回错误信息
+           return f"不支持的频率: {frequency}"

        # 存储任务信息到全局字典
+       scheduled_tasks[task_id] = {
+           "type": "weather_email",
+           "recipient": recipient_email,
+           "location": location,
+           "time": schedule_time,
+           "frequency": frequency,
+           "created_at": datetime.now().isoformat(),
+       }

        # 定义运行定时任务的线程函数
+       def run_schedule():
+           while True:
                # 检查并运行待执行的任务
+               schedule.run_pending()
                # 每分钟检查一次
+               time.sleep(60)  # 每分钟检查一次

        # 如果定时任务线程未启动,则启动
+       if (
+           not hasattr(server, "_schedule_thread")
+           or not server._schedule_thread.is_alive()
+       ):
            # 创建并启动定时任务线程
+           server._schedule_thread = threading.Thread(target=run_schedule, daemon=True)
+           server._schedule_thread.start()

        # 返回任务创建成功信息
+       return f"定时任务创建成功!任务ID: {task_id}, 将在每天 {schedule_time} 发送 {location} 的天气信息到 {recipient_email}"

+   except Exception as error:
        # 捕获异常并返回错误信息
+       return f"创建定时任务失败: {str(error)}"


# 注册名为"list_scheduled_tasks"的工具到FastMCP服务器
+@server.tool("list_scheduled_tasks")
# 定义异步函数,列出所有定时任务
+async def list_scheduled_tasks() -> str:
+   """
+   列出所有定时任务

+   Returns:
+       定时任务列表的JSON字符串
+   """
+   try:
        # 返回定时任务的json字符串
+       return json.dumps(scheduled_tasks, ensure_ascii=False, indent=2)
+   except Exception as error:
        # 捕获异常并返回错误信息
+       return f"获取定时任务列表失败: {str(error)}"


# 注册名为"cancel_scheduled_task"的工具到FastMCP服务器
+@server.tool("cancel_scheduled_task")
# 定义异步函数,取消指定的定时任务
+async def cancel_scheduled_task(task_id: str) -> str:
+   """
+   取消指定的定时任务

+   Args:
+       task_id: 任务ID

+   Returns:
+       取消结果信息
+   """
+   try:
        # 判断任务ID是否存在
+       if task_id in scheduled_tasks:
            # 清除定时任务
+           schedule.clear(task_id)
            # 删除任务记录
+           del scheduled_tasks[task_id]
            # 返回取消成功信息
+           return f"任务 {task_id} 已成功取消"
+       else:
            # 未找到任务ID
+           return f"未找到任务ID: {task_id}"
+   except Exception as error:
        # 捕获异常并返回错误信息
+       return f"取消任务失败: {str(error)}"


# 注册名为"send_weather_report"的工具到FastMCP服务器
+@server.tool("send_weather_report")
# 定义异步函数,发送天气报告邮件
+async def send_weather_report(
+   recipient_email: str, location: str, report_type: str = "current"
+) -> str:
+   """
+   发送天气报告邮件

+   Args:
+       recipient_email: 收件人邮箱
+       location: 天气查询地点
+       report_type: 报告类型(current: 当前天气, forecast: 天气预报)

+   Returns:
+       发送结果信息
+   """
+   try:
        # 获取天气信息
+       weather_info = await get_weather_forecast(location)
        # 解析天气信息为字典
+       weather_data = json.loads(weather_info)

        # 根据报告类型构建邮件内容
+       if report_type == "current":
            # 当前天气邮件主题
+           subject = f"当前天气 - {location}"
            # 当前天气邮件正文
+           body = f"""
+您好!

+以下是 {location} 的当前天气信息:

+{json.dumps(weather_data, ensure_ascii=False, indent=2)}

+祝您生活愉快!
+天气服务系统
+           """
+       else:  # forecast
            # 天气预报邮件主题
+           subject = f"天气预报 - {location}"
            # 天气预报邮件正文
+           body = f"""
+您好!

+以下是 {location} 的天气预报信息:

+{json.dumps(weather_data, ensure_ascii=False, indent=2)}

+祝您生活愉快!
+天气服务系统
+           """

        # 发送邮件
+       result = await send_email(recipient_email, subject, body)
        # 返回发送结果
+       return result

+   except Exception as error:
        # 捕获异常并返回错误信息
+       return f"发送天气报告失败: {str(error)}"


# 判断当前模块是否为主程序入口
if __name__ == "__main__":
    # 启动FastMCP服务器,使用stdio作为传输方式
    server.run(transport="stdio")

访问验证

请输入访问令牌

Token不正确,请重新输入