1. 语言补全 #
1.1. client.py #
client.py
# 导入json模块,用于处理JSON数据
import json
# 导入logging模块,用于日志记录
import logging
# 导入requests模块,用于发送HTTP请求
import requests
# 从urllib.parse模块导入urljoin,用于拼接URL
from urllib.parse import urljoin
# 配置日志级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义MCPClient类,表示MCP HTTP客户端
class MCPClient:
"""MCP HTTP客户端"""
# 初始化方法,设置服务器URL和相关属性
def __init__(self, server_url: str):
# 协议版本号
self.protocol_version = "2025-06-18"
# 客户端信息
self.client_info = {"name": "MCP客户端", "version": "1.0.0"}
# 服务器信息,初始化为None
self.server_info = None
# 服务器能力,初始化为None
self.capabilities = None
# 会话ID,初始化为None
self.session_id = None
# 消息ID,初始值为1
self.message_id = 1
# 去除服务器URL末尾的斜杠
self.server_url = server_url.rstrip("/")
# 拼接MCP接口的完整URL
self.mcp_endpoint = urljoin(self.server_url, "/mcp")
# 创建requests的Session对象
self.session = requests.Session()
# 设置Session的默认请求头
self.session.headers.update(
{
"Accept": "application/json",
"MCP-Protocol-Version": self.protocol_version,
}
)
# 请求补全建议方法
+ def request_completion(
+ self, # 当前实例
+ ref_type, # 引用类型:ref/prompt
+ ref_name, # 引用名称:code_review
+ argument_name, # 参数名:language
+ argument_value, # 参数值:py
+ ):
# 方法文档字符串,说明用途
+ """请求补全建议"""
# 构造补全请求的消息体
+ request = {
+ "jsonrpc": "2.0", # 指定JSON-RPC协议版本
+ "id": self._generate_id(), # 生成唯一消息ID
+ "method": "completion/complete", # 指定调用的方法
+ "params": {
+ "ref": {"type": ref_type, "name": ref_name}, # 引用类型和名称
+ "argument": {
+ "name": argument_name,
+ "value": argument_value,
+ }, # 参数名和参数值
+ },
+ }
# 通过私有方法发送请求消息,并获取响应
+ response = self._send_message(request)
# 判断响应是否存在且包含"result"字段
+ if response and "result" in response:
# 从响应中获取补全内容
+ completion = response["result"].get("completion", {})
# 记录获取到的补全建议数量
+ logger.info(f"获取到 {len(completion.get('values', []))} 个补全建议")
# 返回补全内容
+ return completion
+ else:
# 如果请求失败,记录错误日志
+ logger.error(f"请求补全失败")
# 返回None表示失败
+ return None
# 重置会话方法
def reset_session(self):
"""重置会话"""
# 保存旧的会话ID
old_session_id = self.session_id
# 清空当前会话ID
self.session_id = None
# 记录会话重置信息
logger.info(f"会话已重置,原会话ID: {old_session_id}")
# 获取当前会话信息方法
def get_session_info(self):
"""获取当前会话信息"""
# 返回会话相关信息字典
return {
"session_id": self.session_id,
"server_url": self.server_url,
"protocol_version": self.protocol_version,
}
# 获取可用工具列表方法
def list_tools(self):
"""获取可用工具列表"""
# 构造请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "tools/list",
"params": {},
}
# 发送请求并获取响应
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
# 获取工具列表
tools = response["result"].get("tools", [])
logger.info(f"可用工具数量: {len(tools)}")
return tools
else:
# 获取失败,记录错误
logger.error("获取工具列表失败")
return []
# 通过HTTP发送消息的私有方法
def _send_message(self, message):
"""通过HTTP发送消息"""
try:
# 初始化请求头字典
headers = {}
# 如果有会话ID,则添加到请求头
if self.session_id:
headers["Mcp-Session-Id"] = self.session_id
logger.debug(f"使用会话ID: {self.session_id}")
# 发送POST请求
response = self.session.post(
self.mcp_endpoint, json=message, headers=headers, timeout=30
)
# 检查响应状态码
if response.status_code == 200:
# 检查响应头中是否有新的会话ID
new_session_id = response.headers.get("Mcp-Session-Id")
# 如果有新的会话ID且与当前不同,则更新
if new_session_id and new_session_id != self.session_id:
self.session_id = new_session_id
logger.info(f"更新会话ID: {self.session_id}")
# 返回响应的JSON内容
return response.json()
# 如果状态码为202,表示已接受但无内容
elif response.status_code == 202:
# Accepted - 通知或响应
return None
# 如果状态码为400,表示请求错误
elif response.status_code == 400:
logger.error(f"请求错误: {response.text}")
return None
# 如果状态码为401,表示会话ID无效
elif response.status_code == 401:
logger.error("会话ID无效,需要重新初始化")
self.session_id = None
return None
# 其他HTTP错误
else:
logger.error(f"HTTP错误: {response.status_code} - {response.text}")
return None
# 捕获请求异常
except requests.exceptions.RequestException as e:
logger.error(f"HTTP请求错误: {e}")
return None
# 生成消息ID的方法
def _generate_id(self) -> str:
"""生成消息ID"""
# 消息ID自增
self.message_id += 1
# 返回字符串类型的消息ID
return str(self.message_id)
# 发送消息到服务器的方法(重复定义,实际只会用后面这个)
def _send_message(self, message):
"""通过HTTP发送消息"""
try:
# 初始化请求头字典
headers = {}
# 如果有会话ID,则添加到请求头
if self.session_id:
headers["Mcp-Session-Id"] = self.session_id
logger.debug(f"使用会话ID: {self.session_id}")
# 发送POST请求到MCP端点
response = self.session.post(
self.mcp_endpoint, json=message, headers=headers, timeout=30
)
# 检查响应状态码
if response.status_code == 200:
# 从响应头获取新的会话ID
new_session_id = response.headers.get("Mcp-Session-Id")
# 如果有新的会话ID且与当前不同,则更新
if new_session_id and new_session_id != self.session_id:
self.session_id = new_session_id
logger.info(f"更新会话ID: {self.session_id}")
# 返回响应的JSON内容
return response.json()
# 如果状态码为202,表示已接受但无内容
elif response.status_code == 202:
# Accepted - 通知或响应
return None
# 如果状态码为400,表示请求错误
elif response.status_code == 400:
logger.error(f"请求错误: {response.text}")
return None
# 如果状态码为401,表示会话ID无效
elif response.status_code == 401:
logger.error("会话ID无效,需要重新初始化")
self.session_id = None
return None
# 其他HTTP错误
else:
logger.error(f"HTTP错误: {response.status_code} - {response.text}")
return None
# 捕获请求异常
except requests.exceptions.RequestException as e:
logger.error(f"HTTP请求错误: {e}")
return None
# 调用工具方法
def call_tool(self, tool_name, arguments):
"""调用工具"""
# 构造调用工具的请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "tools/call",
"params": {"calls": [{"name": tool_name, "arguments": arguments}]},
}
# 发送请求并获取响应
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
# 获取调用结果
calls = response["result"].get("calls", [])
if calls:
return calls[0]
else:
# 调用失败,记录错误
logger.error(f"调用工具 {tool_name} 失败")
return None
# 初始化与服务器连接的方法
def initialize(self) -> bool:
"""初始化与服务器的连接"""
# 构造初始化请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "initialize",
"params": {
"protocolVersion": self.protocol_version,
"capabilities": {},
"clientInfo": self.client_info,
},
}
# 发送初始化请求
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
result = response["result"]
# 获取服务器信息
self.server_info = result.get("serverInfo")
# 获取服务器能力
self.capabilities = result.get("capabilities")
# 提取并存储会话ID
if "sessionId" in result:
self.session_id = result["sessionId"]
logger.info(f"获取到会话ID: {self.session_id}")
logger.info(f"服务器初始化成功: {self.server_info}")
return True
else:
# 初始化失败,记录错误
logger.error("服务器初始化失败")
return False
# HTTP传输的函数
def http_transport():
"""HTTP传输"""
print("=== HTTP传输 ===")
client = MCPClient("http://localhost:8000")
try:
if not client.initialize():
print("初始化失败")
return
session_info = client.get_session_info()
print(f" 会话信息: {session_info}")
tools = client.list_tools()
print(f"可用工具: {[tool['name'] for tool in tools]}")
result = client.call_tool("calculate", {"expression": "10 / 2 + 5"})
if result:
content = result.get("content", [])
if content:
print(f"计算结果: {content[0].get('text', '')}")
# 打印分隔符,表示开始演示Completion功能
+ print("\n=== Completion功能 ===")
# 打印分隔符,表示开始演示语言补全
+ print("\n🔤 语言补全演示:")
# 调用request_completion方法,请求针对'language'字段输入'py'的补全建议
+ language_completion = client.request_completion(
+ "ref/prompt", "code_review", "language", "py"
+ )
# 如果成功获取到补全结果
+ if language_completion:
# 从返回结果中获取'values'字段,默认为空列表
+ values = language_completion.get("values", [])
# 打印前5个补全建议
+ print(f"输入 'py' 的补全建议: {values[:5]}...") # 显示前5个
except Exception as e:
print(f"HTTP错误: {e}")
# 主函数
def main():
"""主函数"""
# 调用HTTP传输函数
http_transport()
# 判断是否为主程序入口
if __name__ == "__main__":
main()
1.2. server.py #
server.py
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
# 导入uuid模块,用于生成唯一会话ID
import uuid
# 导入time模块,用于时间戳
import time
# 从tools模块导入tools对象和completion_data
+from tools import tools, completion_data
# 导入datetime模块,用于处理日期和时间
# 从http.server模块导入ThreadingHTTPServer和BaseHTTPRequestHandler类
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
"""MCP服务器核心类"""
# 初始化方法
def __init__(self):
# 协议版本号
self.protocol_version = "2025-06-18"
# 服务器信息,包括名称和版本
self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
# 服务器能力,包括工具、资源和提示
self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
# 会话管理字典
self.sessions = {}
# 获取所有活跃会话信息
def get_all_sessions(self):
"""获取所有活跃会话信息"""
return self.sessions.copy()
def _cleanup_expired_sessions(self, max_age: int = 3600):
"""清理过期会话(默认1小时)"""
current_time = time.time()
expired_sessions = []
for session_id, session_data in self.sessions.items():
if current_time - session_data["last_activity"] > max_age:
expired_sessions.append(session_id)
for session_id in expired_sessions:
del self.sessions[session_id]
logger.info(f"清理过期会话: {session_id}")
return len(expired_sessions)
# 处理工具列表请求
def handle_tools_list(self, params=None, session_id: str = None):
"""处理工具列表请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 返回工具列表
return {"tools": tools}
# 处理初始化请求(旧方法,已被覆盖)
def handle_initialize(self, params, session_id: str = None):
"""处理初始化请求"""
# 返回服务器能力
return {"capabilities": self.capabilities}
# 生成唯一的会话ID
def _generate_session_id(self) -> str:
"""生成唯一的会话ID"""
# 使用uuid4生成唯一ID
return str(uuid.uuid4())
# 获取会话信息
def get_session_info(self, session_id: str):
"""获取会话信息"""
# 如果会话ID存在于sessions字典中,返回会话信息
if session_id in self.sessions:
return self.sessions[session_id]
# 否则返回None
return None
# 获取或创建会话
def _get_or_create_session(self, session_id: str = None):
"""获取或创建会话"""
# 如果没有传入会话ID,则生成一个新的
if not session_id:
session_id = self._generate_session_id()
# 如果会话ID不存在于sessions字典中,则新建一个会话
if session_id not in self.sessions:
self.sessions[session_id] = {
"created_at": time.time(),
"last_activity": time.time(),
"message_count": 0,
"client_info": None,
}
# 记录新会话的创建
logger.info(f"创建新会话: {session_id}")
# 更新会话的最后活动时间
self.sessions[session_id]["last_activity"] = time.time()
# 增加消息计数
self.sessions[session_id]["message_count"] += 1
# 返回会话ID
return session_id
# 处理工具调用请求
def handle_tools_call(self, params, session_id: str = None):
"""处理工具调用请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 获取调用列表
calls = params.get("calls", [])
# 初始化结果列表
results = []
# 遍历每个调用
for call in calls:
# 获取工具名称
tool_name = call.get("name")
# 获取参数
arguments = call.get("arguments", {})
# 如果工具名称为"calculate"
if tool_name == "calculate":
# 获取表达式
expression = arguments.get("expression", "")
try:
# 计算表达式结果
result = eval(expression)
# 添加成功结果到结果列表
results.append(
{
"name": tool_name,
"content": [
{
"type": "text",
"text": f"计算结果: {expression} = {result}",
}
],
}
)
except Exception as e:
# 计算出错,添加错误信息
results.append(
{
"name": tool_name,
"isError": True,
"error": {"message": f"计算错误: {str(e)}"},
}
)
else:
# 未知工具,添加错误信息
results.append(
{
"name": tool_name,
"isError": True,
"error": {"message": f"未知工具: {tool_name}"},
}
)
# 返回所有调用的结果
return {"calls": results}
# 处理补全请求
+ def handle_completion_complete(self, params, session_id: str = None):
+ """处理补全请求"""
# 如果有会话ID,则获取或创建会话
+ if session_id:
+ self._get_or_create_session(session_id)
# 获取引用类型和参数信息
+ ref = params.get("ref", {})
+ ref_type = ref.get("type")
+ argument = params.get("argument", {})
+ argument_name = argument.get("name")
+ argument_value = argument.get("value", "")
# 根据引用类型和参数名提供相应的补全建议
+ completion_values = []
+ if ref_type == "ref/prompt":
# 处理提示补全
+ prompt_name = ref.get("name")
+ if prompt_name == "code_review":
+ if argument_name == "language":
# 语言补全
+ completion_values = self._get_language_completions(argument_value)
# 限制返回结果数量(规范要求最多100个)
+ if len(completion_values) > 100:
+ completion_values = completion_values[:100]
+ return {
+ "completion": {
+ "values": completion_values,
+ "total": len(completion_values),
+ "hasMore": False,
+ }
+ }
# 获取语言补全建议
+ def _get_language_completions(self, partial_value: str):
+ """获取语言补全建议"""
+ if not partial_value:
+ return completion_data["languages"]
# 模糊匹配
+ return [
+ lang
+ for lang in completion_data["languages"]
+ if partial_value.lower() in lang.lower()
+ ]
# 处理初始化请求(实际生效的方法,覆盖上面的方法)
def handle_initialize(self, params, session_id: str = None):
"""处理初始化请求"""
# 获取客户端信息
client_info = params.get("clientInfo", {})
# 获取或创建会话ID
session_id = self._get_or_create_session(session_id)
# 存储客户端信息到会话
self.sessions[session_id]["client_info"] = client_info
# 记录客户端初始化信息
logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
# 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
return {
"protocolVersion": self.protocol_version,
"capabilities": self.capabilities,
"serverInfo": self.server_info,
"sessionId": session_id, # 返回会话ID给客户端
}
# 定义MCPHTTPHandler类,继承自BaseHTTPRequestHandler
class MCPHTTPHandler(BaseHTTPRequestHandler):
"""HTTP处理器,支持Streamable HTTP传输"""
# 初始化方法,接收mcp_server参数
def __init__(self, *args, mcp_server=None, **kwargs):
# 保存MCP服务器实例
self.mcp_server = mcp_server
# 调用父类的初始化方法
super().__init__(*args, **kwargs)
# 处理MCP消息
def _handle_mcp_message(self, message, session_id: str = None):
"""处理MCP消息"""
# 获取方法名
method = message.get("method")
# 获取参数
params = message.get("params", {})
# 获取消息ID
msg_id = message.get("id")
# 判断方法类型
if method == "initialize":
# 调用服务器的handle_initialize方法
result = self.mcp_server.handle_initialize(params, session_id)
# 如果有消息ID,返回带ID的结果
return {"id": msg_id, "result": result} if msg_id else None
elif method == "tools/list":
# 处理工具列表请求
result = self.mcp_server.handle_tools_list(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
elif method == "tools/call":
# 处理工具调用请求
result = self.mcp_server.handle_tools_call(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
+ elif method == "completion/complete":
# 处理补全请求
+ result = self.mcp_server.handle_completion_complete(params, session_id)
+ return {"id": msg_id, "result": result} if msg_id else None
else:
# 如果方法不存在,返回错误
if msg_id:
return {
"id": msg_id,
"error": {"code": -32601, "message": f"Method not found: {method}"},
}
# 没有ID则返回None
return None
# 处理POST请求(发送消息到服务器)
def do_POST(self):
"""处理POST请求(发送消息到服务器)"""
try:
# 检查请求路径是否为/mcp
if self.path != "/mcp":
self.send_error(404, "MCP endpoint not found")
return
# 检查Accept头是否包含application/json
accept_header = self.headers.get("Accept", "")
if "application/json" not in accept_header:
self.send_error(400, "Missing required Accept header")
return
# 检查MCP-Protocol-Version头是否匹配
protocol_version = self.headers.get("MCP-Protocol-Version")
if (
protocol_version
and protocol_version != self.mcp_server.protocol_version
):
self.send_error(
400, f"Unsupported protocol version: {protocol_version}"
)
return
# 获取会话ID
session_id = self.headers.get("Mcp-Session-Id")
# 读取请求体长度
content_length = int(self.headers.get("Content-Length", 0))
# 读取请求体内容
body = self.rfile.read(content_length)
try:
# 尝试解析JSON消息
message = json.loads(body.decode("utf-8"))
except json.JSONDecodeError:
# JSON解析失败
self.send_error(400, "Invalid JSON")
return
# 处理MCP消息
response = self._handle_mcp_message(message, session_id)
# 发送响应
if response:
# 发送200响应码
self.send_response(200)
# 设置响应头Content-Type为application/json
self.send_header("Content-Type", "application/json")
# 设置协议版本头
self.send_header(
"MCP-Protocol-Version", self.mcp_server.protocol_version
)
# 如果响应中包含会话ID,则添加到响应头
if "result" in response and "sessionId" in response["result"]:
self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
# 结束响应头
self.end_headers()
# 写入JSON响应体
self.wfile.write(json.dumps(response).encode("utf-8"))
else:
# 没有响应内容,返回202 Accepted
self.send_response(202) # Accepted
self.end_headers()
except Exception as e:
# 记录错误日志
logger.error(f"POST处理错误: {e}")
# 返回500内部服务器错误
self.send_error(500, "Internal Server Error")
# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
"""运行HTTP服务器"""
# 定义处理器工厂函数,用于传递mcp_server实例
def handler_factory(*args, **kwargs):
return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)
# 创建ThreadingHTTPServer实例,绑定主机和端口
server = ThreadingHTTPServer((host, port), handler_factory)
# 记录服务器启动信息
logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")
# 记录会话管理功能启用信息
logger.info("会话管理功能已启用,支持会话ID跟踪和自动清理")
# 启动会话清理线程
import threading
# 定义会话清理函数
def cleanup_sessions():
while True:
try:
# 每5分钟清理一次
time.sleep(300)
# 调用会话清理方法
cleaned_count = mcp_server._cleanup_expired_sessions()
# 如果有清理的会话,记录日志
if cleaned_count > 0:
logger.info(f"清理了 {cleaned_count} 个过期会话")
# 记录当前活跃会话数
active_sessions = len(mcp_server.get_all_sessions())
logger.debug(f"当前活跃会话数: {active_sessions}")
except Exception as e:
# 清理出错,记录日志
logger.error(f"会话清理错误: {e}")
# 启动清理线程
cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
cleanup_thread.start()
logger.info("会话清理线程已启动")
try:
# 启动服务器,进入循环监听请求
server.serve_forever()
except KeyboardInterrupt:
# 捕获Ctrl+C,记录服务器停止
logger.info("HTTP服务器已停止")
logger.info("正在清理所有会话...")
# 清理所有会话
all_sessions = mcp_server.get_all_sessions()
for session_id in list(all_sessions.keys()):
del mcp_server.sessions[session_id]
logger.info(f"已清理 {len(all_sessions)} 个会话")
finally:
# 关闭服务器
server.server_close()
# 定义主函数
def main():
"""主函数"""
# 导入argparse模块,用于解析命令行参数
import argparse
# 创建ArgumentParser对象,设置描述信息
parser = argparse.ArgumentParser(description="MCP HTTP服务器")
# 添加--host参数,指定服务器主机,默认localhost
parser.add_argument(
"--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
)
# 添加--port参数,指定服务器端口,默认8000
parser.add_argument(
"--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
)
# 解析命令行参数
args = parser.parse_args()
# 创建MCP服务器实例
mcp_server = MCPServer()
# 运行HTTP服务器
run_http_server(mcp_server, args.host, args.port)
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
1.3. tools.py #
tools.py
# MCP工具定义
tools = [
{
"name": "calculate",
"description": "执行数学计算",
"inputSchema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "数学表达式",
}
},
"required": ["expression"],
},
},
]
# 定义补全数据字典
+completion_data = {
# 支持的编程语言列表
+ "languages": [
+ "python",
+ "javascript",
+ "typescript",
+ "java",
+ "c++",
+ "c#",
+ "go",
+ "rust",
+ "php",
+ "ruby",
+ "swift",
+ "kotlin",
+ "scala",
+ "dart",
+ "elixir",
+ "clojure",
+ ],
# 各编程语言对应的主流框架
+ "frameworks": {
# Python常用框架
+ "python": [
+ "django",
+ "flask",
+ "fastapi",
+ "pytorch",
+ "tensorflow",
+ "pandas",
+ "numpy",
+ ],
# JavaScript常用框架
+ "javascript": ["react", "vue", "angular", "express", "next.js", "nuxt.js"],
# TypeScript常用框架
+ "typescript": ["react", "vue", "angular", "express", "next.js", "nuxt.js"],
# Java常用框架
+ "java": ["spring", "spring boot", "hibernate", "maven", "gradle"],
# C++常用框架
+ "c++": ["boost", "qt", "wxwidgets", "stl"],
# C#常用框架
+ "c#": [".net", "asp.net", "entity framework", "xamarin"],
# Go常用框架
+ "go": ["gin", "echo", "fiber", "gorilla", "cobra"],
# Rust常用框架
+ "rust": ["tokio", "actix", "rocket", "serde", "clap"],
+ },
# 支持的分析类型
+ "analysis_types": [
+ "syntax", # 语法分析
+ "complexity", # 复杂度分析
+ "security", # 安全性分析
+ "performance", # 性能分析
+ "style", # 代码风格分析
+ "documentation", # 文档分析
+ ],
# 支持的文件扩展名
+ "file_extensions": [
+ ".py", # Python文件
+ ".js", # JavaScript文件
+ ".ts", # TypeScript文件
+ ".java", # Java文件
+ ".cpp", # C++文件
+ ".cs", # C#文件
+ ".go", # Go文件
+ ".rs", # Rust文件
+ ".php", # PHP文件
+ ".rb", # Ruby文件
+ ],
+}
2. 框架补全 #
2.1. client.py #
client.py
# 导入json模块,用于处理JSON数据
import json
# 导入logging模块,用于日志记录
import logging
# 导入requests模块,用于发送HTTP请求
import requests
# 从urllib.parse模块导入urljoin,用于拼接URL
from urllib.parse import urljoin
# 配置日志级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义MCPClient类,表示MCP HTTP客户端
class MCPClient:
"""MCP HTTP客户端"""
# 初始化方法,设置服务器URL和相关属性
def __init__(self, server_url: str):
# 协议版本号
self.protocol_version = "2025-06-18"
# 客户端信息
self.client_info = {"name": "MCP客户端", "version": "1.0.0"}
# 服务器信息,初始化为None
self.server_info = None
# 服务器能力,初始化为None
self.capabilities = None
# 会话ID,初始化为None
self.session_id = None
# 消息ID,初始值为1
self.message_id = 1
# 去除服务器URL末尾的斜杠
self.server_url = server_url.rstrip("/")
# 拼接MCP接口的完整URL
self.mcp_endpoint = urljoin(self.server_url, "/mcp")
# 创建requests的Session对象
self.session = requests.Session()
# 设置Session的默认请求头
self.session.headers.update(
{
"Accept": "application/json",
"MCP-Protocol-Version": self.protocol_version,
}
)
# 请求补全建议方法
def request_completion(
self, # 当前实例
ref_type, # 引用类型:ref/prompt
ref_name, # 引用名称:code_review
argument_name, # 参数名:language
argument_value, # 参数值:py
+ context_arguments=None, # 上下文参数
):
# 方法文档字符串,说明用途
"""请求补全建议"""
# 构造补全请求的消息体
request = {
"jsonrpc": "2.0", # 指定JSON-RPC协议版本
"id": self._generate_id(), # 生成唯一消息ID
"method": "completion/complete", # 指定调用的方法
"params": {
"ref": {"type": ref_type, "name": ref_name}, # 引用类型和名称
"argument": {
"name": argument_name,
"value": argument_value,
}, # 参数名和参数值
},
}
# 如果有上下文参数,添加到请求中
+ if context_arguments:
+ request["params"]["context"] = {"arguments": context_arguments}
# 通过私有方法发送请求消息,并获取响应
response = self._send_message(request)
# 判断响应是否存在且包含"result"字段
if response and "result" in response:
# 从响应中获取补全内容
completion = response["result"].get("completion", {})
# 记录获取到的补全建议数量
logger.info(f"获取到 {len(completion.get('values', []))} 个补全建议")
# 返回补全内容
return completion
else:
# 如果请求失败,记录错误日志
logger.error(f"请求补全失败")
# 返回None表示失败
return None
# 重置会话方法
def reset_session(self):
"""重置会话"""
# 保存旧的会话ID
old_session_id = self.session_id
# 清空当前会话ID
self.session_id = None
# 记录会话重置信息
logger.info(f"会话已重置,原会话ID: {old_session_id}")
# 获取当前会话信息方法
def get_session_info(self):
"""获取当前会话信息"""
# 返回会话相关信息字典
return {
"session_id": self.session_id,
"server_url": self.server_url,
"protocol_version": self.protocol_version,
}
# 获取可用工具列表方法
def list_tools(self):
"""获取可用工具列表"""
# 构造请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "tools/list",
"params": {},
}
# 发送请求并获取响应
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
# 获取工具列表
tools = response["result"].get("tools", [])
logger.info(f"可用工具数量: {len(tools)}")
return tools
else:
# 获取失败,记录错误
logger.error("获取工具列表失败")
return []
# 通过HTTP发送消息的私有方法
def _send_message(self, message):
"""通过HTTP发送消息"""
try:
# 初始化请求头字典
headers = {}
# 如果有会话ID,则添加到请求头
if self.session_id:
headers["Mcp-Session-Id"] = self.session_id
logger.debug(f"使用会话ID: {self.session_id}")
# 发送POST请求
response = self.session.post(
self.mcp_endpoint, json=message, headers=headers, timeout=30
)
# 检查响应状态码
if response.status_code == 200:
# 检查响应头中是否有新的会话ID
new_session_id = response.headers.get("Mcp-Session-Id")
# 如果有新的会话ID且与当前不同,则更新
if new_session_id and new_session_id != self.session_id:
self.session_id = new_session_id
logger.info(f"更新会话ID: {self.session_id}")
# 返回响应的JSON内容
return response.json()
# 如果状态码为202,表示已接受但无内容
elif response.status_code == 202:
# Accepted - 通知或响应
return None
# 如果状态码为400,表示请求错误
elif response.status_code == 400:
logger.error(f"请求错误: {response.text}")
return None
# 如果状态码为401,表示会话ID无效
elif response.status_code == 401:
logger.error("会话ID无效,需要重新初始化")
self.session_id = None
return None
# 其他HTTP错误
else:
logger.error(f"HTTP错误: {response.status_code} - {response.text}")
return None
# 捕获请求异常
except requests.exceptions.RequestException as e:
logger.error(f"HTTP请求错误: {e}")
return None
# 生成消息ID的方法
def _generate_id(self) -> str:
"""生成消息ID"""
# 消息ID自增
self.message_id += 1
# 返回字符串类型的消息ID
return str(self.message_id)
# 发送消息到服务器的方法(重复定义,实际只会用后面这个)
def _send_message(self, message):
"""通过HTTP发送消息"""
try:
# 初始化请求头字典
headers = {}
# 如果有会话ID,则添加到请求头
if self.session_id:
headers["Mcp-Session-Id"] = self.session_id
logger.debug(f"使用会话ID: {self.session_id}")
# 发送POST请求到MCP端点
response = self.session.post(
self.mcp_endpoint, json=message, headers=headers, timeout=30
)
# 检查响应状态码
if response.status_code == 200:
# 从响应头获取新的会话ID
new_session_id = response.headers.get("Mcp-Session-Id")
# 如果有新的会话ID且与当前不同,则更新
if new_session_id and new_session_id != self.session_id:
self.session_id = new_session_id
logger.info(f"更新会话ID: {self.session_id}")
# 返回响应的JSON内容
return response.json()
# 如果状态码为202,表示已接受但无内容
elif response.status_code == 202:
# Accepted - 通知或响应
return None
# 如果状态码为400,表示请求错误
elif response.status_code == 400:
logger.error(f"请求错误: {response.text}")
return None
# 如果状态码为401,表示会话ID无效
elif response.status_code == 401:
logger.error("会话ID无效,需要重新初始化")
self.session_id = None
return None
# 其他HTTP错误
else:
logger.error(f"HTTP错误: {response.status_code} - {response.text}")
return None
# 捕获请求异常
except requests.exceptions.RequestException as e:
logger.error(f"HTTP请求错误: {e}")
return None
# 调用工具方法
def call_tool(self, tool_name, arguments):
"""调用工具"""
# 构造调用工具的请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "tools/call",
"params": {"calls": [{"name": tool_name, "arguments": arguments}]},
}
# 发送请求并获取响应
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
# 获取调用结果
calls = response["result"].get("calls", [])
if calls:
return calls[0]
else:
# 调用失败,记录错误
logger.error(f"调用工具 {tool_name} 失败")
return None
# 初始化与服务器连接的方法
def initialize(self) -> bool:
"""初始化与服务器的连接"""
# 构造初始化请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "initialize",
"params": {
"protocolVersion": self.protocol_version,
"capabilities": {},
"clientInfo": self.client_info,
},
}
# 发送初始化请求
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
result = response["result"]
# 获取服务器信息
self.server_info = result.get("serverInfo")
# 获取服务器能力
self.capabilities = result.get("capabilities")
# 提取并存储会话ID
if "sessionId" in result:
self.session_id = result["sessionId"]
logger.info(f"获取到会话ID: {self.session_id}")
logger.info(f"服务器初始化成功: {self.server_info}")
return True
else:
# 初始化失败,记录错误
logger.error("服务器初始化失败")
return False
# HTTP传输的函数
def http_transport():
"""HTTP传输"""
print("=== HTTP传输 ===")
client = MCPClient("http://localhost:8000")
try:
if not client.initialize():
print("初始化失败")
return
session_info = client.get_session_info()
print(f" 会话信息: {session_info}")
tools = client.list_tools()
print(f"可用工具: {[tool['name'] for tool in tools]}")
result = client.call_tool("calculate", {"expression": "10 / 2 + 5"})
if result:
content = result.get("content", [])
if content:
print(f"计算结果: {content[0].get('text', '')}")
# 打印分隔符,表示开始演示Completion功能
print("\n=== Completion功能 ===")
# 打印分隔符,表示开始演示语言补全
print("\n🔤 语言补全演示:")
# 调用request_completion方法,请求针对'language'字段输入'py'的补全建议
language_completion = client.request_completion(
"ref/prompt", "code_review", "language", "py"
)
# 如果成功获取到补全结果
if language_completion:
# 从返回结果中获取'values'字段,默认为空列表
values = language_completion.get("values", [])
# 打印前5个补全建议
print(f"输入 'py' 的补全建议: {values[:5]}...") # 显示前5个
# 2. 演示框架补全(无上下文)
+ print("\n🏗️ 框架补全演示(无上下文):")
+ framework_completion = client.request_completion(
+ "ref/prompt", "code_review", "framework", "fla"
+ )
+ if framework_completion:
+ values = framework_completion.get("values", [])
+ print(f"输入 'fla' 的补全建议: {values[:5]}...")
# 3. 演示框架补全(有上下文 - 已选择Python语言)
+ print("\n🐍 框架补全演示(已选择Python语言):")
+ framework_completion_with_context = client.request_completion(
+ "ref/prompt",
+ "code_review",
+ "framework",
+ "fla",
+ context_arguments={"language": "python"},
+ )
+ if framework_completion_with_context:
+ values = framework_completion_with_context.get("values", [])
+ print(f"在Python语言下,输入 'fla' 的补全建议: {values}")
except Exception as e:
print(f"HTTP错误: {e}")
# 主函数
def main():
"""主函数"""
# 调用HTTP传输函数
http_transport()
# 判断是否为主程序入口
if __name__ == "__main__":
main()
2.2. server.py #
server.py
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
# 导入uuid模块,用于生成唯一会话ID
import uuid
# 导入time模块,用于时间戳
import time
# 从tools模块导入tools对象和completion_data
from tools import tools, completion_data
# 导入datetime模块,用于处理日期和时间
# 从http.server模块导入ThreadingHTTPServer和BaseHTTPRequestHandler类
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
"""MCP服务器核心类"""
# 初始化方法
def __init__(self):
# 协议版本号
self.protocol_version = "2025-06-18"
# 服务器信息,包括名称和版本
self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
# 服务器能力,包括工具、资源和提示
self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
# 会话管理字典
self.sessions = {}
# 获取所有活跃会话信息
def get_all_sessions(self):
"""获取所有活跃会话信息"""
return self.sessions.copy()
def _cleanup_expired_sessions(self, max_age: int = 3600):
"""清理过期会话(默认1小时)"""
current_time = time.time()
expired_sessions = []
for session_id, session_data in self.sessions.items():
if current_time - session_data["last_activity"] > max_age:
expired_sessions.append(session_id)
for session_id in expired_sessions:
del self.sessions[session_id]
logger.info(f"清理过期会话: {session_id}")
return len(expired_sessions)
# 处理工具列表请求
def handle_tools_list(self, params=None, session_id: str = None):
"""处理工具列表请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 返回工具列表
return {"tools": tools}
# 处理初始化请求(旧方法,已被覆盖)
def handle_initialize(self, params, session_id: str = None):
"""处理初始化请求"""
# 返回服务器能力
return {"capabilities": self.capabilities}
# 生成唯一的会话ID
def _generate_session_id(self) -> str:
"""生成唯一的会话ID"""
# 使用uuid4生成唯一ID
return str(uuid.uuid4())
# 获取会话信息
def get_session_info(self, session_id: str):
"""获取会话信息"""
# 如果会话ID存在于sessions字典中,返回会话信息
if session_id in self.sessions:
return self.sessions[session_id]
# 否则返回None
return None
# 获取或创建会话
def _get_or_create_session(self, session_id: str = None):
"""获取或创建会话"""
# 如果没有传入会话ID,则生成一个新的
if not session_id:
session_id = self._generate_session_id()
# 如果会话ID不存在于sessions字典中,则新建一个会话
if session_id not in self.sessions:
self.sessions[session_id] = {
"created_at": time.time(),
"last_activity": time.time(),
"message_count": 0,
"client_info": None,
}
# 记录新会话的创建
logger.info(f"创建新会话: {session_id}")
# 更新会话的最后活动时间
self.sessions[session_id]["last_activity"] = time.time()
# 增加消息计数
self.sessions[session_id]["message_count"] += 1
# 返回会话ID
return session_id
# 处理工具调用请求
def handle_tools_call(self, params, session_id: str = None):
"""处理工具调用请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 获取调用列表
calls = params.get("calls", [])
# 初始化结果列表
results = []
# 遍历每个调用
for call in calls:
# 获取工具名称
tool_name = call.get("name")
# 获取参数
arguments = call.get("arguments", {})
# 如果工具名称为"calculate"
if tool_name == "calculate":
# 获取表达式
expression = arguments.get("expression", "")
try:
# 计算表达式结果
result = eval(expression)
# 添加成功结果到结果列表
results.append(
{
"name": tool_name,
"content": [
{
"type": "text",
"text": f"计算结果: {expression} = {result}",
}
],
}
)
except Exception as e:
# 计算出错,添加错误信息
results.append(
{
"name": tool_name,
"isError": True,
"error": {"message": f"计算错误: {str(e)}"},
}
)
else:
# 未知工具,添加错误信息
results.append(
{
"name": tool_name,
"isError": True,
"error": {"message": f"未知工具: {tool_name}"},
}
)
# 返回所有调用的结果
return {"calls": results}
# 处理补全请求
def handle_completion_complete(self, params, session_id: str = None):
"""处理补全请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 获取引用类型和参数信息
ref = params.get("ref", {})
+ context = params.get("context", {})
ref_type = ref.get("type")
argument = params.get("argument", {})
argument_name = argument.get("name")
argument_value = argument.get("value", "")
# 根据引用类型和参数名提供相应的补全建议
completion_values = []
if ref_type == "ref/prompt":
# 处理提示补全
prompt_name = ref.get("name")
if prompt_name == "code_review":
if argument_name == "language":
# 语言补全
completion_values = self._get_language_completions(argument_value)
+ elif argument_name == "framework":
# 框架补全,需要根据已选择的语言
+ selected_language = context.get("arguments", {}).get("language")
+ if selected_language:
+ completion_values = self._get_framework_completions(
+ selected_language, argument_value
+ )
+ else:
+ completion_values = self._get_all_framework_completions(
+ argument_value
+ )
# 限制返回结果数量(规范要求最多100个)
if len(completion_values) > 100:
completion_values = completion_values[:100]
return {
"completion": {
"values": completion_values,
"total": len(completion_values),
"hasMore": False,
}
}
# 获取框架补全建议
+ def _get_framework_completions(self, language: str, partial_value: str):
+ """根据语言获取框架补全建议"""
+ frameworks = completion_data["frameworks"].get(language, [])
+ if not partial_value:
+ return frameworks
# 模糊匹配
+ return [fw for fw in frameworks if partial_value.lower() in fw.lower()]
# 获取所有框架补全建议
+ def _get_all_framework_completions(self, partial_value: str):
+ """获取所有框架补全建议"""
+ all_frameworks = []
+ for frameworks in completion_data["frameworks"].values():
+ all_frameworks.extend(frameworks)
+ if not partial_value:
+ return all_frameworks
# 模糊匹配
+ return [fw for fw in all_frameworks if partial_value.lower() in fw.lower()]
# 获取语言补全建议
def _get_language_completions(self, partial_value: str):
"""获取语言补全建议"""
if not partial_value:
return completion_data["languages"]
# 模糊匹配
return [
lang
for lang in completion_data["languages"]
if partial_value.lower() in lang.lower()
]
# 处理初始化请求(实际生效的方法,覆盖上面的方法)
def handle_initialize(self, params, session_id: str = None):
"""处理初始化请求"""
# 获取客户端信息
client_info = params.get("clientInfo", {})
# 获取或创建会话ID
session_id = self._get_or_create_session(session_id)
# 存储客户端信息到会话
self.sessions[session_id]["client_info"] = client_info
# 记录客户端初始化信息
logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
# 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
return {
"protocolVersion": self.protocol_version,
"capabilities": self.capabilities,
"serverInfo": self.server_info,
"sessionId": session_id, # 返回会话ID给客户端
}
# 定义MCPHTTPHandler类,继承自BaseHTTPRequestHandler
class MCPHTTPHandler(BaseHTTPRequestHandler):
"""HTTP处理器,支持Streamable HTTP传输"""
# 初始化方法,接收mcp_server参数
def __init__(self, *args, mcp_server=None, **kwargs):
# 保存MCP服务器实例
self.mcp_server = mcp_server
# 调用父类的初始化方法
super().__init__(*args, **kwargs)
# 处理MCP消息
def _handle_mcp_message(self, message, session_id: str = None):
"""处理MCP消息"""
# 获取方法名
method = message.get("method")
# 获取参数
params = message.get("params", {})
# 获取消息ID
msg_id = message.get("id")
# 判断方法类型
if method == "initialize":
# 调用服务器的handle_initialize方法
result = self.mcp_server.handle_initialize(params, session_id)
# 如果有消息ID,返回带ID的结果
return {"id": msg_id, "result": result} if msg_id else None
elif method == "tools/list":
# 处理工具列表请求
result = self.mcp_server.handle_tools_list(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
elif method == "tools/call":
# 处理工具调用请求
result = self.mcp_server.handle_tools_call(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
elif method == "completion/complete":
# 处理补全请求
result = self.mcp_server.handle_completion_complete(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
else:
# 如果方法不存在,返回错误
if msg_id:
return {
"id": msg_id,
"error": {"code": -32601, "message": f"Method not found: {method}"},
}
# 没有ID则返回None
return None
# 处理POST请求(发送消息到服务器)
def do_POST(self):
"""处理POST请求(发送消息到服务器)"""
try:
# 检查请求路径是否为/mcp
if self.path != "/mcp":
self.send_error(404, "MCP endpoint not found")
return
# 检查Accept头是否包含application/json
accept_header = self.headers.get("Accept", "")
if "application/json" not in accept_header:
self.send_error(400, "Missing required Accept header")
return
# 检查MCP-Protocol-Version头是否匹配
protocol_version = self.headers.get("MCP-Protocol-Version")
if (
protocol_version
and protocol_version != self.mcp_server.protocol_version
):
self.send_error(
400, f"Unsupported protocol version: {protocol_version}"
)
return
# 获取会话ID
session_id = self.headers.get("Mcp-Session-Id")
# 读取请求体长度
content_length = int(self.headers.get("Content-Length", 0))
# 读取请求体内容
body = self.rfile.read(content_length)
try:
# 尝试解析JSON消息
message = json.loads(body.decode("utf-8"))
except json.JSONDecodeError:
# JSON解析失败
self.send_error(400, "Invalid JSON")
return
# 处理MCP消息
response = self._handle_mcp_message(message, session_id)
# 发送响应
if response:
# 发送200响应码
self.send_response(200)
# 设置响应头Content-Type为application/json
self.send_header("Content-Type", "application/json")
# 设置协议版本头
self.send_header(
"MCP-Protocol-Version", self.mcp_server.protocol_version
)
# 如果响应中包含会话ID,则添加到响应头
if "result" in response and "sessionId" in response["result"]:
self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
# 结束响应头
self.end_headers()
# 写入JSON响应体
self.wfile.write(json.dumps(response).encode("utf-8"))
else:
# 没有响应内容,返回202 Accepted
self.send_response(202) # Accepted
self.end_headers()
except Exception as e:
# 记录错误日志
logger.error(f"POST处理错误: {e}")
# 返回500内部服务器错误
self.send_error(500, "Internal Server Error")
# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
"""运行HTTP服务器"""
# 定义处理器工厂函数,用于传递mcp_server实例
def handler_factory(*args, **kwargs):
return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)
# 创建ThreadingHTTPServer实例,绑定主机和端口
server = ThreadingHTTPServer((host, port), handler_factory)
# 记录服务器启动信息
logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")
# 记录会话管理功能启用信息
logger.info("会话管理功能已启用,支持会话ID跟踪和自动清理")
# 启动会话清理线程
import threading
# 定义会话清理函数
def cleanup_sessions():
while True:
try:
# 每5分钟清理一次
time.sleep(300)
# 调用会话清理方法
cleaned_count = mcp_server._cleanup_expired_sessions()
# 如果有清理的会话,记录日志
if cleaned_count > 0:
logger.info(f"清理了 {cleaned_count} 个过期会话")
# 记录当前活跃会话数
active_sessions = len(mcp_server.get_all_sessions())
logger.debug(f"当前活跃会话数: {active_sessions}")
except Exception as e:
# 清理出错,记录日志
logger.error(f"会话清理错误: {e}")
# 启动清理线程
cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
cleanup_thread.start()
logger.info("会话清理线程已启动")
try:
# 启动服务器,进入循环监听请求
server.serve_forever()
except KeyboardInterrupt:
# 捕获Ctrl+C,记录服务器停止
logger.info("HTTP服务器已停止")
logger.info("正在清理所有会话...")
# 清理所有会话
all_sessions = mcp_server.get_all_sessions()
for session_id in list(all_sessions.keys()):
del mcp_server.sessions[session_id]
logger.info(f"已清理 {len(all_sessions)} 个会话")
finally:
# 关闭服务器
server.server_close()
# 定义主函数
def main():
"""主函数"""
# 导入argparse模块,用于解析命令行参数
import argparse
# 创建ArgumentParser对象,设置描述信息
parser = argparse.ArgumentParser(description="MCP HTTP服务器")
# 添加--host参数,指定服务器主机,默认localhost
parser.add_argument(
"--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
)
# 添加--port参数,指定服务器端口,默认8000
parser.add_argument(
"--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
)
# 解析命令行参数
args = parser.parse_args()
# 创建MCP服务器实例
mcp_server = MCPServer()
# 运行HTTP服务器
run_http_server(mcp_server, args.host, args.port)
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
3. 分析类型补全 #
3.1. client.py #
client.py
# 导入json模块,用于处理JSON数据
import json
# 导入logging模块,用于日志记录
import logging
# 导入requests模块,用于发送HTTP请求
import requests
# 从urllib.parse模块导入urljoin,用于拼接URL
from urllib.parse import urljoin
# 配置日志级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义MCPClient类,表示MCP HTTP客户端
class MCPClient:
"""MCP HTTP客户端"""
# 初始化方法,设置服务器URL和相关属性
def __init__(self, server_url: str):
# 协议版本号
self.protocol_version = "2025-06-18"
# 客户端信息
self.client_info = {"name": "MCP客户端", "version": "1.0.0"}
# 服务器信息,初始化为None
self.server_info = None
# 服务器能力,初始化为None
self.capabilities = None
# 会话ID,初始化为None
self.session_id = None
# 消息ID,初始值为1
self.message_id = 1
# 去除服务器URL末尾的斜杠
self.server_url = server_url.rstrip("/")
# 拼接MCP接口的完整URL
self.mcp_endpoint = urljoin(self.server_url, "/mcp")
# 创建requests的Session对象
self.session = requests.Session()
# 设置Session的默认请求头
self.session.headers.update(
{
"Accept": "application/json",
"MCP-Protocol-Version": self.protocol_version,
}
)
# 请求补全建议方法
def request_completion(
self, # 当前实例
+ ref_type, # 引用类型:ref/prompt or ref/resource
+ ref_name, # 引用名称:code_review or file:///path
+ argument_name, # 参数名:language or path or analysis_type
+ argument_value, # 参数值:py or /path/to/file.js or sec
context_arguments=None, # 上下文参数
):
# 方法文档字符串,说明用途
"""请求补全建议"""
+ ref_obj = {"type": ref_type}
+ if ref_type == "ref/resource":
+ ref_obj["uri"] = ref_name
+ else:
+ ref_obj["name"] = ref_name
# 构造补全请求的消息体
request = {
"jsonrpc": "2.0", # 指定JSON-RPC协议版本
"id": self._generate_id(), # 生成唯一消息ID
"method": "completion/complete", # 指定调用的方法
"params": {
+ "ref": ref_obj, # 引用类型和名称
"argument": {
"name": argument_name,
"value": argument_value,
}, # 参数名和参数值
},
}
# 如果有上下文参数,添加到请求中
if context_arguments:
request["params"]["context"] = {"arguments": context_arguments}
# 通过私有方法发送请求消息,并获取响应
response = self._send_message(request)
# 判断响应是否存在且包含"result"字段
if response and "result" in response:
# 从响应中获取补全内容
completion = response["result"].get("completion", {})
# 记录获取到的补全建议数量
logger.info(f"获取到 {len(completion.get('values', []))} 个补全建议")
# 返回补全内容
return completion
else:
# 如果请求失败,记录错误日志
logger.error(f"请求补全失败")
# 返回None表示失败
return None
# 重置会话方法
def reset_session(self):
"""重置会话"""
# 保存旧的会话ID
old_session_id = self.session_id
# 清空当前会话ID
self.session_id = None
# 记录会话重置信息
logger.info(f"会话已重置,原会话ID: {old_session_id}")
# 获取当前会话信息方法
def get_session_info(self):
"""获取当前会话信息"""
# 返回会话相关信息字典
return {
"session_id": self.session_id,
"server_url": self.server_url,
"protocol_version": self.protocol_version,
}
# 获取可用工具列表方法
def list_tools(self):
"""获取可用工具列表"""
# 构造请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "tools/list",
"params": {},
}
# 发送请求并获取响应
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
# 获取工具列表
tools = response["result"].get("tools", [])
logger.info(f"可用工具数量: {len(tools)}")
return tools
else:
# 获取失败,记录错误
logger.error("获取工具列表失败")
return []
# 通过HTTP发送消息的私有方法
def _send_message(self, message):
"""通过HTTP发送消息"""
try:
# 初始化请求头字典
headers = {}
# 如果有会话ID,则添加到请求头
if self.session_id:
headers["Mcp-Session-Id"] = self.session_id
logger.debug(f"使用会话ID: {self.session_id}")
# 发送POST请求
response = self.session.post(
self.mcp_endpoint, json=message, headers=headers, timeout=30
)
# 检查响应状态码
if response.status_code == 200:
# 检查响应头中是否有新的会话ID
new_session_id = response.headers.get("Mcp-Session-Id")
# 如果有新的会话ID且与当前不同,则更新
if new_session_id and new_session_id != self.session_id:
self.session_id = new_session_id
logger.info(f"更新会话ID: {self.session_id}")
# 返回响应的JSON内容
return response.json()
# 如果状态码为202,表示已接受但无内容
elif response.status_code == 202:
# Accepted - 通知或响应
return None
# 如果状态码为400,表示请求错误
elif response.status_code == 400:
logger.error(f"请求错误: {response.text}")
return None
# 如果状态码为401,表示会话ID无效
elif response.status_code == 401:
logger.error("会话ID无效,需要重新初始化")
self.session_id = None
return None
# 其他HTTP错误
else:
logger.error(f"HTTP错误: {response.status_code} - {response.text}")
return None
# 捕获请求异常
except requests.exceptions.RequestException as e:
logger.error(f"HTTP请求错误: {e}")
return None
# 生成消息ID的方法
def _generate_id(self) -> str:
"""生成消息ID"""
# 消息ID自增
self.message_id += 1
# 返回字符串类型的消息ID
return str(self.message_id)
# 发送消息到服务器的方法(重复定义,实际只会用后面这个)
def _send_message(self, message):
"""通过HTTP发送消息"""
try:
# 初始化请求头字典
headers = {}
# 如果有会话ID,则添加到请求头
if self.session_id:
headers["Mcp-Session-Id"] = self.session_id
logger.debug(f"使用会话ID: {self.session_id}")
# 发送POST请求到MCP端点
response = self.session.post(
self.mcp_endpoint, json=message, headers=headers, timeout=30
)
# 检查响应状态码
if response.status_code == 200:
# 从响应头获取新的会话ID
new_session_id = response.headers.get("Mcp-Session-Id")
# 如果有新的会话ID且与当前不同,则更新
if new_session_id and new_session_id != self.session_id:
self.session_id = new_session_id
logger.info(f"更新会话ID: {self.session_id}")
# 返回响应的JSON内容
return response.json()
# 如果状态码为202,表示已接受但无内容
elif response.status_code == 202:
# Accepted - 通知或响应
return None
# 如果状态码为400,表示请求错误
elif response.status_code == 400:
logger.error(f"请求错误: {response.text}")
return None
# 如果状态码为401,表示会话ID无效
elif response.status_code == 401:
logger.error("会话ID无效,需要重新初始化")
self.session_id = None
return None
# 其他HTTP错误
else:
logger.error(f"HTTP错误: {response.status_code} - {response.text}")
return None
# 捕获请求异常
except requests.exceptions.RequestException as e:
logger.error(f"HTTP请求错误: {e}")
return None
# 调用工具方法
def call_tool(self, tool_name, arguments):
"""调用工具"""
# 构造调用工具的请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "tools/call",
"params": {"calls": [{"name": tool_name, "arguments": arguments}]},
}
# 发送请求并获取响应
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
# 获取调用结果
calls = response["result"].get("calls", [])
if calls:
return calls[0]
else:
# 调用失败,记录错误
logger.error(f"调用工具 {tool_name} 失败")
return None
# 初始化与服务器连接的方法
def initialize(self) -> bool:
"""初始化与服务器的连接"""
# 构造初始化请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "initialize",
"params": {
"protocolVersion": self.protocol_version,
"capabilities": {},
"clientInfo": self.client_info,
},
}
# 发送初始化请求
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
result = response["result"]
# 获取服务器信息
self.server_info = result.get("serverInfo")
# 获取服务器能力
self.capabilities = result.get("capabilities")
# 提取并存储会话ID
if "sessionId" in result:
self.session_id = result["sessionId"]
logger.info(f"获取到会话ID: {self.session_id}")
logger.info(f"服务器初始化成功: {self.server_info}")
return True
else:
# 初始化失败,记录错误
logger.error("服务器初始化失败")
return False
# HTTP传输的函数
def http_transport():
"""HTTP传输"""
print("=== HTTP传输 ===")
client = MCPClient("http://localhost:8000")
try:
if not client.initialize():
print("初始化失败")
return
session_info = client.get_session_info()
print(f" 会话信息: {session_info}")
tools = client.list_tools()
print(f"可用工具: {[tool['name'] for tool in tools]}")
result = client.call_tool("calculate", {"expression": "10 / 2 + 5"})
if result:
content = result.get("content", [])
if content:
print(f"计算结果: {content[0].get('text', '')}")
# 打印分隔符,表示开始演示Completion功能
print("\n=== Completion功能 ===")
# 打印分隔符,表示开始演示语言补全
print("\n🔤 语言补全演示:")
# 调用request_completion方法,请求针对'language'字段输入'py'的补全建议
# language_completion = client.request_completion(
# "ref/prompt", "code_review", "language", "py"
# )
## 如果成功获取到补全结果
# if language_completion:
# # 从返回结果中获取'values'字段,默认为空列表
# values = language_completion.get("values", [])
# # 打印前5个补全建议
# print(f"输入 'py' 的补全建议: {values[:5]}...") # 显示前5个
#
## 2. 演示框架补全(无上下文)
# print("\n🏗️ 框架补全演示(无上下文):")
# framework_completion = client.request_completion(
# "ref/prompt", "code_review", "framework", "fla"
# )
# if framework_completion:
# values = framework_completion.get("values", [])
# print(f"输入 'fla' 的补全建议: {values[:5]}...")
## 3. 演示框架补全(有上下文 - 已选择Python语言)
# print("\n🐍 框架补全演示(已选择Python语言):")
# framework_completion_with_context = client.request_completion(
# "ref/prompt",
# "code_review",
# "framework",
# "fla",
# context_arguments={"language": "python"},
# )
# if framework_completion_with_context:
# values = framework_completion_with_context.get("values", [])
# print(f"在Python语言下,输入 'fla' 的补全建议: {values}")
# 4. 演示分析类型补全
+ print("\n📊 分析类型补全演示:")
+ analysis_completion = client.request_completion(
+ "ref/resource", "file:///path", "analysis_type", "sec"
)
+ if analysis_completion:
+ values = analysis_completion.get("values", [])
+ print(f"输入 'sec' 的分析类型补全建议: {values}")
except Exception as e:
print(f"HTTP错误: {e}")
# 主函数
def main():
"""主函数"""
# 调用HTTP传输函数
http_transport()
# 判断是否为主程序入口
if __name__ == "__main__":
main()
3.2. server.py #
server.py
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
# 导入uuid模块,用于生成唯一会话ID
import uuid
# 导入time模块,用于时间戳
import time
# 从tools模块导入tools对象和completion_data
from tools import tools, completion_data
# 导入datetime模块,用于处理日期和时间
# 从http.server模块导入ThreadingHTTPServer和BaseHTTPRequestHandler类
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
"""MCP服务器核心类"""
# 初始化方法
def __init__(self):
# 协议版本号
self.protocol_version = "2025-06-18"
# 服务器信息,包括名称和版本
self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
# 服务器能力,包括工具、资源和提示
self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
# 会话管理字典
self.sessions = {}
# 获取所有活跃会话信息
def get_all_sessions(self):
"""获取所有活跃会话信息"""
return self.sessions.copy()
def _cleanup_expired_sessions(self, max_age: int = 3600):
"""清理过期会话(默认1小时)"""
current_time = time.time()
expired_sessions = []
for session_id, session_data in self.sessions.items():
if current_time - session_data["last_activity"] > max_age:
expired_sessions.append(session_id)
for session_id in expired_sessions:
del self.sessions[session_id]
logger.info(f"清理过期会话: {session_id}")
return len(expired_sessions)
# 处理工具列表请求
def handle_tools_list(self, params=None, session_id: str = None):
"""处理工具列表请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 返回工具列表
return {"tools": tools}
# 处理初始化请求(旧方法,已被覆盖)
def handle_initialize(self, params, session_id: str = None):
"""处理初始化请求"""
# 返回服务器能力
return {"capabilities": self.capabilities}
# 生成唯一的会话ID
def _generate_session_id(self) -> str:
"""生成唯一的会话ID"""
# 使用uuid4生成唯一ID
return str(uuid.uuid4())
# 获取会话信息
def get_session_info(self, session_id: str):
"""获取会话信息"""
# 如果会话ID存在于sessions字典中,返回会话信息
if session_id in self.sessions:
return self.sessions[session_id]
# 否则返回None
return None
# 获取或创建会话
def _get_or_create_session(self, session_id: str = None):
"""获取或创建会话"""
# 如果没有传入会话ID,则生成一个新的
if not session_id:
session_id = self._generate_session_id()
# 如果会话ID不存在于sessions字典中,则新建一个会话
if session_id not in self.sessions:
self.sessions[session_id] = {
"created_at": time.time(),
"last_activity": time.time(),
"message_count": 0,
"client_info": None,
}
# 记录新会话的创建
logger.info(f"创建新会话: {session_id}")
# 更新会话的最后活动时间
self.sessions[session_id]["last_activity"] = time.time()
# 增加消息计数
self.sessions[session_id]["message_count"] += 1
# 返回会话ID
return session_id
# 处理工具调用请求
def handle_tools_call(self, params, session_id: str = None):
"""处理工具调用请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 获取调用列表
calls = params.get("calls", [])
# 初始化结果列表
results = []
# 遍历每个调用
for call in calls:
# 获取工具名称
tool_name = call.get("name")
# 获取参数
arguments = call.get("arguments", {})
# 如果工具名称为"calculate"
if tool_name == "calculate":
# 获取表达式
expression = arguments.get("expression", "")
try:
# 计算表达式结果
result = eval(expression)
# 添加成功结果到结果列表
results.append(
{
"name": tool_name,
"content": [
{
"type": "text",
"text": f"计算结果: {expression} = {result}",
}
],
}
)
except Exception as e:
# 计算出错,添加错误信息
results.append(
{
"name": tool_name,
"isError": True,
"error": {"message": f"计算错误: {str(e)}"},
}
)
else:
# 未知工具,添加错误信息
results.append(
{
"name": tool_name,
"isError": True,
"error": {"message": f"未知工具: {tool_name}"},
}
)
# 返回所有调用的结果
return {"calls": results}
# 处理补全请求
def handle_completion_complete(self, params, session_id: str = None):
"""处理补全请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 获取引用类型和参数信息
ref = params.get("ref", {})
context = params.get("context", {})
ref_type = ref.get("type")
argument = params.get("argument", {})
argument_name = argument.get("name")
argument_value = argument.get("value", "")
# 根据引用类型和参数名提供相应的补全建议
completion_values = []
if ref_type == "ref/prompt":
# 处理提示补全
prompt_name = ref.get("name")
if prompt_name == "code_review":
if argument_name == "language":
# 语言补全
completion_values = self._get_language_completions(argument_value)
elif argument_name == "framework":
# 框架补全,需要根据已选择的语言
selected_language = context.get("arguments", {}).get("language")
if selected_language:
completion_values = self._get_framework_completions(
selected_language, argument_value
)
else:
completion_values = self._get_all_framework_completions(
argument_value
)
+ elif ref_type == "ref/resource":
# 处理资源URI补全
+ uri = ref.get("uri", "")
+ if "file:///" in uri:
+ if argument_name == "path":
+ completion_values = self._get_file_path_completions(argument_value)
+ elif argument_name == "analysis_type":
+ completion_values = self._get_analysis_type_completions(
+ argument_value
+ )
# 限制返回结果数量(规范要求最多100个)
if len(completion_values) > 100:
completion_values = completion_values[:100]
return {
"completion": {
"values": completion_values,
"total": len(completion_values),
"hasMore": False,
}
}
# 获取分析类型补全建议
+ def _get_analysis_type_completions(self, partial_value: str):
+ """获取分析类型补全建议"""
+ if not partial_value:
+ return completion_data["analysis_types"]
# 模糊匹配
+ return [
+ atype
+ for atype in completion_data["analysis_types"]
+ if partial_value.lower() in atype.lower()
+ ]
# 获取文件路径补全建议
+ def _get_file_path_completions(self, partial_value: str):
+ """获取文件路径补全建议"""
# 这里可以根据实际文件系统提供补全
# 为了演示,返回一些示例文件扩展名
+ if not partial_value:
+ return completion_data["file_extensions"]
+ return [
+ ext
+ for ext in completion_data["file_extensions"]
+ if partial_value.lower() in ext.lower()
+ ]
# 获取框架补全建议
def _get_framework_completions(self, language: str, partial_value: str):
"""根据语言获取框架补全建议"""
frameworks = completion_data["frameworks"].get(language, [])
if not partial_value:
return frameworks
# 模糊匹配
return [fw for fw in frameworks if partial_value.lower() in fw.lower()]
# 获取所有框架补全建议
def _get_all_framework_completions(self, partial_value: str):
"""获取所有框架补全建议"""
all_frameworks = []
for frameworks in completion_data["frameworks"].values():
all_frameworks.extend(frameworks)
if not partial_value:
return all_frameworks
# 模糊匹配
return [fw for fw in all_frameworks if partial_value.lower() in fw.lower()]
# 获取语言补全建议
def _get_language_completions(self, partial_value: str):
"""获取语言补全建议"""
if not partial_value:
return completion_data["languages"]
# 模糊匹配
return [
lang
for lang in completion_data["languages"]
if partial_value.lower() in lang.lower()
]
# 处理初始化请求(实际生效的方法,覆盖上面的方法)
def handle_initialize(self, params, session_id: str = None):
"""处理初始化请求"""
# 获取客户端信息
client_info = params.get("clientInfo", {})
# 获取或创建会话ID
session_id = self._get_or_create_session(session_id)
# 存储客户端信息到会话
self.sessions[session_id]["client_info"] = client_info
# 记录客户端初始化信息
logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
# 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
return {
"protocolVersion": self.protocol_version,
"capabilities": self.capabilities,
"serverInfo": self.server_info,
"sessionId": session_id, # 返回会话ID给客户端
}
# 定义MCPHTTPHandler类,继承自BaseHTTPRequestHandler
class MCPHTTPHandler(BaseHTTPRequestHandler):
"""HTTP处理器,支持Streamable HTTP传输"""
# 初始化方法,接收mcp_server参数
def __init__(self, *args, mcp_server=None, **kwargs):
# 保存MCP服务器实例
self.mcp_server = mcp_server
# 调用父类的初始化方法
super().__init__(*args, **kwargs)
# 处理MCP消息
def _handle_mcp_message(self, message, session_id: str = None):
"""处理MCP消息"""
# 获取方法名
method = message.get("method")
# 获取参数
params = message.get("params", {})
# 获取消息ID
msg_id = message.get("id")
# 判断方法类型
if method == "initialize":
# 调用服务器的handle_initialize方法
result = self.mcp_server.handle_initialize(params, session_id)
# 如果有消息ID,返回带ID的结果
return {"id": msg_id, "result": result} if msg_id else None
elif method == "tools/list":
# 处理工具列表请求
result = self.mcp_server.handle_tools_list(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
elif method == "tools/call":
# 处理工具调用请求
result = self.mcp_server.handle_tools_call(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
elif method == "completion/complete":
# 处理补全请求
result = self.mcp_server.handle_completion_complete(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
else:
# 如果方法不存在,返回错误
if msg_id:
return {
"id": msg_id,
"error": {"code": -32601, "message": f"Method not found: {method}"},
}
# 没有ID则返回None
return None
# 处理POST请求(发送消息到服务器)
def do_POST(self):
"""处理POST请求(发送消息到服务器)"""
try:
# 检查请求路径是否为/mcp
if self.path != "/mcp":
self.send_error(404, "MCP endpoint not found")
return
# 检查Accept头是否包含application/json
accept_header = self.headers.get("Accept", "")
if "application/json" not in accept_header:
self.send_error(400, "Missing required Accept header")
return
# 检查MCP-Protocol-Version头是否匹配
protocol_version = self.headers.get("MCP-Protocol-Version")
if (
protocol_version
and protocol_version != self.mcp_server.protocol_version
):
self.send_error(
400, f"Unsupported protocol version: {protocol_version}"
)
return
# 获取会话ID
session_id = self.headers.get("Mcp-Session-Id")
# 读取请求体长度
content_length = int(self.headers.get("Content-Length", 0))
# 读取请求体内容
body = self.rfile.read(content_length)
try:
# 尝试解析JSON消息
message = json.loads(body.decode("utf-8"))
except json.JSONDecodeError:
# JSON解析失败
self.send_error(400, "Invalid JSON")
return
# 处理MCP消息
response = self._handle_mcp_message(message, session_id)
# 发送响应
if response:
# 发送200响应码
self.send_response(200)
# 设置响应头Content-Type为application/json
self.send_header("Content-Type", "application/json")
# 设置协议版本头
self.send_header(
"MCP-Protocol-Version", self.mcp_server.protocol_version
)
# 如果响应中包含会话ID,则添加到响应头
if "result" in response and "sessionId" in response["result"]:
self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
# 结束响应头
self.end_headers()
# 写入JSON响应体
self.wfile.write(json.dumps(response).encode("utf-8"))
else:
# 没有响应内容,返回202 Accepted
self.send_response(202) # Accepted
self.end_headers()
except Exception as e:
# 记录错误日志
logger.error(f"POST处理错误: {e}")
# 返回500内部服务器错误
self.send_error(500, "Internal Server Error")
# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
"""运行HTTP服务器"""
# 定义处理器工厂函数,用于传递mcp_server实例
def handler_factory(*args, **kwargs):
return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)
# 创建ThreadingHTTPServer实例,绑定主机和端口
server = ThreadingHTTPServer((host, port), handler_factory)
# 记录服务器启动信息
logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")
# 记录会话管理功能启用信息
logger.info("会话管理功能已启用,支持会话ID跟踪和自动清理")
# 启动会话清理线程
import threading
# 定义会话清理函数
def cleanup_sessions():
while True:
try:
# 每5分钟清理一次
time.sleep(300)
# 调用会话清理方法
cleaned_count = mcp_server._cleanup_expired_sessions()
# 如果有清理的会话,记录日志
if cleaned_count > 0:
logger.info(f"清理了 {cleaned_count} 个过期会话")
# 记录当前活跃会话数
active_sessions = len(mcp_server.get_all_sessions())
logger.debug(f"当前活跃会话数: {active_sessions}")
except Exception as e:
# 清理出错,记录日志
logger.error(f"会话清理错误: {e}")
# 启动清理线程
cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
cleanup_thread.start()
logger.info("会话清理线程已启动")
try:
# 启动服务器,进入循环监听请求
server.serve_forever()
except KeyboardInterrupt:
# 捕获Ctrl+C,记录服务器停止
logger.info("HTTP服务器已停止")
logger.info("正在清理所有会话...")
# 清理所有会话
all_sessions = mcp_server.get_all_sessions()
for session_id in list(all_sessions.keys()):
del mcp_server.sessions[session_id]
logger.info(f"已清理 {len(all_sessions)} 个会话")
finally:
# 关闭服务器
server.server_close()
# 定义主函数
def main():
"""主函数"""
# 导入argparse模块,用于解析命令行参数
import argparse
# 创建ArgumentParser对象,设置描述信息
parser = argparse.ArgumentParser(description="MCP HTTP服务器")
# 添加--host参数,指定服务器主机,默认localhost
parser.add_argument(
"--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
)
# 添加--port参数,指定服务器端口,默认8000
parser.add_argument(
"--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
)
# 解析命令行参数
args = parser.parse_args()
# 创建MCP服务器实例
mcp_server = MCPServer()
# 运行HTTP服务器
run_http_server(mcp_server, args.host, args.port)
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
4. 代码检查 #
4.1. client.py #
client.py
# 导入json模块,用于处理JSON数据
import json
# 导入logging模块,用于日志记录
import logging
# 导入requests模块,用于发送HTTP请求
import requests
# 从urllib.parse模块导入urljoin,用于拼接URL
from urllib.parse import urljoin
# 配置日志级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义MCPClient类,表示MCP HTTP客户端
class MCPClient:
"""MCP HTTP客户端"""
# 初始化方法,设置服务器URL和相关属性
def __init__(self, server_url: str):
# 协议版本号
self.protocol_version = "2025-06-18"
# 客户端信息
self.client_info = {"name": "MCP客户端", "version": "1.0.0"}
# 服务器信息,初始化为None
self.server_info = None
# 服务器能力,初始化为None
self.capabilities = None
# 会话ID,初始化为None
self.session_id = None
# 消息ID,初始值为1
self.message_id = 1
# 去除服务器URL末尾的斜杠
self.server_url = server_url.rstrip("/")
# 拼接MCP接口的完整URL
self.mcp_endpoint = urljoin(self.server_url, "/mcp")
# 创建requests的Session对象
self.session = requests.Session()
# 设置Session的默认请求头
self.session.headers.update(
{
"Accept": "application/json",
"MCP-Protocol-Version": self.protocol_version,
}
)
# 请求补全建议方法
def request_completion(
self, # 当前实例
ref_type, # 引用类型:ref/prompt or ref/resource
ref_name, # 引用名称:code_review or file:///path
argument_name, # 参数名:language or path or analysis_type
argument_value, # 参数值:py or /path/to/file.js or sec
context_arguments=None, # 上下文参数
):
# 方法文档字符串,说明用途
"""请求补全建议"""
ref_obj = {"type": ref_type}
if ref_type == "ref/resource":
ref_obj["uri"] = ref_name
else:
ref_obj["name"] = ref_name
# 构造补全请求的消息体
request = {
"jsonrpc": "2.0", # 指定JSON-RPC协议版本
"id": self._generate_id(), # 生成唯一消息ID
"method": "completion/complete", # 指定调用的方法
"params": {
"ref": ref_obj, # 引用类型和名称
"argument": {
"name": argument_name,
"value": argument_value,
}, # 参数名和参数值
},
}
# 如果有上下文参数,添加到请求中
if context_arguments:
request["params"]["context"] = {"arguments": context_arguments}
# 通过私有方法发送请求消息,并获取响应
response = self._send_message(request)
# 判断响应是否存在且包含"result"字段
if response and "result" in response:
# 从响应中获取补全内容
completion = response["result"].get("completion", {})
# 记录获取到的补全建议数量
logger.info(f"获取到 {len(completion.get('values', []))} 个补全建议")
# 返回补全内容
return completion
else:
# 如果请求失败,记录错误日志
logger.error(f"请求补全失败")
# 返回None表示失败
return None
# 重置会话方法
def reset_session(self):
"""重置会话"""
# 保存旧的会话ID
old_session_id = self.session_id
# 清空当前会话ID
self.session_id = None
# 记录会话重置信息
logger.info(f"会话已重置,原会话ID: {old_session_id}")
# 获取当前会话信息方法
def get_session_info(self):
"""获取当前会话信息"""
# 返回会话相关信息字典
return {
"session_id": self.session_id,
"server_url": self.server_url,
"protocol_version": self.protocol_version,
}
# 获取可用工具列表方法
def list_tools(self):
"""获取可用工具列表"""
# 构造请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "tools/list",
"params": {},
}
# 发送请求并获取响应
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
# 获取工具列表
tools = response["result"].get("tools", [])
logger.info(f"可用工具数量: {len(tools)}")
return tools
else:
# 获取失败,记录错误
logger.error("获取工具列表失败")
return []
# 通过HTTP发送消息的私有方法
def _send_message(self, message):
"""通过HTTP发送消息"""
try:
# 初始化请求头字典
headers = {}
# 如果有会话ID,则添加到请求头
if self.session_id:
headers["Mcp-Session-Id"] = self.session_id
logger.debug(f"使用会话ID: {self.session_id}")
# 发送POST请求
response = self.session.post(
self.mcp_endpoint, json=message, headers=headers, timeout=30
)
# 检查响应状态码
if response.status_code == 200:
# 检查响应头中是否有新的会话ID
new_session_id = response.headers.get("Mcp-Session-Id")
# 如果有新的会话ID且与当前不同,则更新
if new_session_id and new_session_id != self.session_id:
self.session_id = new_session_id
logger.info(f"更新会话ID: {self.session_id}")
# 返回响应的JSON内容
return response.json()
# 如果状态码为202,表示已接受但无内容
elif response.status_code == 202:
# Accepted - 通知或响应
return None
# 如果状态码为400,表示请求错误
elif response.status_code == 400:
logger.error(f"请求错误: {response.text}")
return None
# 如果状态码为401,表示会话ID无效
elif response.status_code == 401:
logger.error("会话ID无效,需要重新初始化")
self.session_id = None
return None
# 其他HTTP错误
else:
logger.error(f"HTTP错误: {response.status_code} - {response.text}")
return None
# 捕获请求异常
except requests.exceptions.RequestException as e:
logger.error(f"HTTP请求错误: {e}")
return None
# 生成消息ID的方法
def _generate_id(self) -> str:
"""生成消息ID"""
# 消息ID自增
self.message_id += 1
# 返回字符串类型的消息ID
return str(self.message_id)
# 发送消息到服务器的方法(重复定义,实际只会用后面这个)
def _send_message(self, message):
"""通过HTTP发送消息"""
try:
# 初始化请求头字典
headers = {}
# 如果有会话ID,则添加到请求头
if self.session_id:
headers["Mcp-Session-Id"] = self.session_id
logger.debug(f"使用会话ID: {self.session_id}")
# 发送POST请求到MCP端点
response = self.session.post(
self.mcp_endpoint, json=message, headers=headers, timeout=30
)
# 检查响应状态码
if response.status_code == 200:
# 从响应头获取新的会话ID
new_session_id = response.headers.get("Mcp-Session-Id")
# 如果有新的会话ID且与当前不同,则更新
if new_session_id and new_session_id != self.session_id:
self.session_id = new_session_id
logger.info(f"更新会话ID: {self.session_id}")
# 返回响应的JSON内容
return response.json()
# 如果状态码为202,表示已接受但无内容
elif response.status_code == 202:
# Accepted - 通知或响应
return None
# 如果状态码为400,表示请求错误
elif response.status_code == 400:
logger.error(f"请求错误: {response.text}")
return None
# 如果状态码为401,表示会话ID无效
elif response.status_code == 401:
logger.error("会话ID无效,需要重新初始化")
self.session_id = None
return None
# 其他HTTP错误
else:
logger.error(f"HTTP错误: {response.status_code} - {response.text}")
return None
# 捕获请求异常
except requests.exceptions.RequestException as e:
logger.error(f"HTTP请求错误: {e}")
return None
# 调用工具方法
def call_tool(self, tool_name, arguments):
"""调用工具"""
# 构造调用工具的请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "tools/call",
"params": {"calls": [{"name": tool_name, "arguments": arguments}]},
}
# 发送请求并获取响应
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
# 获取调用结果
calls = response["result"].get("calls", [])
if calls:
return calls[0]
else:
# 调用失败,记录错误
logger.error(f"调用工具 {tool_name} 失败")
return None
# 初始化与服务器连接的方法
def initialize(self) -> bool:
"""初始化与服务器的连接"""
# 构造初始化请求消息
request = {
"jsonrpc": "2.0",
"id": self._generate_id(),
"method": "initialize",
"params": {
"protocolVersion": self.protocol_version,
"capabilities": {},
"clientInfo": self.client_info,
},
}
# 发送初始化请求
response = self._send_message(request)
# 如果响应存在且包含result字段
if response and "result" in response:
result = response["result"]
# 获取服务器信息
self.server_info = result.get("serverInfo")
# 获取服务器能力
self.capabilities = result.get("capabilities")
# 提取并存储会话ID
if "sessionId" in result:
self.session_id = result["sessionId"]
logger.info(f"获取到会话ID: {self.session_id}")
logger.info(f"服务器初始化成功: {self.server_info}")
return True
else:
# 初始化失败,记录错误
logger.error("服务器初始化失败")
return False
# HTTP传输的函数
def http_transport():
"""HTTP传输"""
print("=== HTTP传输 ===")
client = MCPClient("http://localhost:8000")
try:
if not client.initialize():
print("初始化失败")
return
session_info = client.get_session_info()
print(f" 会话信息: {session_info}")
tools = client.list_tools()
print(f"可用工具: {[tool['name'] for tool in tools]}")
result = client.call_tool("calculate", {"expression": "10 / 2 + 5"})
if result:
content = result.get("content", [])
if content:
print(f"计算结果: {content[0].get('text', '')}")
# 打印分隔符,表示开始演示Completion功能
print("\n=== Completion功能 ===")
# 打印分隔符,表示开始演示语言补全
print("\n🔤 语言补全演示:")
# 调用request_completion方法,请求针对'language'字段输入'py'的补全建议
# language_completion = client.request_completion(
# "ref/prompt", "code_review", "language", "py"
# )
## 如果成功获取到补全结果
# if language_completion:
# # 从返回结果中获取'values'字段,默认为空列表
# values = language_completion.get("values", [])
# # 打印前5个补全建议
# print(f"输入 'py' 的补全建议: {values[:5]}...") # 显示前5个
#
## 2. 演示框架补全(无上下文)
# print("\n🏗️ 框架补全演示(无上下文):")
# framework_completion = client.request_completion(
# "ref/prompt", "code_review", "framework", "fla"
# )
# if framework_completion:
# values = framework_completion.get("values", [])
# print(f"输入 'fla' 的补全建议: {values[:5]}...")
## 3. 演示框架补全(有上下文 - 已选择Python语言)
# print("\n🐍 框架补全演示(已选择Python语言):")
# framework_completion_with_context = client.request_completion(
# "ref/prompt",
# "code_review",
# "framework",
# "fla",
# context_arguments={"language": "python"},
# )
# if framework_completion_with_context:
# values = framework_completion_with_context.get("values", [])
# print(f"在Python语言下,输入 'fla' 的补全建议: {values}")
# 4. 演示分析类型补全
# print("\n📊 分析类型补全演示:")
# analysis_completion = client.request_completion(
# "ref/resource", "file:///path", "analysis_type", "sec"
# )
# if analysis_completion:
# values = analysis_completion.get("values", [])
# print(f"输入 'sec' 的分析类型补全建议: {values}")
# 5. 演示文件扩展名补全
# print("\n📁 文件扩展名补全演示:")
# file_completion = client.request_completion(
# "ref/resource", "file:///path", "path", ".py"
# )
# if file_completion:
# values = file_completion.get("values", [])
# print(f"输入 '.py' 的文件扩展名补全建议: {values[:5]}...")
# 测试code_review工具
+ review_result = client.call_tool(
+ "code_review",
+ {
+ "language": "python",
+ "framework": "flask",
+ "code": "print('Hello, World!')",
+ },
)
+ if review_result:
+ content = review_result.get("content", [])
+ if content:
+ print(f"代码审查结果: {content[0].get('text', '')}")
# 测试file_analyzer工具
+ analysis_result = client.call_tool(
+ "file_analyzer", {"file_path": "example.py", "analysis_type": "security"}
+ )
+ if analysis_result:
+ content = analysis_result.get("content", [])
+ if content:
+ print(f"文件分析结果: {content[0].get('text', '')}")
except Exception as e:
print(f"HTTP错误: {e}")
# 主函数
def main():
"""主函数"""
# 调用HTTP传输函数
http_transport()
# 判断是否为主程序入口
if __name__ == "__main__":
main()
4.2. server.py #
server.py
# 导入logging模块,用于日志记录
import logging
# 导入json模块,用于处理JSON数据
import json
# 导入uuid模块,用于生成唯一会话ID
import uuid
# 导入time模块,用于时间戳
import time
# 从tools模块导入tools对象和completion_data
from tools import tools, completion_data
# 导入datetime模块,用于处理日期和时间
# 从http.server模块导入ThreadingHTTPServer和BaseHTTPRequestHandler类
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
"""MCP服务器核心类"""
# 初始化方法
def __init__(self):
# 协议版本号
self.protocol_version = "2025-06-18"
# 服务器信息,包括名称和版本
self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
# 服务器能力,包括工具、资源和提示
self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
# 会话管理字典
self.sessions = {}
# 获取所有活跃会话信息
def get_all_sessions(self):
"""获取所有活跃会话信息"""
return self.sessions.copy()
def _cleanup_expired_sessions(self, max_age: int = 3600):
"""清理过期会话(默认1小时)"""
current_time = time.time()
expired_sessions = []
for session_id, session_data in self.sessions.items():
if current_time - session_data["last_activity"] > max_age:
expired_sessions.append(session_id)
for session_id in expired_sessions:
del self.sessions[session_id]
logger.info(f"清理过期会话: {session_id}")
return len(expired_sessions)
# 处理工具列表请求
def handle_tools_list(self, params=None, session_id: str = None):
"""处理工具列表请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 返回工具列表
return {"tools": tools}
# 处理初始化请求(旧方法,已被覆盖)
def handle_initialize(self, params, session_id: str = None):
"""处理初始化请求"""
# 返回服务器能力
return {"capabilities": self.capabilities}
# 生成唯一的会话ID
def _generate_session_id(self) -> str:
"""生成唯一的会话ID"""
# 使用uuid4生成唯一ID
return str(uuid.uuid4())
# 获取会话信息
def get_session_info(self, session_id: str):
"""获取会话信息"""
# 如果会话ID存在于sessions字典中,返回会话信息
if session_id in self.sessions:
return self.sessions[session_id]
# 否则返回None
return None
# 获取或创建会话
def _get_or_create_session(self, session_id: str = None):
"""获取或创建会话"""
# 如果没有传入会话ID,则生成一个新的
if not session_id:
session_id = self._generate_session_id()
# 如果会话ID不存在于sessions字典中,则新建一个会话
if session_id not in self.sessions:
self.sessions[session_id] = {
"created_at": time.time(),
"last_activity": time.time(),
"message_count": 0,
"client_info": None,
}
# 记录新会话的创建
logger.info(f"创建新会话: {session_id}")
# 更新会话的最后活动时间
self.sessions[session_id]["last_activity"] = time.time()
# 增加消息计数
self.sessions[session_id]["message_count"] += 1
# 返回会话ID
return session_id
# 处理工具调用请求
def handle_tools_call(self, params, session_id: str = None):
"""处理工具调用请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 获取调用列表
calls = params.get("calls", [])
# 初始化结果列表
results = []
# 遍历每个调用
for call in calls:
# 获取工具名称
tool_name = call.get("name")
# 获取参数
arguments = call.get("arguments", {})
# 如果工具名称为"calculate"
if tool_name == "calculate":
# 获取表达式
expression = arguments.get("expression", "")
try:
# 计算表达式结果
result = eval(expression)
# 添加成功结果到结果列表
results.append(
{
"name": tool_name,
"content": [
{
"type": "text",
"text": f"计算结果: {expression} = {result}",
}
],
}
)
except Exception as e:
# 计算出错,添加错误信息
results.append(
{
"name": tool_name,
"isError": True,
"error": {"message": f"计算错误: {str(e)}"},
}
)
+ elif tool_name == "code_review":
# 处理代码审查工具
+ language = arguments.get("language", "")
+ framework = arguments.get("framework", "")
+ code = arguments.get("code", "")
+ review_result = f"代码审查结果:\n语言: {language}"
+ if framework:
+ review_result += f"\n框架: {framework}"
+ review_result += f"\n代码长度: {len(code)} 字符"
+ results.append(
+ {
+ "name": tool_name,
+ "content": [{"type": "text", "text": review_result}],
+ }
+ )
+ elif tool_name == "file_analyzer":
# 处理文件分析工具
+ file_path = arguments.get("file_path", "")
+ analysis_type = arguments.get("analysis_type", "syntax")
+ analysis_result = f"文件分析结果:\n文件路径: {file_path}\n分析类型: {analysis_type}\n状态: 分析完成"
+ results.append(
+ {
+ "name": tool_name,
+ "content": [{"type": "text", "text": analysis_result}],
+ }
+ )
else:
# 未知工具,添加错误信息
results.append(
{
"name": tool_name,
"isError": True,
"error": {"message": f"未知工具: {tool_name}"},
}
)
# 返回所有调用的结果
return {"calls": results}
# 处理补全请求
def handle_completion_complete(self, params, session_id: str = None):
"""处理补全请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 获取引用类型和参数信息
ref = params.get("ref", {})
context = params.get("context", {})
ref_type = ref.get("type")
argument = params.get("argument", {})
argument_name = argument.get("name")
argument_value = argument.get("value", "")
# 根据引用类型和参数名提供相应的补全建议
completion_values = []
if ref_type == "ref/prompt":
# 处理提示补全
prompt_name = ref.get("name")
if prompt_name == "code_review":
if argument_name == "language":
# 语言补全
completion_values = self._get_language_completions(argument_value)
elif argument_name == "framework":
# 框架补全,需要根据已选择的语言
selected_language = context.get("arguments", {}).get("language")
if selected_language:
completion_values = self._get_framework_completions(
selected_language, argument_value
)
else:
completion_values = self._get_all_framework_completions(
argument_value
)
elif ref_type == "ref/resource":
# 处理资源URI补全
uri = ref.get("uri", "")
if "file:///" in uri:
if argument_name == "path":
completion_values = self._get_file_path_completions(argument_value)
elif argument_name == "analysis_type":
completion_values = self._get_analysis_type_completions(
argument_value
)
# 限制返回结果数量(规范要求最多100个)
if len(completion_values) > 100:
completion_values = completion_values[:100]
return {
"completion": {
"values": completion_values,
"total": len(completion_values),
"hasMore": False,
}
}
# 获取分析类型补全建议
def _get_analysis_type_completions(self, partial_value: str):
"""获取分析类型补全建议"""
if not partial_value:
return completion_data["analysis_types"]
# 模糊匹配
return [
atype
for atype in completion_data["analysis_types"]
if partial_value.lower() in atype.lower()
]
# 获取文件路径补全建议
def _get_file_path_completions(self, partial_value: str):
"""获取文件路径补全建议"""
# 这里可以根据实际文件系统提供补全
# 为了演示,返回一些示例文件扩展名
if not partial_value:
return completion_data["file_extensions"]
return [
ext
for ext in completion_data["file_extensions"]
if partial_value.lower() in ext.lower()
]
# 获取框架补全建议
def _get_framework_completions(self, language: str, partial_value: str):
"""根据语言获取框架补全建议"""
frameworks = completion_data["frameworks"].get(language, [])
if not partial_value:
return frameworks
# 模糊匹配
return [fw for fw in frameworks if partial_value.lower() in fw.lower()]
# 获取所有框架补全建议
def _get_all_framework_completions(self, partial_value: str):
"""获取所有框架补全建议"""
all_frameworks = []
for frameworks in completion_data["frameworks"].values():
all_frameworks.extend(frameworks)
if not partial_value:
return all_frameworks
# 模糊匹配
return [fw for fw in all_frameworks if partial_value.lower() in fw.lower()]
# 获取语言补全建议
def _get_language_completions(self, partial_value: str):
"""获取语言补全建议"""
if not partial_value:
return completion_data["languages"]
# 模糊匹配
return [
lang
for lang in completion_data["languages"]
if partial_value.lower() in lang.lower()
]
# 处理初始化请求(实际生效的方法,覆盖上面的方法)
def handle_initialize(self, params, session_id: str = None):
"""处理初始化请求"""
# 获取客户端信息
client_info = params.get("clientInfo", {})
# 获取或创建会话ID
session_id = self._get_or_create_session(session_id)
# 存储客户端信息到会话
self.sessions[session_id]["client_info"] = client_info
# 记录客户端初始化信息
logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
# 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
return {
"protocolVersion": self.protocol_version,
"capabilities": self.capabilities,
"serverInfo": self.server_info,
"sessionId": session_id, # 返回会话ID给客户端
}
# 定义MCPHTTPHandler类,继承自BaseHTTPRequestHandler
class MCPHTTPHandler(BaseHTTPRequestHandler):
"""HTTP处理器,支持Streamable HTTP传输"""
# 初始化方法,接收mcp_server参数
def __init__(self, *args, mcp_server=None, **kwargs):
# 保存MCP服务器实例
self.mcp_server = mcp_server
# 调用父类的初始化方法
super().__init__(*args, **kwargs)
# 处理MCP消息
def _handle_mcp_message(self, message, session_id: str = None):
"""处理MCP消息"""
# 获取方法名
method = message.get("method")
# 获取参数
params = message.get("params", {})
# 获取消息ID
msg_id = message.get("id")
# 判断方法类型
if method == "initialize":
# 调用服务器的handle_initialize方法
result = self.mcp_server.handle_initialize(params, session_id)
# 如果有消息ID,返回带ID的结果
return {"id": msg_id, "result": result} if msg_id else None
elif method == "tools/list":
# 处理工具列表请求
result = self.mcp_server.handle_tools_list(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
elif method == "tools/call":
# 处理工具调用请求
result = self.mcp_server.handle_tools_call(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
elif method == "completion/complete":
# 处理补全请求
result = self.mcp_server.handle_completion_complete(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
else:
# 如果方法不存在,返回错误
if msg_id:
return {
"id": msg_id,
"error": {"code": -32601, "message": f"Method not found: {method}"},
}
# 没有ID则返回None
return None
# 处理POST请求(发送消息到服务器)
def do_POST(self):
"""处理POST请求(发送消息到服务器)"""
try:
# 检查请求路径是否为/mcp
if self.path != "/mcp":
self.send_error(404, "MCP endpoint not found")
return
# 检查Accept头是否包含application/json
accept_header = self.headers.get("Accept", "")
if "application/json" not in accept_header:
self.send_error(400, "Missing required Accept header")
return
# 检查MCP-Protocol-Version头是否匹配
protocol_version = self.headers.get("MCP-Protocol-Version")
if (
protocol_version
and protocol_version != self.mcp_server.protocol_version
):
self.send_error(
400, f"Unsupported protocol version: {protocol_version}"
)
return
# 获取会话ID
session_id = self.headers.get("Mcp-Session-Id")
# 读取请求体长度
content_length = int(self.headers.get("Content-Length", 0))
# 读取请求体内容
body = self.rfile.read(content_length)
try:
# 尝试解析JSON消息
message = json.loads(body.decode("utf-8"))
except json.JSONDecodeError:
# JSON解析失败
self.send_error(400, "Invalid JSON")
return
# 处理MCP消息
response = self._handle_mcp_message(message, session_id)
# 发送响应
if response:
# 发送200响应码
self.send_response(200)
# 设置响应头Content-Type为application/json
self.send_header("Content-Type", "application/json")
# 设置协议版本头
self.send_header(
"MCP-Protocol-Version", self.mcp_server.protocol_version
)
# 如果响应中包含会话ID,则添加到响应头
if "result" in response and "sessionId" in response["result"]:
self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
# 结束响应头
self.end_headers()
# 写入JSON响应体
self.wfile.write(json.dumps(response).encode("utf-8"))
else:
# 没有响应内容,返回202 Accepted
self.send_response(202) # Accepted
self.end_headers()
except Exception as e:
# 记录错误日志
logger.error(f"POST处理错误: {e}")
# 返回500内部服务器错误
self.send_error(500, "Internal Server Error")
# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
"""运行HTTP服务器"""
# 定义处理器工厂函数,用于传递mcp_server实例
def handler_factory(*args, **kwargs):
return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)
# 创建ThreadingHTTPServer实例,绑定主机和端口
server = ThreadingHTTPServer((host, port), handler_factory)
# 记录服务器启动信息
logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")
# 记录会话管理功能启用信息
logger.info("会话管理功能已启用,支持会话ID跟踪和自动清理")
# 启动会话清理线程
import threading
# 定义会话清理函数
def cleanup_sessions():
while True:
try:
# 每5分钟清理一次
time.sleep(300)
# 调用会话清理方法
cleaned_count = mcp_server._cleanup_expired_sessions()
# 如果有清理的会话,记录日志
if cleaned_count > 0:
logger.info(f"清理了 {cleaned_count} 个过期会话")
# 记录当前活跃会话数
active_sessions = len(mcp_server.get_all_sessions())
logger.debug(f"当前活跃会话数: {active_sessions}")
except Exception as e:
# 清理出错,记录日志
logger.error(f"会话清理错误: {e}")
# 启动清理线程
cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
cleanup_thread.start()
logger.info("会话清理线程已启动")
try:
# 启动服务器,进入循环监听请求
server.serve_forever()
except KeyboardInterrupt:
# 捕获Ctrl+C,记录服务器停止
logger.info("HTTP服务器已停止")
logger.info("正在清理所有会话...")
# 清理所有会话
all_sessions = mcp_server.get_all_sessions()
for session_id in list(all_sessions.keys()):
del mcp_server.sessions[session_id]
logger.info(f"已清理 {len(all_sessions)} 个会话")
finally:
# 关闭服务器
server.server_close()
# 定义主函数
def main():
"""主函数"""
# 导入argparse模块,用于解析命令行参数
import argparse
# 创建ArgumentParser对象,设置描述信息
parser = argparse.ArgumentParser(description="MCP HTTP服务器")
# 添加--host参数,指定服务器主机,默认localhost
parser.add_argument(
"--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
)
# 添加--port参数,指定服务器端口,默认8000
parser.add_argument(
"--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
)
# 解析命令行参数
args = parser.parse_args()
# 创建MCP服务器实例
mcp_server = MCPServer()
# 运行HTTP服务器
run_http_server(mcp_server, args.host, args.port)
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
4.3. tools.py #
tools.py
# MCP工具定义
tools = [
{
"name": "calculate",
"description": "执行数学计算",
"inputSchema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "数学表达式",
}
},
"required": ["expression"],
},
},
+ {
+ "name": "code_review",
+ "description": "代码审查工具",
+ "inputSchema": {
+ "type": "object",
+ "properties": {
+ "language": {
+ "type": "string",
+ "description": "编程语言",
+ },
+ "framework": {
+ "type": "string",
+ "description": "框架名称",
+ },
+ "code": {
+ "type": "string",
+ "description": "要审查的代码",
+ },
+ },
+ "required": ["language", "code"],
+ },
+ },
+ {
+ "name": "file_analyzer",
+ "description": "文件分析工具",
+ "inputSchema": {
+ "type": "object",
+ "properties": {
+ "file_path": {
+ "type": "string",
+ "description": "文件路径",
+ },
+ "analysis_type": {
+ "type": "string",
+ "description": "分析类型",
+ },
+ },
+ "required": ["file_path"],
+ },
+ },
]
# 定义补全数据字典
completion_data = {
# 支持的编程语言列表
"languages": [
"python",
"javascript",
"typescript",
"java",
"c++",
"c#",
"go",
"rust",
"php",
"ruby",
"swift",
"kotlin",
"scala",
"dart",
"elixir",
"clojure",
],
# 各编程语言对应的主流框架
"frameworks": {
# Python常用框架
"python": [
"django",
"flask",
"fastapi",
"pytorch",
"tensorflow",
"pandas",
"numpy",
],
# JavaScript常用框架
"javascript": ["react", "vue", "angular", "express", "next.js", "nuxt.js"],
# TypeScript常用框架
"typescript": ["react", "vue", "angular", "express", "next.js", "nuxt.js"],
# Java常用框架
"java": ["spring", "spring boot", "hibernate", "maven", "gradle"],
# C++常用框架
"c++": ["boost", "qt", "wxwidgets", "stl"],
# C#常用框架
"c#": [".net", "asp.net", "entity framework", "xamarin"],
# Go常用框架
"go": ["gin", "echo", "fiber", "gorilla", "cobra"],
# Rust常用框架
"rust": ["tokio", "actix", "rocket", "serde", "clap"],
},
# 支持的分析类型
"analysis_types": [
"syntax", # 语法分析
"complexity", # 复杂度分析
"security", # 安全性分析
"performance", # 性能分析
"style", # 代码风格分析
"documentation", # 文档分析
],
# 支持的文件扩展名
"file_extensions": [
".py", # Python文件
".js", # JavaScript文件
".ts", # TypeScript文件
".java", # Java文件
".cpp", # C++文件
".cs", # C#文件
".go", # Go文件
".rs", # Rust文件
".php", # PHP文件
".rb", # Ruby文件
],
}