1.故障排除与最佳实践 #
本章目标:
- 掌握 MCP 开发中的常见问题诊断与解决方法
- 学习性能优化技巧与最佳实践
- 掌握调试技巧与工具使用方法
- 了解生产环境部署的安全考虑
2. 常见问题诊断与解决 #
2.1 连接问题 #
2.1.1 问题:服务器启动失败 #
症状:
Error: ModuleNotFoundError: No module named 'mcp'解决方案:
# 1. 检查虚拟环境是否正确激活
call .venv\Scripts\activate
# 2. 验证 MCP SDK 安装
pip list | findstr mcp
# 3. 重新安装(如果未安装)
pip install mcp2.1.2 问题:权限错误 #
症状:
PermissionError: [Errno 13] Permission denied解决方案:
# 1. 以管理员身份运行命令提示符
# 2. 检查文件权限
icacls C:\mcp-project
# 3. 修改权限(如果需要)
icacls C:\mcp-project /grant Users:F2.2 运行时错误 #
2.2.1 问题:工具调用失败 #
症状:
Error: Tool 'unknown_tool' not found解决方案:
# 检查工具注册
@mcp.tool()
async def my_tool(ctx: Context) -> str:
return "Hello World"
# 确保装饰器正确使用
# 检查函数名称拼写
# 验证参数类型注解2.2.2 问题:结构化输出错误 #
症状:
Error: Invalid structured output format解决方案:
# 1. 使用正确的类型注解
# 导入Dict和Any用于类型注解
from typing import Dict, Any
# 使用mcp.tool装饰器注册工具
@mcp.tool()
# 定义异步函数,返回类型为Dict[str, Any]
async def my_tool(ctx: Context) -> Dict[str, Any]:
# 返回一个包含状态和数据的字典
return {"status": "success", "data": "example"}
# 2. 或者使用 Pydantic 模型
# 导入Pydantic的BaseModel基类
from pydantic import BaseModel
# 定义一个Pydantic模型,包含status和data字段
class ToolResult(BaseModel):
status: str
data: str
# 使用mcp.tool装饰器注册工具
@mcp.tool()
# 定义异步函数,返回类型为ToolResult模型
async def my_tool(ctx: Context) -> ToolResult:
# 返回一个ToolResult实例,包含状态和数据
return ToolResult(status="success", data="example")2. 性能优化技巧 #
2.3 异步编程最佳实践 #
2.3.1 避免阻塞操作 #
# 错误:同步阻塞操作
@mcp.tool()
async def slow_tool(ctx: Context) -> str:
import time
time.sleep(5) # 阻塞整个事件循环
return "Done"
# 正确:异步非阻塞操作
@mcp.tool()
async def fast_tool(ctx: Context) -> str:
await asyncio.sleep(5) # 非阻塞等待
return "Done"2.3.2 并发执行多个操作 #
# 错误:顺序执行
# 依次调用异步操作,每次等待上一个完成,效率低
@mcp.tool()
async def sequential_tool(ctx: Context) -> list[str]:
# 初始化结果列表
results = []
# 循环3次,依次执行异步操作
for i in range(3):
# 等待每个异步操作完成(顺序执行)
result = await some_async_operation(i)
# 将结果添加到列表
results.append(result)
# 返回所有结果
return results
# 正确:并发执行
# 同时发起多个异步操作,提高执行效率
@mcp.tool()
async def concurrent_tool(ctx: Context) -> list[str]:
# 构建3个异步任务列表
tasks = [some_async_operation(i) for i in range(3)]
# 并发执行所有任务,等待全部完成
results = await asyncio.gather(*tasks)
# 返回所有结果
return results2.4 内存管理 #
2.4.1 避免内存泄漏 #
# 错误:可能的内存泄漏
# 定义一个缓存类,没有大小限制,可能导致内存泄漏
class Cache:
# 初始化方法,创建一个空字典用于缓存
def __init__(self):
self._cache = {}
# 添加缓存项的方法
def add(self, key: str, value: Any):
self._cache[key] = value # 无限制增长
# 正确:限制缓存大小
# 定义一个带有最大容量限制的缓存类
class LimitedCache:
# 初始化方法,设置最大缓存大小,默认1000
def __init__(self, max_size: int = 1000):
self._cache = {}
self._max_size = max_size
# 添加缓存项的方法
def add(self, key: str, value: Any):
# 如果缓存已满,则移除最旧的条目
if len(self._cache) >= self._max_size:
# 获取最早插入的key
oldest_key = next(iter(self._cache))
# 删除最早插入的缓存项
del self._cache[oldest_key]
# 添加新的缓存项
self._cache[key] = value2.4.2 资源清理 #
# 正确:使用上下文管理器
# 装饰器,声明该函数为MCP工具
@mcp.tool()
# 定义异步文件操作函数,接收上下文和文件路径参数,返回字符串
async def file_operation(ctx: Context, file_path: str) -> str:
# 使用异步上下文管理器打开文件,模式为只读
async with aiofiles.open(file_path, 'r') as f:
# 异步读取文件内容
content = await f.read()
# 返回读取到的内容
return content
# 正确:手动清理资源
# 装饰器,声明该函数为MCP工具
@mcp.tool()
# 定义异步数据库操作函数,接收上下文参数,返回字符串
async def database_operation(ctx: Context) -> str:
# 异步创建数据库连接
db = await create_database_connection()
try:
# 执行异步查询操作
result = await db.query("SELECT * FROM table")
# 返回查询结果的字符串表示
return str(result)
finally:
# 无论是否异常,最后都要关闭数据库连接,防止资源泄漏
await db.close()3. 调试技巧与工具 #
2.5 日志记录 #
2.5.1 结构化日志 #
# 导入日志模块
import logging
# 导入json模块,用于序列化日志为json格式
import json
# 导入datetime模块,用于获取当前UTC时间
from datetime import datetime
# 定义结构化日志格式化器,继承自logging.Formatter
class StructuredFormatter(logging.Formatter):
# 重写format方法,实现自定义日志格式
def format(self, record):
# 构建日志条目的字典
log_entry = {
"timestamp": datetime.utcnow().isoformat(), # 当前UTC时间,ISO格式
"level": record.levelname, # 日志级别
"logger": record.name, # 日志记录器名称
"message": record.getMessage(), # 日志消息内容
"module": record.module, # 模块名
"function": record.funcName, # 函数名
"line": record.lineno # 行号
}
# 如果有异常信息,则添加到日志条目中
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
# 如果有额外字段,则合并到日志条目中
if hasattr(record, "extra_fields"):
log_entry.update(record.extra_fields)
# 返回json格式的日志字符串,确保中文不转义
return json.dumps(log_entry, ensure_ascii=False)
# 结构化日志配置示例
def setup_structured_logging():
# 获取名为"mcp_server"的日志记录器
logger = logging.getLogger("mcp_server")
# 创建一个流式处理器(输出到控制台)
handler = logging.StreamHandler()
# 设置处理器的格式化器为结构化格式化器
handler.setFormatter(StructuredFormatter())
# 将处理器添加到日志记录器
logger.addHandler(handler)
# 设置日志级别为DEBUG
logger.setLevel(logging.DEBUG)
# 返回配置好的日志记录器
return logger2.5.2 上下文感知日志 #
# 在工具中使用上下文日志
# 装饰器,声明该函数为mcp工具
@mcp.tool()
# 定义异步工具函数,接收上下文和消息参数
async def debug_tool(ctx: Context, message: str) -> str:
# 记录工具调用开始的日志,包含工具名、用户ID和请求ID等上下文信息
ctx.info(f"工具调用开始", extra={
"tool_name": "debug_tool",
"user_id": getattr(ctx, "user_id", "unknown"),
"request_id": getattr(ctx, "request_id", "unknown")
})
try:
# 执行消息处理操作,调用异步处理函数
result = await process_message(message)
# 记录工具调用成功的日志,包含结果长度和处理耗时
ctx.info("工具调用成功", extra={
"result_length": len(result),
"processing_time": "100ms"
})
# 返回处理结果
return result
except Exception as e:
# 捕获异常并记录错误日志,包含错误类型和错误信息
ctx.error("工具调用失败", extra={
"error_type": type(e).__name__,
"error_message": str(e)
})
# 重新抛出异常
raise2.6 性能监控 #
2.6.1 执行时间监控 #
# 导入time模块用于计时
import time
# 导入functools模块用于装饰器
import functools
# 从typing模块导入类型注解
from typing import Callable, Any
# 定义性能监控装饰器
def monitor_performance(func: Callable) -> Callable:
"""性能监控装饰器"""
# 保留原函数元信息
@functools.wraps(func)
# 定义异步包装函数
async def wrapper(*args, **kwargs) -> Any:
# 记录开始时间
start_time = time.time()
# 记录开始时的内存使用量
start_memory = get_memory_usage()
try:
# 执行被装饰的异步函数
result = await func(*args, **kwargs)
# 返回函数执行结果
return result
finally:
# 记录结束时间
end_time = time.time()
# 记录结束时的内存使用量
end_memory = get_memory_usage()
# 计算执行时间
execution_time = end_time - start_time
# 计算内存变化量
memory_delta = end_memory - start_memory
# 记录性能指标
log_performance_metrics(
function_name=func.__name__,
execution_time=execution_time,
memory_delta=memory_delta
)
# 返回包装后的函数
return wrapper
# 使用示例
# 注册为mcp工具
@mcp.tool()
# 应用性能监控装饰器
@monitor_performance
# 定义异步工具函数
async def monitored_tool(ctx: Context) -> str:
# 模拟工作,休眠1秒
await asyncio.sleep(1)
# 返回结果
return "Performance monitored"2.6.2 内存使用监控 #
# 导入psutil库,用于获取系统和进程信息
import psutil
# 导入os库,用于获取当前进程ID
import os
# 获取当前进程的内存使用量(以字节为单位)
def get_memory_usage() -> int:
"""获取当前进程内存使用量(字节)"""
# 创建当前进程的psutil进程对象
process = psutil.Process(os.getpid())
# 返回当前进程的常驻内存集(RSS)大小
return process.memory_info().rss
# 记录性能指标,包括函数名、执行时间和内存变化量
def log_performance_metrics(function_name: str, execution_time: float, memory_delta: int):
"""记录性能指标"""
# 获取名为"performance"的日志记录器
logger = logging.getLogger("performance")
# 记录性能指标信息,包含函数名、执行时间(毫秒)、内存变化(KB)和时间戳
logger.info("性能指标", extra={
"function_name": function_name,
"execution_time_ms": execution_time * 1000,
"memory_delta_kb": memory_delta / 1024,
"timestamp": datetime.utcnow().isoformat()
})3. 安全最佳实践 #
3.1 输入验证 #
3.1.1 参数验证 #
# 导入pydantic的BaseModel和validator用于数据验证
from pydantic import BaseModel, validator
# 导入正则表达式模块
import re
# 定义文件操作请求的数据模型
class FileOperationRequest(BaseModel):
# 文件路径,类型为字符串
file_path: str
# 文件内容,类型为字符串
content: str
# 对file_path字段进行自定义验证
@validator('file_path')
def validate_file_path(cls, v):
# 防止路径遍历攻击,禁止出现'..'和以'/'或'\'开头的路径
if '..' in v or v.startswith('/') or v.startswith('\\'):
raise ValueError("无效的文件路径")
# 只允许特定的文件扩展名
allowed_extensions = ['.txt', '.log', '.json']
# 检查文件扩展名是否在允许列表中
if not any(v.endswith(ext) for ext in allowed_extensions):
raise ValueError("不支持的文件类型")
# 返回验证通过的文件路径
return v
# 对content字段进行自定义验证
@validator('content')
def validate_content(cls, v):
# 限制内容长度不能超过1MB
if len(v) > 1024 * 1024: # 1MB
raise ValueError("内容过长")
# 返回验证通过的内容
return v
# 注册为mcp工具
@mcp.tool()
# 定义异步安全文件操作函数
async def safe_file_operation(ctx: Context, request: FileOperationRequest) -> str:
# 使用验证后的参数,返回安全操作提示
return f"安全操作:{request.file_path}"3.2 访问控制 #
3.2.1 基于角色的权限控制 #
# 导入枚举类型Enum
from enum import Enum
# 导入可选类型Optional
from typing import Optional
# 定义用户角色的枚举类
class UserRole(Enum):
# 只读用户
READER = "reader"
# 可写用户
WRITER = "writer"
# 管理员用户
ADMIN = "admin"
# 定义权限检查器类
class PermissionChecker:
# 初始化方法
def __init__(self):
# 定义每种操作对应的允许角色
self.permissions = {
"read_file": [UserRole.READER, UserRole.WRITER, UserRole.ADMIN], # 读文件权限
"write_file": [UserRole.WRITER, UserRole.ADMIN], # 写文件权限
"delete_file": [UserRole.ADMIN] # 删除文件权限
}
# 检查指定角色是否有权限执行某操作
def check_permission(self, operation: str, user_role: UserRole) -> bool:
# 判断用户角色是否在允许列表中
return user_role in self.permissions.get(operation, [])
# 创建权限检查器实例
permission_checker = PermissionChecker()
# 注册为mcp工具
@mcp.tool()
# 定义异步安全文件操作函数
async def secure_file_operation(
ctx: Context,
operation: str,
file_path: str,
user_role: UserRole
) -> str:
# 检查用户角色是否有权限执行该操作
if not permission_checker.check_permission(operation, user_role):
# 如果无权限则抛出异常
raise PermissionError(f"用户角色 {user_role.value} 无权执行操作 {operation}")
# 权限通过后执行操作
return f"执行操作:{operation} on {file_path}"5. 生产环境部署 #
4.1 进程管理 #
4.1.1 使用 Supervisor 管理进程 #
创建 C:\mcp-project\supervisor.conf:
; 定义名为 mcp-server 的进程
[program:mcp-server]
; 指定启动命令,运行 integration_server.py
command=python C:\mcp-project\integration_server.py
; 设置工作目录为 C:\mcp-project
directory=C:\mcp-project
; 以 SYSTEM 用户身份运行进程
user=SYSTEM
; 启动 supervisor 时自动启动该进程
autostart=true
; 进程异常退出时自动重启
autorestart=true
; 标准错误重定向到标准输出
redirect_stderr=true
; 指定标准输出日志文件路径
stdout_logfile=C:\mcp-project\logs\mcp_server.log
; 单个日志文件最大为 50MB
stdout_logfile_maxbytes=50MB
; 最多保留 10 个日志文件备份
stdout_logfile_backups=10使用 Windows 服务 #
创建 C:\mcp-project\install_service.bat:
@echo off
:: 输出安装提示信息
echo 安装 MCP 服务器为 Windows 服务...
:: 创建名为 "MCP Server" 的 Windows 服务,指定 Python 解释器和主程序路径,设置为自动启动
sc create "MCP Server" binPath= "C:\mcp-project\.venv\Scripts\python.exe C:\mcp-project\integration_server.py" start= auto
:: 设置服务的描述信息
sc description "MCP Server" "Model Context Protocol Server for AI Integration"
:: 启动刚刚创建的服务
sc start "MCP Server"
:: 输出安装完成提示
echo 服务安装完成!
:: 暂停,等待用户按任意键继续
pause4.2 健康检查 #
4.2.1 健康检查端点 #
# 注册为 mcp 的工具
@mcp.tool()
# 定义异步健康检查函数,接收上下文参数,返回字典类型
async def health_check(ctx: Context) -> dict[str, Any]:
"""系统健康检查"""
try:
# 异步检查数据库连接状态
db_status = await check_database_connection()
# 异步检查文件系统状态
fs_status = await check_file_system()
# 检查内存使用情况
memory_status = check_memory_usage()
# 如果所有组件都健康,则整体状态为 healthy,否则为 unhealthy
overall_status = "healthy" if all([
db_status["healthy"],
fs_status["healthy"],
memory_status["healthy"]
]) else "unhealthy"
# 返回健康检查结果,包括状态、时间戳和各组件状态
return {
"status": overall_status,
"timestamp": datetime.utcnow().isoformat(),
"components": {
"database": db_status,
"filesystem": fs_status,
"memory": memory_status
}
}
except Exception as e:
# 捕获异常并记录错误信息到上下文
ctx.error(f"健康检查失败:{e}")
# 返回错误状态和异常信息
return {
"status": "error",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}