1.初始化项目 #
uv init mcp_server2. 运行本地服务器 #
npx @modelcontextprotocol/inspector python mcp_server.py3.启动服务器和Inspector #
3.1 mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
"capabilities": {},
"serverInfo": self.server_info,
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
# 如果方法为initialize
if method == "initialize":
# 调用initialize方法处理
result = self.initialize(params)
# 返回成功响应
return {"jsonrpc": "2.0", "id": request_id, "result": result}
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Method {method} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()
3.2 test_mcp_server.py #
test_mcp_server.py
# 导入json模块,用于处理JSON数据
import json
# 导入subprocess模块,用于创建和管理子进程
import subprocess
# 导入sys模块,用于访问与Python解释器相关的变量和函数
import sys
# 定义测试MCP服务器的函数
def test_mcp_server():
"""测试MCP服务器功能"""
# 打印测试开始信息
print("启动MCP服务器测试...")
# 启动mcp_server.py子进程,设置标准输入、输出和错误为管道,文本模式
process = subprocess.Popen(
[sys.executable, "mcp_server.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
try:
# 1. 发送initialize请求
initialize_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0.0"},
},
}
print("1. 发送initialize请求...")
process.stdin.write(json.dumps(initialize_request) + "\n")
process.stdin.flush()
response = process.stdout.readline()
print(f"收到响应: {response}")
# 2. 发送initialized通知
initialized_notification = {
"jsonrpc": "2.0",
"method": "notifications/initialized",
}
print("2. 发送initialized通知...")
process.stdin.write(json.dumps(initialized_notification) + "\n")
process.stdin.flush()
# 3. 测试未知方法
unknown_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "unknown_method",
"params": {},
}
print("3. 发送未知方法请求...")
process.stdin.write(json.dumps(unknown_request) + "\n")
process.stdin.flush()
response = process.stdout.readline()
print(f"收到响应: {response}")
except Exception as e:
print(f"测试过程中出错: {e}")
finally:
process.terminate()
process.wait()
print("测试完成")
# 如果当前脚本作为主程序运行,则执行测试函数
if __name__ == "__main__":
test_mcp_server()4.实现ping #
4.1. mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
"capabilities": {},
"serverInfo": self.server_info,
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
+ def _handle_initialize(self, request_id):
+ """处理initialize请求"""
+ return {"jsonrpc": "2.0", "id": request_id, "result": self.initialize({})}
+ def _handle_ping(self, request_id):
+ """处理ping请求"""
+ return {"jsonrpc": "2.0", "id": request_id, "result": {}}
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
if method == "initialize":
+ return self._handle_initialize(request_id)
+ elif method == "ping":
+ return self._handle_ping(request_id)
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Method {method} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()
5.工具列表 #
5.1. mcp_client.py #
mcp_client.py
# 导入subprocess模块,用于创建和管理子进程
import subprocess
# 导入sys模块,提供对Python解释器相关的变量和函数的访问
import sys
# 导入json模块,用于处理JSON数据
import json
# 导入os模块,提供与操作系统交互的功能
import os
# 导入logging模块,用于日志记录
import logging
# 从dotenv包导入load_dotenv函数,用于加载环境变量
from dotenv import load_dotenv
# 加载.env文件中的环境变量
load_dotenv() # 从.env加载环境变量
# 配置日志记录的函数
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器,指定日志输出格式
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置控制台处理器的日志格式
console_handler.setFormatter(formatter)
# 配置根日志记录器,设置日志级别和处理器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 设置日志记录器
logger = setup_logging()
# 定义MCPClient类
class MCPClient:
# 构造函数,初始化客户端对象
def __init__(self):
# 服务器进程对象
self.process = None
# 请求ID,每次请求自增
self.request_id = 1
# 可用工具列表
self.available_tools = []
# 连接状态标志
self._connected = False
# 关闭状态标志
self._shutdown = False
# 初始化服务器连接
def _initialize_server(self):
"""初始化服务器连接"""
# 记录调试日志,发送初始化请求
logger.debug("发送初始化请求...")
# 发送初始化请求
init_response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {"name": "mcp-client", "version": "1.0.0"},
},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否有result字段
if "result" not in init_response:
logger.error(f"初始化失败: {init_response}")
return False
# 发送initialized通知
logger.debug("发送initialized通知...")
self._send_request(
{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}
)
# 记录服务器初始化成功
logger.info("服务器初始化成功")
return True
# ping服务器,测试连通性
def ping(self):
"""Ping服务器"""
# 发送ping请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "ping",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 返回响应
return response
# 发送JSON-RPC请求到服务器
def _send_request(self, request):
"""发送JSON-RPC请求到服务器"""
# 检查服务器进程是否存在且未退出
if not self.process or self.process.poll() is not None:
logger.error("服务器连接已断开")
return {"error": "Server connection lost"}
# 将请求对象序列化为JSON字符串
request_str = json.dumps(request, ensure_ascii=False)
# 记录调试日志,请求方法和ID
logger.debug(
f"发送请求: {request.get('method', 'unknown')} (ID: {request.get('id', 'none')})"
)
try:
# 向服务器进程写入请求
self.process.stdin.write(request_str + "\n")
self.process.stdin.flush()
except Exception as e:
# 发送请求时出错,记录错误日志
logger.error(f"发送请求时出错: {e}")
return {"error": f"发送失败: {e}"}
# 检查是否为通知类请求(没有id字段)
if "id" not in request:
logger.debug("通知类请求,不等待响应")
return {"result": "notification_sent"}
# 读取服务器响应
logger.debug("等待响应...")
try:
# 读取一行响应
response_line = self.process.stdout.readline()
if response_line:
try:
# 解析JSON响应
response = json.loads(response_line.strip())
logger.debug(f"收到响应: {response.get('result', 'error')}")
return response
except json.JSONDecodeError as e:
# JSON解析错误
logger.error(f"JSON解析错误: {e}")
return {"error": f"JSON解析失败: {e}"}
else:
# 没有收到响应
logger.error("没有收到响应")
return {"error": "没有响应"}
except Exception as e:
# 读取响应时出错
logger.error(f"读取响应时出错: {e}")
return {"error": f"读取失败: {e}"}
# 加载工具列表
def _load_tools(self):
"""加载工具列表"""
# 记录调试日志
logger.debug("获取工具列表...")
# 发送tools/list请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/list",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否包含工具列表
if "result" in response and "tools" in response["result"]:
self.available_tools = response["result"]["tools"]
logger.info(f"可用工具: {[tool['name'] for tool in self.available_tools]}")
else:
logger.error(f"获取工具列表失败: {response}")
# 加载所有可用功能
def _load_all_capabilities(self):
"""加载所有可用功能"""
# 记录日志,开始加载功能
logger.info("正在加载服务器功能...")
# 初始化服务器
if not self._initialize_server():
return
# 加载工具列表
self._load_tools()
# 记录日志,功能加载完成
logger.info("功能加载完成!")
# 连接到MCP服务器
def connect_to_server(self, server_script_path):
"""连接到MCP服务器"""
# 检查服务器脚本路径是否存在
if not os.path.exists(server_script_path):
raise FileNotFoundError(f"服务器脚本不存在: {server_script_path}")
# 检查脚本文件是否为.py结尾
if not server_script_path.endswith(".py"):
logger.warning(f"服务器脚本可能不是Python文件: {server_script_path}")
# 启动服务器进程
self.process = subprocess.Popen(
[sys.executable, server_script_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0, # 改为无缓冲
)
# 加载所有可用功能
self._load_all_capabilities()
# 更新连接状态
self._connected = True
# 列出所有可用工具
def _list_tools(self):
# 工具列表字符串数组
tool_list = []
# 遍历所有可用工具
for tool in self.available_tools:
# 获取工具名称
name = tool.get("name", "Unknown")
# 获取工具描述
description = tool.get("description", "No description")
# 添加到工具列表
tool_list.append(f" - {name}: {description}")
# 返回格式化后的工具列表字符串
return f"可用工具:\n" + "\n".join(tool_list)
# 处理用户输入的命令
def process_query(self, query):
"""处理用户查询"""
# 去除首尾空白并按空格分割
parts = query.strip().split()
# 如果没有输入内容,提示输入有效命令
if not parts:
return "请输入有效的命令"
# 获取命令关键字并转为小写
command = parts[0].lower()
try:
# 判断命令类型并调用相应方法
if command == "help":
return self._show_help()
elif command == "tools":
return self._list_tools()
elif command == "ping":
result = self.ping()
return f"Ping结果: {result}"
else:
return f"未知命令: {command}\n输入 'help' 查看帮助"
except Exception as e:
# 执行命令时出错,记录日志并返回错误信息
logger.error(f"执行命令时出错: {str(e)}")
return f"执行命令时出错: {str(e)}"
# 显示帮助信息
def _show_help(self):
return """
=== MCP客户端帮助 ===
基础命令:
help - 显示此帮助
ping - 测试服务器连接
工具相关:
tools - 列出所有工具
示例:
help
ping
tools
"""
# 运行交互式聊天循环
def chat_loop(self):
"""运行交互式聊天循环"""
# 记录客户端启动日志
logger.info("=== MCP客户端已启动! ===")
# 提示用户输入help查看命令
print("输入 'help' 查看所有可用命令")
# 提示用户输入quit退出
print("输入 'quit' 退出")
# 循环读取用户输入
while True:
try:
# 读取用户输入命令
query = input("\n命令: ").strip()
# 如果输入quit则退出循环
if query.lower() == "quit":
logger.info("用户退出客户端")
break
# 处理用户命令并输出结果
response = self.process_query(query)
print(f"\n{response}")
except Exception as e:
# 处理命令时出错,记录日志并输出错误
logger.error(f"处理命令时出错: {str(e)}")
print(f"\n错误: {str(e)}")
# 清理资源
def cleanup(self):
"""清理资源"""
# 设置关闭和断开标志
self._shutdown = True
self._connected = False
try:
# 如果服务器进程存在
if self.process:
logger.debug("终止服务器进程")
# 终止服务器进程
self.process.terminate()
# 等待进程结束,设置超时
try:
self.process.wait(timeout=5)
logger.info("服务器进程已正常终止")
except subprocess.TimeoutExpired:
logger.warning("服务器进程未在5秒内终止,强制杀死")
self.process.kill()
self.process.wait()
logger.info("服务器进程已强制终止")
except Exception as e:
# 清理资源时出错,记录日志
logger.error(f"清理资源时出错: {e}")
finally:
# 释放进程对象
self.process = None
# 主函数,程序入口
def main():
# 检查命令行参数数量
if len(sys.argv) < 2:
logger.error("用法: python client.py <服务器脚本路径>")
print("用法: python client.py <服务器脚本路径>")
sys.exit(1)
# 记录启动日志
logger.info(f"启动MCP客户端,连接服务器: {sys.argv[1]}")
# 创建客户端对象
client = MCPClient()
try:
# 连接服务器并进入交互循环
client.connect_to_server(sys.argv[1])
client.chat_loop()
except Exception as e:
# 客户端运行出错,记录日志并输出错误
logger.error(f"客户端运行出错: {e}")
print(f"错误: {e}")
finally:
# 清理客户端资源
logger.info("清理客户端资源")
client.cleanup()
# 判断是否为主程序入口
if __name__ == "__main__":
main()
5.2. tools.py #
tools.py
tools = {
"get_current_time": {
"name": "get_current_time",
"description": "Get current time",
"inputSchema": {
"type": "object",
"properties": {
"format": {
"type": "string",
"description": "Time format (optional)",
"enum": ["iso", "timestamp", "readable"],
}
},
},
},
"calculate": {
"name": "calculate",
"description": "Perform simple mathematical calculations",
"inputSchema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression, e.g. '2 + 3 * 4'",
}
},
"required": ["expression"],
},
},
"echo": {
"name": "echo",
"description": "Echo input message",
"inputSchema": {
"type": "object",
"properties": {
"message": {"type": "string", "description": "Message to echo"}
},
"required": ["message"],
},
},
}
5.3. mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
+from tools import tools
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
+ self.tools = tools
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
+ "capabilities": {"tools": {"listChanged": True}},
"serverInfo": self.server_info,
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
def _handle_initialize(self, request_id):
"""处理initialize请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": self.initialize({})}
def _handle_ping(self, request_id):
"""处理ping请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": {}}
+ def _handle_list_tools(self, request_id):
+ """Handle tools list request"""
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {"tools": list(self.tools.values())},
+ }
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
if method == "initialize":
return self._handle_initialize(request_id)
elif method == "ping":
return self._handle_ping(request_id)
+ elif method == "tools/list":
+ return self._handle_list_tools(request_id)
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Method {method} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()6.执行工具 #
6.1. mcp_client.py #
mcp_client.py
# 导入subprocess模块,用于创建和管理子进程
import subprocess
# 导入sys模块,提供对Python解释器相关的变量和函数的访问
import sys
# 导入json模块,用于处理JSON数据
import json
# 导入os模块,提供与操作系统交互的功能
import os
# 导入logging模块,用于日志记录
import logging
# 从dotenv包导入load_dotenv函数,用于加载环境变量
from dotenv import load_dotenv
# 加载.env文件中的环境变量
load_dotenv() # 从.env加载环境变量
# 配置日志记录的函数
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器,指定日志输出格式
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置控制台处理器的日志格式
console_handler.setFormatter(formatter)
# 配置根日志记录器,设置日志级别和处理器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 设置日志记录器
logger = setup_logging()
# 定义MCPClient类
class MCPClient:
# 构造函数,初始化客户端对象
def __init__(self):
# 服务器进程对象
self.process = None
# 请求ID,每次请求自增
self.request_id = 1
# 可用工具列表
self.available_tools = []
# 连接状态标志
self._connected = False
# 关闭状态标志
self._shutdown = False
# 初始化服务器连接
def _initialize_server(self):
"""初始化服务器连接"""
# 记录调试日志,发送初始化请求
logger.debug("发送初始化请求...")
# 发送初始化请求
init_response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {"name": "mcp-client", "version": "1.0.0"},
},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否有result字段
if "result" not in init_response:
logger.error(f"初始化失败: {init_response}")
return False
# 发送initialized通知
logger.debug("发送initialized通知...")
self._send_request(
{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}
)
# 记录服务器初始化成功
logger.info("服务器初始化成功")
return True
# ping服务器,测试连通性
def ping(self):
"""Ping服务器"""
# 发送ping请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "ping",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 返回响应
return response
# 发送JSON-RPC请求到服务器
def _send_request(self, request):
"""发送JSON-RPC请求到服务器"""
# 检查服务器进程是否存在且未退出
if not self.process or self.process.poll() is not None:
logger.error("服务器连接已断开")
return {"error": "Server connection lost"}
# 将请求对象序列化为JSON字符串
request_str = json.dumps(request, ensure_ascii=False)
# 记录调试日志,请求方法和ID
logger.debug(
f"发送请求: {request.get('method', 'unknown')} (ID: {request.get('id', 'none')})"
)
try:
# 向服务器进程写入请求
self.process.stdin.write(request_str + "\n")
self.process.stdin.flush()
except Exception as e:
# 发送请求时出错,记录错误日志
logger.error(f"发送请求时出错: {e}")
return {"error": f"发送失败: {e}"}
# 检查是否为通知类请求(没有id字段)
if "id" not in request:
logger.debug("通知类请求,不等待响应")
return {"result": "notification_sent"}
# 读取服务器响应
logger.debug("等待响应...")
try:
# 读取一行响应
response_line = self.process.stdout.readline()
if response_line:
try:
# 解析JSON响应
response = json.loads(response_line.strip())
logger.debug(f"收到响应: {response.get('result', 'error')}")
return response
except json.JSONDecodeError as e:
# JSON解析错误
logger.error(f"JSON解析错误: {e}")
return {"error": f"JSON解析失败: {e}"}
else:
# 没有收到响应
logger.error("没有收到响应")
return {"error": "没有响应"}
except Exception as e:
# 读取响应时出错
logger.error(f"读取响应时出错: {e}")
return {"error": f"读取失败: {e}"}
# 加载工具列表
def _load_tools(self):
"""加载工具列表"""
# 记录调试日志
logger.debug("获取工具列表...")
# 发送tools/list请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/list",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否包含工具列表
if "result" in response and "tools" in response["result"]:
self.available_tools = response["result"]["tools"]
logger.info(f"可用工具: {[tool['name'] for tool in self.available_tools]}")
else:
logger.error(f"获取工具列表失败: {response}")
# 加载所有可用功能
def _load_all_capabilities(self):
"""加载所有可用功能"""
# 记录日志,开始加载功能
logger.info("正在加载服务器功能...")
# 初始化服务器
if not self._initialize_server():
return
# 加载工具列表
self._load_tools()
# 记录日志,功能加载完成
logger.info("功能加载完成!")
# 连接到MCP服务器
def connect_to_server(self, server_script_path):
"""连接到MCP服务器"""
# 检查服务器脚本路径是否存在
if not os.path.exists(server_script_path):
raise FileNotFoundError(f"服务器脚本不存在: {server_script_path}")
# 检查脚本文件是否为.py结尾
if not server_script_path.endswith(".py"):
logger.warning(f"服务器脚本可能不是Python文件: {server_script_path}")
# 启动服务器进程
self.process = subprocess.Popen(
[sys.executable, server_script_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0, # 改为无缓冲
)
# 加载所有可用功能
self._load_all_capabilities()
# 更新连接状态
self._connected = True
# 列出所有可用工具
def _list_tools(self):
# 工具列表字符串数组
tool_list = []
# 遍历所有可用工具
for tool in self.available_tools:
# 获取工具名称
name = tool.get("name", "Unknown")
# 获取工具描述
description = tool.get("description", "No description")
# 添加到工具列表
tool_list.append(f" - {name}: {description}")
# 返回格式化后的工具列表字符串
return f"可用工具:\n" + "\n".join(tool_list)
# 调用工具的方法
+ def call_tool(self, tool_name, arguments):
+ """调用工具"""
# 发送调用工具请求
+ response = self._send_request(
+ {
+ "jsonrpc": "2.0",
+ "id": self.request_id,
+ "method": "tools/call",
+ "params": {"name": tool_name, "arguments": arguments},
+ }
+ )
# 请求ID递增
+ self.request_id += 1
+ return response
# 调用工具的内部方法
+ def _call_tool(self, tool_name, args):
+ """调用工具"""
+ arguments = {}
# 针对不同工具解析参数
+ if tool_name == "get_current_time":
+ if args and args[0] in ["iso", "timestamp", "readable"]:
+ arguments["format"] = args[0]
+ elif tool_name == "calculate":
+ if args:
# 对于calculate,直接使用第一个参数作为表达式
+ arguments["expression"] = args[0]
+ else:
+ return "calculate 工具需要表达式参数"
+ elif tool_name == "echo":
+ if args:
# 对于echo,直接使用第一个参数作为消息
+ arguments["message"] = args[0]
+ else:
+ return "echo 工具需要消息参数"
# 记录日志:调用工具
+ logger.debug(f"调用工具: {tool_name} 参数: {arguments}")
# 调用工具
+ result = self.call_tool(tool_name, arguments)
# 检查结果
+ if "result" in result and "content" in result["result"]:
+ logger.debug(f"工具 {tool_name} 调用成功")
+ return result["result"]["content"]
+ else:
+ logger.error(f"工具 {tool_name} 调用失败: {result}")
+ return f"工具调用失败: {result}"
# 处理用户输入的命令
def process_query(self, query):
"""处理用户查询"""
# 去除首尾空白并按空格分割
parts = query.strip().split()
# 如果没有输入内容,提示输入有效命令
if not parts:
return "请输入有效的命令"
# 获取命令关键字并转为小写
command = parts[0].lower()
try:
# 判断命令类型并调用相应方法
if command == "help":
return self._show_help()
elif command == "tools":
return self._list_tools()
elif command == "ping":
result = self.ping()
return f"Ping结果: {result}"
+ elif command in ["get_current_time", "calculate", "echo"]:
+ return self._call_tool(command, parts[1:])
else:
return f"未知命令: {command}\n输入 'help' 查看帮助"
except Exception as e:
# 执行命令时出错,记录日志并返回错误信息
logger.error(f"执行命令时出错: {str(e)}")
return f"执行命令时出错: {str(e)}"
# 显示帮助信息
def _show_help(self):
return """
=== MCP客户端帮助 ===
基础命令:
help - 显示此帮助
ping - 测试服务器连接
工具相关:
tools - 列出所有工具
+ get_current_time [format] - 获取当前时间 (format: iso/timestamp/readable)
+ calculate <expression> - 计算表达式
+ echo <message> - 回显消息
示例:
help
ping
tools
"""
# 运行交互式聊天循环
def chat_loop(self):
"""运行交互式聊天循环"""
# 记录客户端启动日志
logger.info("=== MCP客户端已启动! ===")
# 提示用户输入help查看命令
print("输入 'help' 查看所有可用命令")
# 提示用户输入quit退出
print("输入 'quit' 退出")
# 循环读取用户输入
while True:
try:
# 读取用户输入命令
query = input("\n命令: ").strip()
# 如果输入quit则退出循环
if query.lower() == "quit":
logger.info("用户退出客户端")
break
# 处理用户命令并输出结果
response = self.process_query(query)
print(f"\n{response}")
except Exception as e:
# 处理命令时出错,记录日志并输出错误
logger.error(f"处理命令时出错: {str(e)}")
print(f"\n错误: {str(e)}")
# 清理资源
def cleanup(self):
"""清理资源"""
# 设置关闭和断开标志
self._shutdown = True
self._connected = False
try:
# 如果服务器进程存在
if self.process:
logger.debug("终止服务器进程")
# 终止服务器进程
self.process.terminate()
# 等待进程结束,设置超时
try:
self.process.wait(timeout=5)
logger.info("服务器进程已正常终止")
except subprocess.TimeoutExpired:
logger.warning("服务器进程未在5秒内终止,强制杀死")
self.process.kill()
self.process.wait()
logger.info("服务器进程已强制终止")
except Exception as e:
# 清理资源时出错,记录日志
logger.error(f"清理资源时出错: {e}")
finally:
# 释放进程对象
self.process = None
# 主函数,程序入口
def main():
# 检查命令行参数数量
if len(sys.argv) < 2:
logger.error("用法: python client.py <服务器脚本路径>")
print("用法: python client.py <服务器脚本路径>")
sys.exit(1)
# 记录启动日志
logger.info(f"启动MCP客户端,连接服务器: {sys.argv[1]}")
# 创建客户端对象
client = MCPClient()
try:
# 连接服务器并进入交互循环
client.connect_to_server(sys.argv[1])
client.chat_loop()
except Exception as e:
# 客户端运行出错,记录日志并输出错误
logger.error(f"客户端运行出错: {e}")
print(f"错误: {e}")
finally:
# 清理客户端资源
logger.info("清理客户端资源")
client.cleanup()
# 判断是否为主程序入口
if __name__ == "__main__":
main()
6.2. mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
+from datetime import datetime
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
from tools import tools
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
self.tools = tools
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
"capabilities": {"tools": {"listChanged": True}},
"serverInfo": self.server_info,
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
def _handle_initialize(self, request_id):
"""处理initialize请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": self.initialize({})}
def _handle_ping(self, request_id):
"""处理ping请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": {}}
def _handle_list_tools(self, request_id):
"""Handle tools list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"tools": list(self.tools.values())},
}
# 处理获取时间工具
+ def _handle_get_time(self, arguments):
+ """Handle get time tool"""
# 获取格式类型,默认为readable
+ format_type = arguments.get("format", "readable")
# 获取当前时间
+ now = datetime.now()
# 根据格式类型返回不同格式
+ if format_type == "iso":
+ result = now.isoformat()
+ elif format_type == "timestamp":
+ result = str(now.timestamp())
+ else: # readable
+ result = now.strftime("%Y-%m-%d %H:%M:%S")
# 返回时间文本
+ return [{"type": "text", "text": f"Current time: {result}"}]
# 处理计算工具
+ def _handle_calculate(self, arguments):
+ """Handle calculate tool"""
# 获取表达式
+ expression = arguments.get("expression", "")
+ try:
# 安全检查表达式字符
+ allowed_chars = set("0123456789+-*/.() ")
+ if not all(c in allowed_chars for c in expression):
+ raise ValueError("Expression contains disallowed characters")
# 计算表达式
+ result = eval(expression)
# 返回计算结果
+ return [
+ {"type": "text", "text": f"Calculation result: {expression} = {result}"}
+ ]
+ except Exception as e:
# 计算失败
+ return [{"type": "text", "text": f"Calculation failed: {str(e)}"}]
# 处理回显工具
+ def _handle_echo(self, arguments):
+ """Handle echo tool"""
# 获取消息
+ message = arguments.get("message", "")
# 返回回显内容
+ return [{"type": "text", "text": f"Echo: {message}"}]
# 处理工具调用请求
+ def _handle_call_tool(self, request_id, params):
+ """Handle tool call request"""
# 获取工具名称
+ name = params.get("name")
# 获取参数
+ arguments = params.get("arguments", {})
+ try:
# 记录工具调用
+ logger.debug(f"Tool call: {name} with arguments: {arguments}")
# 根据工具名称分发
+ if name == "get_current_time":
+ result = self._handle_get_time(arguments)
+ elif name == "calculate":
+ result = self._handle_calculate(arguments)
+ elif name == "echo":
+ result = self._handle_echo(arguments)
+ else:
# 未知工具
+ logger.warning(f"Unknown tool requested: {name}")
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -32601, "message": f"Unknown tool: {name}"},
+ }
# 记录工具调用成功
+ logger.debug(f"Tool {name} executed successfully")
# 返回工具调用结果
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {"content": result},
+ }
+ except Exception as e:
# 工具调用失败
+ logger.error(f"Tool {name} call failed: {str(e)}")
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -32603, "message": f"Tool call failed: {str(e)}"},
+ }
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
if method == "initialize":
return self._handle_initialize(request_id)
elif method == "ping":
return self._handle_ping(request_id)
elif method == "tools/list":
return self._handle_list_tools(request_id)
+ elif method == "tools/call":
+ return self._handle_call_tool(request_id, params)
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Method {method} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()
7.资源 #
7.1. resources.py #
resources.py
from datetime import datetime
resources = {
"mcp://server-info": {
"uri": "mcp://server-info",
"name": "Server Information",
"description": "Basic information about current server",
"mimeType": "application/json",
},
"mcp://sample-data": {
"uri": "mcp://sample-data",
"name": "Sample Data",
"description": "Some sample JSON data",
"mimeType": "application/json",
},
}
server_info = {
"server_name": "Demo MCP Server",
"version": "1.0.0",
"status": "running",
"start_time": datetime.now().isoformat(),
"features": ["tools", "resources", "prompts"],
}
sample_data = {
"items": [
{"id": 1, "name": "Sample Project 1", "status": "active"},
{"id": 2, "name": "Sample Project 2", "status": "pending"},
{"id": 3, "name": "Sample Project 3", "status": "completed"},
],
"total": 3,
"last_updated": datetime.now().isoformat(),
}
7.2. mcp_client.py #
mcp_client.py
# 导入subprocess模块,用于创建和管理子进程
import subprocess
# 导入sys模块,提供对Python解释器相关的变量和函数的访问
import sys
# 导入json模块,用于处理JSON数据
import json
# 导入os模块,提供与操作系统交互的功能
import os
# 导入logging模块,用于日志记录
import logging
# 从dotenv包导入load_dotenv函数,用于加载环境变量
from dotenv import load_dotenv
# 加载.env文件中的环境变量
load_dotenv() # 从.env加载环境变量
# 配置日志记录的函数
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器,指定日志输出格式
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置控制台处理器的日志格式
console_handler.setFormatter(formatter)
# 配置根日志记录器,设置日志级别和处理器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 设置日志记录器
logger = setup_logging()
# 定义MCPClient类
class MCPClient:
# 构造函数,初始化客户端对象
def __init__(self):
# 服务器进程对象
self.process = None
# 请求ID,每次请求自增
self.request_id = 1
# 可用工具列表
self.available_tools = []
# 可用资源列表
+ self.available_resources = []
# 连接状态标志
self._connected = False
# 关闭状态标志
self._shutdown = False
# 初始化服务器连接
def _initialize_server(self):
"""初始化服务器连接"""
# 记录调试日志,发送初始化请求
logger.debug("发送初始化请求...")
# 发送初始化请求
init_response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {"name": "mcp-client", "version": "1.0.0"},
},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否有result字段
if "result" not in init_response:
logger.error(f"初始化失败: {init_response}")
return False
# 发送initialized通知
logger.debug("发送initialized通知...")
self._send_request(
{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}
)
# 记录服务器初始化成功
logger.info("服务器初始化成功")
return True
# ping服务器,测试连通性
def ping(self):
"""Ping服务器"""
# 发送ping请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "ping",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 返回响应
return response
# 发送JSON-RPC请求到服务器
def _send_request(self, request):
"""发送JSON-RPC请求到服务器"""
# 检查服务器进程是否存在且未退出
if not self.process or self.process.poll() is not None:
logger.error("服务器连接已断开")
return {"error": "Server connection lost"}
# 将请求对象序列化为JSON字符串
request_str = json.dumps(request, ensure_ascii=False)
# 记录调试日志,请求方法和ID
logger.debug(
f"发送请求: {request.get('method', 'unknown')} (ID: {request.get('id', 'none')})"
)
try:
# 向服务器进程写入请求
self.process.stdin.write(request_str + "\n")
self.process.stdin.flush()
except Exception as e:
# 发送请求时出错,记录错误日志
logger.error(f"发送请求时出错: {e}")
return {"error": f"发送失败: {e}"}
# 检查是否为通知类请求(没有id字段)
if "id" not in request:
logger.debug("通知类请求,不等待响应")
return {"result": "notification_sent"}
# 读取服务器响应
logger.debug("等待响应...")
try:
# 读取一行响应
response_line = self.process.stdout.readline()
if response_line:
try:
# 解析JSON响应
response = json.loads(response_line.strip())
logger.debug(f"收到响应: {response.get('result', 'error')}")
return response
except json.JSONDecodeError as e:
# JSON解析错误
logger.error(f"JSON解析错误: {e}")
return {"error": f"JSON解析失败: {e}"}
else:
# 没有收到响应
logger.error("没有收到响应")
return {"error": "没有响应"}
except Exception as e:
# 读取响应时出错
logger.error(f"读取响应时出错: {e}")
return {"error": f"读取失败: {e}"}
# 加载工具列表
def _load_tools(self):
"""加载工具列表"""
# 记录调试日志
logger.debug("获取工具列表...")
# 发送tools/list请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/list",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否包含工具列表
if "result" in response and "tools" in response["result"]:
self.available_tools = response["result"]["tools"]
logger.info(f"可用工具: {[tool['name'] for tool in self.available_tools]}")
else:
logger.error(f"获取工具列表失败: {response}")
# 加载资源列表的方法
+ def _load_resources(self):
+ """加载资源列表"""
# 记录日志:获取资源列表
+ logger.debug("获取资源列表...")
# 发送获取资源列表请求
+ response = self._send_request(
+ {
+ "jsonrpc": "2.0",
+ "id": self.request_id,
+ "method": "resources/list",
+ "params": {},
+ }
+ )
# 请求ID递增
+ self.request_id += 1
# 检查响应并保存资源列表
+ if "result" in response and "resources" in response["result"]:
+ self.available_resources = response["result"]["resources"]
+ logger.info(
+ f"可用资源: {[res['name'] for res in self.available_resources]}"
+ )
+ else:
+ logger.error(f"获取资源列表失败: {response}")
# 加载所有可用功能
def _load_all_capabilities(self):
"""加载所有可用功能"""
# 记录日志,开始加载功能
logger.info("正在加载服务器功能...")
# 初始化服务器
if not self._initialize_server():
return
# 加载工具列表
self._load_tools()
# 加载资源列表
+ self._load_resources()
# 记录日志,功能加载完成
logger.info("功能加载完成!")
# 连接到MCP服务器
def connect_to_server(self, server_script_path):
"""连接到MCP服务器"""
# 检查服务器脚本路径是否存在
if not os.path.exists(server_script_path):
raise FileNotFoundError(f"服务器脚本不存在: {server_script_path}")
# 检查脚本文件是否为.py结尾
if not server_script_path.endswith(".py"):
logger.warning(f"服务器脚本可能不是Python文件: {server_script_path}")
# 启动服务器进程
self.process = subprocess.Popen(
[sys.executable, server_script_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0, # 改为无缓冲
)
# 加载所有可用功能
self._load_all_capabilities()
# 更新连接状态
self._connected = True
# 列出所有可用工具
def _list_tools(self):
# 工具列表字符串数组
tool_list = []
# 遍历所有可用工具
for tool in self.available_tools:
# 获取工具名称
name = tool.get("name", "Unknown")
# 获取工具描述
description = tool.get("description", "No description")
# 添加到工具列表
tool_list.append(f" - {name}: {description}")
# 返回格式化后的工具列表字符串
return f"可用工具:\n" + "\n".join(tool_list)
# 调用工具的方法
def call_tool(self, tool_name, arguments):
"""调用工具"""
# 发送调用工具请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
}
)
# 请求ID递增
self.request_id += 1
return response
# 调用工具的内部方法
def _call_tool(self, tool_name, args):
"""调用工具"""
arguments = {}
# 针对不同工具解析参数
if tool_name == "get_current_time":
if args and args[0] in ["iso", "timestamp", "readable"]:
arguments["format"] = args[0]
elif tool_name == "calculate":
if args:
# 对于calculate,直接使用第一个参数作为表达式
arguments["expression"] = args[0]
else:
return "calculate 工具需要表达式参数"
elif tool_name == "echo":
if args:
# 对于echo,直接使用第一个参数作为消息
arguments["message"] = args[0]
else:
return "echo 工具需要消息参数"
# 记录日志:调用工具
logger.debug(f"调用工具: {tool_name} 参数: {arguments}")
# 调用工具
result = self.call_tool(tool_name, arguments)
# 检查结果
if "result" in result and "content" in result["result"]:
logger.debug(f"工具 {tool_name} 调用成功")
return result["result"]["content"]
else:
logger.error(f"工具 {tool_name} 调用失败: {result}")
return f"工具调用失败: {result}"
# 列出所有资源的方法
+ def _list_resources(self):
+ resource_list = []
+ for res in self.available_resources:
+ name = res.get("name", "Unknown")
+ uri = res.get("uri", "No URI")
+ resource_list.append(f" - {name}: {uri}")
+ return f"可用资源:\n" + "\n".join(resource_list)
# 读取资源的方法
+ def read_resource(self, uri):
+ """读取资源"""
# 发送读取资源请求
+ response = self._send_request(
+ {
+ "jsonrpc": "2.0",
+ "id": self.request_id,
+ "method": "resources/read",
+ "params": {"uri": uri},
+ }
+ )
# 请求ID递增
+ self.request_id += 1
+ return response
# 处理用户输入的命令
def process_query(self, query):
"""处理用户查询"""
# 去除首尾空白并按空格分割
parts = query.strip().split()
# 如果没有输入内容,提示输入有效命令
if not parts:
return "请输入有效的命令"
# 获取命令关键字并转为小写
command = parts[0].lower()
try:
# 判断命令类型并调用相应方法
if command == "help":
return self._show_help()
elif command == "tools":
return self._list_tools()
elif command == "ping":
result = self.ping()
return f"Ping结果: {result}"
elif command in ["get_current_time", "calculate", "echo"]:
return self._call_tool(command, parts[1:])
+ elif command == "resources":
+ return self._list_resources()
+ elif command == "read":
+ if len(parts) < 2:
+ return "用法: read <资源URI>"
+ uri = parts[1]
+ result = self.read_resource(uri)
+ return f"资源内容: {json.dumps(result, ensure_ascii=False, indent=2)}"
else:
return f"未知命令: {command}\n输入 'help' 查看帮助"
except Exception as e:
# 执行命令时出错,记录日志并返回错误信息
logger.error(f"执行命令时出错: {str(e)}")
return f"执行命令时出错: {str(e)}"
# 显示帮助信息
def _show_help(self):
return """
=== MCP客户端帮助 ===
基础命令:
help - 显示此帮助
ping - 测试服务器连接
工具相关:
tools - 列出所有工具
get_current_time [format] - 获取当前时间 (format: iso/timestamp/readable)
calculate <expression> - 计算表达式
echo <message> - 回显消息
+ 资源相关:
+ resources - 列出所有资源
+ read <uri> - 读取资源内容
示例:
help
ping
tools
"""
# 运行交互式聊天循环
def chat_loop(self):
"""运行交互式聊天循环"""
# 记录客户端启动日志
logger.info("=== MCP客户端已启动! ===")
# 提示用户输入help查看命令
print("输入 'help' 查看所有可用命令")
# 提示用户输入quit退出
print("输入 'quit' 退出")
# 循环读取用户输入
while True:
try:
# 读取用户输入命令
query = input("\n命令: ").strip()
# 如果输入quit则退出循环
if query.lower() == "quit":
logger.info("用户退出客户端")
break
# 处理用户命令并输出结果
response = self.process_query(query)
print(f"\n{response}")
except Exception as e:
# 处理命令时出错,记录日志并输出错误
logger.error(f"处理命令时出错: {str(e)}")
print(f"\n错误: {str(e)}")
# 清理资源
def cleanup(self):
"""清理资源"""
# 设置关闭和断开标志
self._shutdown = True
self._connected = False
try:
# 如果服务器进程存在
if self.process:
logger.debug("终止服务器进程")
# 终止服务器进程
self.process.terminate()
# 等待进程结束,设置超时
try:
self.process.wait(timeout=5)
logger.info("服务器进程已正常终止")
except subprocess.TimeoutExpired:
logger.warning("服务器进程未在5秒内终止,强制杀死")
self.process.kill()
self.process.wait()
logger.info("服务器进程已强制终止")
except Exception as e:
# 清理资源时出错,记录日志
logger.error(f"清理资源时出错: {e}")
finally:
# 释放进程对象
self.process = None
# 主函数,程序入口
def main():
# 检查命令行参数数量
if len(sys.argv) < 2:
logger.error("用法: python client.py <服务器脚本路径>")
print("用法: python client.py <服务器脚本路径>")
sys.exit(1)
# 记录启动日志
logger.info(f"启动MCP客户端,连接服务器: {sys.argv[1]}")
# 创建客户端对象
client = MCPClient()
try:
# 连接服务器并进入交互循环
client.connect_to_server(sys.argv[1])
client.chat_loop()
except Exception as e:
# 客户端运行出错,记录日志并输出错误
logger.error(f"客户端运行出错: {e}")
print(f"错误: {e}")
finally:
# 清理客户端资源
logger.info("清理客户端资源")
client.cleanup()
# 判断是否为主程序入口
if __name__ == "__main__":
main()
7.3. mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
from datetime import datetime
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
+from resources import resources, server_info, sample_data
from tools import tools
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
self.tools = tools
# 定义可用资源
+ self.resources = resources
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
+ "capabilities": {
+ "tools": {"listChanged": True},
+ "resources": {"listChanged": True},
+ },
"serverInfo": self.server_info,
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
def _handle_initialize(self, request_id):
"""处理initialize请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": self.initialize({})}
def _handle_ping(self, request_id):
"""处理ping请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": {}}
def _handle_list_tools(self, request_id):
"""Handle tools list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"tools": list(self.tools.values())},
}
# 处理获取时间工具
def _handle_get_time(self, arguments):
"""Handle get time tool"""
# 获取格式类型,默认为readable
format_type = arguments.get("format", "readable")
# 获取当前时间
now = datetime.now()
# 根据格式类型返回不同格式
if format_type == "iso":
result = now.isoformat()
elif format_type == "timestamp":
result = str(now.timestamp())
else: # readable
result = now.strftime("%Y-%m-%d %H:%M:%S")
# 返回时间文本
return [{"type": "text", "text": f"Current time: {result}"}]
# 处理计算工具
def _handle_calculate(self, arguments):
"""Handle calculate tool"""
# 获取表达式
expression = arguments.get("expression", "")
try:
# 安全检查表达式字符
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
raise ValueError("Expression contains disallowed characters")
# 计算表达式
result = eval(expression)
# 返回计算结果
return [
{"type": "text", "text": f"Calculation result: {expression} = {result}"}
]
except Exception as e:
# 计算失败
return [{"type": "text", "text": f"Calculation failed: {str(e)}"}]
# 处理回显工具
def _handle_echo(self, arguments):
"""Handle echo tool"""
# 获取消息
message = arguments.get("message", "")
# 返回回显内容
return [{"type": "text", "text": f"Echo: {message}"}]
# 处理工具调用请求
def _handle_call_tool(self, request_id, params):
"""Handle tool call request"""
# 获取工具名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 记录工具调用
logger.debug(f"Tool call: {name} with arguments: {arguments}")
# 根据工具名称分发
if name == "get_current_time":
result = self._handle_get_time(arguments)
elif name == "calculate":
result = self._handle_calculate(arguments)
elif name == "echo":
result = self._handle_echo(arguments)
else:
# 未知工具
logger.warning(f"Unknown tool requested: {name}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Unknown tool: {name}"},
}
# 记录工具调用成功
logger.debug(f"Tool {name} executed successfully")
# 返回工具调用结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"content": result},
}
except Exception as e:
# 工具调用失败
logger.error(f"Tool {name} call failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Tool call failed: {str(e)}"},
}
# 处理资源列表请求
+ def _handle_list_resources(self, request_id):
+ """Handle resources list request"""
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {"resources": list(self.resources.values())},
+ }
# 处理读取资源请求
+ def _handle_read_resource(self, request_id, params):
+ """Handle resource read request"""
# 获取资源URI
+ uri = params.get("uri")
+ try:
# 判断资源类型
+ if uri == "mcp://server-info":
# 构造服务器信息内容
+ content = server_info
# 返回内容
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {
+ "contents": [
+ {
+ "uri": uri,
+ "mimeType": "application/json",
+ "text": json.dumps(content, indent=2),
+ }
+ ]
+ },
+ }
+ elif uri == "mcp://sample-data":
# 构造示例数据内容
+ content = sample_data
# 返回内容
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {
+ "contents": [
+ {
+ "uri": uri,
+ "mimeType": "application/json",
+ "text": json.dumps(content, indent=2),
+ }
+ ]
+ },
+ }
+ else:
# 未找到资源
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -32601, "message": f"Resource not found: {uri}"},
+ }
+ except Exception as e:
# 读取资源失败
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -32603, "message": f"Read resource failed: {str(e)}"},
+ }
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
if method == "initialize":
return self._handle_initialize(request_id)
elif method == "ping":
return self._handle_ping(request_id)
elif method == "tools/list":
return self._handle_list_tools(request_id)
elif method == "tools/call":
return self._handle_call_tool(request_id, params)
+ elif method == "resources/list":
+ return self._handle_list_resources(request_id)
+ elif method == "resources/read":
+ return self._handle_read_resource(request_id, params)
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Method {method} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()
8.资源模板 #
8.1. mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
from datetime import datetime
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
+from resources import (
+ meeting_notes_template,
+ project_plan_template,
+ weekly_report_template,
+ resources,
+ server_info,
+ sample_data,
+ templates,
+)
from tools import tools
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
self.tools = tools
# 定义可用资源
self.resources = resources
# 定义可用资源模板
+ self.templates = templates
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
"capabilities": {
"tools": {"listChanged": True},
"resources": {"listChanged": True},
},
"serverInfo": self.server_info,
+ "resourceTemplates": list(self.templates.values()),
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
def _handle_initialize(self, request_id):
"""处理initialize请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": self.initialize({})}
def _handle_ping(self, request_id):
"""处理ping请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": {}}
def _handle_list_tools(self, request_id):
"""Handle tools list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"tools": list(self.tools.values())},
}
# 处理获取时间工具
def _handle_get_time(self, arguments):
"""Handle get time tool"""
# 获取格式类型,默认为readable
format_type = arguments.get("format", "readable")
# 获取当前时间
now = datetime.now()
# 根据格式类型返回不同格式
if format_type == "iso":
result = now.isoformat()
elif format_type == "timestamp":
result = str(now.timestamp())
else: # readable
result = now.strftime("%Y-%m-%d %H:%M:%S")
# 返回时间文本
return [{"type": "text", "text": f"Current time: {result}"}]
# 处理计算工具
def _handle_calculate(self, arguments):
"""Handle calculate tool"""
# 获取表达式
expression = arguments.get("expression", "")
try:
# 安全检查表达式字符
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
raise ValueError("Expression contains disallowed characters")
# 计算表达式
result = eval(expression)
# 返回计算结果
return [
{"type": "text", "text": f"Calculation result: {expression} = {result}"}
]
except Exception as e:
# 计算失败
return [{"type": "text", "text": f"Calculation failed: {str(e)}"}]
# 处理回显工具
def _handle_echo(self, arguments):
"""Handle echo tool"""
# 获取消息
message = arguments.get("message", "")
# 返回回显内容
return [{"type": "text", "text": f"Echo: {message}"}]
# 处理工具调用请求
def _handle_call_tool(self, request_id, params):
"""Handle tool call request"""
# 获取工具名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 记录工具调用
logger.debug(f"Tool call: {name} with arguments: {arguments}")
# 根据工具名称分发
if name == "get_current_time":
result = self._handle_get_time(arguments)
elif name == "calculate":
result = self._handle_calculate(arguments)
elif name == "echo":
result = self._handle_echo(arguments)
else:
# 未知工具
logger.warning(f"Unknown tool requested: {name}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Unknown tool: {name}"},
}
# 记录工具调用成功
logger.debug(f"Tool {name} executed successfully")
# 返回工具调用结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"content": result},
}
except Exception as e:
# 工具调用失败
logger.error(f"Tool {name} call failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Tool call failed: {str(e)}"},
}
# 处理资源列表请求
def _handle_list_resources(self, request_id):
"""Handle resources list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resources": list(self.resources.values())},
}
# 处理读取资源请求
def _handle_read_resource(self, request_id, params):
"""Handle resource read request"""
# 获取资源URI
uri = params.get("uri")
try:
# 判断资源类型
if uri == "mcp://server-info":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
+ "text": json.dumps(server_info, indent=2),
}
]
},
}
elif uri == "mcp://sample-data":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
+ "text": json.dumps(sample_data, indent=2),
}
]
},
}
else:
# 未找到资源
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Resource not found: {uri}"},
}
except Exception as e:
# 读取资源失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read resource failed: {str(e)}"},
}
# 处理模板列表请求
+ def _handle_list_templates(self, request_id):
+ """Handle template list request"""
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {"resourceTemplates": list(self.templates.values())},
+ }
# 处理读取模板请求
+ def _handle_read_template(self, request_id, params):
# 获取模板URI
+ uri = params.get("uri")
+ try:
# 判断模板类型
+ if uri == "mcp://templates/project-plan":
# 返回模板内容
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {
+ "contents": [
+ {
+ "uri": uri,
+ "mimeType": "application/json",
+ "text": json.dumps(project_plan_template, indent=2),
+ }
+ ]
+ },
+ }
+ elif uri == "mcp://templates/meeting-notes":
# 返回模板内容
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {
+ "contents": [
+ {
+ "uri": uri,
+ "mimeType": "application/json",
+ "text": json.dumps(meeting_notes_template, indent=2),
+ }
+ ]
+ },
+ }
+ elif uri == "mcp://templates/weekly-report":
# 返回模板内容
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {
+ "contents": [
+ {
+ "uri": uri,
+ "mimeType": "application/json",
+ "text": json.dumps(weekly_report_template, indent=2),
+ }
+ ]
+ },
+ }
+ else:
# 未找到模板
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -32601, "message": f"Template not found: {uri}"},
+ }
+ except Exception as e:
# 读取模板失败
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -32603, "message": f"Read template failed: {str(e)}"},
+ }
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
if method == "initialize":
return self._handle_initialize(request_id)
elif method == "ping":
return self._handle_ping(request_id)
elif method == "tools/list":
return self._handle_list_tools(request_id)
elif method == "tools/call":
return self._handle_call_tool(request_id, params)
elif method == "resources/list":
return self._handle_list_resources(request_id)
elif method == "resources/read":
return self._handle_read_resource(request_id, params)
+ elif method == "resources/templates/list":
+ return self._handle_list_templates(request_id)
+ elif method == "resources/templates/read":
+ return self._handle_read_template(request_id, params)
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Method {method} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()
8.2. resources.py #
resources.py
from datetime import datetime
resources = {
"mcp://server-info": {
"uri": "mcp://server-info",
"name": "Server Information",
"description": "Basic information about current server",
"mimeType": "application/json",
},
"mcp://sample-data": {
"uri": "mcp://sample-data",
"name": "Sample Data",
"description": "Some sample JSON data",
"mimeType": "application/json",
},
}
server_info = {
"server_name": "Demo MCP Server",
"version": "1.0.0",
"status": "running",
"start_time": datetime.now().isoformat(),
"features": ["tools", "resources", "prompts"],
}
sample_data = {
"items": [
{"id": 1, "name": "Sample Project 1", "status": "active"},
{"id": 2, "name": "Sample Project 2", "status": "pending"},
{"id": 3, "name": "Sample Project 3", "status": "completed"},
],
"total": 3,
"last_updated": datetime.now().isoformat(),
}
+templates = {
+ "mcp://templates/project-plan": {
+ "uri": "mcp://templates/project-plan",
+ "uriTemplate": "mcp://templates/project-plan",
+ "name": "Project Plan Template",
+ "description": "Template for creating project plans",
+ "mimeType": "application/json",
+ },
+ "mcp://templates/meeting-notes": {
+ "uri": "mcp://templates/meeting-notes",
+ "uriTemplate": "mcp://templates/meeting-notes",
+ "name": "Meeting Notes Template",
+ "description": "Template for meeting notes",
+ "mimeType": "application/json",
+ },
+ "mcp://templates/weekly-report": {
+ "uri": "mcp://templates/weekly-report",
+ "uriTemplate": "mcp://templates/weekly-report",
+ "name": "Weekly Report Template",
+ "description": "Template for weekly reports",
+ "mimeType": "application/json",
+ },
+}
+project_plan_template = {
+ "template_name": "Project Plan Template",
+ "sections": [
+ {
+ "title": "Project Overview",
+ "fields": [
+ "project_name",
+ "description",
+ "objectives",
+ "timeline",
+ ],
+ },
+ {
+ "title": "Team",
+ "fields": ["project_manager", "team_members", "roles"],
+ },
+ {
+ "title": "Milestones",
+ "fields": [
+ "milestone_name",
+ "due_date",
+ "deliverables",
+ "status",
+ ],
+ },
+ {
+ "title": "Risk Management",
+ "fields": [
+ "risk_description",
+ "probability",
+ "impact",
+ "mitigation",
+ ],
+ },
+ ],
+ "created": datetime.now().isoformat(),
+ "version": "1.0",
+}
+meeting_notes_template = {
+ "template_name": "Meeting Notes Template",
+ "sections": [
+ {
+ "title": "Meeting Information",
+ "fields": [
+ "meeting_title",
+ "date",
+ "time",
+ "location",
+ "attendees",
+ ],
+ },
+ {
+ "title": "Agenda",
+ "fields": ["agenda_items", "time_allocation", "presenter"],
+ },
+ {
+ "title": "Discussion Points",
+ "fields": [
+ "topic",
+ "discussion",
+ "decisions",
+ "action_items",
+ ],
+ },
+ {
+ "title": "Action Items",
+ "fields": ["action", "assignee", "due_date", "status"],
+ },
+ ],
+ "created": datetime.now().isoformat(),
+ "version": "1.0",
+}
+weekly_report_template = {
+ "template_name": "Weekly Report Template",
+ "sections": [
+ {
+ "title": "Report Period",
+ "fields": ["week_ending", "report_date", "reporter"],
+ },
+ {
+ "title": "Accomplishments",
+ "fields": [
+ "completed_tasks",
+ "milestones_achieved",
+ "deliverables",
+ ],
+ },
+ {
+ "title": "Challenges",
+ "fields": ["issues_encountered", "blockers", "risks"],
+ },
+ {
+ "title": "Next Week Plan",
+ "fields": ["planned_tasks", "goals", "priorities"],
+ },
+ {
+ "title": "Metrics",
+ "fields": ["kpi_name", "current_value", "target", "trend"],
+ },
+ ],
+ "created": datetime.now().isoformat(),
+ "version": "1.0",
+}
9.提示词 #
9.1. prompts.py #
prompts.py
prompts = {
"greeting": {
"name": "greeting",
"description": "Generate greeting message",
"arguments": [
{
"name": "name",
"description": "Name to greet",
"type": "string",
"required": True,
},
{
"name": "time_of_day",
"description": "Time of day",
"type": "string",
"enum": ["morning", "afternoon", "evening", "night"],
"required": False,
},
],
},
"status_report": {
"name": "status_report",
"description": "Generate status report",
"arguments": [
{
"name": "project",
"description": "Project name",
"type": "string",
"required": True,
},
{
"name": "status",
"description": "Project status",
"type": "string",
"enum": ["on_track", "at_risk", "behind_schedule"],
"required": True,
},
],
},
}
9.2. mcp_client.py #
mcp_client.py
# 导入subprocess模块,用于创建和管理子进程
import subprocess
# 导入sys模块,提供对Python解释器相关的变量和函数的访问
import sys
# 导入json模块,用于处理JSON数据
import json
# 导入os模块,提供与操作系统交互的功能
import os
# 导入logging模块,用于日志记录
import logging
# 从dotenv包导入load_dotenv函数,用于加载环境变量
from dotenv import load_dotenv
# 加载.env文件中的环境变量
load_dotenv() # 从.env加载环境变量
# 配置日志记录的函数
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器,指定日志输出格式
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置控制台处理器的日志格式
console_handler.setFormatter(formatter)
# 配置根日志记录器,设置日志级别和处理器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 设置日志记录器
logger = setup_logging()
# 定义MCPClient类
class MCPClient:
# 构造函数,初始化客户端对象
def __init__(self):
# 服务器进程对象
self.process = None
# 请求ID,每次请求自增
self.request_id = 1
# 可用工具列表
self.available_tools = []
# 可用资源列表
self.available_resources = []
# 可用模板列表
+ self.available_templates = []
# 可用提示列表
+ self.available_prompts = []
# 连接状态标志
self._connected = False
# 关闭状态标志
self._shutdown = False
# 初始化服务器连接
def _initialize_server(self):
"""初始化服务器连接"""
# 记录调试日志,发送初始化请求
logger.debug("发送初始化请求...")
# 发送初始化请求
init_response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {"name": "mcp-client", "version": "1.0.0"},
},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否有result字段
if "result" not in init_response:
logger.error(f"初始化失败: {init_response}")
return False
# 发送initialized通知
logger.debug("发送initialized通知...")
self._send_request(
{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}
)
# 记录服务器初始化成功
logger.info("服务器初始化成功")
return True
# ping服务器,测试连通性
def ping(self):
"""Ping服务器"""
# 发送ping请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "ping",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 返回响应
return response
# 发送JSON-RPC请求到服务器
def _send_request(self, request):
"""发送JSON-RPC请求到服务器"""
# 检查服务器进程是否存在且未退出
if not self.process or self.process.poll() is not None:
logger.error("服务器连接已断开")
return {"error": "Server connection lost"}
# 将请求对象序列化为JSON字符串
request_str = json.dumps(request, ensure_ascii=False)
# 记录调试日志,请求方法和ID
logger.debug(
f"发送请求: {request.get('method', 'unknown')} (ID: {request.get('id', 'none')})"
)
try:
# 向服务器进程写入请求
self.process.stdin.write(request_str + "\n")
self.process.stdin.flush()
except Exception as e:
# 发送请求时出错,记录错误日志
logger.error(f"发送请求时出错: {e}")
return {"error": f"发送失败: {e}"}
# 检查是否为通知类请求(没有id字段)
if "id" not in request:
logger.debug("通知类请求,不等待响应")
return {"result": "notification_sent"}
# 读取服务器响应
logger.debug("等待响应...")
try:
# 读取一行响应
response_line = self.process.stdout.readline()
if response_line:
try:
# 解析JSON响应
response = json.loads(response_line.strip())
logger.debug(f"收到响应: {response.get('result', 'error')}")
return response
except json.JSONDecodeError as e:
# JSON解析错误
logger.error(f"JSON解析错误: {e}")
return {"error": f"JSON解析失败: {e}"}
else:
# 没有收到响应
logger.error("没有收到响应")
return {"error": "没有响应"}
except Exception as e:
# 读取响应时出错
logger.error(f"读取响应时出错: {e}")
return {"error": f"读取失败: {e}"}
# 加载模板列表的方法
+ def _load_templates(self):
+ """加载模板列表"""
# 记录日志:获取模板列表
+ logger.debug("获取模板列表...")
# 发送获取模板列表请求
+ response = self._send_request(
+ {
+ "jsonrpc": "2.0",
+ "id": self.request_id,
+ "method": "resources/templates/list",
+ "params": {},
+ }
+ )
# 请求ID递增
+ self.request_id += 1
# 检查响应并保存模板列表
+ if "result" in response and "resourceTemplates" in response["result"]:
+ self.available_templates = response["result"]["resourceTemplates"]
+ logger.info(
+ f"可用模板: {[tpl['name'] for tpl in self.available_templates]}"
+ )
+ else:
+ logger.error(f"获取模板列表失败: {response}")
# 加载提示列表的方法
+ def _load_prompts(self):
+ """加载提示列表"""
# 记录日志:获取提示列表
+ logger.debug("获取提示列表...")
# 发送获取提示列表请求
+ response = self._send_request(
+ {
+ "jsonrpc": "2.0",
+ "id": self.request_id,
+ "method": "prompts/list",
+ "params": {},
+ }
+ )
# 请求ID递增
+ self.request_id += 1
# 检查响应并保存提示列表
+ if "result" in response and "prompts" in response["result"]:
+ self.available_prompts = response["result"]["prompts"]
+ logger.info(
+ f"可用提示: {[prompt['name'] for prompt in self.available_prompts]}"
+ )
+ else:
+ logger.error(f"获取提示列表失败: {response}")
# 加载工具列表
def _load_tools(self):
"""加载工具列表"""
# 记录调试日志
logger.debug("获取工具列表...")
# 发送tools/list请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/list",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否包含工具列表
if "result" in response and "tools" in response["result"]:
self.available_tools = response["result"]["tools"]
logger.info(f"可用工具: {[tool['name'] for tool in self.available_tools]}")
else:
logger.error(f"获取工具列表失败: {response}")
# 加载资源列表的方法
def _load_resources(self):
"""加载资源列表"""
# 记录日志:获取资源列表
logger.debug("获取资源列表...")
# 发送获取资源列表请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/list",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
# 检查响应并保存资源列表
if "result" in response and "resources" in response["result"]:
self.available_resources = response["result"]["resources"]
logger.info(
f"可用资源: {[res['name'] for res in self.available_resources]}"
)
else:
logger.error(f"获取资源列表失败: {response}")
# 加载所有可用功能
def _load_all_capabilities(self):
"""加载所有可用功能"""
# 记录日志,开始加载功能
logger.info("正在加载服务器功能...")
# 初始化服务器
if not self._initialize_server():
return
# 加载工具列表
self._load_tools()
# 加载资源列表
self._load_resources()
# 加载模板列表
+ self._load_templates()
# 加载提示列表
+ self._load_prompts()
# 记录日志,功能加载完成
logger.info("功能加载完成!")
# 连接到MCP服务器
def connect_to_server(self, server_script_path):
"""连接到MCP服务器"""
# 检查服务器脚本路径是否存在
if not os.path.exists(server_script_path):
raise FileNotFoundError(f"服务器脚本不存在: {server_script_path}")
# 检查脚本文件是否为.py结尾
if not server_script_path.endswith(".py"):
logger.warning(f"服务器脚本可能不是Python文件: {server_script_path}")
# 启动服务器进程
self.process = subprocess.Popen(
[sys.executable, server_script_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0, # 改为无缓冲
)
# 加载所有可用功能
self._load_all_capabilities()
# 更新连接状态
self._connected = True
# 列出所有可用工具
def _list_tools(self):
# 工具列表字符串数组
tool_list = []
# 遍历所有可用工具
for tool in self.available_tools:
# 获取工具名称
name = tool.get("name", "Unknown")
# 获取工具描述
description = tool.get("description", "No description")
# 添加到工具列表
tool_list.append(f" - {name}: {description}")
# 返回格式化后的工具列表字符串
return f"可用工具:\n" + "\n".join(tool_list)
# 调用工具的方法
def call_tool(self, tool_name, arguments):
"""调用工具"""
# 发送调用工具请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
}
)
# 请求ID递增
self.request_id += 1
return response
# 调用工具的内部方法
def _call_tool(self, tool_name, args):
"""调用工具"""
arguments = {}
# 针对不同工具解析参数
if tool_name == "get_current_time":
if args and args[0] in ["iso", "timestamp", "readable"]:
arguments["format"] = args[0]
elif tool_name == "calculate":
if args:
# 对于calculate,直接使用第一个参数作为表达式
arguments["expression"] = args[0]
else:
return "calculate 工具需要表达式参数"
elif tool_name == "echo":
if args:
# 对于echo,直接使用第一个参数作为消息
arguments["message"] = args[0]
else:
return "echo 工具需要消息参数"
# 记录日志:调用工具
logger.debug(f"调用工具: {tool_name} 参数: {arguments}")
# 调用工具
result = self.call_tool(tool_name, arguments)
# 检查结果
if "result" in result and "content" in result["result"]:
logger.debug(f"工具 {tool_name} 调用成功")
return result["result"]["content"]
else:
logger.error(f"工具 {tool_name} 调用失败: {result}")
return f"工具调用失败: {result}"
# 列出所有资源的方法
def _list_resources(self):
resource_list = []
for res in self.available_resources:
name = res.get("name", "Unknown")
uri = res.get("uri", "No URI")
resource_list.append(f" - {name}: {uri}")
return f"可用资源:\n" + "\n".join(resource_list)
# 读取资源的方法
def read_resource(self, uri):
"""读取资源"""
# 发送读取资源请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/read",
"params": {"uri": uri},
}
)
# 请求ID递增
self.request_id += 1
return response
# 列出所有模板的方法
+ def _list_templates(self):
+ template_list = []
+ for tpl in self.available_templates:
+ name = tpl.get("name", "Unknown")
+ uri = tpl.get("uri", "No URI")
+ template_list.append(f" - {name}: {uri}")
+ return f"可用模板:\n" + "\n".join(template_list)
# 列出所有提示的方法
+ def _list_prompts(self):
+ prompt_list = []
+ for prompt in self.available_prompts:
+ name = prompt.get("name", "Unknown")
+ description = prompt.get("description", "No description")
+ prompt_list.append(f" - {name}: {description}")
+ return f"可用提示:\n" + "\n".join(prompt_list)
# 读取模板的方法
+ def read_template(self, uri):
+ """读取模板"""
# 发送读取模板请求
+ response = self._send_request(
+ {
+ "jsonrpc": "2.0",
+ "id": self.request_id,
+ "method": "resources/templates/read",
+ "params": {"uri": uri},
+ }
+ )
# 请求ID递增
+ self.request_id += 1
+ return response
# 显示提示的方法
+ def show_prompt(self, name, arguments):
+ """显示提示"""
# 发送显示提示请求
+ response = self._send_request(
+ {
+ "jsonrpc": "2.0",
+ "id": self.request_id,
+ "method": "prompts/get",
+ "params": {"name": name, "arguments": arguments},
+ }
+ )
# 请求ID递增
+ self.request_id += 1
+ return response
# 获取提示的方法
+ def get_prompt(self, name, arguments):
+ """获取提示"""
# 发送获取提示请求
+ response = self._send_request(
+ {
+ "jsonrpc": "2.0",
+ "id": self.request_id,
+ "method": "prompts/get",
+ "params": {"name": name, "arguments": arguments},
+ }
+ )
# 请求ID递增
+ self.request_id += 1
+ return response
# 解析命令行参数为字典的方法
+ def _parse_arguments(self, args):
+ """解析命令行参数为字典"""
+ arguments = {}
+ for arg in args:
+ if "=" in arg:
+ key, value = arg.split("=", 1)
+ arguments[key] = value
+ return arguments
# 处理用户输入的命令
def process_query(self, query):
"""处理用户查询"""
# 去除首尾空白并按空格分割
parts = query.strip().split()
# 如果没有输入内容,提示输入有效命令
if not parts:
return "请输入有效的命令"
# 获取命令关键字并转为小写
command = parts[0].lower()
try:
# 判断命令类型并调用相应方法
if command == "help":
return self._show_help()
elif command == "tools":
return self._list_tools()
elif command == "ping":
result = self.ping()
return f"Ping结果: {result}"
elif command in ["get_current_time", "calculate", "echo"]:
return self._call_tool(command, parts[1:])
elif command == "resources":
return self._list_resources()
elif command == "read":
if len(parts) < 2:
return "用法: read <资源URI>"
uri = parts[1]
result = self.read_resource(uri)
return f"资源内容: {json.dumps(result, ensure_ascii=False, indent=2)}"
+ elif command == "templates":
+ return self._list_templates()
+ elif command == "template":
+ if len(parts) < 2:
+ return "用法: template <模板URI>"
+ uri = parts[1]
+ result = self.read_template(uri)
+ return f"模板内容: {json.dumps(result, ensure_ascii=False, indent=2)}"
+ elif command == "prompts":
+ return self._list_prompts()
+ elif command == "prompt":
+ if len(parts) < 2:
+ return "用法: prompt <提示名称> [参数]"
+ prompt_name = parts[1]
+ arguments = self._parse_arguments(parts[2:])
+ result = self.show_prompt(prompt_name, arguments)
+ return f"提示结果: {json.dumps(result, ensure_ascii=False, indent=2)}"
else:
return f"未知命令: {command}\n输入 'help' 查看帮助"
except Exception as e:
# 执行命令时出错,记录日志并返回错误信息
logger.error(f"执行命令时出错: {str(e)}")
return f"执行命令时出错: {str(e)}"
# 显示帮助信息
def _show_help(self):
return """
=== MCP客户端帮助 ===
基础命令:
help - 显示此帮助
ping - 测试服务器连接
工具相关:
tools - 列出所有工具
get_current_time [format] - 获取当前时间 (format: iso/timestamp/readable)
calculate <expression> - 计算表达式
echo <message> - 回显消息
资源相关:
resources - 列出所有资源
read <uri> - 读取资源内容
+ 模板相关:
+ templates - 列出所有模板
+ template <uri> - 读取模板内容
+ 提示相关:
+ prompts - 列出所有提示
+ prompt <name> [args] - 使用提示
示例:
help
ping
tools
+ get_current_time iso
+ calculate 2+3*4
+ echo hello world
+ read mcp://server-info
+ template mcp://templates/project-plan
+ prompt greeting name=Alice time_of_day=morning
"""
# 运行交互式聊天循环
def chat_loop(self):
"""运行交互式聊天循环"""
# 记录客户端启动日志
logger.info("=== MCP客户端已启动! ===")
# 提示用户输入help查看命令
print("输入 'help' 查看所有可用命令")
# 提示用户输入quit退出
print("输入 'quit' 退出")
# 循环读取用户输入
while True:
try:
# 读取用户输入命令
query = input("\n命令: ").strip()
# 如果输入quit则退出循环
if query.lower() == "quit":
logger.info("用户退出客户端")
break
# 处理用户命令并输出结果
response = self.process_query(query)
print(f"\n{response}")
except Exception as e:
# 处理命令时出错,记录日志并输出错误
logger.error(f"处理命令时出错: {str(e)}")
print(f"\n错误: {str(e)}")
# 清理资源
def cleanup(self):
"""清理资源"""
# 设置关闭和断开标志
self._shutdown = True
self._connected = False
try:
# 如果服务器进程存在
if self.process:
logger.debug("终止服务器进程")
# 终止服务器进程
self.process.terminate()
# 等待进程结束,设置超时
try:
self.process.wait(timeout=5)
logger.info("服务器进程已正常终止")
except subprocess.TimeoutExpired:
logger.warning("服务器进程未在5秒内终止,强制杀死")
self.process.kill()
self.process.wait()
logger.info("服务器进程已强制终止")
except Exception as e:
# 清理资源时出错,记录日志
logger.error(f"清理资源时出错: {e}")
finally:
# 释放进程对象
self.process = None
# 主函数,程序入口
def main():
# 检查命令行参数数量
if len(sys.argv) < 2:
logger.error("用法: python client.py <服务器脚本路径>")
print("用法: python client.py <服务器脚本路径>")
sys.exit(1)
# 记录启动日志
logger.info(f"启动MCP客户端,连接服务器: {sys.argv[1]}")
# 创建客户端对象
client = MCPClient()
try:
# 连接服务器并进入交互循环
client.connect_to_server(sys.argv[1])
client.chat_loop()
except Exception as e:
# 客户端运行出错,记录日志并输出错误
logger.error(f"客户端运行出错: {e}")
print(f"错误: {e}")
finally:
# 清理客户端资源
logger.info("清理客户端资源")
client.cleanup()
# 判断是否为主程序入口
if __name__ == "__main__":
main()
9.3. mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
+from ast import Dict
from datetime import datetime
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
from resources import (
meeting_notes_template,
project_plan_template,
weekly_report_template,
resources,
server_info,
sample_data,
templates,
)
from tools import tools
+from prompts import prompts
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
self.tools = tools
# 定义可用资源
self.resources = resources
# 定义可用资源模板
self.templates = templates
# 定义可用提示(prompts)
+ self.prompts = prompts
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
"capabilities": {
"tools": {"listChanged": True},
"resources": {"listChanged": True},
+ "prompts": {"listChanged": True},
},
"serverInfo": self.server_info,
"resourceTemplates": list(self.templates.values()),
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
def _handle_initialize(self, request_id):
"""处理initialize请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": self.initialize({})}
def _handle_ping(self, request_id):
"""处理ping请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": {}}
def _handle_list_tools(self, request_id):
"""Handle tools list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"tools": list(self.tools.values())},
}
# 处理获取时间工具
def _handle_get_time(self, arguments):
"""Handle get time tool"""
# 获取格式类型,默认为readable
format_type = arguments.get("format", "readable")
# 获取当前时间
now = datetime.now()
# 根据格式类型返回不同格式
if format_type == "iso":
result = now.isoformat()
elif format_type == "timestamp":
result = str(now.timestamp())
else: # readable
result = now.strftime("%Y-%m-%d %H:%M:%S")
# 返回时间文本
return [{"type": "text", "text": f"Current time: {result}"}]
# 处理计算工具
def _handle_calculate(self, arguments):
"""Handle calculate tool"""
# 获取表达式
expression = arguments.get("expression", "")
try:
# 安全检查表达式字符
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
raise ValueError("Expression contains disallowed characters")
# 计算表达式
result = eval(expression)
# 返回计算结果
return [
{"type": "text", "text": f"Calculation result: {expression} = {result}"}
]
except Exception as e:
# 计算失败
return [{"type": "text", "text": f"Calculation failed: {str(e)}"}]
# 处理回显工具
def _handle_echo(self, arguments):
"""Handle echo tool"""
# 获取消息
message = arguments.get("message", "")
# 返回回显内容
return [{"type": "text", "text": f"Echo: {message}"}]
# 处理工具调用请求
def _handle_call_tool(self, request_id, params):
"""Handle tool call request"""
# 获取工具名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 记录工具调用
logger.debug(f"Tool call: {name} with arguments: {arguments}")
# 根据工具名称分发
if name == "get_current_time":
result = self._handle_get_time(arguments)
elif name == "calculate":
result = self._handle_calculate(arguments)
elif name == "echo":
result = self._handle_echo(arguments)
else:
# 未知工具
logger.warning(f"Unknown tool requested: {name}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Unknown tool: {name}"},
}
# 记录工具调用成功
logger.debug(f"Tool {name} executed successfully")
# 返回工具调用结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"content": result},
}
except Exception as e:
# 工具调用失败
logger.error(f"Tool {name} call failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Tool call failed: {str(e)}"},
}
# 处理资源列表请求
def _handle_list_resources(self, request_id):
"""Handle resources list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resources": list(self.resources.values())},
}
# 处理读取资源请求
def _handle_read_resource(self, request_id, params):
"""Handle resource read request"""
# 获取资源URI
uri = params.get("uri")
try:
# 判断资源类型
if uri == "mcp://server-info":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(server_info, indent=2),
}
]
},
}
elif uri == "mcp://sample-data":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(sample_data, indent=2),
}
]
},
}
else:
# 未找到资源
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Resource not found: {uri}"},
}
except Exception as e:
# 读取资源失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read resource failed: {str(e)}"},
}
# 处理模板列表请求
def _handle_list_templates(self, request_id):
"""Handle template list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resourceTemplates": list(self.templates.values())},
}
# 处理读取模板请求
def _handle_read_template(self, request_id, params):
# 获取模板URI
uri = params.get("uri")
try:
# 判断模板类型
if uri == "mcp://templates/project-plan":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(project_plan_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/meeting-notes":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(meeting_notes_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/weekly-report":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(weekly_report_template, indent=2),
}
]
},
}
else:
# 未找到模板
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Template not found: {uri}"},
}
except Exception as e:
# 读取模板失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read template failed: {str(e)}"},
}
# 处理提示列表请求
+ def _handle_list_prompts(self, request_id):
+ """Handle prompt list request"""
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {"prompts": list(self.prompts.values())},
+ }
# 处理问候提示
+ def _handle_greeting_prompt(self, arguments):
+ """Handle greeting prompt"""
# 获取姓名,默认为User
+ name = arguments.get("name", "User")
# 获取时间段,默认为day
+ time_of_day = arguments.get("time_of_day", "day")
# 问候语映射
+ greetings = {
+ "morning": "Good morning",
+ "afternoon": "Good afternoon",
+ "evening": "Good evening",
+ "night": "Good night",
+ }
# 获取对应问候语
+ greeting = greetings.get(time_of_day, "Hello")
# 生成内容
+ content = f"{greeting}, {name}! Welcome to the MCP server."
# 返回问候消息
+ return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理状态报告提示
+ def _handle_status_report_prompt(self, arguments):
+ """Handle status report prompt"""
# 获取项目名,默认为Unknown Project
+ project = arguments.get("project", "Unknown Project")
# 获取状态,默认为unknown
+ status = arguments.get("status", "unknown")
# 状态消息映射
+ status_messages = {
+ "on_track": "Project is on track",
+ "at_risk": "Project is at risk",
+ "behind_schedule": "Project is behind schedule",
+ }
# 获取对应状态消息
+ message = status_messages.get(status, "Project status unknown")
# 生成内容
+ content = f"Project '{project}' status report: {message}"
# 返回状态报告消息
+ return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理获取提示请求
+ def _handle_get_prompt(self, request_id, params):
+ """Handle prompt get request"""
# 获取提示名称
+ name = params.get("name")
# 获取参数
+ arguments = params.get("arguments", {})
+ try:
# 根据提示名称分发
+ if name == "greeting":
+ result = self._handle_greeting_prompt(arguments)
+ elif name == "status_report":
+ result = self._handle_status_report_prompt(arguments)
+ else:
# 未知提示
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -32601, "message": f"Unknown prompt: {name}"},
+ }
# 返回提示结果
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {"messages": result},
+ }
+ except Exception as e:
# 获取提示失败
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -32603, "message": f"Get prompt failed: {str(e)}"},
+ }
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
if method == "initialize":
return self._handle_initialize(request_id)
elif method == "ping":
return self._handle_ping(request_id)
elif method == "tools/list":
return self._handle_list_tools(request_id)
elif method == "tools/call":
return self._handle_call_tool(request_id, params)
elif method == "resources/list":
return self._handle_list_resources(request_id)
elif method == "resources/read":
return self._handle_read_resource(request_id, params)
elif method == "resources/templates/list":
return self._handle_list_templates(request_id)
elif method == "resources/templates/read":
return self._handle_read_template(request_id, params)
+ elif method == "prompts/list":
+ return self._handle_list_prompts(request_id)
+ elif method == "prompts/get":
+ return self._handle_get_prompt(request_id, params)
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Method {method} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()
10.采样 #
10.1. mcp_client.py #
mcp_client.py
# 导入subprocess模块,用于创建和管理子进程
import subprocess
# 导入sys模块,提供对Python解释器相关的变量和函数的访问
import sys
# 导入json模块,用于处理JSON数据
import json
# 导入os模块,提供与操作系统交互的功能
import os
# 导入logging模块,用于日志记录
import logging
# 从dotenv包导入load_dotenv函数,用于加载环境变量
from dotenv import load_dotenv
# 加载.env文件中的环境变量
load_dotenv() # 从.env加载环境变量
# 配置日志记录的函数
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器,指定日志输出格式
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置控制台处理器的日志格式
console_handler.setFormatter(formatter)
# 配置根日志记录器,设置日志级别和处理器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 设置日志记录器
logger = setup_logging()
# 定义MCPClient类
class MCPClient:
# 构造函数,初始化客户端对象
def __init__(self):
# 服务器进程对象
self.process = None
# 请求ID,每次请求自增
self.request_id = 1
# 可用工具列表
self.available_tools = []
# 可用资源列表
self.available_resources = []
# 可用模板列表
self.available_templates = []
# 可用提示列表
self.available_prompts = []
# 连接状态标志
self._connected = False
# 关闭状态标志
self._shutdown = False
# 初始化服务器连接
def _initialize_server(self):
"""初始化服务器连接"""
# 记录调试日志,发送初始化请求
logger.debug("发送初始化请求...")
# 发送初始化请求
init_response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {"name": "mcp-client", "version": "1.0.0"},
},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否有result字段
if "result" not in init_response:
logger.error(f"初始化失败: {init_response}")
return False
# 发送initialized通知
logger.debug("发送initialized通知...")
self._send_request(
{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}
)
# 记录服务器初始化成功
logger.info("服务器初始化成功")
return True
# ping服务器,测试连通性
def ping(self):
"""Ping服务器"""
# 发送ping请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "ping",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 返回响应
return response
# 发送JSON-RPC请求到服务器
def _send_request(self, request):
"""发送JSON-RPC请求到服务器"""
# 检查服务器进程是否存在且未退出
if not self.process or self.process.poll() is not None:
logger.error("服务器连接已断开")
return {"error": "Server connection lost"}
# 将请求对象序列化为JSON字符串
request_str = json.dumps(request, ensure_ascii=False)
# 记录调试日志,请求方法和ID
logger.debug(
f"发送请求: {request.get('method', 'unknown')} (ID: {request.get('id', 'none')})"
)
try:
# 向服务器进程写入请求
self.process.stdin.write(request_str + "\n")
self.process.stdin.flush()
except Exception as e:
# 发送请求时出错,记录错误日志
logger.error(f"发送请求时出错: {e}")
return {"error": f"发送失败: {e}"}
# 检查是否为通知类请求(没有id字段)
if "id" not in request:
logger.debug("通知类请求,不等待响应")
return {"result": "notification_sent"}
# 读取服务器响应
logger.debug("等待响应...")
try:
# 读取一行响应
response_line = self.process.stdout.readline()
if response_line:
try:
# 解析JSON响应
response = json.loads(response_line.strip())
logger.debug(f"收到响应: {response.get('result', 'error')}")
return response
except json.JSONDecodeError as e:
# JSON解析错误
logger.error(f"JSON解析错误: {e}")
return {"error": f"JSON解析失败: {e}"}
else:
# 没有收到响应
logger.error("没有收到响应")
return {"error": "没有响应"}
except Exception as e:
# 读取响应时出错
logger.error(f"读取响应时出错: {e}")
return {"error": f"读取失败: {e}"}
# 加载模板列表的方法
def _load_templates(self):
"""加载模板列表"""
# 记录日志:获取模板列表
logger.debug("获取模板列表...")
# 发送获取模板列表请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/templates/list",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
# 检查响应并保存模板列表
if "result" in response and "resourceTemplates" in response["result"]:
self.available_templates = response["result"]["resourceTemplates"]
logger.info(
f"可用模板: {[tpl['name'] for tpl in self.available_templates]}"
)
else:
logger.error(f"获取模板列表失败: {response}")
# 加载提示列表的方法
def _load_prompts(self):
"""加载提示列表"""
# 记录日志:获取提示列表
logger.debug("获取提示列表...")
# 发送获取提示列表请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "prompts/list",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
# 检查响应并保存提示列表
if "result" in response and "prompts" in response["result"]:
self.available_prompts = response["result"]["prompts"]
logger.info(
f"可用提示: {[prompt['name'] for prompt in self.available_prompts]}"
)
else:
logger.error(f"获取提示列表失败: {response}")
# 加载工具列表
def _load_tools(self):
"""加载工具列表"""
# 记录调试日志
logger.debug("获取工具列表...")
# 发送tools/list请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/list",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否包含工具列表
if "result" in response and "tools" in response["result"]:
self.available_tools = response["result"]["tools"]
logger.info(f"可用工具: {[tool['name'] for tool in self.available_tools]}")
else:
logger.error(f"获取工具列表失败: {response}")
# 加载资源列表的方法
def _load_resources(self):
"""加载资源列表"""
# 记录日志:获取资源列表
logger.debug("获取资源列表...")
# 发送获取资源列表请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/list",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
# 检查响应并保存资源列表
if "result" in response and "resources" in response["result"]:
self.available_resources = response["result"]["resources"]
logger.info(
f"可用资源: {[res['name'] for res in self.available_resources]}"
)
else:
logger.error(f"获取资源列表失败: {response}")
# 加载所有可用功能
def _load_all_capabilities(self):
"""加载所有可用功能"""
# 记录日志,开始加载功能
logger.info("正在加载服务器功能...")
# 初始化服务器
if not self._initialize_server():
return
# 加载工具列表
self._load_tools()
# 加载资源列表
self._load_resources()
# 加载模板列表
self._load_templates()
# 加载提示列表
self._load_prompts()
# 记录日志,功能加载完成
logger.info("功能加载完成!")
# 处理采样请求的方法
+ def _handle_sampling_request(self, request_id, params):
+ """处理服务器发起的采样请求"""
+ logger.info("收到服务器采样请求")
# 检查request_id是否存在
+ if not request_id:
+ logger.error("采样请求缺少request_id")
+ return
# 检查参数是否存在
+ if not params:
+ logger.error("采样请求缺少参数")
+ return
# 打印服务器请求信息
+ print(f"\n🤖 服务器请求AI生成内容:")
+ print(f"消息: {params.get('messages', [])}")
+ print(f"系统提示: {params.get('systemPrompt', '')}")
+ print(f"最大Access Token数: {params.get('maxTokens', 100)}")
# 模拟用户审核过程
+ print("\n请选择操作:")
+ print("1. 批准并生成内容")
+ print("2. 拒绝请求")
+ try:
# 获取用户选择
+ choice = input("请输入选择 (1/2): ").strip()
+ except (EOFError, KeyboardInterrupt):
+ logger.info("用户中断了采样请求")
+ return
+ if choice == "1":
# 模拟生成内容
+ sample_text = "这是模拟生成的AI内容,基于服务器的请求。"
+ result = {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {
+ "role": "assistant",
+ "content": {"type": "text", "text": sample_text},
+ "model": "gpt-4",
+ "stopReason": "endTurn",
+ },
+ }
+ print(f"\n 已生成内容: {sample_text}")
# 发送响应给服务器
# self._send_response_to_server(result)
+ elif choice == "2":
# 拒绝请求
+ error_result = {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -1, "message": "User rejected sampling request"},
+ }
+ print("\n 已拒绝采样请求")
# self._send_response_to_server(error_result)
+ else:
+ print("\n 无效选择,请求被拒绝")
+ error_result = {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -1, "message": "Invalid user choice"},
+ }
+ self._send_response_to_server(error_result)
# 发送响应给服务器的方法
+ def _send_response_to_server(self, response):
+ """发送响应给服务器"""
+ try:
# 将响应对象转为JSON字符串
+ response_str = json.dumps(response, ensure_ascii=False)
# 写入响应到服务器进程的标准输入
+ self.process.stdin.write(response_str + "\n")
+ self.process.stdin.flush()
+ logger.debug("已发送响应给服务器")
+ except Exception as e:
+ logger.error(f"发送响应给服务器时出错: {e}")
# 连接到MCP服务器
def connect_to_server(self, server_script_path):
"""连接到MCP服务器"""
# 检查服务器脚本路径是否存在
if not os.path.exists(server_script_path):
raise FileNotFoundError(f"服务器脚本不存在: {server_script_path}")
# 检查脚本文件是否为.py结尾
if not server_script_path.endswith(".py"):
logger.warning(f"服务器脚本可能不是Python文件: {server_script_path}")
# 启动服务器进程
self.process = subprocess.Popen(
[sys.executable, server_script_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0, # 改为无缓冲
)
# 加载所有可用功能
self._load_all_capabilities()
# 更新连接状态
self._connected = True
# 列出所有可用工具
def _list_tools(self):
# 工具列表字符串数组
tool_list = []
# 遍历所有可用工具
for tool in self.available_tools:
# 获取工具名称
name = tool.get("name", "Unknown")
# 获取工具描述
description = tool.get("description", "No description")
# 添加到工具列表
tool_list.append(f" - {name}: {description}")
# 返回格式化后的工具列表字符串
return f"可用工具:\n" + "\n".join(tool_list)
# 调用工具的方法
def call_tool(self, tool_name, arguments):
"""调用工具"""
# 发送调用工具请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
}
)
# 请求ID递增
self.request_id += 1
return response
# 调用工具的内部方法
def _call_tool(self, tool_name, args):
"""调用工具"""
arguments = {}
# 针对不同工具解析参数
if tool_name == "get_current_time":
if args and args[0] in ["iso", "timestamp", "readable"]:
arguments["format"] = args[0]
elif tool_name == "calculate":
if args:
# 对于calculate,直接使用第一个参数作为表达式
arguments["expression"] = args[0]
else:
return "calculate 工具需要表达式参数"
elif tool_name == "echo":
if args:
# 对于echo,直接使用第一个参数作为消息
arguments["message"] = args[0]
else:
return "echo 工具需要消息参数"
# 记录日志:调用工具
logger.debug(f"调用工具: {tool_name} 参数: {arguments}")
# 调用工具
result = self.call_tool(tool_name, arguments)
# 检查结果
if "result" in result and "content" in result["result"]:
logger.debug(f"工具 {tool_name} 调用成功")
return result["result"]["content"]
else:
logger.error(f"工具 {tool_name} 调用失败: {result}")
return f"工具调用失败: {result}"
# 列出所有资源的方法
def _list_resources(self):
resource_list = []
for res in self.available_resources:
name = res.get("name", "Unknown")
uri = res.get("uri", "No URI")
resource_list.append(f" - {name}: {uri}")
return f"可用资源:\n" + "\n".join(resource_list)
# 读取资源的方法
def read_resource(self, uri):
"""读取资源"""
# 发送读取资源请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/read",
"params": {"uri": uri},
}
)
# 请求ID递增
self.request_id += 1
return response
# 列出所有模板的方法
def _list_templates(self):
template_list = []
for tpl in self.available_templates:
name = tpl.get("name", "Unknown")
uri = tpl.get("uri", "No URI")
template_list.append(f" - {name}: {uri}")
return f"可用模板:\n" + "\n".join(template_list)
# 列出所有提示的方法
def _list_prompts(self):
prompt_list = []
for prompt in self.available_prompts:
name = prompt.get("name", "Unknown")
description = prompt.get("description", "No description")
prompt_list.append(f" - {name}: {description}")
return f"可用提示:\n" + "\n".join(prompt_list)
# 读取模板的方法
def read_template(self, uri):
"""读取模板"""
# 发送读取模板请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/templates/read",
"params": {"uri": uri},
}
)
# 请求ID递增
self.request_id += 1
return response
# 显示提示的方法
def show_prompt(self, name, arguments):
"""显示提示"""
# 发送显示提示请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "prompts/get",
"params": {"name": name, "arguments": arguments},
}
)
# 请求ID递增
self.request_id += 1
return response
# 获取提示的方法
def get_prompt(self, name, arguments):
"""获取提示"""
# 发送获取提示请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "prompts/get",
"params": {"name": name, "arguments": arguments},
}
)
# 请求ID递增
self.request_id += 1
return response
# 解析命令行参数为字典的方法
def _parse_arguments(self, args):
"""解析命令行参数为字典"""
arguments = {}
for arg in args:
if "=" in arg:
key, value = arg.split("=", 1)
arguments[key] = value
return arguments
# 演示采样命令的方法
+ def _mcp_sampling_command(self):
+ """演示采样命令"""
+ logger.info("执行演示采样命令")
# 发送演示采样请求给服务器
+ response = self._send_request(
+ {
+ "jsonrpc": "2.0",
+ "id": self.request_id,
+ "method": "mcp/sampling",
+ "params": {},
+ }
+ )
+ print(f"response: {response}")
# 请求ID递增
+ self.request_id += 1
+ method = response.get("method")
+ print(f"method: {method}")
+ params = response.get("params", {})
+ print(f"params: {params}")
+ self._handle_sampling_request(response.get("id"), params)
# 处理用户输入的命令
def process_query(self, query):
"""处理用户查询"""
# 去除首尾空白并按空格分割
parts = query.strip().split()
# 如果没有输入内容,提示输入有效命令
if not parts:
return "请输入有效的命令"
# 获取命令关键字并转为小写
command = parts[0].lower()
try:
# 判断命令类型并调用相应方法
if command == "help":
return self._show_help()
elif command == "tools":
return self._list_tools()
elif command == "ping":
result = self.ping()
return f"Ping结果: {result}"
elif command in ["get_current_time", "calculate", "echo"]:
return self._call_tool(command, parts[1:])
elif command == "resources":
return self._list_resources()
elif command == "read":
if len(parts) < 2:
return "用法: read <资源URI>"
uri = parts[1]
result = self.read_resource(uri)
return f"资源内容: {json.dumps(result, ensure_ascii=False, indent=2)}"
elif command == "templates":
return self._list_templates()
elif command == "template":
if len(parts) < 2:
return "用法: template <模板URI>"
uri = parts[1]
result = self.read_template(uri)
return f"模板内容: {json.dumps(result, ensure_ascii=False, indent=2)}"
elif command == "prompts":
return self._list_prompts()
elif command == "prompt":
if len(parts) < 2:
return "用法: prompt <提示名称> [参数]"
prompt_name = parts[1]
arguments = self._parse_arguments(parts[2:])
result = self.show_prompt(prompt_name, arguments)
return f"提示结果: {json.dumps(result, ensure_ascii=False, indent=2)}"
+ elif command == "mcp-sampling":
+ return self._mcp_sampling_command()
else:
return f"未知命令: {command}\n输入 'help' 查看帮助"
except Exception as e:
# 执行命令时出错,记录日志并返回错误信息
logger.error(f"执行命令时出错: {str(e)}")
return f"执行命令时出错: {str(e)}"
# 显示帮助信息
def _show_help(self):
return """
=== MCP客户端帮助 ===
基础命令:
help - 显示此帮助
ping - 测试服务器连接
工具相关:
tools - 列出所有工具
get_current_time [format] - 获取当前时间 (format: iso/timestamp/readable)
calculate <expression> - 计算表达式
echo <message> - 回显消息
资源相关:
resources - 列出所有资源
read <uri> - 读取资源内容
模板相关:
templates - 列出所有模板
template <uri> - 读取模板内容
提示相关:
prompts - 列出所有提示
prompt <name> [args] - 使用提示
+ 演示功能:
+ mcp-sampling - 演示AI内容生成功能
示例:
help
ping
tools
get_current_time iso
calculate 2+3*4
echo hello world
read mcp://server-info
template mcp://templates/project-plan
prompt greeting name=Alice time_of_day=morning
+ mcp-sampling
"""
# 运行交互式聊天循环
def chat_loop(self):
"""运行交互式聊天循环"""
# 记录客户端启动日志
logger.info("=== MCP客户端已启动! ===")
# 提示用户输入help查看命令
print("输入 'help' 查看所有可用命令")
# 提示用户输入quit退出
print("输入 'quit' 退出")
# 循环读取用户输入
while True:
try:
# 读取用户输入命令
query = input("\n命令: ").strip()
# 如果输入quit则退出循环
if query.lower() == "quit":
logger.info("用户退出客户端")
break
# 处理用户命令并输出结果
response = self.process_query(query)
print(f"\n{response}")
except Exception as e:
# 处理命令时出错,记录日志并输出错误
logger.error(f"处理命令时出错: {str(e)}")
print(f"\n错误: {str(e)}")
# 清理资源
def cleanup(self):
"""清理资源"""
# 设置关闭和断开标志
self._shutdown = True
self._connected = False
try:
# 如果服务器进程存在
if self.process:
logger.debug("终止服务器进程")
# 终止服务器进程
self.process.terminate()
# 等待进程结束,设置超时
try:
self.process.wait(timeout=5)
logger.info("服务器进程已正常终止")
except subprocess.TimeoutExpired:
logger.warning("服务器进程未在5秒内终止,强制杀死")
self.process.kill()
self.process.wait()
logger.info("服务器进程已强制终止")
except Exception as e:
# 清理资源时出错,记录日志
logger.error(f"清理资源时出错: {e}")
finally:
# 释放进程对象
self.process = None
# 主函数,程序入口
def main():
# 检查命令行参数数量
if len(sys.argv) < 2:
logger.error("用法: python client.py <服务器脚本路径>")
print("用法: python client.py <服务器脚本路径>")
sys.exit(1)
# 记录启动日志
logger.info(f"启动MCP客户端,连接服务器: {sys.argv[1]}")
# 创建客户端对象
client = MCPClient()
try:
# 连接服务器并进入交互循环
client.connect_to_server(sys.argv[1])
client.chat_loop()
except Exception as e:
# 客户端运行出错,记录日志并输出错误
logger.error(f"客户端运行出错: {e}")
print(f"错误: {e}")
finally:
# 清理客户端资源
logger.info("清理客户端资源")
client.cleanup()
# 判断是否为主程序入口
if __name__ == "__main__":
main()
10.2. mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
from ast import Dict
from datetime import datetime
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
from resources import (
meeting_notes_template,
project_plan_template,
weekly_report_template,
resources,
server_info,
sample_data,
templates,
)
from tools import tools
from prompts import prompts
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
self.tools = tools
# 定义可用资源
self.resources = resources
# 定义可用资源模板
self.templates = templates
# 定义可用提示(prompts)
self.prompts = prompts
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
"capabilities": {
"tools": {"listChanged": True},
"resources": {"listChanged": True},
"prompts": {"listChanged": True},
},
"serverInfo": self.server_info,
"resourceTemplates": list(self.templates.values()),
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
def _handle_initialize(self, request_id):
"""处理initialize请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": self.initialize({})}
def _handle_ping(self, request_id):
"""处理ping请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": {}}
def _handle_list_tools(self, request_id):
"""Handle tools list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"tools": list(self.tools.values())},
}
# 处理获取时间工具
def _handle_get_time(self, arguments):
"""Handle get time tool"""
# 获取格式类型,默认为readable
format_type = arguments.get("format", "readable")
# 获取当前时间
now = datetime.now()
# 根据格式类型返回不同格式
if format_type == "iso":
result = now.isoformat()
elif format_type == "timestamp":
result = str(now.timestamp())
else: # readable
result = now.strftime("%Y-%m-%d %H:%M:%S")
# 返回时间文本
return [{"type": "text", "text": f"Current time: {result}"}]
# 处理计算工具
def _handle_calculate(self, arguments):
"""Handle calculate tool"""
# 获取表达式
expression = arguments.get("expression", "")
try:
# 安全检查表达式字符
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
raise ValueError("Expression contains disallowed characters")
# 计算表达式
result = eval(expression)
# 返回计算结果
return [
{"type": "text", "text": f"Calculation result: {expression} = {result}"}
]
except Exception as e:
# 计算失败
return [{"type": "text", "text": f"Calculation failed: {str(e)}"}]
# 处理回显工具
def _handle_echo(self, arguments):
"""Handle echo tool"""
# 获取消息
message = arguments.get("message", "")
# 返回回显内容
return [{"type": "text", "text": f"Echo: {message}"}]
# 处理工具调用请求
def _handle_call_tool(self, request_id, params):
"""Handle tool call request"""
# 获取工具名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 记录工具调用
logger.debug(f"Tool call: {name} with arguments: {arguments}")
# 根据工具名称分发
if name == "get_current_time":
result = self._handle_get_time(arguments)
elif name == "calculate":
result = self._handle_calculate(arguments)
elif name == "echo":
result = self._handle_echo(arguments)
else:
# 未知工具
logger.warning(f"Unknown tool requested: {name}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Unknown tool: {name}"},
}
# 记录工具调用成功
logger.debug(f"Tool {name} executed successfully")
# 返回工具调用结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"content": result},
}
except Exception as e:
# 工具调用失败
logger.error(f"Tool {name} call failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Tool call failed: {str(e)}"},
}
# 处理资源列表请求
def _handle_list_resources(self, request_id):
"""Handle resources list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resources": list(self.resources.values())},
}
# 处理读取资源请求
def _handle_read_resource(self, request_id, params):
"""Handle resource read request"""
# 获取资源URI
uri = params.get("uri")
try:
# 判断资源类型
if uri == "mcp://server-info":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(server_info, indent=2),
}
]
},
}
elif uri == "mcp://sample-data":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(sample_data, indent=2),
}
]
},
}
else:
# 未找到资源
return {
"jsonrpc": "2.0",
"id": request_id,
+ "error": {"code": -32602, "message": f"Resource not found: {uri}"},
}
except Exception as e:
# 读取资源失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read resource failed: {str(e)}"},
}
# 处理模板列表请求
def _handle_list_templates(self, request_id):
"""Handle template list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resourceTemplates": list(self.templates.values())},
}
# 处理读取模板请求
def _handle_read_template(self, request_id, params):
# 获取模板URI
uri = params.get("uri")
try:
# 判断模板类型
if uri == "mcp://templates/project-plan":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(project_plan_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/meeting-notes":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(meeting_notes_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/weekly-report":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(weekly_report_template, indent=2),
}
]
},
}
else:
# 未找到模板
return {
"jsonrpc": "2.0",
"id": request_id,
+ "error": {"code": -32603, "message": f"Template not found: {uri}"},
}
except Exception as e:
# 读取模板失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read template failed: {str(e)}"},
}
# 处理提示列表请求
def _handle_list_prompts(self, request_id):
"""Handle prompt list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"prompts": list(self.prompts.values())},
}
# 处理问候提示
def _handle_greeting_prompt(self, arguments):
"""Handle greeting prompt"""
# 获取姓名,默认为User
name = arguments.get("name", "User")
# 获取时间段,默认为day
time_of_day = arguments.get("time_of_day", "day")
# 问候语映射
greetings = {
"morning": "Good morning",
"afternoon": "Good afternoon",
"evening": "Good evening",
"night": "Good night",
}
# 获取对应问候语
greeting = greetings.get(time_of_day, "Hello")
# 生成内容
content = f"{greeting}, {name}! Welcome to the MCP server."
# 返回问候消息
return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理状态报告提示
def _handle_status_report_prompt(self, arguments):
"""Handle status report prompt"""
# 获取项目名,默认为Unknown Project
project = arguments.get("project", "Unknown Project")
# 获取状态,默认为unknown
status = arguments.get("status", "unknown")
# 状态消息映射
status_messages = {
"on_track": "Project is on track",
"at_risk": "Project is at risk",
"behind_schedule": "Project is behind schedule",
}
# 获取对应状态消息
message = status_messages.get(status, "Project status unknown")
# 生成内容
content = f"Project '{project}' status report: {message}"
# 返回状态报告消息
return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理获取提示请求
def _handle_get_prompt(self, request_id, params):
"""Handle prompt get request"""
# 获取提示名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 根据提示名称分发
if name == "greeting":
result = self._handle_greeting_prompt(arguments)
elif name == "status_report":
result = self._handle_status_report_prompt(arguments)
else:
# 未知提示
return {
"jsonrpc": "2.0",
"id": request_id,
+ "error": {"code": -32604, "message": f"Unknown prompt: {name}"},
}
# 返回提示结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"messages": result},
}
except Exception as e:
# 获取提示失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Get prompt failed: {str(e)}"},
}
# 处理演示采样请求
+ def _handle_mcp_sampling(self, request_id, params):
+ try:
# 日志记录收到演示采样请求
+ logger.info("收到客户端演示采样请求")
# 构造采样请求
+ sampling_request = {
+ "jsonrpc": "2.0",
+ "id": f"mcp_sampling_{request_id}",
+ "method": "sampling/createMessage",
+ "params": {
+ "messages": [
+ {
+ "role": "user",
+ "content": {
+ "type": "text",
+ "text": "Write a creative story about a robot learning to paint in a digital art studio",
+ },
+ }
+ ],
+ "modelPreferences": {
+ "hints": [{"name": "gpt-4"}],
+ "intelligencePriority": 0.8,
+ "speedPriority": 0.5,
+ "costPriority": 0.3,
+ },
+ "systemPrompt": "You are a creative storyteller. Write engaging and imaginative stories.",
+ "maxTokens": 200,
+ },
+ }
# 日志记录已发送
+ logger.info("已发送采样请求到客户端")
# 返回成功响应
+ return sampling_request
+ except Exception as e:
# 采样失败日志
+ logger.error(f"Mcp sampling failed: {str(e)}")
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -32603, "message": f"Mcp sampling failed: {str(e)}"},
+ }
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
if method == "initialize":
return self._handle_initialize(request_id)
elif method == "ping":
return self._handle_ping(request_id)
elif method == "tools/list":
return self._handle_list_tools(request_id)
elif method == "tools/call":
return self._handle_call_tool(request_id, params)
elif method == "resources/list":
return self._handle_list_resources(request_id)
elif method == "resources/read":
return self._handle_read_resource(request_id, params)
elif method == "resources/templates/list":
return self._handle_list_templates(request_id)
elif method == "resources/templates/read":
return self._handle_read_template(request_id, params)
elif method == "prompts/list":
return self._handle_list_prompts(request_id)
elif method == "prompts/get":
return self._handle_get_prompt(request_id, params)
+ elif method == "mcp/sampling":
+ return self._handle_mcp_sampling(request_id, params)
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
+ "error": {"code": -32605, "message": f"Method {request} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()
11.启发 #
11.1. mcp_client.py #
mcp_client.py
# 导入subprocess模块,用于创建和管理子进程
import subprocess
# 导入sys模块,提供对Python解释器相关的变量和函数的访问
import sys
# 导入json模块,用于处理JSON数据
import json
# 导入os模块,提供与操作系统交互的功能
import os
# 导入logging模块,用于日志记录
import logging
# 从dotenv包导入load_dotenv函数,用于加载环境变量
from dotenv import load_dotenv
# 加载.env文件中的环境变量
load_dotenv() # 从.env加载环境变量
# 配置日志记录的函数
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器,指定日志输出格式
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置控制台处理器的日志格式
console_handler.setFormatter(formatter)
# 配置根日志记录器,设置日志级别和处理器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 设置日志记录器
logger = setup_logging()
# 定义MCPClient类
class MCPClient:
# 构造函数,初始化客户端对象
def __init__(self):
# 服务器进程对象
self.process = None
# 请求ID,每次请求自增
self.request_id = 1
# 可用工具列表
self.available_tools = []
# 可用资源列表
self.available_resources = []
# 可用模板列表
self.available_templates = []
# 可用提示列表
self.available_prompts = []
# 连接状态标志
self._connected = False
# 关闭状态标志
self._shutdown = False
# 初始化服务器连接
def _initialize_server(self):
"""初始化服务器连接"""
# 记录调试日志,发送初始化请求
logger.debug("发送初始化请求...")
# 发送初始化请求
init_response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {"name": "mcp-client", "version": "1.0.0"},
},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否有result字段
if "result" not in init_response:
logger.error(f"初始化失败: {init_response}")
return False
# 发送initialized通知
logger.debug("发送initialized通知...")
self._send_request(
{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}
)
# 记录服务器初始化成功
logger.info("服务器初始化成功")
return True
# ping服务器,测试连通性
def ping(self):
"""Ping服务器"""
# 发送ping请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "ping",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 返回响应
return response
# 发送JSON-RPC请求到服务器
def _send_request(self, request):
"""发送JSON-RPC请求到服务器"""
# 检查服务器进程是否存在且未退出
if not self.process or self.process.poll() is not None:
logger.error("服务器连接已断开")
return {"error": "Server connection lost"}
# 将请求对象序列化为JSON字符串
request_str = json.dumps(request, ensure_ascii=False)
# 记录调试日志,请求方法和ID
logger.debug(
f"发送请求: {request.get('method', 'unknown')} (ID: {request.get('id', 'none')})"
)
try:
# 向服务器进程写入请求
self.process.stdin.write(request_str + "\n")
self.process.stdin.flush()
except Exception as e:
# 发送请求时出错,记录错误日志
logger.error(f"发送请求时出错: {e}")
return {"error": f"发送失败: {e}"}
# 检查是否为通知类请求(没有id字段)
if "id" not in request:
logger.debug("通知类请求,不等待响应")
return {"result": "notification_sent"}
# 读取服务器响应
logger.debug("等待响应...")
try:
# 读取一行响应
response_line = self.process.stdout.readline()
if response_line:
try:
# 解析JSON响应
response = json.loads(response_line.strip())
logger.debug(f"收到响应: {response.get('result', 'error')}")
return response
except json.JSONDecodeError as e:
# JSON解析错误
logger.error(f"JSON解析错误: {e}")
return {"error": f"JSON解析失败: {e}"}
else:
# 没有收到响应
logger.error("没有收到响应")
return {"error": "没有响应"}
except Exception as e:
# 读取响应时出错
logger.error(f"读取响应时出错: {e}")
return {"error": f"读取失败: {e}"}
# 加载模板列表的方法
def _load_templates(self):
"""加载模板列表"""
# 记录日志:获取模板列表
logger.debug("获取模板列表...")
# 发送获取模板列表请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/templates/list",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
# 检查响应并保存模板列表
if "result" in response and "resourceTemplates" in response["result"]:
self.available_templates = response["result"]["resourceTemplates"]
logger.info(
f"可用模板: {[tpl['name'] for tpl in self.available_templates]}"
)
else:
logger.error(f"获取模板列表失败: {response}")
# 加载提示列表的方法
def _load_prompts(self):
"""加载提示列表"""
# 记录日志:获取提示列表
logger.debug("获取提示列表...")
# 发送获取提示列表请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "prompts/list",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
# 检查响应并保存提示列表
if "result" in response and "prompts" in response["result"]:
self.available_prompts = response["result"]["prompts"]
logger.info(
f"可用提示: {[prompt['name'] for prompt in self.available_prompts]}"
)
else:
logger.error(f"获取提示列表失败: {response}")
# 加载工具列表
def _load_tools(self):
"""加载工具列表"""
# 记录调试日志
logger.debug("获取工具列表...")
# 发送tools/list请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/list",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否包含工具列表
if "result" in response and "tools" in response["result"]:
self.available_tools = response["result"]["tools"]
logger.info(f"可用工具: {[tool['name'] for tool in self.available_tools]}")
else:
logger.error(f"获取工具列表失败: {response}")
# 加载资源列表的方法
def _load_resources(self):
"""加载资源列表"""
# 记录日志:获取资源列表
logger.debug("获取资源列表...")
# 发送获取资源列表请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/list",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
# 检查响应并保存资源列表
if "result" in response and "resources" in response["result"]:
self.available_resources = response["result"]["resources"]
logger.info(
f"可用资源: {[res['name'] for res in self.available_resources]}"
)
else:
logger.error(f"获取资源列表失败: {response}")
# 加载所有可用功能
def _load_all_capabilities(self):
"""加载所有可用功能"""
# 记录日志,开始加载功能
logger.info("正在加载服务器功能...")
# 初始化服务器
if not self._initialize_server():
return
# 加载工具列表
self._load_tools()
# 加载资源列表
self._load_resources()
# 加载模板列表
self._load_templates()
# 加载提示列表
self._load_prompts()
# 记录日志,功能加载完成
logger.info("功能加载完成!")
# 处理采样请求的方法
def _handle_sampling_request(self, request_id, params):
"""处理服务器发起的采样请求"""
logger.info("收到服务器采样请求")
# 检查request_id是否存在
if not request_id:
logger.error("采样请求缺少request_id")
return
# 检查参数是否存在
if not params:
logger.error("采样请求缺少参数")
return
# 打印服务器请求信息
print(f"\n🤖 服务器请求AI生成内容:")
print(f"消息: {params.get('messages', [])}")
print(f"系统提示: {params.get('systemPrompt', '')}")
print(f"最大Access Token数: {params.get('maxTokens', 100)}")
# 模拟用户审核过程
print("\n请选择操作:")
print("1. 批准并生成内容")
print("2. 拒绝请求")
try:
# 获取用户选择
choice = input("请输入选择 (1/2): ").strip()
except (EOFError, KeyboardInterrupt):
logger.info("用户中断了采样请求")
return
if choice == "1":
# 模拟生成内容
sample_text = "这是模拟生成的AI内容,基于服务器的请求。"
result = {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"role": "assistant",
"content": {"type": "text", "text": sample_text},
"model": "gpt-4",
"stopReason": "endTurn",
},
}
print(f"\n 已生成内容: {sample_text}")
# 发送响应给服务器
# self._send_response_to_server(result)
elif choice == "2":
# 拒绝请求
error_result = {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -1, "message": "User rejected sampling request"},
}
print("\n 已拒绝采样请求")
# self._send_response_to_server(error_result)
else:
print("\n 无效选择,请求被拒绝")
error_result = {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -1, "message": "Invalid user choice"},
}
self._send_response_to_server(error_result)
# 发送响应给服务器的方法
def _send_response_to_server(self, response):
"""发送响应给服务器"""
try:
# 将响应对象转为JSON字符串
response_str = json.dumps(response, ensure_ascii=False)
# 写入响应到服务器进程的标准输入
self.process.stdin.write(response_str + "\n")
self.process.stdin.flush()
logger.debug("已发送响应给服务器")
except Exception as e:
logger.error(f"发送响应给服务器时出错: {e}")
# 连接到MCP服务器
def connect_to_server(self, server_script_path):
"""连接到MCP服务器"""
# 检查服务器脚本路径是否存在
if not os.path.exists(server_script_path):
raise FileNotFoundError(f"服务器脚本不存在: {server_script_path}")
# 检查脚本文件是否为.py结尾
if not server_script_path.endswith(".py"):
logger.warning(f"服务器脚本可能不是Python文件: {server_script_path}")
# 启动服务器进程
self.process = subprocess.Popen(
[sys.executable, server_script_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0, # 改为无缓冲
)
# 加载所有可用功能
self._load_all_capabilities()
# 更新连接状态
self._connected = True
# 列出所有可用工具
def _list_tools(self):
# 工具列表字符串数组
tool_list = []
# 遍历所有可用工具
for tool in self.available_tools:
# 获取工具名称
name = tool.get("name", "Unknown")
# 获取工具描述
description = tool.get("description", "No description")
# 添加到工具列表
tool_list.append(f" - {name}: {description}")
# 返回格式化后的工具列表字符串
return f"可用工具:\n" + "\n".join(tool_list)
# 调用工具的方法
def call_tool(self, tool_name, arguments):
"""调用工具"""
# 发送调用工具请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
}
)
# 请求ID递增
self.request_id += 1
return response
# 调用工具的内部方法
def _call_tool(self, tool_name, args):
"""调用工具"""
arguments = {}
# 针对不同工具解析参数
if tool_name == "get_current_time":
if args and args[0] in ["iso", "timestamp", "readable"]:
arguments["format"] = args[0]
elif tool_name == "calculate":
if args:
# 对于calculate,直接使用第一个参数作为表达式
arguments["expression"] = args[0]
else:
return "calculate 工具需要表达式参数"
elif tool_name == "echo":
if args:
# 对于echo,直接使用第一个参数作为消息
arguments["message"] = args[0]
else:
return "echo 工具需要消息参数"
# 记录日志:调用工具
logger.debug(f"调用工具: {tool_name} 参数: {arguments}")
# 调用工具
result = self.call_tool(tool_name, arguments)
# 检查结果
if "result" in result and "content" in result["result"]:
logger.debug(f"工具 {tool_name} 调用成功")
return result["result"]["content"]
else:
logger.error(f"工具 {tool_name} 调用失败: {result}")
return f"工具调用失败: {result}"
# 列出所有资源的方法
def _list_resources(self):
resource_list = []
for res in self.available_resources:
name = res.get("name", "Unknown")
uri = res.get("uri", "No URI")
resource_list.append(f" - {name}: {uri}")
return f"可用资源:\n" + "\n".join(resource_list)
# 读取资源的方法
def read_resource(self, uri):
"""读取资源"""
# 发送读取资源请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/read",
"params": {"uri": uri},
}
)
# 请求ID递增
self.request_id += 1
return response
# 列出所有模板的方法
def _list_templates(self):
template_list = []
for tpl in self.available_templates:
name = tpl.get("name", "Unknown")
uri = tpl.get("uri", "No URI")
template_list.append(f" - {name}: {uri}")
return f"可用模板:\n" + "\n".join(template_list)
# 列出所有提示的方法
def _list_prompts(self):
prompt_list = []
for prompt in self.available_prompts:
name = prompt.get("name", "Unknown")
description = prompt.get("description", "No description")
prompt_list.append(f" - {name}: {description}")
return f"可用提示:\n" + "\n".join(prompt_list)
# 读取模板的方法
def read_template(self, uri):
"""读取模板"""
# 发送读取模板请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/templates/read",
"params": {"uri": uri},
}
)
# 请求ID递增
self.request_id += 1
return response
# 显示提示的方法
def show_prompt(self, name, arguments):
"""显示提示"""
# 发送显示提示请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "prompts/get",
"params": {"name": name, "arguments": arguments},
}
)
# 请求ID递增
self.request_id += 1
return response
# 获取提示的方法
def get_prompt(self, name, arguments):
"""获取提示"""
# 发送获取提示请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "prompts/get",
"params": {"name": name, "arguments": arguments},
}
)
# 请求ID递增
self.request_id += 1
return response
# 解析命令行参数为字典的方法
def _parse_arguments(self, args):
"""解析命令行参数为字典"""
arguments = {}
for arg in args:
if "=" in arg:
key, value = arg.split("=", 1)
arguments[key] = value
return arguments
# 演示采样命令的方法
def _mcp_sampling_command(self):
"""演示采样命令"""
logger.info("执行演示采样命令")
# 发送演示采样请求给服务器
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "mcp/sampling",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
+ self._handle_sampling_request(response.get("id"), response.get("params", {}))
# 演示引导命令的方法
+ def _mcp_elicitation_command(self):
+ """演示引导命令"""
+ logger.info("执行演示引导命令")
# 发送演示引导请求给服务器
+ response = self._send_request(
+ {
+ "jsonrpc": "2.0",
+ "id": self.request_id,
+ "method": "mcp/elicitation",
+ "params": {},
+ }
+ )
# 请求ID递增
+ self.request_id += 1
+ self._handle_elicitation_request(response.get("id"), response.get("params", {}))
# 处理引导请求的方法
+ def _handle_elicitation_request(self, request_id, params):
+ """处理服务器发起的引导请求"""
+ logger.info("收到服务器引导请求")
# 检查request_id是否存在
+ if not request_id:
+ logger.error("引导请求缺少request_id")
+ return
# 检查参数是否存在
+ if not params:
+ logger.error("引导请求缺少参数")
+ return
# 打印服务器请求信息
+ print(f"\n服务器请求用户偏好收集:")
+ print(f"消息: {params.get('message', '')}")
+ print(f"请求ID: {request_id}")
+ print(f"参数: {params}")
# 处理用户输入的命令
def process_query(self, query):
"""处理用户查询"""
# 去除首尾空白并按空格分割
parts = query.strip().split()
# 如果没有输入内容,提示输入有效命令
if not parts:
return "请输入有效的命令"
# 获取命令关键字并转为小写
command = parts[0].lower()
try:
# 判断命令类型并调用相应方法
if command == "help":
return self._show_help()
elif command == "tools":
return self._list_tools()
elif command == "ping":
result = self.ping()
return f"Ping结果: {result}"
elif command in ["get_current_time", "calculate", "echo"]:
return self._call_tool(command, parts[1:])
elif command == "resources":
return self._list_resources()
elif command == "read":
if len(parts) < 2:
return "用法: read <资源URI>"
uri = parts[1]
result = self.read_resource(uri)
return f"资源内容: {json.dumps(result, ensure_ascii=False, indent=2)}"
elif command == "templates":
return self._list_templates()
elif command == "template":
if len(parts) < 2:
return "用法: template <模板URI>"
uri = parts[1]
result = self.read_template(uri)
return f"模板内容: {json.dumps(result, ensure_ascii=False, indent=2)}"
elif command == "prompts":
return self._list_prompts()
elif command == "prompt":
if len(parts) < 2:
return "用法: prompt <提示名称> [参数]"
prompt_name = parts[1]
arguments = self._parse_arguments(parts[2:])
result = self.show_prompt(prompt_name, arguments)
return f"提示结果: {json.dumps(result, ensure_ascii=False, indent=2)}"
elif command == "mcp-sampling":
return self._mcp_sampling_command()
+ elif command == "mcp-elicitation":
+ return self._mcp_elicitation_command()
else:
return f"未知命令: {command}\n输入 'help' 查看帮助"
except Exception as e:
# 执行命令时出错,记录日志并返回错误信息
logger.error(f"执行命令时出错: {str(e)}")
return f"执行命令时出错: {str(e)}"
# 显示帮助信息
def _show_help(self):
return """
=== MCP客户端帮助 ===
基础命令:
help - 显示此帮助
ping - 测试服务器连接
工具相关:
tools - 列出所有工具
get_current_time [format] - 获取当前时间 (format: iso/timestamp/readable)
calculate <expression> - 计算表达式
echo <message> - 回显消息
资源相关:
resources - 列出所有资源
read <uri> - 读取资源内容
模板相关:
templates - 列出所有模板
template <uri> - 读取模板内容
提示相关:
prompts - 列出所有提示
prompt <name> [args] - 使用提示
演示功能:
mcp-sampling - 演示AI内容生成功能
+ mcp-elicitation - 演示用户偏好收集功能
示例:
help
ping
tools
get_current_time iso
calculate 2+3*4
echo hello world
read mcp://server-info
template mcp://templates/project-plan
prompt greeting name=Alice time_of_day=morning
mcp-sampling
+ mcp-elicitation
"""
# 运行交互式聊天循环
def chat_loop(self):
"""运行交互式聊天循环"""
# 记录客户端启动日志
logger.info("=== MCP客户端已启动! ===")
# 提示用户输入help查看命令
print("输入 'help' 查看所有可用命令")
# 提示用户输入quit退出
print("输入 'quit' 退出")
# 循环读取用户输入
while True:
try:
# 读取用户输入命令
query = input("\n命令: ").strip()
# 如果输入quit则退出循环
if query.lower() == "quit":
logger.info("用户退出客户端")
break
# 处理用户命令并输出结果
response = self.process_query(query)
print(f"\n{response}")
except Exception as e:
# 处理命令时出错,记录日志并输出错误
logger.error(f"处理命令时出错: {str(e)}")
print(f"\n错误: {str(e)}")
# 清理资源
def cleanup(self):
"""清理资源"""
# 设置关闭和断开标志
self._shutdown = True
self._connected = False
try:
# 如果服务器进程存在
if self.process:
logger.debug("终止服务器进程")
# 终止服务器进程
self.process.terminate()
# 等待进程结束,设置超时
try:
self.process.wait(timeout=5)
logger.info("服务器进程已正常终止")
except subprocess.TimeoutExpired:
logger.warning("服务器进程未在5秒内终止,强制杀死")
self.process.kill()
self.process.wait()
logger.info("服务器进程已强制终止")
except Exception as e:
# 清理资源时出错,记录日志
logger.error(f"清理资源时出错: {e}")
finally:
# 释放进程对象
self.process = None
# 主函数,程序入口
def main():
# 检查命令行参数数量
if len(sys.argv) < 2:
logger.error("用法: python client.py <服务器脚本路径>")
print("用法: python client.py <服务器脚本路径>")
sys.exit(1)
# 记录启动日志
logger.info(f"启动MCP客户端,连接服务器: {sys.argv[1]}")
# 创建客户端对象
client = MCPClient()
try:
# 连接服务器并进入交互循环
client.connect_to_server(sys.argv[1])
client.chat_loop()
except Exception as e:
# 客户端运行出错,记录日志并输出错误
logger.error(f"客户端运行出错: {e}")
print(f"错误: {e}")
finally:
# 清理客户端资源
logger.info("清理客户端资源")
client.cleanup()
# 判断是否为主程序入口
if __name__ == "__main__":
main()
11.2. mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
from ast import Dict
from datetime import datetime
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
from resources import (
meeting_notes_template,
project_plan_template,
weekly_report_template,
resources,
server_info,
sample_data,
templates,
)
from tools import tools
from prompts import prompts
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
self.tools = tools
# 定义可用资源
self.resources = resources
# 定义可用资源模板
self.templates = templates
# 定义可用提示(prompts)
self.prompts = prompts
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
"capabilities": {
"tools": {"listChanged": True},
"resources": {"listChanged": True},
"prompts": {"listChanged": True},
},
"serverInfo": self.server_info,
"resourceTemplates": list(self.templates.values()),
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
def _handle_initialize(self, request_id):
"""处理initialize请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": self.initialize({})}
def _handle_ping(self, request_id):
"""处理ping请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": {}}
def _handle_list_tools(self, request_id):
"""Handle tools list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"tools": list(self.tools.values())},
}
# 处理获取时间工具
def _handle_get_time(self, arguments):
"""Handle get time tool"""
# 获取格式类型,默认为readable
format_type = arguments.get("format", "readable")
# 获取当前时间
now = datetime.now()
# 根据格式类型返回不同格式
if format_type == "iso":
result = now.isoformat()
elif format_type == "timestamp":
result = str(now.timestamp())
else: # readable
result = now.strftime("%Y-%m-%d %H:%M:%S")
# 返回时间文本
return [{"type": "text", "text": f"Current time: {result}"}]
# 处理计算工具
def _handle_calculate(self, arguments):
"""Handle calculate tool"""
# 获取表达式
expression = arguments.get("expression", "")
try:
# 安全检查表达式字符
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
raise ValueError("Expression contains disallowed characters")
# 计算表达式
result = eval(expression)
# 返回计算结果
return [
{"type": "text", "text": f"Calculation result: {expression} = {result}"}
]
except Exception as e:
# 计算失败
return [{"type": "text", "text": f"Calculation failed: {str(e)}"}]
# 处理回显工具
def _handle_echo(self, arguments):
"""Handle echo tool"""
# 获取消息
message = arguments.get("message", "")
# 返回回显内容
return [{"type": "text", "text": f"Echo: {message}"}]
# 处理工具调用请求
def _handle_call_tool(self, request_id, params):
"""Handle tool call request"""
# 获取工具名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 记录工具调用
logger.debug(f"Tool call: {name} with arguments: {arguments}")
# 根据工具名称分发
if name == "get_current_time":
result = self._handle_get_time(arguments)
elif name == "calculate":
result = self._handle_calculate(arguments)
elif name == "echo":
result = self._handle_echo(arguments)
else:
# 未知工具
logger.warning(f"Unknown tool requested: {name}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Unknown tool: {name}"},
}
# 记录工具调用成功
logger.debug(f"Tool {name} executed successfully")
# 返回工具调用结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"content": result},
}
except Exception as e:
# 工具调用失败
logger.error(f"Tool {name} call failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Tool call failed: {str(e)}"},
}
# 处理资源列表请求
def _handle_list_resources(self, request_id):
"""Handle resources list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resources": list(self.resources.values())},
}
# 处理读取资源请求
def _handle_read_resource(self, request_id, params):
"""Handle resource read request"""
# 获取资源URI
uri = params.get("uri")
try:
# 判断资源类型
if uri == "mcp://server-info":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(server_info, indent=2),
}
]
},
}
elif uri == "mcp://sample-data":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(sample_data, indent=2),
}
]
},
}
else:
# 未找到资源
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32602, "message": f"Resource not found: {uri}"},
}
except Exception as e:
# 读取资源失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read resource failed: {str(e)}"},
}
# 处理模板列表请求
def _handle_list_templates(self, request_id):
"""Handle template list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resourceTemplates": list(self.templates.values())},
}
# 处理读取模板请求
def _handle_read_template(self, request_id, params):
# 获取模板URI
uri = params.get("uri")
try:
# 判断模板类型
if uri == "mcp://templates/project-plan":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(project_plan_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/meeting-notes":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(meeting_notes_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/weekly-report":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(weekly_report_template, indent=2),
}
]
},
}
else:
# 未找到模板
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Template not found: {uri}"},
}
except Exception as e:
# 读取模板失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read template failed: {str(e)}"},
}
# 处理提示列表请求
def _handle_list_prompts(self, request_id):
"""Handle prompt list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"prompts": list(self.prompts.values())},
}
# 处理问候提示
def _handle_greeting_prompt(self, arguments):
"""Handle greeting prompt"""
# 获取姓名,默认为User
name = arguments.get("name", "User")
# 获取时间段,默认为day
time_of_day = arguments.get("time_of_day", "day")
# 问候语映射
greetings = {
"morning": "Good morning",
"afternoon": "Good afternoon",
"evening": "Good evening",
"night": "Good night",
}
# 获取对应问候语
greeting = greetings.get(time_of_day, "Hello")
# 生成内容
content = f"{greeting}, {name}! Welcome to the MCP server."
# 返回问候消息
return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理状态报告提示
def _handle_status_report_prompt(self, arguments):
"""Handle status report prompt"""
# 获取项目名,默认为Unknown Project
project = arguments.get("project", "Unknown Project")
# 获取状态,默认为unknown
status = arguments.get("status", "unknown")
# 状态消息映射
status_messages = {
"on_track": "Project is on track",
"at_risk": "Project is at risk",
"behind_schedule": "Project is behind schedule",
}
# 获取对应状态消息
message = status_messages.get(status, "Project status unknown")
# 生成内容
content = f"Project '{project}' status report: {message}"
# 返回状态报告消息
return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理获取提示请求
def _handle_get_prompt(self, request_id, params):
"""Handle prompt get request"""
# 获取提示名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 根据提示名称分发
if name == "greeting":
result = self._handle_greeting_prompt(arguments)
elif name == "status_report":
result = self._handle_status_report_prompt(arguments)
else:
# 未知提示
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32604, "message": f"Unknown prompt: {name}"},
}
# 返回提示结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"messages": result},
}
except Exception as e:
# 获取提示失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Get prompt failed: {str(e)}"},
}
# 处理演示采样请求
def _handle_mcp_sampling(self, request_id, params):
try:
# 日志记录收到演示采样请求
logger.info("收到客户端演示采样请求")
# 构造采样请求
sampling_request = {
"jsonrpc": "2.0",
"id": f"mcp_sampling_{request_id}",
"method": "sampling/createMessage",
"params": {
"messages": [
{
"role": "user",
"content": {
"type": "text",
"text": "Write a creative story about a robot learning to paint in a digital art studio",
},
}
],
"modelPreferences": {
"hints": [{"name": "gpt-4"}],
"intelligencePriority": 0.8,
"speedPriority": 0.5,
"costPriority": 0.3,
},
"systemPrompt": "You are a creative storyteller. Write engaging and imaginative stories.",
"maxTokens": 200,
},
}
# 日志记录已发送
logger.info("已发送采样请求到客户端")
# 返回成功响应
return sampling_request
except Exception as e:
# 采样失败日志
logger.error(f"Mcp sampling failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Mcp sampling failed: {str(e)}"},
}
# 处理演示引导请求
+ def _handle_mcp_elicitation(self, request_id, params):
+ try:
# 日志记录收到演示引导请求
+ logger.info("收到客户端演示引导请求")
# 构造引导请求
+ elicitation_request = {
+ "jsonrpc": "2.0",
+ "id": f"mcp_elicitation_{request_id}",
+ "method": "elicitation/create",
+ "params": {
+ "message": "Please provide your preferences for the project configuration",
+ "requestedSchema": {
+ "type": "object",
+ "properties": {
+ "user_name": {"type": "string", "description": "Your name"},
+ "preference": {
+ "type": "string",
+ "description": "Your preferred style",
+ "enum": ["creative", "analytical", "practical"],
+ },
+ "feedback": {
+ "type": "string",
+ "description": "Any additional feedback",
+ },
+ },
+ "required": ["user_name", "preference"],
+ },
+ },
+ }
# 发送引导请求到客户端
+ request_str = json.dumps(elicitation_request, ensure_ascii=False)
+ print(request_str)
+ sys.stdout.flush()
# 日志记录已发送
+ logger.info("已发送引导请求到客户端")
+ return elicitation_request
# 返回成功响应
# return {
# "jsonrpc": "2.0",
# "id": request_id,
# "result": {
# "message": "Elicitation request sent to client",
# "requestId": f"mcp_elicitation_{request_id}",
# },
# }
+ except Exception as e:
# 引导失败日志
+ logger.error(f"Mcp elicitation failed: {str(e)}")
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {
+ "code": -32603,
+ "message": f"Mcp elicitation failed: {str(e)}",
+ },
+ }
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
if method == "initialize":
return self._handle_initialize(request_id)
elif method == "ping":
return self._handle_ping(request_id)
elif method == "tools/list":
return self._handle_list_tools(request_id)
elif method == "tools/call":
return self._handle_call_tool(request_id, params)
elif method == "resources/list":
return self._handle_list_resources(request_id)
elif method == "resources/read":
return self._handle_read_resource(request_id, params)
elif method == "resources/templates/list":
return self._handle_list_templates(request_id)
elif method == "resources/templates/read":
return self._handle_read_template(request_id, params)
elif method == "prompts/list":
return self._handle_list_prompts(request_id)
elif method == "prompts/get":
return self._handle_get_prompt(request_id, params)
elif method == "mcp/sampling":
return self._handle_mcp_sampling(request_id, params)
+ elif method == "mcp/elicitation":
+ return self._handle_mcp_elicitation(request_id, params)
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32605, "message": f"Method {request} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()
12.自动采样 #
12.1. mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
from ast import Dict
from datetime import datetime
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
+import threading
+import time
from resources import (
meeting_notes_template,
project_plan_template,
weekly_report_template,
resources,
server_info,
sample_data,
templates,
)
from tools import tools
from prompts import prompts
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
self.tools = tools
# 定义可用资源
self.resources = resources
# 定义可用资源模板
self.templates = templates
# 定义可用提示(prompts)
self.prompts = prompts
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
"capabilities": {
"tools": {"listChanged": True},
"resources": {"listChanged": True},
"prompts": {"listChanged": True},
},
"serverInfo": self.server_info,
"resourceTemplates": list(self.templates.values()),
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
def _handle_initialize(self, request_id):
"""处理initialize请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": self.initialize({})}
def _handle_ping(self, request_id):
"""处理ping请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": {}}
def _handle_list_tools(self, request_id):
"""Handle tools list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"tools": list(self.tools.values())},
}
# 处理获取时间工具
def _handle_get_time(self, arguments):
"""Handle get time tool"""
# 获取格式类型,默认为readable
format_type = arguments.get("format", "readable")
# 获取当前时间
now = datetime.now()
# 根据格式类型返回不同格式
if format_type == "iso":
result = now.isoformat()
elif format_type == "timestamp":
result = str(now.timestamp())
else: # readable
result = now.strftime("%Y-%m-%d %H:%M:%S")
# 返回时间文本
return [{"type": "text", "text": f"Current time: {result}"}]
# 处理计算工具
def _handle_calculate(self, arguments):
"""Handle calculate tool"""
# 获取表达式
expression = arguments.get("expression", "")
try:
# 安全检查表达式字符
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
raise ValueError("Expression contains disallowed characters")
# 计算表达式
result = eval(expression)
# 返回计算结果
return [
{"type": "text", "text": f"Calculation result: {expression} = {result}"}
]
except Exception as e:
# 计算失败
return [{"type": "text", "text": f"Calculation failed: {str(e)}"}]
# 处理回显工具
def _handle_echo(self, arguments):
"""Handle echo tool"""
# 获取消息
message = arguments.get("message", "")
# 返回回显内容
return [{"type": "text", "text": f"Echo: {message}"}]
# 处理工具调用请求
def _handle_call_tool(self, request_id, params):
"""Handle tool call request"""
# 获取工具名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 记录工具调用
logger.debug(f"Tool call: {name} with arguments: {arguments}")
# 根据工具名称分发
if name == "get_current_time":
result = self._handle_get_time(arguments)
elif name == "calculate":
result = self._handle_calculate(arguments)
elif name == "echo":
result = self._handle_echo(arguments)
else:
# 未知工具
logger.warning(f"Unknown tool requested: {name}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Unknown tool: {name}"},
}
# 记录工具调用成功
logger.debug(f"Tool {name} executed successfully")
# 返回工具调用结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"content": result},
}
except Exception as e:
# 工具调用失败
logger.error(f"Tool {name} call failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Tool call failed: {str(e)}"},
}
# 处理资源列表请求
def _handle_list_resources(self, request_id):
"""Handle resources list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resources": list(self.resources.values())},
}
# 处理读取资源请求
def _handle_read_resource(self, request_id, params):
"""Handle resource read request"""
# 获取资源URI
uri = params.get("uri")
try:
# 判断资源类型
if uri == "mcp://server-info":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(server_info, indent=2),
}
]
},
}
elif uri == "mcp://sample-data":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(sample_data, indent=2),
}
]
},
}
else:
# 未找到资源
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32602, "message": f"Resource not found: {uri}"},
}
except Exception as e:
# 读取资源失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read resource failed: {str(e)}"},
}
# 处理模板列表请求
def _handle_list_templates(self, request_id):
"""Handle template list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resourceTemplates": list(self.templates.values())},
}
# 处理读取模板请求
def _handle_read_template(self, request_id, params):
# 获取模板URI
uri = params.get("uri")
try:
# 判断模板类型
if uri == "mcp://templates/project-plan":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(project_plan_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/meeting-notes":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(meeting_notes_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/weekly-report":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(weekly_report_template, indent=2),
}
]
},
}
else:
# 未找到模板
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Template not found: {uri}"},
}
except Exception as e:
# 读取模板失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read template failed: {str(e)}"},
}
# 处理提示列表请求
def _handle_list_prompts(self, request_id):
"""Handle prompt list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"prompts": list(self.prompts.values())},
}
# 处理问候提示
def _handle_greeting_prompt(self, arguments):
"""Handle greeting prompt"""
# 获取姓名,默认为User
name = arguments.get("name", "User")
# 获取时间段,默认为day
time_of_day = arguments.get("time_of_day", "day")
# 问候语映射
greetings = {
"morning": "Good morning",
"afternoon": "Good afternoon",
"evening": "Good evening",
"night": "Good night",
}
# 获取对应问候语
greeting = greetings.get(time_of_day, "Hello")
# 生成内容
content = f"{greeting}, {name}! Welcome to the MCP server."
# 返回问候消息
return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理状态报告提示
def _handle_status_report_prompt(self, arguments):
"""Handle status report prompt"""
# 获取项目名,默认为Unknown Project
project = arguments.get("project", "Unknown Project")
# 获取状态,默认为unknown
status = arguments.get("status", "unknown")
# 状态消息映射
status_messages = {
"on_track": "Project is on track",
"at_risk": "Project is at risk",
"behind_schedule": "Project is behind schedule",
}
# 获取对应状态消息
message = status_messages.get(status, "Project status unknown")
# 生成内容
content = f"Project '{project}' status report: {message}"
# 返回状态报告消息
return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理获取提示请求
def _handle_get_prompt(self, request_id, params):
"""Handle prompt get request"""
# 获取提示名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 根据提示名称分发
if name == "greeting":
result = self._handle_greeting_prompt(arguments)
elif name == "status_report":
result = self._handle_status_report_prompt(arguments)
else:
# 未知提示
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32604, "message": f"Unknown prompt: {name}"},
}
# 返回提示结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"messages": result},
}
except Exception as e:
# 获取提示失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Get prompt failed: {str(e)}"},
}
# 处理演示采样请求
def _handle_mcp_sampling(self, request_id, params):
try:
# 日志记录收到演示采样请求
logger.info("收到客户端演示采样请求")
# 构造采样请求
sampling_request = {
"jsonrpc": "2.0",
"id": f"mcp_sampling_{request_id}",
"method": "sampling/createMessage",
"params": {
"messages": [
{
"role": "user",
"content": {
"type": "text",
"text": "Write a creative story about a robot learning to paint in a digital art studio",
},
}
],
"modelPreferences": {
"hints": [{"name": "gpt-4"}],
"intelligencePriority": 0.8,
"speedPriority": 0.5,
"costPriority": 0.3,
},
"systemPrompt": "You are a creative storyteller. Write engaging and imaginative stories.",
"maxTokens": 200,
},
}
# 日志记录已发送
logger.info("已发送采样请求到客户端")
# 返回成功响应
return sampling_request
except Exception as e:
# 采样失败日志
logger.error(f"Mcp sampling failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Mcp sampling failed: {str(e)}"},
}
# 处理演示引导请求
def _handle_mcp_elicitation(self, request_id, params):
try:
# 日志记录收到演示引导请求
logger.info("收到客户端演示引导请求")
# 构造引导请求
elicitation_request = {
"jsonrpc": "2.0",
"id": f"mcp_elicitation_{request_id}",
"method": "elicitation/create",
"params": {
"message": "Please provide your preferences for the project configuration",
"requestedSchema": {
"type": "object",
"properties": {
"user_name": {"type": "string", "description": "Your name"},
"preference": {
"type": "string",
"description": "Your preferred style",
"enum": ["creative", "analytical", "practical"],
},
"feedback": {
"type": "string",
"description": "Any additional feedback",
},
},
"required": ["user_name", "preference"],
},
},
}
# 发送引导请求到客户端
request_str = json.dumps(elicitation_request, ensure_ascii=False)
print(request_str)
sys.stdout.flush()
# 日志记录已发送
logger.info("已发送引导请求到客户端")
return elicitation_request
# 返回成功响应
# return {
# "jsonrpc": "2.0",
# "id": request_id,
# "result": {
# "message": "Elicitation request sent to client",
# "requestId": f"mcp_elicitation_{request_id}",
# },
# }
except Exception as e:
# 引导失败日志
logger.error(f"Mcp elicitation failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": f"Mcp elicitation failed: {str(e)}",
},
}
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
if method == "initialize":
return self._handle_initialize(request_id)
elif method == "ping":
return self._handle_ping(request_id)
elif method == "tools/list":
return self._handle_list_tools(request_id)
elif method == "tools/call":
return self._handle_call_tool(request_id, params)
elif method == "resources/list":
return self._handle_list_resources(request_id)
elif method == "resources/read":
return self._handle_read_resource(request_id, params)
elif method == "resources/templates/list":
return self._handle_list_templates(request_id)
elif method == "resources/templates/read":
return self._handle_read_template(request_id, params)
elif method == "prompts/list":
return self._handle_list_prompts(request_id)
elif method == "prompts/get":
return self._handle_get_prompt(request_id, params)
elif method == "mcp/sampling":
return self._handle_mcp_sampling(request_id, params)
elif method == "mcp/elicitation":
return self._handle_mcp_elicitation(request_id, params)
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32605, "message": f"Method {request} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 自动发送采样请求(10秒后)
+def auto_sampling_request():
# 线程函数,延迟后发送采样请求
+ def send_sampling_request():
+ time.sleep(10)
# 构造采样请求
+ sampling_request = {
+ "jsonrpc": "2.0",
+ "id": "auto_sampling_001",
+ "method": "sampling/createMessage",
+ "params": {
+ "messages": [
+ {
+ "role": "user",
+ "content": {
+ "type": "text",
+ "text": "Write a creative story about a robot learning to paint in a digital art studio",
+ },
+ }
+ ],
+ "modelPreferences": {
+ "hints": [{"name": "gpt-4"}],
+ "intelligencePriority": 0.8,
+ "speedPriority": 0.5,
+ "costPriority": 0.3,
+ },
+ "systemPrompt": "You are a creative storyteller. Write engaging and imaginative stories.",
+ "maxTokens": 200,
+ },
+ }
# 发送请求到标准输出(MCP Inspector会读取)
+ request_str = json.dumps(sampling_request, ensure_ascii=False)
+ print(request_str)
+ sys.stdout.flush()
# 日志记录自动请求
+ logger.info("Auto sampling request sent to MCP Inspector")
+ print(f"Auto sampling request sent: {request_str}", file=sys.stderr)
# 启动自动采样线程
+ auto_thread = threading.Thread(target=send_sampling_request, daemon=True)
+ auto_thread.start()
+ logger.info("Auto sampling thread started - will send request in 10 seconds")
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
+ auto_sampling_request()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()
13.自动引导 #
13.1. mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
from ast import Dict
from datetime import datetime
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
import threading
import time
from resources import (
meeting_notes_template,
project_plan_template,
weekly_report_template,
resources,
server_info,
sample_data,
templates,
)
from tools import tools
from prompts import prompts
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
self.tools = tools
# 定义可用资源
self.resources = resources
# 定义可用资源模板
self.templates = templates
# 定义可用提示(prompts)
self.prompts = prompts
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
"capabilities": {
"tools": {"listChanged": True},
"resources": {"listChanged": True},
"prompts": {"listChanged": True},
},
"serverInfo": self.server_info,
"resourceTemplates": list(self.templates.values()),
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
def _handle_initialize(self, request_id):
"""处理initialize请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": self.initialize({})}
def _handle_ping(self, request_id):
"""处理ping请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": {}}
def _handle_list_tools(self, request_id):
"""Handle tools list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"tools": list(self.tools.values())},
}
# 处理获取时间工具
def _handle_get_time(self, arguments):
"""Handle get time tool"""
# 获取格式类型,默认为readable
format_type = arguments.get("format", "readable")
# 获取当前时间
now = datetime.now()
# 根据格式类型返回不同格式
if format_type == "iso":
result = now.isoformat()
elif format_type == "timestamp":
result = str(now.timestamp())
else: # readable
result = now.strftime("%Y-%m-%d %H:%M:%S")
# 返回时间文本
return [{"type": "text", "text": f"Current time: {result}"}]
# 处理计算工具
def _handle_calculate(self, arguments):
"""Handle calculate tool"""
# 获取表达式
expression = arguments.get("expression", "")
try:
# 安全检查表达式字符
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
raise ValueError("Expression contains disallowed characters")
# 计算表达式
result = eval(expression)
# 返回计算结果
return [
{"type": "text", "text": f"Calculation result: {expression} = {result}"}
]
except Exception as e:
# 计算失败
return [{"type": "text", "text": f"Calculation failed: {str(e)}"}]
# 处理回显工具
def _handle_echo(self, arguments):
"""Handle echo tool"""
# 获取消息
message = arguments.get("message", "")
# 返回回显内容
return [{"type": "text", "text": f"Echo: {message}"}]
# 处理工具调用请求
def _handle_call_tool(self, request_id, params):
"""Handle tool call request"""
# 获取工具名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 记录工具调用
logger.debug(f"Tool call: {name} with arguments: {arguments}")
# 根据工具名称分发
if name == "get_current_time":
result = self._handle_get_time(arguments)
elif name == "calculate":
result = self._handle_calculate(arguments)
elif name == "echo":
result = self._handle_echo(arguments)
else:
# 未知工具
logger.warning(f"Unknown tool requested: {name}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Unknown tool: {name}"},
}
# 记录工具调用成功
logger.debug(f"Tool {name} executed successfully")
# 返回工具调用结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"content": result},
}
except Exception as e:
# 工具调用失败
logger.error(f"Tool {name} call failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Tool call failed: {str(e)}"},
}
# 处理资源列表请求
def _handle_list_resources(self, request_id):
"""Handle resources list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resources": list(self.resources.values())},
}
# 处理读取资源请求
def _handle_read_resource(self, request_id, params):
"""Handle resource read request"""
# 获取资源URI
uri = params.get("uri")
try:
# 判断资源类型
if uri == "mcp://server-info":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(server_info, indent=2),
}
]
},
}
elif uri == "mcp://sample-data":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(sample_data, indent=2),
}
]
},
}
else:
# 未找到资源
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32602, "message": f"Resource not found: {uri}"},
}
except Exception as e:
# 读取资源失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read resource failed: {str(e)}"},
}
# 处理模板列表请求
def _handle_list_templates(self, request_id):
"""Handle template list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resourceTemplates": list(self.templates.values())},
}
# 处理读取模板请求
def _handle_read_template(self, request_id, params):
# 获取模板URI
uri = params.get("uri")
try:
# 判断模板类型
if uri == "mcp://templates/project-plan":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(project_plan_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/meeting-notes":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(meeting_notes_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/weekly-report":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(weekly_report_template, indent=2),
}
]
},
}
else:
# 未找到模板
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Template not found: {uri}"},
}
except Exception as e:
# 读取模板失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read template failed: {str(e)}"},
}
# 处理提示列表请求
def _handle_list_prompts(self, request_id):
"""Handle prompt list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"prompts": list(self.prompts.values())},
}
# 处理问候提示
def _handle_greeting_prompt(self, arguments):
"""Handle greeting prompt"""
# 获取姓名,默认为User
name = arguments.get("name", "User")
# 获取时间段,默认为day
time_of_day = arguments.get("time_of_day", "day")
# 问候语映射
greetings = {
"morning": "Good morning",
"afternoon": "Good afternoon",
"evening": "Good evening",
"night": "Good night",
}
# 获取对应问候语
greeting = greetings.get(time_of_day, "Hello")
# 生成内容
content = f"{greeting}, {name}! Welcome to the MCP server."
# 返回问候消息
return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理状态报告提示
def _handle_status_report_prompt(self, arguments):
"""Handle status report prompt"""
# 获取项目名,默认为Unknown Project
project = arguments.get("project", "Unknown Project")
# 获取状态,默认为unknown
status = arguments.get("status", "unknown")
# 状态消息映射
status_messages = {
"on_track": "Project is on track",
"at_risk": "Project is at risk",
"behind_schedule": "Project is behind schedule",
}
# 获取对应状态消息
message = status_messages.get(status, "Project status unknown")
# 生成内容
content = f"Project '{project}' status report: {message}"
# 返回状态报告消息
return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理获取提示请求
def _handle_get_prompt(self, request_id, params):
"""Handle prompt get request"""
# 获取提示名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 根据提示名称分发
if name == "greeting":
result = self._handle_greeting_prompt(arguments)
elif name == "status_report":
result = self._handle_status_report_prompt(arguments)
else:
# 未知提示
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32604, "message": f"Unknown prompt: {name}"},
}
# 返回提示结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"messages": result},
}
except Exception as e:
# 获取提示失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Get prompt failed: {str(e)}"},
}
# 处理演示采样请求
def _handle_mcp_sampling(self, request_id, params):
try:
# 日志记录收到演示采样请求
logger.info("收到客户端演示采样请求")
# 构造采样请求
sampling_request = {
"jsonrpc": "2.0",
"id": f"mcp_sampling_{request_id}",
"method": "sampling/createMessage",
"params": {
"messages": [
{
"role": "user",
"content": {
"type": "text",
"text": "Write a creative story about a robot learning to paint in a digital art studio",
},
}
],
"modelPreferences": {
"hints": [{"name": "gpt-4"}],
"intelligencePriority": 0.8,
"speedPriority": 0.5,
"costPriority": 0.3,
},
"systemPrompt": "You are a creative storyteller. Write engaging and imaginative stories.",
"maxTokens": 200,
},
}
# 日志记录已发送
logger.info("已发送采样请求到客户端")
# 返回成功响应
return sampling_request
except Exception as e:
# 采样失败日志
logger.error(f"Mcp sampling failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Mcp sampling failed: {str(e)}"},
}
# 处理演示引导请求
def _handle_mcp_elicitation(self, request_id, params):
try:
# 日志记录收到演示引导请求
logger.info("收到客户端演示引导请求")
# 构造引导请求
elicitation_request = {
"jsonrpc": "2.0",
"id": f"mcp_elicitation_{request_id}",
"method": "elicitation/create",
"params": {
"message": "Please provide your preferences for the project configuration",
"requestedSchema": {
"type": "object",
"properties": {
"user_name": {"type": "string", "description": "Your name"},
"preference": {
"type": "string",
"description": "Your preferred style",
"enum": ["creative", "analytical", "practical"],
},
"feedback": {
"type": "string",
"description": "Any additional feedback",
},
},
"required": ["user_name", "preference"],
},
},
}
# 发送引导请求到客户端
request_str = json.dumps(elicitation_request, ensure_ascii=False)
print(request_str)
sys.stdout.flush()
# 日志记录已发送
logger.info("已发送引导请求到客户端")
return elicitation_request
# 返回成功响应
# return {
# "jsonrpc": "2.0",
# "id": request_id,
# "result": {
# "message": "Elicitation request sent to client",
# "requestId": f"mcp_elicitation_{request_id}",
# },
# }
except Exception as e:
# 引导失败日志
logger.error(f"Mcp elicitation failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": f"Mcp elicitation failed: {str(e)}",
},
}
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
if method == "initialize":
return self._handle_initialize(request_id)
elif method == "ping":
return self._handle_ping(request_id)
elif method == "tools/list":
return self._handle_list_tools(request_id)
elif method == "tools/call":
return self._handle_call_tool(request_id, params)
elif method == "resources/list":
return self._handle_list_resources(request_id)
elif method == "resources/read":
return self._handle_read_resource(request_id, params)
elif method == "resources/templates/list":
return self._handle_list_templates(request_id)
elif method == "resources/templates/read":
return self._handle_read_template(request_id, params)
elif method == "prompts/list":
return self._handle_list_prompts(request_id)
elif method == "prompts/get":
return self._handle_get_prompt(request_id, params)
elif method == "mcp/sampling":
return self._handle_mcp_sampling(request_id, params)
elif method == "mcp/elicitation":
return self._handle_mcp_elicitation(request_id, params)
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32605, "message": f"Method {request} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 自动发送采样请求(10秒后)
def auto_sampling_request():
# 线程函数,延迟后发送采样请求
def send_sampling_request():
time.sleep(10)
# 构造采样请求
sampling_request = {
"jsonrpc": "2.0",
"id": "auto_sampling_001",
"method": "sampling/createMessage",
"params": {
"messages": [
{
"role": "user",
"content": {
"type": "text",
"text": "Write a creative story about a robot learning to paint in a digital art studio",
},
}
],
"modelPreferences": {
"hints": [{"name": "gpt-4"}],
"intelligencePriority": 0.8,
"speedPriority": 0.5,
"costPriority": 0.3,
},
"systemPrompt": "You are a creative storyteller. Write engaging and imaginative stories.",
"maxTokens": 200,
},
}
# 发送请求到标准输出(MCP Inspector会读取)
request_str = json.dumps(sampling_request, ensure_ascii=False)
print(request_str)
sys.stdout.flush()
# 日志记录自动请求
logger.info("Auto sampling request sent to MCP Inspector")
print(f"Auto sampling request sent: {request_str}", file=sys.stderr)
# 启动自动采样线程
auto_thread = threading.Thread(target=send_sampling_request, daemon=True)
auto_thread.start()
logger.info("Auto sampling thread started - will send request in 10 seconds")
# 自动发送引导请求(10秒后)
+def auto_elicitation_request():
# 线程函数,延迟后发送引导请求
+ def send_elicitation_request():
+ time.sleep(10)
# 构造引导请求
+ elicitation_request = {
+ "jsonrpc": "2.0",
+ "id": "auto_elicitation_001",
+ "method": "elicitation/create",
+ "params": {
+ "message": "Please provide your preferences for the project configuration",
+ "requestedSchema": {
+ "type": "object",
+ "properties": {
+ "user_name": {"type": "string", "description": "Your name"},
+ "preference": {
+ "type": "string",
+ "description": "Your preferred style",
+ "enum": ["creative", "analytical", "practical"],
+ },
+ "feedback": {
+ "type": "string",
+ "description": "Any additional feedback",
+ },
+ },
+ "required": ["user_name", "preference"],
+ },
+ },
+ }
# 发送请求到标准输出(MCP Inspector会读取)
+ request_str = json.dumps(elicitation_request, ensure_ascii=False)
+ print(request_str)
+ sys.stdout.flush()
# 日志记录自动请求
+ logger.info("Auto elicitation request sent to MCP Inspector")
+ print(f"Auto elicitation request sent: {request_str}", file=sys.stderr)
# 启动自动引导线程
+ auto_thread = threading.Thread(target=send_elicitation_request, daemon=True)
+ auto_thread.start()
+ logger.info("Auto elicitation thread started - will send request in 10 seconds")
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
# auto_sampling_request()
+ auto_elicitation_request()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()
14.完成 #
14.1. mcp_client.py #
mcp_client.py
# 导入subprocess模块,用于创建和管理子进程
import subprocess
# 导入sys模块,提供对Python解释器相关的变量和函数的访问
import sys
# 导入json模块,用于处理JSON数据
import json
# 导入os模块,提供与操作系统交互的功能
import os
# 导入logging模块,用于日志记录
import logging
# 从dotenv包导入load_dotenv函数,用于加载环境变量
from dotenv import load_dotenv
# 加载.env文件中的环境变量
load_dotenv() # 从.env加载环境变量
# 配置日志记录的函数
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器,指定日志输出格式
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置控制台处理器的日志格式
console_handler.setFormatter(formatter)
# 配置根日志记录器,设置日志级别和处理器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 设置日志记录器
logger = setup_logging()
# 定义MCPClient类
class MCPClient:
# 构造函数,初始化客户端对象
def __init__(self):
# 服务器进程对象
self.process = None
# 请求ID,每次请求自增
self.request_id = 1
# 可用工具列表
self.available_tools = []
# 可用资源列表
self.available_resources = []
# 可用模板列表
self.available_templates = []
# 可用提示列表
self.available_prompts = []
# 连接状态标志
self._connected = False
# 关闭状态标志
self._shutdown = False
# 初始化服务器连接
def _initialize_server(self):
"""初始化服务器连接"""
# 记录调试日志,发送初始化请求
logger.debug("发送初始化请求...")
# 发送初始化请求
init_response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {"name": "mcp-client", "version": "1.0.0"},
},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否有result字段
if "result" not in init_response:
logger.error(f"初始化失败: {init_response}")
return False
# 发送initialized通知
logger.debug("发送initialized通知...")
self._send_request(
{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}
)
# 记录服务器初始化成功
logger.info("服务器初始化成功")
return True
# ping服务器,测试连通性
def ping(self):
"""Ping服务器"""
# 发送ping请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "ping",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 返回响应
return response
# 发送JSON-RPC请求到服务器
def _send_request(self, request):
"""发送JSON-RPC请求到服务器"""
# 检查服务器进程是否存在且未退出
if not self.process or self.process.poll() is not None:
logger.error("服务器连接已断开")
return {"error": "Server connection lost"}
# 将请求对象序列化为JSON字符串
request_str = json.dumps(request, ensure_ascii=False)
# 记录调试日志,请求方法和ID
logger.debug(
f"发送请求: {request.get('method', 'unknown')} (ID: {request.get('id', 'none')})"
)
try:
# 向服务器进程写入请求
self.process.stdin.write(request_str + "\n")
self.process.stdin.flush()
except Exception as e:
# 发送请求时出错,记录错误日志
logger.error(f"发送请求时出错: {e}")
return {"error": f"发送失败: {e}"}
# 检查是否为通知类请求(没有id字段)
if "id" not in request:
logger.debug("通知类请求,不等待响应")
return {"result": "notification_sent"}
# 读取服务器响应
logger.debug("等待响应...")
try:
# 读取一行响应
response_line = self.process.stdout.readline()
if response_line:
try:
# 解析JSON响应
response = json.loads(response_line.strip())
logger.debug(f"收到响应: {response.get('result', 'error')}")
return response
except json.JSONDecodeError as e:
# JSON解析错误
logger.error(f"JSON解析错误: {e}")
return {"error": f"JSON解析失败: {e}"}
else:
# 没有收到响应
logger.error("没有收到响应")
return {"error": "没有响应"}
except Exception as e:
# 读取响应时出错
logger.error(f"读取响应时出错: {e}")
return {"error": f"读取失败: {e}"}
# 加载模板列表的方法
def _load_templates(self):
"""加载模板列表"""
# 记录日志:获取模板列表
logger.debug("获取模板列表...")
# 发送获取模板列表请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/templates/list",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
# 检查响应并保存模板列表
if "result" in response and "resourceTemplates" in response["result"]:
self.available_templates = response["result"]["resourceTemplates"]
logger.info(
f"可用模板: {[tpl['name'] for tpl in self.available_templates]}"
)
else:
logger.error(f"获取模板列表失败: {response}")
# 加载提示列表的方法
def _load_prompts(self):
"""加载提示列表"""
# 记录日志:获取提示列表
logger.debug("获取提示列表...")
# 发送获取提示列表请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "prompts/list",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
# 检查响应并保存提示列表
if "result" in response and "prompts" in response["result"]:
self.available_prompts = response["result"]["prompts"]
logger.info(
f"可用提示: {[prompt['name'] for prompt in self.available_prompts]}"
)
else:
logger.error(f"获取提示列表失败: {response}")
# 加载工具列表
def _load_tools(self):
"""加载工具列表"""
# 记录调试日志
logger.debug("获取工具列表...")
# 发送tools/list请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/list",
"params": {},
}
)
# 请求ID自增
self.request_id += 1
# 检查响应中是否包含工具列表
if "result" in response and "tools" in response["result"]:
self.available_tools = response["result"]["tools"]
logger.info(f"可用工具: {[tool['name'] for tool in self.available_tools]}")
else:
logger.error(f"获取工具列表失败: {response}")
# 加载资源列表的方法
def _load_resources(self):
"""加载资源列表"""
# 记录日志:获取资源列表
logger.debug("获取资源列表...")
# 发送获取资源列表请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/list",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
# 检查响应并保存资源列表
if "result" in response and "resources" in response["result"]:
self.available_resources = response["result"]["resources"]
logger.info(
f"可用资源: {[res['name'] for res in self.available_resources]}"
)
else:
logger.error(f"获取资源列表失败: {response}")
# 加载所有可用功能
def _load_all_capabilities(self):
"""加载所有可用功能"""
# 记录日志,开始加载功能
logger.info("正在加载服务器功能...")
# 初始化服务器
if not self._initialize_server():
return
# 加载工具列表
self._load_tools()
# 加载资源列表
self._load_resources()
# 加载模板列表
self._load_templates()
# 加载提示列表
self._load_prompts()
# 记录日志,功能加载完成
logger.info("功能加载完成!")
# 处理采样请求的方法
def _handle_sampling_request(self, request_id, params):
"""处理服务器发起的采样请求"""
logger.info("收到服务器采样请求")
# 检查request_id是否存在
if not request_id:
logger.error("采样请求缺少request_id")
return
# 检查参数是否存在
if not params:
logger.error("采样请求缺少参数")
return
# 打印服务器请求信息
print(f"\n🤖 服务器请求AI生成内容:")
print(f"消息: {params.get('messages', [])}")
print(f"系统提示: {params.get('systemPrompt', '')}")
print(f"最大Access Token数: {params.get('maxTokens', 100)}")
# 模拟用户审核过程
print("\n请选择操作:")
print("1. 批准并生成内容")
print("2. 拒绝请求")
try:
# 获取用户选择
choice = input("请输入选择 (1/2): ").strip()
except (EOFError, KeyboardInterrupt):
logger.info("用户中断了采样请求")
return
if choice == "1":
# 模拟生成内容
sample_text = "这是模拟生成的AI内容,基于服务器的请求。"
result = {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"role": "assistant",
"content": {"type": "text", "text": sample_text},
"model": "gpt-4",
"stopReason": "endTurn",
},
}
print(f"\n 已生成内容: {sample_text}")
# 发送响应给服务器
# self._send_response_to_server(result)
elif choice == "2":
# 拒绝请求
error_result = {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -1, "message": "User rejected sampling request"},
}
print("\n 已拒绝采样请求")
# self._send_response_to_server(error_result)
else:
print("\n 无效选择,请求被拒绝")
error_result = {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -1, "message": "Invalid user choice"},
}
self._send_response_to_server(error_result)
# 发送响应给服务器的方法
def _send_response_to_server(self, response):
"""发送响应给服务器"""
try:
# 将响应对象转为JSON字符串
response_str = json.dumps(response, ensure_ascii=False)
# 写入响应到服务器进程的标准输入
self.process.stdin.write(response_str + "\n")
self.process.stdin.flush()
logger.debug("已发送响应给服务器")
except Exception as e:
logger.error(f"发送响应给服务器时出错: {e}")
# 连接到MCP服务器
def connect_to_server(self, server_script_path):
"""连接到MCP服务器"""
# 检查服务器脚本路径是否存在
if not os.path.exists(server_script_path):
raise FileNotFoundError(f"服务器脚本不存在: {server_script_path}")
# 检查脚本文件是否为.py结尾
if not server_script_path.endswith(".py"):
logger.warning(f"服务器脚本可能不是Python文件: {server_script_path}")
# 启动服务器进程
self.process = subprocess.Popen(
[sys.executable, server_script_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0, # 改为无缓冲
)
# 加载所有可用功能
self._load_all_capabilities()
# 更新连接状态
self._connected = True
# 列出所有可用工具
def _list_tools(self):
# 工具列表字符串数组
tool_list = []
# 遍历所有可用工具
for tool in self.available_tools:
# 获取工具名称
name = tool.get("name", "Unknown")
# 获取工具描述
description = tool.get("description", "No description")
# 添加到工具列表
tool_list.append(f" - {name}: {description}")
# 返回格式化后的工具列表字符串
return f"可用工具:\n" + "\n".join(tool_list)
# 调用工具的方法
def call_tool(self, tool_name, arguments):
"""调用工具"""
# 发送调用工具请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
}
)
# 请求ID递增
self.request_id += 1
return response
# 调用工具的内部方法
def _call_tool(self, tool_name, args):
"""调用工具"""
arguments = {}
# 针对不同工具解析参数
if tool_name == "get_current_time":
if args and args[0] in ["iso", "timestamp", "readable"]:
arguments["format"] = args[0]
elif tool_name == "calculate":
if args:
# 对于calculate,直接使用第一个参数作为表达式
arguments["expression"] = args[0]
else:
return "calculate 工具需要表达式参数"
elif tool_name == "echo":
if args:
# 对于echo,直接使用第一个参数作为消息
arguments["message"] = args[0]
else:
return "echo 工具需要消息参数"
# 记录日志:调用工具
logger.debug(f"调用工具: {tool_name} 参数: {arguments}")
# 调用工具
result = self.call_tool(tool_name, arguments)
# 检查结果
if "result" in result and "content" in result["result"]:
logger.debug(f"工具 {tool_name} 调用成功")
return result["result"]["content"]
else:
logger.error(f"工具 {tool_name} 调用失败: {result}")
return f"工具调用失败: {result}"
# 列出所有资源的方法
def _list_resources(self):
resource_list = []
for res in self.available_resources:
name = res.get("name", "Unknown")
uri = res.get("uri", "No URI")
resource_list.append(f" - {name}: {uri}")
return f"可用资源:\n" + "\n".join(resource_list)
# 读取资源的方法
def read_resource(self, uri):
"""读取资源"""
# 发送读取资源请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/read",
"params": {"uri": uri},
}
)
# 请求ID递增
self.request_id += 1
return response
# 列出所有模板的方法
def _list_templates(self):
template_list = []
for tpl in self.available_templates:
name = tpl.get("name", "Unknown")
uri = tpl.get("uri", "No URI")
template_list.append(f" - {name}: {uri}")
return f"可用模板:\n" + "\n".join(template_list)
# 列出所有提示的方法
def _list_prompts(self):
prompt_list = []
for prompt in self.available_prompts:
name = prompt.get("name", "Unknown")
description = prompt.get("description", "No description")
prompt_list.append(f" - {name}: {description}")
return f"可用提示:\n" + "\n".join(prompt_list)
# 读取模板的方法
def read_template(self, uri):
"""读取模板"""
# 发送读取模板请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "resources/templates/read",
"params": {"uri": uri},
}
)
# 请求ID递增
self.request_id += 1
return response
# 显示提示的方法
def show_prompt(self, name, arguments):
"""显示提示"""
# 发送显示提示请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "prompts/get",
"params": {"name": name, "arguments": arguments},
}
)
# 请求ID递增
self.request_id += 1
return response
# 获取提示的方法
def get_prompt(self, name, arguments):
"""获取提示"""
# 发送获取提示请求
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "prompts/get",
"params": {"name": name, "arguments": arguments},
}
)
# 请求ID递增
self.request_id += 1
return response
# 解析命令行参数为字典的方法
def _parse_arguments(self, args):
"""解析命令行参数为字典"""
arguments = {}
for arg in args:
if "=" in arg:
key, value = arg.split("=", 1)
arguments[key] = value
return arguments
# 发送completion请求的方法
+ def request_completion(
+ self, ref_type, ref_name, argument_name, argument_value, context=None
+ ):
+ """发送completion请求"""
# 构造引用对象
+ ref = {"type": ref_type}
+ if ref_type == "ref/prompt":
+ ref["name"] = ref_name
+ elif ref_type == "ref/resource":
+ ref["uri"] = ref_name
# 构造参数对象
+ argument = {"name": argument_name, "value": argument_value}
# 构造请求参数
+ params = {"ref": ref, "argument": argument}
# 如果有上下文,添加到参数中
+ if context:
+ params["context"] = context
# 发送completion请求
+ response = self._send_request(
+ {
+ "jsonrpc": "2.0",
+ "id": self.request_id,
+ "method": "completion/complete",
+ "params": params,
+ }
+ )
# 请求ID递增
+ self.request_id += 1
+ return response
# 演示completion命令的方法
+ def _completion_command(self, args):
+ """演示completion命令"""
+ if len(args) < 4:
+ return "用法: completion <ref_type> <ref_name> <argument_name> <argument_value> [context]"
+ ref_type = args[0] # ref/prompt 或 ref/resource
+ ref_name = args[1] # 提示名称或资源URI
+ argument_name = args[2] # 参数名
+ argument_value = args[3] # 参数值
+ context = args[4] if len(args) > 4 else None # 可选的上下文
+ logger.info(
+ f"发送completion请求: {ref_type}, {ref_name}, {argument_name}, {argument_value}"
+ )
# 发送completion请求
+ response = self.request_completion(
+ ref_type, ref_name, argument_name, argument_value, context
+ )
# 处理响应
+ if "result" in response and "completion" in response["result"]:
+ completion = response["result"]["completion"]
+ values = completion.get("values", [])
+ total = completion.get("total", 0)
+ has_more = completion.get("hasMore", False)
+ result = f"Completion建议 ({total}个结果):\n"
+ for i, value in enumerate(values, 1):
+ result += f" {i}. {value}\n"
+ if has_more:
+ result += " ... (还有更多结果)"
+ return result
+ elif "error" in response:
+ return f" Completion请求失败: {response['error']}"
+ else:
+ return f" 响应格式异常: {response}"
# 演示采样命令的方法
def _mcp_sampling_command(self):
"""演示采样命令"""
logger.info("执行演示采样命令")
# 发送演示采样请求给服务器
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "mcp/sampling",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
self._handle_sampling_request(response.get("id"), response.get("params", {}))
# 演示引导命令的方法
def _mcp_elicitation_command(self):
"""演示引导命令"""
logger.info("执行演示引导命令")
# 发送演示引导请求给服务器
response = self._send_request(
{
"jsonrpc": "2.0",
"id": self.request_id,
"method": "mcp/elicitation",
"params": {},
}
)
# 请求ID递增
self.request_id += 1
self._handle_elicitation_request(response.get("id"), response.get("params", {}))
# 处理引导请求的方法
def _handle_elicitation_request(self, request_id, params):
"""处理服务器发起的引导请求"""
logger.info("收到服务器引导请求")
# 检查request_id是否存在
if not request_id:
logger.error("引导请求缺少request_id")
return
# 检查参数是否存在
if not params:
logger.error("引导请求缺少参数")
return
# 打印服务器请求信息
print(f"\n服务器请求用户偏好收集:")
print(f"消息: {params.get('message', '')}")
print(f"请求ID: {request_id}")
print(f"参数: {params}")
# 处理用户输入的命令
def process_query(self, query):
"""处理用户查询"""
# 去除首尾空白并按空格分割
parts = query.strip().split()
# 如果没有输入内容,提示输入有效命令
if not parts:
return "请输入有效的命令"
# 获取命令关键字并转为小写
command = parts[0].lower()
try:
# 判断命令类型并调用相应方法
if command == "help":
return self._show_help()
elif command == "tools":
return self._list_tools()
elif command == "ping":
result = self.ping()
return f"Ping结果: {result}"
elif command in ["get_current_time", "calculate", "echo"]:
return self._call_tool(command, parts[1:])
elif command == "resources":
return self._list_resources()
elif command == "read":
if len(parts) < 2:
return "用法: read <资源URI>"
uri = parts[1]
result = self.read_resource(uri)
return f"资源内容: {json.dumps(result, ensure_ascii=False, indent=2)}"
elif command == "templates":
return self._list_templates()
elif command == "template":
if len(parts) < 2:
return "用法: template <模板URI>"
uri = parts[1]
result = self.read_template(uri)
return f"模板内容: {json.dumps(result, ensure_ascii=False, indent=2)}"
elif command == "prompts":
return self._list_prompts()
elif command == "prompt":
if len(parts) < 2:
return "用法: prompt <提示名称> [参数]"
prompt_name = parts[1]
arguments = self._parse_arguments(parts[2:])
result = self.show_prompt(prompt_name, arguments)
return f"提示结果: {json.dumps(result, ensure_ascii=False, indent=2)}"
+ elif command == "completion":
+ return self._completion_command(parts[1:])
elif command == "mcp-sampling":
return self._mcp_sampling_command()
elif command == "mcp-elicitation":
return self._mcp_elicitation_command()
else:
return f"未知命令: {command}\n输入 'help' 查看帮助"
except Exception as e:
# 执行命令时出错,记录日志并返回错误信息
logger.error(f"执行命令时出错: {str(e)}")
return f"执行命令时出错: {str(e)}"
# 显示帮助信息
def _show_help(self):
return """
=== MCP客户端帮助 ===
基础命令:
help - 显示此帮助
ping - 测试服务器连接
工具相关:
tools - 列出所有工具
get_current_time [format] - 获取当前时间 (format: iso/timestamp/readable)
calculate <expression> - 计算表达式
echo <message> - 回显消息
资源相关:
resources - 列出所有资源
read <uri> - 读取资源内容
模板相关:
templates - 列出所有模板
template <uri> - 读取模板内容
提示相关:
prompts - 列出所有提示
prompt <name> [args] - 使用提示
+ Completion相关:
+ completion <ref_type> <ref_name> <argument_name> <argument_value> [context] - 获取自动完成建议
演示功能:
mcp-sampling - 演示AI内容生成功能
mcp-elicitation - 演示用户偏好收集功能
+ Completion示例:
+ completion ref/prompt greeting name A
+ completion ref/prompt greeting time_of_day m
+ completion ref/prompt status_report project W
+ completion ref/resource mcp://server-info uri mcp://
示例:
help
ping
tools
get_current_time iso
calculate 2+3*4
echo hello world
read mcp://server-info
template mcp://templates/project-plan
prompt greeting name=Alice time_of_day=morning
+ completion ref/prompt greeting name A
mcp-sampling
mcp-elicitation
"""
# 运行交互式聊天循环
def chat_loop(self):
"""运行交互式聊天循环"""
# 记录客户端启动日志
logger.info("=== MCP客户端已启动! ===")
# 提示用户输入help查看命令
print("输入 'help' 查看所有可用命令")
# 提示用户输入quit退出
print("输入 'quit' 退出")
# 循环读取用户输入
while True:
try:
# 读取用户输入命令
query = input("\n命令: ").strip()
# 如果输入quit则退出循环
if query.lower() == "quit":
logger.info("用户退出客户端")
break
# 处理用户命令并输出结果
response = self.process_query(query)
print(f"\n{response}")
except Exception as e:
# 处理命令时出错,记录日志并输出错误
logger.error(f"处理命令时出错: {str(e)}")
print(f"\n错误: {str(e)}")
# 清理资源
def cleanup(self):
"""清理资源"""
# 设置关闭和断开标志
self._shutdown = True
self._connected = False
try:
# 如果服务器进程存在
if self.process:
logger.debug("终止服务器进程")
# 终止服务器进程
self.process.terminate()
# 等待进程结束,设置超时
try:
self.process.wait(timeout=5)
logger.info("服务器进程已正常终止")
except subprocess.TimeoutExpired:
logger.warning("服务器进程未在5秒内终止,强制杀死")
self.process.kill()
self.process.wait()
logger.info("服务器进程已强制终止")
except Exception as e:
# 清理资源时出错,记录日志
logger.error(f"清理资源时出错: {e}")
finally:
# 释放进程对象
self.process = None
# 主函数,程序入口
def main():
# 检查命令行参数数量
if len(sys.argv) < 2:
logger.error("用法: python client.py <服务器脚本路径>")
print("用法: python client.py <服务器脚本路径>")
sys.exit(1)
# 记录启动日志
logger.info(f"启动MCP客户端,连接服务器: {sys.argv[1]}")
# 创建客户端对象
client = MCPClient()
try:
# 连接服务器并进入交互循环
client.connect_to_server(sys.argv[1])
client.chat_loop()
except Exception as e:
# 客户端运行出错,记录日志并输出错误
logger.error(f"客户端运行出错: {e}")
print(f"错误: {e}")
finally:
# 清理客户端资源
logger.info("清理客户端资源")
client.cleanup()
# 判断是否为主程序入口
if __name__ == "__main__":
main()
14.2. mcp_server.py #
mcp_server.py
# 导入os模块,用于获取环境变量
from ast import Dict
from datetime import datetime
import os
# 导入sys模块,用于标准输入输出
import sys
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
import threading
import time
from resources import (
meeting_notes_template,
project_plan_template,
weekly_report_template,
resources,
server_info,
sample_data,
templates,
)
from tools import tools
from prompts import prompts
# 设置日志配置
def setup_logging():
"""设置日志配置"""
# 获取日志级别,默认为INFO,并转换为大写
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# 创建日志格式化器
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 创建控制台日志处理器,输出到标准错误
console_handler = logging.StreamHandler(sys.stderr)
# 设置处理器的格式
console_handler.setFormatter(formatter)
# 配置根日志记录器
logging.basicConfig(
level=getattr(logging, log_level),
handlers=[console_handler],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回当前模块的日志记录器
return logging.getLogger(__name__)
# 定义MCPServer类
class MCPServer:
# 初始化方法
def __init__(self):
# 服务器信息,包含名称和版本
self.server_info = {"name": "MCPServer", "version": "1.0.0"}
self.initialized = False
self.tools = tools
# 定义可用资源
self.resources = resources
# 定义可用资源模板
self.templates = templates
# 定义可用提示(prompts)
self.prompts = prompts
# 处理initialize请求的方法
def initialize(self, params):
"""处理initialize请求"""
# 返回协议版本、能力和服务器信息
return {
"protocolVersion": "2025-06-18",
"capabilities": {
"tools": {"listChanged": True},
"resources": {"listChanged": True},
"prompts": {"listChanged": True},
+ "completions": {}, # 添加completion能力
},
"serverInfo": self.server_info,
"resourceTemplates": list(self.templates.values()),
}
def handle_notification(self, method):
"""处理通知类请求"""
if method == "notifications/initialized":
self.initialized = True
logger.info("Client initialized successfully")
return None # 通知不需要响应
return None
def _handle_initialize(self, request_id):
"""处理initialize请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": self.initialize({})}
def _handle_ping(self, request_id):
"""处理ping请求"""
return {"jsonrpc": "2.0", "id": request_id, "result": {}}
def _handle_list_tools(self, request_id):
"""Handle tools list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"tools": list(self.tools.values())},
}
# 处理获取时间工具
def _handle_get_time(self, arguments):
"""Handle get time tool"""
# 获取格式类型,默认为readable
format_type = arguments.get("format", "readable")
# 获取当前时间
now = datetime.now()
# 根据格式类型返回不同格式
if format_type == "iso":
result = now.isoformat()
elif format_type == "timestamp":
result = str(now.timestamp())
else: # readable
result = now.strftime("%Y-%m-%d %H:%M:%S")
# 返回时间文本
return [{"type": "text", "text": f"Current time: {result}"}]
# 处理计算工具
def _handle_calculate(self, arguments):
"""Handle calculate tool"""
# 获取表达式
expression = arguments.get("expression", "")
try:
# 安全检查表达式字符
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
raise ValueError("Expression contains disallowed characters")
# 计算表达式
result = eval(expression)
# 返回计算结果
return [
{"type": "text", "text": f"Calculation result: {expression} = {result}"}
]
except Exception as e:
# 计算失败
return [{"type": "text", "text": f"Calculation failed: {str(e)}"}]
# 处理回显工具
def _handle_echo(self, arguments):
"""Handle echo tool"""
# 获取消息
message = arguments.get("message", "")
# 返回回显内容
return [{"type": "text", "text": f"Echo: {message}"}]
# 处理工具调用请求
def _handle_call_tool(self, request_id, params):
"""Handle tool call request"""
# 获取工具名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 记录工具调用
logger.debug(f"Tool call: {name} with arguments: {arguments}")
# 根据工具名称分发
if name == "get_current_time":
result = self._handle_get_time(arguments)
elif name == "calculate":
result = self._handle_calculate(arguments)
elif name == "echo":
result = self._handle_echo(arguments)
else:
# 未知工具
logger.warning(f"Unknown tool requested: {name}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Unknown tool: {name}"},
}
# 记录工具调用成功
logger.debug(f"Tool {name} executed successfully")
# 返回工具调用结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"content": result},
}
except Exception as e:
# 工具调用失败
logger.error(f"Tool {name} call failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Tool call failed: {str(e)}"},
}
# 处理资源列表请求
def _handle_list_resources(self, request_id):
"""Handle resources list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resources": list(self.resources.values())},
}
# 处理读取资源请求
def _handle_read_resource(self, request_id, params):
"""Handle resource read request"""
# 获取资源URI
uri = params.get("uri")
try:
# 判断资源类型
if uri == "mcp://server-info":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(server_info, indent=2),
}
]
},
}
elif uri == "mcp://sample-data":
# 返回内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(sample_data, indent=2),
}
]
},
}
else:
# 未找到资源
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32602, "message": f"Resource not found: {uri}"},
}
except Exception as e:
# 读取资源失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read resource failed: {str(e)}"},
}
# 处理模板列表请求
def _handle_list_templates(self, request_id):
"""Handle template list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"resourceTemplates": list(self.templates.values())},
}
# 处理读取模板请求
def _handle_read_template(self, request_id, params):
# 获取模板URI
uri = params.get("uri")
try:
# 判断模板类型
if uri == "mcp://templates/project-plan":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(project_plan_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/meeting-notes":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(meeting_notes_template, indent=2),
}
]
},
}
elif uri == "mcp://templates/weekly-report":
# 返回模板内容
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(weekly_report_template, indent=2),
}
]
},
}
else:
# 未找到模板
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Template not found: {uri}"},
}
except Exception as e:
# 读取模板失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Read template failed: {str(e)}"},
}
# 处理提示列表请求
def _handle_list_prompts(self, request_id):
"""Handle prompt list request"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"prompts": list(self.prompts.values())},
}
# 处理问候提示
def _handle_greeting_prompt(self, arguments):
"""Handle greeting prompt"""
# 获取姓名,默认为User
name = arguments.get("name", "User")
# 获取时间段,默认为day
time_of_day = arguments.get("time_of_day", "day")
# 问候语映射
greetings = {
"morning": "Good morning",
"afternoon": "Good afternoon",
"evening": "Good evening",
"night": "Good night",
}
# 获取对应问候语
greeting = greetings.get(time_of_day, "Hello")
# 生成内容
content = f"{greeting}, {name}! Welcome to the MCP server."
# 返回问候消息
return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理状态报告提示
def _handle_status_report_prompt(self, arguments):
"""Handle status report prompt"""
# 获取项目名,默认为Unknown Project
project = arguments.get("project", "Unknown Project")
# 获取状态,默认为unknown
status = arguments.get("status", "unknown")
# 状态消息映射
status_messages = {
"on_track": "Project is on track",
"at_risk": "Project is at risk",
"behind_schedule": "Project is behind schedule",
}
# 获取对应状态消息
message = status_messages.get(status, "Project status unknown")
# 生成内容
content = f"Project '{project}' status report: {message}"
# 返回状态报告消息
return [{"role": "assistant", "content": {"type": "text", "text": content}}]
# 处理获取提示请求
def _handle_get_prompt(self, request_id, params):
"""Handle prompt get request"""
# 获取提示名称
name = params.get("name")
# 获取参数
arguments = params.get("arguments", {})
try:
# 根据提示名称分发
if name == "greeting":
result = self._handle_greeting_prompt(arguments)
elif name == "status_report":
result = self._handle_status_report_prompt(arguments)
else:
# 未知提示
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32604, "message": f"Unknown prompt: {name}"},
}
# 返回提示结果
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"messages": result},
}
except Exception as e:
# 获取提示失败
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Get prompt failed: {str(e)}"},
}
# 处理completion请求
+ def _handle_completion(self, request_id, params):
+ """Handle completion request"""
+ try:
# 获取引用类型和参数
+ ref = params.get("ref", {})
+ argument = params.get("argument", {})
+ context = params.get("context", {})
+ ref_type = ref.get("type")
+ argument_name = argument.get("name")
+ argument_value = argument.get("value", "")
+ logger.info(
+ f"Completion request: {ref_type}, argument: {argument_name}, value: {argument_value}"
+ )
# 根据引用类型和参数名提供建议
+ if ref_type == "ref/prompt":
+ prompt_name = ref.get("name")
+ return self._handle_prompt_completion(
+ request_id, prompt_name, argument_name, argument_value, context
+ )
+ elif ref_type == "ref/resource":
+ resource_uri = ref.get("uri")
+ return self._handle_resource_completion(
+ request_id, resource_uri, argument_name, argument_value, context
+ )
+ else:
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {
+ "code": -32602,
+ "message": f"Unsupported reference type: {ref_type}",
+ },
+ }
+ except Exception as e:
+ logger.error(f"Completion failed: {str(e)}")
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "error": {"code": -32603, "message": f"Completion failed: {str(e)}"},
+ }
# 处理提示completion
+ def _handle_prompt_completion(
+ self, request_id, prompt_name, argument_name, argument_value, context
+ ):
+ """Handle prompt completion"""
# 根据提示名称和参数名提供建议
+ if prompt_name == "greeting":
+ if argument_name == "name":
# 提供名字建议
+ suggestions = ["Alice", "Bob", "Charlie", "David", "Eve"]
+ filtered_suggestions = [
+ s
+ for s in suggestions
+ if s.lower().startswith(argument_value.lower())
+ ]
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {
+ "completion": {
+ "values": filtered_suggestions[:10],
+ "total": len(filtered_suggestions),
+ "hasMore": len(filtered_suggestions) > 10,
+ }
+ },
+ }
+ elif argument_name == "time_of_day":
# 提供时间段建议
+ suggestions = ["morning", "afternoon", "evening", "night"]
+ filtered_suggestions = [
+ s
+ for s in suggestions
+ if s.lower().startswith(argument_value.lower())
+ ]
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {
+ "completion": {
+ "values": filtered_suggestions,
+ "total": len(filtered_suggestions),
+ "hasMore": False,
+ }
+ },
+ }
+ elif prompt_name == "status_report":
+ if argument_name == "project":
# 提供项目名建议
+ suggestions = [
+ "Web App",
+ "Mobile App",
+ "API Service",
+ "Database",
+ "Frontend",
+ ]
+ filtered_suggestions = [
+ s
+ for s in suggestions
+ if s.lower().startswith(argument_value.lower())
+ ]
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {
+ "completion": {
+ "values": filtered_suggestions,
+ "total": len(filtered_suggestions),
+ "hasMore": False,
+ }
+ },
+ }
+ elif argument_name == "status":
# 提供状态建议
+ suggestions = ["on_track", "at_risk", "behind_schedule"]
+ filtered_suggestions = [
+ s
+ for s in suggestions
+ if s.lower().startswith(argument_value.lower())
+ ]
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {
+ "completion": {
+ "values": filtered_suggestions,
+ "total": len(filtered_suggestions),
+ "hasMore": False,
+ }
+ },
+ }
# 默认返回空建议
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {"completion": {"values": [], "total": 0, "hasMore": False}},
+ }
# 处理资源completion
+ def _handle_resource_completion(
+ self, request_id, resource_uri, argument_name, argument_value, context
+ ):
+ """Handle resource completion"""
# 提供资源URI建议
+ suggestions = [
+ "mcp://server-info",
+ "mcp://sample-data",
+ "mcp://templates/project-plan",
+ "mcp://templates/meeting-notes",
+ "mcp://templates/weekly-report",
+ ]
+ filtered_suggestions = [
+ s for s in suggestions if s.lower().startswith(argument_value.lower())
+ ]
+ return {
+ "jsonrpc": "2.0",
+ "id": request_id,
+ "result": {
+ "completion": {
+ "values": filtered_suggestions,
+ "total": len(filtered_suggestions),
+ "hasMore": False,
+ }
+ },
+ }
# 处理演示采样请求
def _handle_mcp_sampling(self, request_id, params):
try:
# 日志记录收到演示采样请求
logger.info("收到客户端演示采样请求")
# 构造采样请求
sampling_request = {
"jsonrpc": "2.0",
"id": f"mcp_sampling_{request_id}",
"method": "sampling/createMessage",
"params": {
"messages": [
{
"role": "user",
"content": {
"type": "text",
"text": "Write a creative story about a robot learning to paint in a digital art studio",
},
}
],
"modelPreferences": {
"hints": [{"name": "gpt-4"}],
"intelligencePriority": 0.8,
"speedPriority": 0.5,
"costPriority": 0.3,
},
"systemPrompt": "You are a creative storyteller. Write engaging and imaginative stories.",
"maxTokens": 200,
},
}
# 日志记录已发送
logger.info("已发送采样请求到客户端")
# 返回成功响应
return sampling_request
except Exception as e:
# 采样失败日志
logger.error(f"Mcp sampling failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Mcp sampling failed: {str(e)}"},
}
# 处理演示引导请求
def _handle_mcp_elicitation(self, request_id, params):
try:
# 日志记录收到演示引导请求
logger.info("收到客户端演示引导请求")
# 构造引导请求
elicitation_request = {
"jsonrpc": "2.0",
"id": f"mcp_elicitation_{request_id}",
"method": "elicitation/create",
"params": {
"message": "Please provide your preferences for the project configuration",
"requestedSchema": {
"type": "object",
"properties": {
"user_name": {"type": "string", "description": "Your name"},
"preference": {
"type": "string",
"description": "Your preferred style",
"enum": ["creative", "analytical", "practical"],
},
"feedback": {
"type": "string",
"description": "Any additional feedback",
},
},
"required": ["user_name", "preference"],
},
},
}
# 发送引导请求到客户端
request_str = json.dumps(elicitation_request, ensure_ascii=False)
print(request_str)
sys.stdout.flush()
# 日志记录已发送
logger.info("已发送引导请求到客户端")
return elicitation_request
# 返回成功响应
# return {
# "jsonrpc": "2.0",
# "id": request_id,
# "result": {
# "message": "Elicitation request sent to client",
# "requestId": f"mcp_elicitation_{request_id}",
# },
# }
except Exception as e:
# 引导失败日志
logger.error(f"Mcp elicitation failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": f"Mcp elicitation failed: {str(e)}",
},
}
# 处理JSON-RPC请求的方法
def handle_request(self, request):
"""处理JSON-RPC请求"""
# 获取请求的方法名
method = request.get("method")
# 获取请求ID,默认为0
request_id = request.get("id", 0)
# 获取请求参数,默认为空字典
params = request.get("params", {})
try:
# 处理通知类请求(没有id字段)
if "id" not in request:
return self.handle_notification(method)
# 处理有id的请求
if method == "initialize":
return self._handle_initialize(request_id)
elif method == "ping":
return self._handle_ping(request_id)
elif method == "tools/list":
return self._handle_list_tools(request_id)
elif method == "tools/call":
return self._handle_call_tool(request_id, params)
elif method == "resources/list":
return self._handle_list_resources(request_id)
elif method == "resources/read":
return self._handle_read_resource(request_id, params)
elif method == "resources/templates/list":
return self._handle_list_templates(request_id)
elif method == "resources/templates/read":
return self._handle_read_template(request_id, params)
elif method == "prompts/list":
return self._handle_list_prompts(request_id)
elif method == "prompts/get":
return self._handle_get_prompt(request_id, params)
+ elif method == "completion/complete":
+ return self._handle_completion(request_id, params)
elif method == "mcp/sampling":
return self._handle_mcp_sampling(request_id, params)
elif method == "mcp/elicitation":
return self._handle_mcp_elicitation(request_id, params)
else:
# 方法未找到,返回错误
return {
"jsonrpc": "2.0",
"id": request_id,
+ "error": {"code": -32605, "message": f"Method {method} not found"},
}
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Request handling error: {e}")
# 返回内部错误响应
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Internal error: {str(e)}"},
}
# 自动发送采样请求(10秒后)
def auto_sampling_request():
# 线程函数,延迟后发送采样请求
def send_sampling_request():
time.sleep(10)
# 构造采样请求
sampling_request = {
"jsonrpc": "2.0",
"id": "auto_sampling_001",
"method": "sampling/createMessage",
"params": {
"messages": [
{
"role": "user",
"content": {
"type": "text",
"text": "Write a creative story about a robot learning to paint in a digital art studio",
},
}
],
"modelPreferences": {
"hints": [{"name": "gpt-4"}],
"intelligencePriority": 0.8,
"speedPriority": 0.5,
"costPriority": 0.3,
},
"systemPrompt": "You are a creative storyteller. Write engaging and imaginative stories.",
"maxTokens": 200,
},
}
# 发送请求到标准输出(MCP Inspector会读取)
request_str = json.dumps(sampling_request, ensure_ascii=False)
print(request_str)
sys.stdout.flush()
# 日志记录自动请求
logger.info("Auto sampling request sent to MCP Inspector")
print(f"Auto sampling request sent: {request_str}", file=sys.stderr)
# 启动自动采样线程
auto_thread = threading.Thread(target=send_sampling_request, daemon=True)
auto_thread.start()
logger.info("Auto sampling thread started - will send request in 10 seconds")
# 自动发送引导请求(10秒后)
def auto_elicitation_request():
# 线程函数,延迟后发送引导请求
def send_elicitation_request():
time.sleep(10)
# 构造引导请求
elicitation_request = {
"jsonrpc": "2.0",
"id": "auto_elicitation_001",
"method": "elicitation/create",
"params": {
"message": "Please provide your preferences for the project configuration",
"requestedSchema": {
"type": "object",
"properties": {
"user_name": {"type": "string", "description": "Your name"},
"preference": {
"type": "string",
"description": "Your preferred style",
"enum": ["creative", "analytical", "practical"],
},
"feedback": {
"type": "string",
"description": "Any additional feedback",
},
},
"required": ["user_name", "preference"],
},
},
}
# 发送请求到标准输出(MCP Inspector会读取)
request_str = json.dumps(elicitation_request, ensure_ascii=False)
print(request_str)
sys.stdout.flush()
# 日志记录自动请求
logger.info("Auto elicitation request sent to MCP Inspector")
print(f"Auto elicitation request sent: {request_str}", file=sys.stderr)
# 启动自动引导线程
auto_thread = threading.Thread(target=send_elicitation_request, daemon=True)
auto_thread.start()
logger.info("Auto elicitation thread started - will send request in 10 seconds")
# 主函数
def main():
"""主函数"""
# 记录服务器启动信息
logger.info("Starting server...")
# 创建MCPServer实例
mcp_server = MCPServer()
# auto_sampling_request()
auto_elicitation_request()
try:
# 循环读取标准输入的每一行
for line in sys.stdin:
# 如果行为空,跳过
if not line.strip():
continue
try:
# 解析JSON请求
request = json.loads(line.strip())
# 处理请求,获取响应
response = mcp_server.handle_request(request)
# 如果有响应,输出到标准输出
if response is not None:
print(json.dumps(response, ensure_ascii=False))
# 刷新标准输出缓冲区
sys.stdout.flush()
except Exception as e:
# 处理请求时发生异常,记录错误日志
logger.error(f"Unexpected error: {e}")
except KeyboardInterrupt:
# 捕获键盘中断,记录服务器停止信息
logger.info("Server stopped")
except Exception as e:
# 服务器发生其他异常,记录错误并退出
logger.error(f"Server error: {e}")
sys.exit(1)
# 初始化日志记录器
logger = setup_logging()
# 如果作为主程序运行,则调用main函数
if __name__ == "__main__":
main()