ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1.故障排除与最佳实践
  • 2. 常见问题诊断与解决
    • 2.1 连接问题
      • 2.1.1 问题:服务器启动失败
      • 2.1.2 问题:权限错误
    • 2.2 运行时错误
      • 2.2.1 问题:工具调用失败
      • 2.2.2 问题:结构化输出错误
  • 2. 性能优化技巧
    • 2.3 异步编程最佳实践
      • 2.3.1 避免阻塞操作
      • 2.3.2 并发执行多个操作
    • 2.4 内存管理
      • 2.4.1 避免内存泄漏
      • 2.4.2 资源清理
  • 3. 调试技巧与工具
    • 2.5 日志记录
      • 2.5.1 结构化日志
      • 2.5.2 上下文感知日志
    • 2.6 性能监控
      • 2.6.1 执行时间监控
      • 2.6.2 内存使用监控
  • 3. 安全最佳实践
    • 3.1 输入验证
      • 3.1.1 参数验证
    • 3.2 访问控制
      • 3.2.1 基于角色的权限控制
  • 5. 生产环境部署
    • 4.1 进程管理
      • 4.1.1 使用 Supervisor 管理进程
      • 使用 Windows 服务
    • 4.2 健康检查
      • 4.2.1 健康检查端点

1.故障排除与最佳实践 #

本章目标:

  • 掌握 MCP 开发中的常见问题诊断与解决方法
  • 学习性能优化技巧与最佳实践
  • 掌握调试技巧与工具使用方法
  • 了解生产环境部署的安全考虑

2. 常见问题诊断与解决 #

2.1 连接问题 #

2.1.1 问题:服务器启动失败 #

症状:

Error: ModuleNotFoundError: No module named 'mcp'

解决方案:

# 1. 检查虚拟环境是否正确激活
call .venv\Scripts\activate

# 2. 验证 MCP SDK 安装
pip list | findstr mcp

# 3. 重新安装(如果未安装)
pip install mcp

2.1.2 问题:权限错误 #

症状:

PermissionError: [Errno 13] Permission denied

解决方案:

# 1. 以管理员身份运行命令提示符
# 2. 检查文件权限
icacls C:\mcp-project

# 3. 修改权限(如果需要)
icacls C:\mcp-project /grant Users:F

2.2 运行时错误 #

2.2.1 问题:工具调用失败 #

症状:

Error: Tool 'unknown_tool' not found

解决方案:

# 检查工具注册
@mcp.tool()
async def my_tool(ctx: Context) -> str:
    return "Hello World"

# 确保装饰器正确使用
# 检查函数名称拼写
# 验证参数类型注解

2.2.2 问题:结构化输出错误 #

症状:

Error: Invalid structured output format

解决方案:

# 1. 使用正确的类型注解
# 导入Dict和Any用于类型注解
from typing import Dict, Any

# 使用mcp.tool装饰器注册工具
@mcp.tool()
# 定义异步函数,返回类型为Dict[str, Any]
async def my_tool(ctx: Context) -> Dict[str, Any]:
    # 返回一个包含状态和数据的字典
    return {"status": "success", "data": "example"}

# 2. 或者使用 Pydantic 模型
# 导入Pydantic的BaseModel基类
from pydantic import BaseModel

# 定义一个Pydantic模型,包含status和data字段
class ToolResult(BaseModel):
    status: str
    data: str

# 使用mcp.tool装饰器注册工具
@mcp.tool()
# 定义异步函数,返回类型为ToolResult模型
async def my_tool(ctx: Context) -> ToolResult:
    # 返回一个ToolResult实例,包含状态和数据
    return ToolResult(status="success", data="example")

2. 性能优化技巧 #

2.3 异步编程最佳实践 #

2.3.1 避免阻塞操作 #

#  错误:同步阻塞操作
@mcp.tool()
async def slow_tool(ctx: Context) -> str:
    import time
    time.sleep(5)  # 阻塞整个事件循环
    return "Done"

#  正确:异步非阻塞操作
@mcp.tool()
async def fast_tool(ctx: Context) -> str:
    await asyncio.sleep(5)  # 非阻塞等待
    return "Done"

2.3.2 并发执行多个操作 #

#  错误:顺序执行
# 依次调用异步操作,每次等待上一个完成,效率低
@mcp.tool()
async def sequential_tool(ctx: Context) -> list[str]:
    # 初始化结果列表
    results = []
    # 循环3次,依次执行异步操作
    for i in range(3):
        # 等待每个异步操作完成(顺序执行)
        result = await some_async_operation(i)
        # 将结果添加到列表
        results.append(result)
    # 返回所有结果
    return results

#  正确:并发执行
# 同时发起多个异步操作,提高执行效率
@mcp.tool()
async def concurrent_tool(ctx: Context) -> list[str]:
    # 构建3个异步任务列表
    tasks = [some_async_operation(i) for i in range(3)]
    # 并发执行所有任务,等待全部完成
    results = await asyncio.gather(*tasks)
    # 返回所有结果
    return results

2.4 内存管理 #

2.4.1 避免内存泄漏 #

#  错误:可能的内存泄漏
# 定义一个缓存类,没有大小限制,可能导致内存泄漏
class Cache:
    # 初始化方法,创建一个空字典用于缓存
    def __init__(self):
        self._cache = {}

    # 添加缓存项的方法
    def add(self, key: str, value: Any):
        self._cache[key] = value  # 无限制增长

#  正确:限制缓存大小
# 定义一个带有最大容量限制的缓存类
class LimitedCache:
    # 初始化方法,设置最大缓存大小,默认1000
    def __init__(self, max_size: int = 1000):
        self._cache = {}
        self._max_size = max_size

    # 添加缓存项的方法
    def add(self, key: str, value: Any):
        # 如果缓存已满,则移除最旧的条目
        if len(self._cache) >= self._max_size:
            # 获取最早插入的key
            oldest_key = next(iter(self._cache))
            # 删除最早插入的缓存项
            del self._cache[oldest_key]
        # 添加新的缓存项
        self._cache[key] = value

2.4.2 资源清理 #

#  正确:使用上下文管理器

# 装饰器,声明该函数为MCP工具
@mcp.tool()
# 定义异步文件操作函数,接收上下文和文件路径参数,返回字符串
async def file_operation(ctx: Context, file_path: str) -> str:
    # 使用异步上下文管理器打开文件,模式为只读
    async with aiofiles.open(file_path, 'r') as f:
        # 异步读取文件内容
        content = await f.read()
        # 返回读取到的内容
        return content

#  正确:手动清理资源

# 装饰器,声明该函数为MCP工具
@mcp.tool()
# 定义异步数据库操作函数,接收上下文参数,返回字符串
async def database_operation(ctx: Context) -> str:
    # 异步创建数据库连接
    db = await create_database_connection()
    try:
        # 执行异步查询操作
        result = await db.query("SELECT * FROM table")
        # 返回查询结果的字符串表示
        return str(result)
    finally:
        # 无论是否异常,最后都要关闭数据库连接,防止资源泄漏
        await db.close()

3. 调试技巧与工具 #

2.5 日志记录 #

2.5.1 结构化日志 #

# 导入日志模块
import logging
# 导入json模块,用于序列化日志为json格式
import json
# 导入datetime模块,用于获取当前UTC时间
from datetime import datetime

# 定义结构化日志格式化器,继承自logging.Formatter
class StructuredFormatter(logging.Formatter):
    # 重写format方法,实现自定义日志格式
    def format(self, record):
        # 构建日志条目的字典
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),  # 当前UTC时间,ISO格式
            "level": record.levelname,                  # 日志级别
            "logger": record.name,                      # 日志记录器名称
            "message": record.getMessage(),             # 日志消息内容
            "module": record.module,                    # 模块名
            "function": record.funcName,                # 函数名
            "line": record.lineno                       # 行号
        }

        # 如果有异常信息,则添加到日志条目中
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)

        # 如果有额外字段,则合并到日志条目中
        if hasattr(record, "extra_fields"):
            log_entry.update(record.extra_fields)

        # 返回json格式的日志字符串,确保中文不转义
        return json.dumps(log_entry, ensure_ascii=False)

# 结构化日志配置示例
def setup_structured_logging():
    # 获取名为"mcp_server"的日志记录器
    logger = logging.getLogger("mcp_server")
    # 创建一个流式处理器(输出到控制台)
    handler = logging.StreamHandler()
    # 设置处理器的格式化器为结构化格式化器
    handler.setFormatter(StructuredFormatter())
    # 将处理器添加到日志记录器
    logger.addHandler(handler)
    # 设置日志级别为DEBUG
    logger.setLevel(logging.DEBUG)
    # 返回配置好的日志记录器
    return logger

2.5.2 上下文感知日志 #

# 在工具中使用上下文日志

# 装饰器,声明该函数为mcp工具
@mcp.tool()
# 定义异步工具函数,接收上下文和消息参数
async def debug_tool(ctx: Context, message: str) -> str:
    # 记录工具调用开始的日志,包含工具名、用户ID和请求ID等上下文信息
    ctx.info(f"工具调用开始", extra={
        "tool_name": "debug_tool",
        "user_id": getattr(ctx, "user_id", "unknown"),
        "request_id": getattr(ctx, "request_id", "unknown")
    })

    try:
        # 执行消息处理操作,调用异步处理函数
        result = await process_message(message)

        # 记录工具调用成功的日志,包含结果长度和处理耗时
        ctx.info("工具调用成功", extra={
            "result_length": len(result),
            "processing_time": "100ms"
        })

        # 返回处理结果
        return result
    except Exception as e:
        # 捕获异常并记录错误日志,包含错误类型和错误信息
        ctx.error("工具调用失败", extra={
            "error_type": type(e).__name__,
            "error_message": str(e)
        })
        # 重新抛出异常
        raise

2.6 性能监控 #

2.6.1 执行时间监控 #

# 导入time模块用于计时
import time
# 导入functools模块用于装饰器
import functools
# 从typing模块导入类型注解
from typing import Callable, Any

# 定义性能监控装饰器
def monitor_performance(func: Callable) -> Callable:
    """性能监控装饰器"""
    # 保留原函数元信息
    @functools.wraps(func)
    # 定义异步包装函数
    async def wrapper(*args, **kwargs) -> Any:
        # 记录开始时间
        start_time = time.time()
        # 记录开始时的内存使用量
        start_memory = get_memory_usage()

        try:
            # 执行被装饰的异步函数
            result = await func(*args, **kwargs)
            # 返回函数执行结果
            return result
        finally:
            # 记录结束时间
            end_time = time.time()
            # 记录结束时的内存使用量
            end_memory = get_memory_usage()

            # 计算执行时间
            execution_time = end_time - start_time
            # 计算内存变化量
            memory_delta = end_memory - start_memory

            # 记录性能指标
            log_performance_metrics(
                function_name=func.__name__,
                execution_time=execution_time,
                memory_delta=memory_delta
            )

    # 返回包装后的函数
    return wrapper

# 使用示例
# 注册为mcp工具
@mcp.tool()
# 应用性能监控装饰器
@monitor_performance
# 定义异步工具函数
async def monitored_tool(ctx: Context) -> str:
    # 模拟工作,休眠1秒
    await asyncio.sleep(1)
    # 返回结果
    return "Performance monitored"

2.6.2 内存使用监控 #

# 导入psutil库,用于获取系统和进程信息
import psutil
# 导入os库,用于获取当前进程ID
import os

# 获取当前进程的内存使用量(以字节为单位)
def get_memory_usage() -> int:
    """获取当前进程内存使用量(字节)"""
    # 创建当前进程的psutil进程对象
    process = psutil.Process(os.getpid())
    # 返回当前进程的常驻内存集(RSS)大小
    return process.memory_info().rss

# 记录性能指标,包括函数名、执行时间和内存变化量
def log_performance_metrics(function_name: str, execution_time: float, memory_delta: int):
    """记录性能指标"""
    # 获取名为"performance"的日志记录器
    logger = logging.getLogger("performance")
    # 记录性能指标信息,包含函数名、执行时间(毫秒)、内存变化(KB)和时间戳
    logger.info("性能指标", extra={
        "function_name": function_name,
        "execution_time_ms": execution_time * 1000,
        "memory_delta_kb": memory_delta / 1024,
        "timestamp": datetime.utcnow().isoformat()
    })

3. 安全最佳实践 #

3.1 输入验证 #

3.1.1 参数验证 #

# 导入pydantic的BaseModel和validator用于数据验证
from pydantic import BaseModel, validator
# 导入正则表达式模块
import re

# 定义文件操作请求的数据模型
class FileOperationRequest(BaseModel):
    # 文件路径,类型为字符串
    file_path: str
    # 文件内容,类型为字符串
    content: str

    # 对file_path字段进行自定义验证
    @validator('file_path')
    def validate_file_path(cls, v):
        # 防止路径遍历攻击,禁止出现'..'和以'/'或'\'开头的路径
        if '..' in v or v.startswith('/') or v.startswith('\\'):
            raise ValueError("无效的文件路径")

        # 只允许特定的文件扩展名
        allowed_extensions = ['.txt', '.log', '.json']
        # 检查文件扩展名是否在允许列表中
        if not any(v.endswith(ext) for ext in allowed_extensions):
            raise ValueError("不支持的文件类型")

        # 返回验证通过的文件路径
        return v

    # 对content字段进行自定义验证
    @validator('content')
    def validate_content(cls, v):
        # 限制内容长度不能超过1MB
        if len(v) > 1024 * 1024:  # 1MB
            raise ValueError("内容过长")
        # 返回验证通过的内容
        return v

# 注册为mcp工具
@mcp.tool()
# 定义异步安全文件操作函数
async def safe_file_operation(ctx: Context, request: FileOperationRequest) -> str:
    # 使用验证后的参数,返回安全操作提示
    return f"安全操作:{request.file_path}"

3.2 访问控制 #

3.2.1 基于角色的权限控制 #

# 导入枚举类型Enum
from enum import Enum
# 导入可选类型Optional
from typing import Optional

# 定义用户角色的枚举类
class UserRole(Enum):
    # 只读用户
    READER = "reader"
    # 可写用户
    WRITER = "writer"
    # 管理员用户
    ADMIN = "admin"

# 定义权限检查器类
class PermissionChecker:
    # 初始化方法
    def __init__(self):
        # 定义每种操作对应的允许角色
        self.permissions = {
            "read_file": [UserRole.READER, UserRole.WRITER, UserRole.ADMIN],  # 读文件权限
            "write_file": [UserRole.WRITER, UserRole.ADMIN],                  # 写文件权限
            "delete_file": [UserRole.ADMIN]                                   # 删除文件权限
        }

    # 检查指定角色是否有权限执行某操作
    def check_permission(self, operation: str, user_role: UserRole) -> bool:
        # 判断用户角色是否在允许列表中
        return user_role in self.permissions.get(operation, [])

# 创建权限检查器实例
permission_checker = PermissionChecker()

# 注册为mcp工具
@mcp.tool()
# 定义异步安全文件操作函数
async def secure_file_operation(
    ctx: Context, 
    operation: str, 
    file_path: str,
    user_role: UserRole
) -> str:
    # 检查用户角色是否有权限执行该操作
    if not permission_checker.check_permission(operation, user_role):
        # 如果无权限则抛出异常
        raise PermissionError(f"用户角色 {user_role.value} 无权执行操作 {operation}")

    # 权限通过后执行操作
    return f"执行操作:{operation} on {file_path}"

5. 生产环境部署 #

4.1 进程管理 #

4.1.1 使用 Supervisor 管理进程 #

创建 C:\mcp-project\supervisor.conf:

; 定义名为 mcp-server 的进程
[program:mcp-server]
; 指定启动命令,运行 integration_server.py
command=python C:\mcp-project\integration_server.py
; 设置工作目录为 C:\mcp-project
directory=C:\mcp-project
; 以 SYSTEM 用户身份运行进程
user=SYSTEM
; 启动 supervisor 时自动启动该进程
autostart=true
; 进程异常退出时自动重启
autorestart=true
; 标准错误重定向到标准输出
redirect_stderr=true
; 指定标准输出日志文件路径
stdout_logfile=C:\mcp-project\logs\mcp_server.log
; 单个日志文件最大为 50MB
stdout_logfile_maxbytes=50MB
; 最多保留 10 个日志文件备份
stdout_logfile_backups=10

使用 Windows 服务 #

创建 C:\mcp-project\install_service.bat:

@echo off
:: 输出安装提示信息
echo 安装 MCP 服务器为 Windows 服务...

:: 创建名为 "MCP Server" 的 Windows 服务,指定 Python 解释器和主程序路径,设置为自动启动
sc create "MCP Server" binPath= "C:\mcp-project\.venv\Scripts\python.exe C:\mcp-project\integration_server.py" start= auto

:: 设置服务的描述信息
sc description "MCP Server" "Model Context Protocol Server for AI Integration"

:: 启动刚刚创建的服务
sc start "MCP Server"

:: 输出安装完成提示
echo 服务安装完成!

:: 暂停,等待用户按任意键继续
pause

4.2 健康检查 #

4.2.1 健康检查端点 #

# 注册为 mcp 的工具
@mcp.tool()
# 定义异步健康检查函数,接收上下文参数,返回字典类型
async def health_check(ctx: Context) -> dict[str, Any]:
    """系统健康检查"""
    try:
        # 异步检查数据库连接状态
        db_status = await check_database_connection()

        # 异步检查文件系统状态
        fs_status = await check_file_system()

        # 检查内存使用情况
        memory_status = check_memory_usage()

        # 如果所有组件都健康,则整体状态为 healthy,否则为 unhealthy
        overall_status = "healthy" if all([
            db_status["healthy"],
            fs_status["healthy"],
            memory_status["healthy"]
        ]) else "unhealthy"

        # 返回健康检查结果,包括状态、时间戳和各组件状态
        return {
            "status": overall_status,
            "timestamp": datetime.utcnow().isoformat(),
            "components": {
                "database": db_status,
                "filesystem": fs_status,
                "memory": memory_status
            }
        }
    except Exception as e:
        # 捕获异常并记录错误信息到上下文
        ctx.error(f"健康检查失败:{e}")
        # 返回错误状态和异常信息
        return {
            "status": "error",
            "error": str(e),
            "timestamp": datetime.utcnow().isoformat()
        }

访问验证

请输入访问令牌

Token不正确,请重新输入