ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1. 语言补全
    • 1.1. client.py
    • 1.2. server.py
    • 1.3. tools.py
  • 2. 框架补全
    • 2.1. client.py
    • 2.2. server.py
  • 3. 分析类型补全
    • 3.1. client.py
    • 3.2. server.py
  • 4. 代码检查
    • 4.1. client.py
    • 4.2. server.py
    • 4.3. tools.py

1. 语言补全 #

1.1. client.py #

client.py

# 导入json模块,用于处理JSON数据
import json

# 导入logging模块,用于日志记录
import logging

# 导入requests模块,用于发送HTTP请求
import requests

# 从urllib.parse模块导入urljoin,用于拼接URL
from urllib.parse import urljoin

# 配置日志级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义MCPClient类,表示MCP HTTP客户端
class MCPClient:
    """MCP HTTP客户端"""

    # 初始化方法,设置服务器URL和相关属性
    def __init__(self, server_url: str):
        # 协议版本号
        self.protocol_version = "2025-06-18"
        # 客户端信息
        self.client_info = {"name": "MCP客户端", "version": "1.0.0"}
        # 服务器信息,初始化为None
        self.server_info = None
        # 服务器能力,初始化为None
        self.capabilities = None
        # 会话ID,初始化为None
        self.session_id = None
        # 消息ID,初始值为1
        self.message_id = 1

        # 去除服务器URL末尾的斜杠
        self.server_url = server_url.rstrip("/")
        # 拼接MCP接口的完整URL
        self.mcp_endpoint = urljoin(self.server_url, "/mcp")
        # 创建requests的Session对象
        self.session = requests.Session()
        # 设置Session的默认请求头
        self.session.headers.update(
            {
                "Accept": "application/json",
                "MCP-Protocol-Version": self.protocol_version,
            }
        )

    # 请求补全建议方法
+   def request_completion(
+       self,  # 当前实例
+       ref_type,  # 引用类型:ref/prompt
+       ref_name,  # 引用名称:code_review
+       argument_name,  # 参数名:language
+       argument_value,  # 参数值:py
+   ):
        # 方法文档字符串,说明用途
+       """请求补全建议"""
        # 构造补全请求的消息体
+       request = {
+           "jsonrpc": "2.0",  # 指定JSON-RPC协议版本
+           "id": self._generate_id(),  # 生成唯一消息ID
+           "method": "completion/complete",  # 指定调用的方法
+           "params": {
+               "ref": {"type": ref_type, "name": ref_name},  # 引用类型和名称
+               "argument": {
+                   "name": argument_name,
+                   "value": argument_value,
+               },  # 参数名和参数值
+           },
+       }
        # 通过私有方法发送请求消息,并获取响应
+       response = self._send_message(request)
        # 判断响应是否存在且包含"result"字段
+       if response and "result" in response:
            # 从响应中获取补全内容
+           completion = response["result"].get("completion", {})
            # 记录获取到的补全建议数量
+           logger.info(f"获取到 {len(completion.get('values', []))} 个补全建议")
            # 返回补全内容
+           return completion
+       else:
            # 如果请求失败,记录错误日志
+           logger.error(f"请求补全失败")
            # 返回None表示失败
+           return None

    # 重置会话方法
    def reset_session(self):
        """重置会话"""
        # 保存旧的会话ID
        old_session_id = self.session_id
        # 清空当前会话ID
        self.session_id = None
        # 记录会话重置信息
        logger.info(f"会话已重置,原会话ID: {old_session_id}")

    # 获取当前会话信息方法
    def get_session_info(self):
        """获取当前会话信息"""
        # 返回会话相关信息字典
        return {
            "session_id": self.session_id,
            "server_url": self.server_url,
            "protocol_version": self.protocol_version,
        }

    # 获取可用工具列表方法
    def list_tools(self):
        """获取可用工具列表"""
        # 构造请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "tools/list",
            "params": {},
        }

        # 发送请求并获取响应
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            # 获取工具列表
            tools = response["result"].get("tools", [])
            logger.info(f"可用工具数量: {len(tools)}")
            return tools
        else:
            # 获取失败,记录错误
            logger.error("获取工具列表失败")
            return []

    # 通过HTTP发送消息的私有方法
    def _send_message(self, message):
        """通过HTTP发送消息"""
        try:
            # 初始化请求头字典
            headers = {}
            # 如果有会话ID,则添加到请求头
            if self.session_id:
                headers["Mcp-Session-Id"] = self.session_id
                logger.debug(f"使用会话ID: {self.session_id}")

            # 发送POST请求
            response = self.session.post(
                self.mcp_endpoint, json=message, headers=headers, timeout=30
            )

            # 检查响应状态码
            if response.status_code == 200:
                # 检查响应头中是否有新的会话ID
                new_session_id = response.headers.get("Mcp-Session-Id")
                # 如果有新的会话ID且与当前不同,则更新
                if new_session_id and new_session_id != self.session_id:
                    self.session_id = new_session_id
                    logger.info(f"更新会话ID: {self.session_id}")

                # 返回响应的JSON内容
                return response.json()
            # 如果状态码为202,表示已接受但无内容
            elif response.status_code == 202:
                # Accepted - 通知或响应
                return None
            # 如果状态码为400,表示请求错误
            elif response.status_code == 400:
                logger.error(f"请求错误: {response.text}")
                return None
            # 如果状态码为401,表示会话ID无效
            elif response.status_code == 401:
                logger.error("会话ID无效,需要重新初始化")
                self.session_id = None
                return None
            # 其他HTTP错误
            else:
                logger.error(f"HTTP错误: {response.status_code} - {response.text}")
                return None

        # 捕获请求异常
        except requests.exceptions.RequestException as e:
            logger.error(f"HTTP请求错误: {e}")
            return None

    # 生成消息ID的方法
    def _generate_id(self) -> str:
        """生成消息ID"""
        # 消息ID自增
        self.message_id += 1
        # 返回字符串类型的消息ID
        return str(self.message_id)

    # 发送消息到服务器的方法(重复定义,实际只会用后面这个)
    def _send_message(self, message):
        """通过HTTP发送消息"""
        try:
            # 初始化请求头字典
            headers = {}
            # 如果有会话ID,则添加到请求头
            if self.session_id:
                headers["Mcp-Session-Id"] = self.session_id
                logger.debug(f"使用会话ID: {self.session_id}")

            # 发送POST请求到MCP端点
            response = self.session.post(
                self.mcp_endpoint, json=message, headers=headers, timeout=30
            )

            # 检查响应状态码
            if response.status_code == 200:
                # 从响应头获取新的会话ID
                new_session_id = response.headers.get("Mcp-Session-Id")
                # 如果有新的会话ID且与当前不同,则更新
                if new_session_id and new_session_id != self.session_id:
                    self.session_id = new_session_id
                    logger.info(f"更新会话ID: {self.session_id}")

                # 返回响应的JSON内容
                return response.json()
            # 如果状态码为202,表示已接受但无内容
            elif response.status_code == 202:
                # Accepted - 通知或响应
                return None
            # 如果状态码为400,表示请求错误
            elif response.status_code == 400:
                logger.error(f"请求错误: {response.text}")
                return None
            # 如果状态码为401,表示会话ID无效
            elif response.status_code == 401:
                logger.error("会话ID无效,需要重新初始化")
                self.session_id = None
                return None
            # 其他HTTP错误
            else:
                logger.error(f"HTTP错误: {response.status_code} - {response.text}")
                return None

        # 捕获请求异常
        except requests.exceptions.RequestException as e:
            logger.error(f"HTTP请求错误: {e}")
            return None

    # 调用工具方法
    def call_tool(self, tool_name, arguments):
        """调用工具"""
        # 构造调用工具的请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "tools/call",
            "params": {"calls": [{"name": tool_name, "arguments": arguments}]},
        }

        # 发送请求并获取响应
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            # 获取调用结果
            calls = response["result"].get("calls", [])
            if calls:
                return calls[0]
        else:
            # 调用失败,记录错误
            logger.error(f"调用工具 {tool_name} 失败")
        return None

    # 初始化与服务器连接的方法
    def initialize(self) -> bool:
        """初始化与服务器的连接"""
        # 构造初始化请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "initialize",
            "params": {
                "protocolVersion": self.protocol_version,
                "capabilities": {},
                "clientInfo": self.client_info,
            },
        }

        # 发送初始化请求
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            result = response["result"]
            # 获取服务器信息
            self.server_info = result.get("serverInfo")
            # 获取服务器能力
            self.capabilities = result.get("capabilities")

            # 提取并存储会话ID
            if "sessionId" in result:
                self.session_id = result["sessionId"]
                logger.info(f"获取到会话ID: {self.session_id}")

            logger.info(f"服务器初始化成功: {self.server_info}")
            return True
        else:
            # 初始化失败,记录错误
            logger.error("服务器初始化失败")
            return False


# HTTP传输的函数
def http_transport():
    """HTTP传输"""
    print("=== HTTP传输 ===")

    client = MCPClient("http://localhost:8000")

    try:
        if not client.initialize():
            print("初始化失败")
            return
        session_info = client.get_session_info()
        print(f" 会话信息: {session_info}")
        tools = client.list_tools()
        print(f"可用工具: {[tool['name'] for tool in tools]}")

        result = client.call_tool("calculate", {"expression": "10 / 2 + 5"})
        if result:
            content = result.get("content", [])
            if content:
                print(f"计算结果: {content[0].get('text', '')}")
        # 打印分隔符,表示开始演示Completion功能
+       print("\n=== Completion功能 ===")
        # 打印分隔符,表示开始演示语言补全
+       print("\n🔤 语言补全演示:")
        # 调用request_completion方法,请求针对'language'字段输入'py'的补全建议
+       language_completion = client.request_completion(
+           "ref/prompt", "code_review", "language", "py"
+       )
        # 如果成功获取到补全结果
+       if language_completion:
            # 从返回结果中获取'values'字段,默认为空列表
+           values = language_completion.get("values", [])
            # 打印前5个补全建议
+           print(f"输入 'py' 的补全建议: {values[:5]}...")  # 显示前5个
    except Exception as e:
        print(f"HTTP错误: {e}")


# 主函数
def main():
    """主函数"""
    # 调用HTTP传输函数
    http_transport()


# 判断是否为主程序入口
if __name__ == "__main__":
    main()

1.2. server.py #

server.py

# 导入logging模块,用于日志记录
import logging

# 导入json模块,用于处理JSON数据
import json

# 导入uuid模块,用于生成唯一会话ID
import uuid

# 导入time模块,用于时间戳
import time

# 从tools模块导入tools对象和completion_data
+from tools import tools, completion_data

# 导入datetime模块,用于处理日期和时间


# 从http.server模块导入ThreadingHTTPServer和BaseHTTPRequestHandler类
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler

# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)

# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
    """MCP服务器核心类"""

    # 初始化方法
    def __init__(self):
        # 协议版本号
        self.protocol_version = "2025-06-18"
        # 服务器信息,包括名称和版本
        self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
        # 服务器能力,包括工具、资源和提示
        self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
        # 会话管理字典
        self.sessions = {}

    # 获取所有活跃会话信息
    def get_all_sessions(self):
        """获取所有活跃会话信息"""
        return self.sessions.copy()

    def _cleanup_expired_sessions(self, max_age: int = 3600):
        """清理过期会话(默认1小时)"""
        current_time = time.time()
        expired_sessions = []

        for session_id, session_data in self.sessions.items():
            if current_time - session_data["last_activity"] > max_age:
                expired_sessions.append(session_id)

        for session_id in expired_sessions:
            del self.sessions[session_id]
            logger.info(f"清理过期会话: {session_id}")

        return len(expired_sessions)

    # 处理工具列表请求
    def handle_tools_list(self, params=None, session_id: str = None):
        """处理工具列表请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)
        # 返回工具列表
        return {"tools": tools}

    # 处理初始化请求(旧方法,已被覆盖)
    def handle_initialize(self, params, session_id: str = None):
        """处理初始化请求"""
        # 返回服务器能力
        return {"capabilities": self.capabilities}

    # 生成唯一的会话ID
    def _generate_session_id(self) -> str:
        """生成唯一的会话ID"""
        # 使用uuid4生成唯一ID
        return str(uuid.uuid4())

    # 获取会话信息
    def get_session_info(self, session_id: str):
        """获取会话信息"""
        # 如果会话ID存在于sessions字典中,返回会话信息
        if session_id in self.sessions:
            return self.sessions[session_id]
        # 否则返回None
        return None

    # 获取或创建会话
    def _get_or_create_session(self, session_id: str = None):
        """获取或创建会话"""
        # 如果没有传入会话ID,则生成一个新的
        if not session_id:
            session_id = self._generate_session_id()
        # 如果会话ID不存在于sessions字典中,则新建一个会话
        if session_id not in self.sessions:
            self.sessions[session_id] = {
                "created_at": time.time(),
                "last_activity": time.time(),
                "message_count": 0,
                "client_info": None,
            }
            # 记录新会话的创建
            logger.info(f"创建新会话: {session_id}")
        # 更新会话的最后活动时间
        self.sessions[session_id]["last_activity"] = time.time()
        # 增加消息计数
        self.sessions[session_id]["message_count"] += 1
        # 返回会话ID
        return session_id

    # 处理工具调用请求
    def handle_tools_call(self, params, session_id: str = None):
        """处理工具调用请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)

        # 获取调用列表
        calls = params.get("calls", [])
        # 初始化结果列表
        results = []

        # 遍历每个调用
        for call in calls:
            # 获取工具名称
            tool_name = call.get("name")
            # 获取参数
            arguments = call.get("arguments", {})

            # 如果工具名称为"calculate"
            if tool_name == "calculate":
                # 获取表达式
                expression = arguments.get("expression", "")
                try:
                    # 计算表达式结果
                    result = eval(expression)
                    # 添加成功结果到结果列表
                    results.append(
                        {
                            "name": tool_name,
                            "content": [
                                {
                                    "type": "text",
                                    "text": f"计算结果: {expression} = {result}",
                                }
                            ],
                        }
                    )
                except Exception as e:
                    # 计算出错,添加错误信息
                    results.append(
                        {
                            "name": tool_name,
                            "isError": True,
                            "error": {"message": f"计算错误: {str(e)}"},
                        }
                    )
            else:
                # 未知工具,添加错误信息
                results.append(
                    {
                        "name": tool_name,
                        "isError": True,
                        "error": {"message": f"未知工具: {tool_name}"},
                    }
                )

        # 返回所有调用的结果
        return {"calls": results}

    # 处理补全请求
+   def handle_completion_complete(self, params, session_id: str = None):
+       """处理补全请求"""
        # 如果有会话ID,则获取或创建会话
+       if session_id:
+           self._get_or_create_session(session_id)

        # 获取引用类型和参数信息
+       ref = params.get("ref", {})
+       ref_type = ref.get("type")

+       argument = params.get("argument", {})
+       argument_name = argument.get("name")
+       argument_value = argument.get("value", "")

        # 根据引用类型和参数名提供相应的补全建议
+       completion_values = []

+       if ref_type == "ref/prompt":
            # 处理提示补全
+           prompt_name = ref.get("name")
+           if prompt_name == "code_review":
+               if argument_name == "language":
                    # 语言补全
+                   completion_values = self._get_language_completions(argument_value)
        # 限制返回结果数量(规范要求最多100个)
+       if len(completion_values) > 100:
+           completion_values = completion_values[:100]

+       return {
+           "completion": {
+               "values": completion_values,
+               "total": len(completion_values),
+               "hasMore": False,
+           }
+       }

    # 获取语言补全建议
+   def _get_language_completions(self, partial_value: str):
+       """获取语言补全建议"""
+       if not partial_value:
+           return completion_data["languages"]

        # 模糊匹配
+       return [
+           lang
+           for lang in completion_data["languages"]
+           if partial_value.lower() in lang.lower()
+       ]

    # 处理初始化请求(实际生效的方法,覆盖上面的方法)
    def handle_initialize(self, params, session_id: str = None):
        """处理初始化请求"""
        # 获取客户端信息
        client_info = params.get("clientInfo", {})
        # 获取或创建会话ID
        session_id = self._get_or_create_session(session_id)
        # 存储客户端信息到会话
        self.sessions[session_id]["client_info"] = client_info
        # 记录客户端初始化信息
        logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
        # 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
        return {
            "protocolVersion": self.protocol_version,
            "capabilities": self.capabilities,
            "serverInfo": self.server_info,
            "sessionId": session_id,  # 返回会话ID给客户端
        }


# 定义MCPHTTPHandler类,继承自BaseHTTPRequestHandler
class MCPHTTPHandler(BaseHTTPRequestHandler):
    """HTTP处理器,支持Streamable HTTP传输"""

    # 初始化方法,接收mcp_server参数
    def __init__(self, *args, mcp_server=None, **kwargs):
        # 保存MCP服务器实例
        self.mcp_server = mcp_server
        # 调用父类的初始化方法
        super().__init__(*args, **kwargs)

    # 处理MCP消息
    def _handle_mcp_message(self, message, session_id: str = None):
        """处理MCP消息"""
        # 获取方法名
        method = message.get("method")
        # 获取参数
        params = message.get("params", {})
        # 获取消息ID
        msg_id = message.get("id")
        # 判断方法类型
        if method == "initialize":
            # 调用服务器的handle_initialize方法
            result = self.mcp_server.handle_initialize(params, session_id)
            # 如果有消息ID,返回带ID的结果
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "tools/list":
            # 处理工具列表请求
            result = self.mcp_server.handle_tools_list(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "tools/call":
            # 处理工具调用请求
            result = self.mcp_server.handle_tools_call(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
+       elif method == "completion/complete":
            # 处理补全请求
+           result = self.mcp_server.handle_completion_complete(params, session_id)
+           return {"id": msg_id, "result": result} if msg_id else None
        else:
            # 如果方法不存在,返回错误
            if msg_id:
                return {
                    "id": msg_id,
                    "error": {"code": -32601, "message": f"Method not found: {method}"},
                }
            # 没有ID则返回None
            return None

    # 处理POST请求(发送消息到服务器)
    def do_POST(self):
        """处理POST请求(发送消息到服务器)"""
        try:
            # 检查请求路径是否为/mcp
            if self.path != "/mcp":
                self.send_error(404, "MCP endpoint not found")
                return
            # 检查Accept头是否包含application/json
            accept_header = self.headers.get("Accept", "")
            if "application/json" not in accept_header:
                self.send_error(400, "Missing required Accept header")
                return
            # 检查MCP-Protocol-Version头是否匹配
            protocol_version = self.headers.get("MCP-Protocol-Version")
            if (
                protocol_version
                and protocol_version != self.mcp_server.protocol_version
            ):
                self.send_error(
                    400, f"Unsupported protocol version: {protocol_version}"
                )
                return
            # 获取会话ID
            session_id = self.headers.get("Mcp-Session-Id")
            # 读取请求体长度
            content_length = int(self.headers.get("Content-Length", 0))
            # 读取请求体内容
            body = self.rfile.read(content_length)
            try:
                # 尝试解析JSON消息
                message = json.loads(body.decode("utf-8"))
            except json.JSONDecodeError:
                # JSON解析失败
                self.send_error(400, "Invalid JSON")
                return
            # 处理MCP消息
            response = self._handle_mcp_message(message, session_id)
            # 发送响应
            if response:
                # 发送200响应码
                self.send_response(200)
                # 设置响应头Content-Type为application/json
                self.send_header("Content-Type", "application/json")
                # 设置协议版本头
                self.send_header(
                    "MCP-Protocol-Version", self.mcp_server.protocol_version
                )
                # 如果响应中包含会话ID,则添加到响应头
                if "result" in response and "sessionId" in response["result"]:
                    self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
                # 结束响应头
                self.end_headers()
                # 写入JSON响应体
                self.wfile.write(json.dumps(response).encode("utf-8"))
            else:
                # 没有响应内容,返回202 Accepted
                self.send_response(202)  # Accepted
                self.end_headers()
        except Exception as e:
            # 记录错误日志
            logger.error(f"POST处理错误: {e}")
            # 返回500内部服务器错误
            self.send_error(500, "Internal Server Error")


# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
    """运行HTTP服务器"""

    # 定义处理器工厂函数,用于传递mcp_server实例
    def handler_factory(*args, **kwargs):
        return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)

    # 创建ThreadingHTTPServer实例,绑定主机和端口
    server = ThreadingHTTPServer((host, port), handler_factory)
    # 记录服务器启动信息
    logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")
    # 记录会话管理功能启用信息
    logger.info("会话管理功能已启用,支持会话ID跟踪和自动清理")

    # 启动会话清理线程
    import threading

    # 定义会话清理函数
    def cleanup_sessions():
        while True:
            try:
                # 每5分钟清理一次
                time.sleep(300)
                # 调用会话清理方法
                cleaned_count = mcp_server._cleanup_expired_sessions()
                # 如果有清理的会话,记录日志
                if cleaned_count > 0:
                    logger.info(f"清理了 {cleaned_count} 个过期会话")
                # 记录当前活跃会话数
                active_sessions = len(mcp_server.get_all_sessions())
                logger.debug(f"当前活跃会话数: {active_sessions}")
            except Exception as e:
                # 清理出错,记录日志
                logger.error(f"会话清理错误: {e}")

    # 启动清理线程
    cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
    cleanup_thread.start()
    logger.info("会话清理线程已启动")
    try:
        # 启动服务器,进入循环监听请求
        server.serve_forever()
    except KeyboardInterrupt:
        # 捕获Ctrl+C,记录服务器停止
        logger.info("HTTP服务器已停止")
        logger.info("正在清理所有会话...")
        # 清理所有会话
        all_sessions = mcp_server.get_all_sessions()
        for session_id in list(all_sessions.keys()):
            del mcp_server.sessions[session_id]
        logger.info(f"已清理 {len(all_sessions)} 个会话")
    finally:
        # 关闭服务器
        server.server_close()


# 定义主函数
def main():
    """主函数"""
    # 导入argparse模块,用于解析命令行参数
    import argparse

    # 创建ArgumentParser对象,设置描述信息
    parser = argparse.ArgumentParser(description="MCP HTTP服务器")
    # 添加--host参数,指定服务器主机,默认localhost
    parser.add_argument(
        "--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
    )
    # 添加--port参数,指定服务器端口,默认8000
    parser.add_argument(
        "--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
    )

    # 解析命令行参数
    args = parser.parse_args()

    # 创建MCP服务器实例
    mcp_server = MCPServer()

    # 运行HTTP服务器
    run_http_server(mcp_server, args.host, args.port)


# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

1.3. tools.py #

tools.py

# MCP工具定义

tools = [
    {
        "name": "calculate",
        "description": "执行数学计算",
        "inputSchema": {
            "type": "object",
            "properties": {
                "expression": {
                    "type": "string",
                    "description": "数学表达式",
                }
            },
            "required": ["expression"],
        },
    },
]

# 定义补全数据字典
+completion_data = {
    # 支持的编程语言列表
+   "languages": [
+       "python",
+       "javascript",
+       "typescript",
+       "java",
+       "c++",
+       "c#",
+       "go",
+       "rust",
+       "php",
+       "ruby",
+       "swift",
+       "kotlin",
+       "scala",
+       "dart",
+       "elixir",
+       "clojure",
+   ],
    # 各编程语言对应的主流框架
+   "frameworks": {
        # Python常用框架
+       "python": [
+           "django",
+           "flask",
+           "fastapi",
+           "pytorch",
+           "tensorflow",
+           "pandas",
+           "numpy",
+       ],
        # JavaScript常用框架
+       "javascript": ["react", "vue", "angular", "express", "next.js", "nuxt.js"],
        # TypeScript常用框架
+       "typescript": ["react", "vue", "angular", "express", "next.js", "nuxt.js"],
        # Java常用框架
+       "java": ["spring", "spring boot", "hibernate", "maven", "gradle"],
        # C++常用框架
+       "c++": ["boost", "qt", "wxwidgets", "stl"],
        # C#常用框架
+       "c#": [".net", "asp.net", "entity framework", "xamarin"],
        # Go常用框架
+       "go": ["gin", "echo", "fiber", "gorilla", "cobra"],
        # Rust常用框架
+       "rust": ["tokio", "actix", "rocket", "serde", "clap"],
+   },
    # 支持的分析类型
+   "analysis_types": [
+       "syntax",  # 语法分析
+       "complexity",  # 复杂度分析
+       "security",  # 安全性分析
+       "performance",  # 性能分析
+       "style",  # 代码风格分析
+       "documentation",  # 文档分析
+   ],
    # 支持的文件扩展名
+   "file_extensions": [
+       ".py",  # Python文件
+       ".js",  # JavaScript文件
+       ".ts",  # TypeScript文件
+       ".java",  # Java文件
+       ".cpp",  # C++文件
+       ".cs",  # C#文件
+       ".go",  # Go文件
+       ".rs",  # Rust文件
+       ".php",  # PHP文件
+       ".rb",  # Ruby文件
+   ],
+}

2. 框架补全 #

2.1. client.py #

client.py

# 导入json模块,用于处理JSON数据
import json

# 导入logging模块,用于日志记录
import logging

# 导入requests模块,用于发送HTTP请求
import requests

# 从urllib.parse模块导入urljoin,用于拼接URL
from urllib.parse import urljoin

# 配置日志级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义MCPClient类,表示MCP HTTP客户端
class MCPClient:
    """MCP HTTP客户端"""

    # 初始化方法,设置服务器URL和相关属性
    def __init__(self, server_url: str):
        # 协议版本号
        self.protocol_version = "2025-06-18"
        # 客户端信息
        self.client_info = {"name": "MCP客户端", "version": "1.0.0"}
        # 服务器信息,初始化为None
        self.server_info = None
        # 服务器能力,初始化为None
        self.capabilities = None
        # 会话ID,初始化为None
        self.session_id = None
        # 消息ID,初始值为1
        self.message_id = 1

        # 去除服务器URL末尾的斜杠
        self.server_url = server_url.rstrip("/")
        # 拼接MCP接口的完整URL
        self.mcp_endpoint = urljoin(self.server_url, "/mcp")
        # 创建requests的Session对象
        self.session = requests.Session()
        # 设置Session的默认请求头
        self.session.headers.update(
            {
                "Accept": "application/json",
                "MCP-Protocol-Version": self.protocol_version,
            }
        )

    # 请求补全建议方法
    def request_completion(
        self,  # 当前实例
        ref_type,  # 引用类型:ref/prompt
        ref_name,  # 引用名称:code_review
        argument_name,  # 参数名:language
        argument_value,  # 参数值:py
+       context_arguments=None,  # 上下文参数
    ):
        # 方法文档字符串,说明用途
        """请求补全建议"""
        # 构造补全请求的消息体
        request = {
            "jsonrpc": "2.0",  # 指定JSON-RPC协议版本
            "id": self._generate_id(),  # 生成唯一消息ID
            "method": "completion/complete",  # 指定调用的方法
            "params": {
                "ref": {"type": ref_type, "name": ref_name},  # 引用类型和名称
                "argument": {
                    "name": argument_name,
                    "value": argument_value,
                },  # 参数名和参数值
            },
        }
        # 如果有上下文参数,添加到请求中
+       if context_arguments:
+           request["params"]["context"] = {"arguments": context_arguments}
        # 通过私有方法发送请求消息,并获取响应
        response = self._send_message(request)
        # 判断响应是否存在且包含"result"字段
        if response and "result" in response:
            # 从响应中获取补全内容
            completion = response["result"].get("completion", {})
            # 记录获取到的补全建议数量
            logger.info(f"获取到 {len(completion.get('values', []))} 个补全建议")
            # 返回补全内容
            return completion
        else:
            # 如果请求失败,记录错误日志
            logger.error(f"请求补全失败")
            # 返回None表示失败
            return None

    # 重置会话方法
    def reset_session(self):
        """重置会话"""
        # 保存旧的会话ID
        old_session_id = self.session_id
        # 清空当前会话ID
        self.session_id = None
        # 记录会话重置信息
        logger.info(f"会话已重置,原会话ID: {old_session_id}")

    # 获取当前会话信息方法
    def get_session_info(self):
        """获取当前会话信息"""
        # 返回会话相关信息字典
        return {
            "session_id": self.session_id,
            "server_url": self.server_url,
            "protocol_version": self.protocol_version,
        }

    # 获取可用工具列表方法
    def list_tools(self):
        """获取可用工具列表"""
        # 构造请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "tools/list",
            "params": {},
        }

        # 发送请求并获取响应
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            # 获取工具列表
            tools = response["result"].get("tools", [])
            logger.info(f"可用工具数量: {len(tools)}")
            return tools
        else:
            # 获取失败,记录错误
            logger.error("获取工具列表失败")
            return []

    # 通过HTTP发送消息的私有方法
    def _send_message(self, message):
        """通过HTTP发送消息"""
        try:
            # 初始化请求头字典
            headers = {}
            # 如果有会话ID,则添加到请求头
            if self.session_id:
                headers["Mcp-Session-Id"] = self.session_id
                logger.debug(f"使用会话ID: {self.session_id}")

            # 发送POST请求
            response = self.session.post(
                self.mcp_endpoint, json=message, headers=headers, timeout=30
            )

            # 检查响应状态码
            if response.status_code == 200:
                # 检查响应头中是否有新的会话ID
                new_session_id = response.headers.get("Mcp-Session-Id")
                # 如果有新的会话ID且与当前不同,则更新
                if new_session_id and new_session_id != self.session_id:
                    self.session_id = new_session_id
                    logger.info(f"更新会话ID: {self.session_id}")

                # 返回响应的JSON内容
                return response.json()
            # 如果状态码为202,表示已接受但无内容
            elif response.status_code == 202:
                # Accepted - 通知或响应
                return None
            # 如果状态码为400,表示请求错误
            elif response.status_code == 400:
                logger.error(f"请求错误: {response.text}")
                return None
            # 如果状态码为401,表示会话ID无效
            elif response.status_code == 401:
                logger.error("会话ID无效,需要重新初始化")
                self.session_id = None
                return None
            # 其他HTTP错误
            else:
                logger.error(f"HTTP错误: {response.status_code} - {response.text}")
                return None

        # 捕获请求异常
        except requests.exceptions.RequestException as e:
            logger.error(f"HTTP请求错误: {e}")
            return None

    # 生成消息ID的方法
    def _generate_id(self) -> str:
        """生成消息ID"""
        # 消息ID自增
        self.message_id += 1
        # 返回字符串类型的消息ID
        return str(self.message_id)

    # 发送消息到服务器的方法(重复定义,实际只会用后面这个)
    def _send_message(self, message):
        """通过HTTP发送消息"""
        try:
            # 初始化请求头字典
            headers = {}
            # 如果有会话ID,则添加到请求头
            if self.session_id:
                headers["Mcp-Session-Id"] = self.session_id
                logger.debug(f"使用会话ID: {self.session_id}")

            # 发送POST请求到MCP端点
            response = self.session.post(
                self.mcp_endpoint, json=message, headers=headers, timeout=30
            )

            # 检查响应状态码
            if response.status_code == 200:
                # 从响应头获取新的会话ID
                new_session_id = response.headers.get("Mcp-Session-Id")
                # 如果有新的会话ID且与当前不同,则更新
                if new_session_id and new_session_id != self.session_id:
                    self.session_id = new_session_id
                    logger.info(f"更新会话ID: {self.session_id}")

                # 返回响应的JSON内容
                return response.json()
            # 如果状态码为202,表示已接受但无内容
            elif response.status_code == 202:
                # Accepted - 通知或响应
                return None
            # 如果状态码为400,表示请求错误
            elif response.status_code == 400:
                logger.error(f"请求错误: {response.text}")
                return None
            # 如果状态码为401,表示会话ID无效
            elif response.status_code == 401:
                logger.error("会话ID无效,需要重新初始化")
                self.session_id = None
                return None
            # 其他HTTP错误
            else:
                logger.error(f"HTTP错误: {response.status_code} - {response.text}")
                return None

        # 捕获请求异常
        except requests.exceptions.RequestException as e:
            logger.error(f"HTTP请求错误: {e}")
            return None

    # 调用工具方法
    def call_tool(self, tool_name, arguments):
        """调用工具"""
        # 构造调用工具的请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "tools/call",
            "params": {"calls": [{"name": tool_name, "arguments": arguments}]},
        }

        # 发送请求并获取响应
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            # 获取调用结果
            calls = response["result"].get("calls", [])
            if calls:
                return calls[0]
        else:
            # 调用失败,记录错误
            logger.error(f"调用工具 {tool_name} 失败")
        return None

    # 初始化与服务器连接的方法
    def initialize(self) -> bool:
        """初始化与服务器的连接"""
        # 构造初始化请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "initialize",
            "params": {
                "protocolVersion": self.protocol_version,
                "capabilities": {},
                "clientInfo": self.client_info,
            },
        }

        # 发送初始化请求
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            result = response["result"]
            # 获取服务器信息
            self.server_info = result.get("serverInfo")
            # 获取服务器能力
            self.capabilities = result.get("capabilities")

            # 提取并存储会话ID
            if "sessionId" in result:
                self.session_id = result["sessionId"]
                logger.info(f"获取到会话ID: {self.session_id}")

            logger.info(f"服务器初始化成功: {self.server_info}")
            return True
        else:
            # 初始化失败,记录错误
            logger.error("服务器初始化失败")
            return False


# HTTP传输的函数
def http_transport():
    """HTTP传输"""
    print("=== HTTP传输 ===")

    client = MCPClient("http://localhost:8000")

    try:
        if not client.initialize():
            print("初始化失败")
            return
        session_info = client.get_session_info()
        print(f" 会话信息: {session_info}")
        tools = client.list_tools()
        print(f"可用工具: {[tool['name'] for tool in tools]}")

        result = client.call_tool("calculate", {"expression": "10 / 2 + 5"})
        if result:
            content = result.get("content", [])
            if content:
                print(f"计算结果: {content[0].get('text', '')}")
        # 打印分隔符,表示开始演示Completion功能
        print("\n=== Completion功能 ===")
        # 打印分隔符,表示开始演示语言补全
        print("\n🔤 语言补全演示:")
        # 调用request_completion方法,请求针对'language'字段输入'py'的补全建议
        language_completion = client.request_completion(
            "ref/prompt", "code_review", "language", "py"
        )
        # 如果成功获取到补全结果
        if language_completion:
            # 从返回结果中获取'values'字段,默认为空列表
            values = language_completion.get("values", [])
            # 打印前5个补全建议
            print(f"输入 'py' 的补全建议: {values[:5]}...")  # 显示前5个

        # 2. 演示框架补全(无上下文)
+       print("\n🏗️  框架补全演示(无上下文):")
+       framework_completion = client.request_completion(
+           "ref/prompt", "code_review", "framework", "fla"
+       )
+       if framework_completion:
+           values = framework_completion.get("values", [])
+           print(f"输入 'fla' 的补全建议: {values[:5]}...")
        # 3. 演示框架补全(有上下文 - 已选择Python语言)
+       print("\n🐍 框架补全演示(已选择Python语言):")
+       framework_completion_with_context = client.request_completion(
+           "ref/prompt",
+           "code_review",
+           "framework",
+           "fla",
+           context_arguments={"language": "python"},
+       )
+       if framework_completion_with_context:
+           values = framework_completion_with_context.get("values", [])
+           print(f"在Python语言下,输入 'fla' 的补全建议: {values}")
    except Exception as e:
        print(f"HTTP错误: {e}")


# 主函数
def main():
    """主函数"""
    # 调用HTTP传输函数
    http_transport()


# 判断是否为主程序入口
if __name__ == "__main__":
    main()

2.2. server.py #

server.py

# 导入logging模块,用于日志记录
import logging

# 导入json模块,用于处理JSON数据
import json

# 导入uuid模块,用于生成唯一会话ID
import uuid

# 导入time模块,用于时间戳
import time

# 从tools模块导入tools对象和completion_data
from tools import tools, completion_data

# 导入datetime模块,用于处理日期和时间


# 从http.server模块导入ThreadingHTTPServer和BaseHTTPRequestHandler类
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler

# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)

# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
    """MCP服务器核心类"""

    # 初始化方法
    def __init__(self):
        # 协议版本号
        self.protocol_version = "2025-06-18"
        # 服务器信息,包括名称和版本
        self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
        # 服务器能力,包括工具、资源和提示
        self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
        # 会话管理字典
        self.sessions = {}

    # 获取所有活跃会话信息
    def get_all_sessions(self):
        """获取所有活跃会话信息"""
        return self.sessions.copy()

    def _cleanup_expired_sessions(self, max_age: int = 3600):
        """清理过期会话(默认1小时)"""
        current_time = time.time()
        expired_sessions = []

        for session_id, session_data in self.sessions.items():
            if current_time - session_data["last_activity"] > max_age:
                expired_sessions.append(session_id)

        for session_id in expired_sessions:
            del self.sessions[session_id]
            logger.info(f"清理过期会话: {session_id}")

        return len(expired_sessions)

    # 处理工具列表请求
    def handle_tools_list(self, params=None, session_id: str = None):
        """处理工具列表请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)
        # 返回工具列表
        return {"tools": tools}

    # 处理初始化请求(旧方法,已被覆盖)
    def handle_initialize(self, params, session_id: str = None):
        """处理初始化请求"""
        # 返回服务器能力
        return {"capabilities": self.capabilities}

    # 生成唯一的会话ID
    def _generate_session_id(self) -> str:
        """生成唯一的会话ID"""
        # 使用uuid4生成唯一ID
        return str(uuid.uuid4())

    # 获取会话信息
    def get_session_info(self, session_id: str):
        """获取会话信息"""
        # 如果会话ID存在于sessions字典中,返回会话信息
        if session_id in self.sessions:
            return self.sessions[session_id]
        # 否则返回None
        return None

    # 获取或创建会话
    def _get_or_create_session(self, session_id: str = None):
        """获取或创建会话"""
        # 如果没有传入会话ID,则生成一个新的
        if not session_id:
            session_id = self._generate_session_id()
        # 如果会话ID不存在于sessions字典中,则新建一个会话
        if session_id not in self.sessions:
            self.sessions[session_id] = {
                "created_at": time.time(),
                "last_activity": time.time(),
                "message_count": 0,
                "client_info": None,
            }
            # 记录新会话的创建
            logger.info(f"创建新会话: {session_id}")
        # 更新会话的最后活动时间
        self.sessions[session_id]["last_activity"] = time.time()
        # 增加消息计数
        self.sessions[session_id]["message_count"] += 1
        # 返回会话ID
        return session_id

    # 处理工具调用请求
    def handle_tools_call(self, params, session_id: str = None):
        """处理工具调用请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)

        # 获取调用列表
        calls = params.get("calls", [])
        # 初始化结果列表
        results = []

        # 遍历每个调用
        for call in calls:
            # 获取工具名称
            tool_name = call.get("name")
            # 获取参数
            arguments = call.get("arguments", {})

            # 如果工具名称为"calculate"
            if tool_name == "calculate":
                # 获取表达式
                expression = arguments.get("expression", "")
                try:
                    # 计算表达式结果
                    result = eval(expression)
                    # 添加成功结果到结果列表
                    results.append(
                        {
                            "name": tool_name,
                            "content": [
                                {
                                    "type": "text",
                                    "text": f"计算结果: {expression} = {result}",
                                }
                            ],
                        }
                    )
                except Exception as e:
                    # 计算出错,添加错误信息
                    results.append(
                        {
                            "name": tool_name,
                            "isError": True,
                            "error": {"message": f"计算错误: {str(e)}"},
                        }
                    )
            else:
                # 未知工具,添加错误信息
                results.append(
                    {
                        "name": tool_name,
                        "isError": True,
                        "error": {"message": f"未知工具: {tool_name}"},
                    }
                )

        # 返回所有调用的结果
        return {"calls": results}

    # 处理补全请求
    def handle_completion_complete(self, params, session_id: str = None):
        """处理补全请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)

        # 获取引用类型和参数信息
        ref = params.get("ref", {})
+       context = params.get("context", {})
        ref_type = ref.get("type")

        argument = params.get("argument", {})
        argument_name = argument.get("name")
        argument_value = argument.get("value", "")

        # 根据引用类型和参数名提供相应的补全建议
        completion_values = []

        if ref_type == "ref/prompt":
            # 处理提示补全
            prompt_name = ref.get("name")
            if prompt_name == "code_review":
                if argument_name == "language":
                    # 语言补全
                    completion_values = self._get_language_completions(argument_value)
+               elif argument_name == "framework":
                    # 框架补全,需要根据已选择的语言
+                   selected_language = context.get("arguments", {}).get("language")
+                   if selected_language:
+                       completion_values = self._get_framework_completions(
+                           selected_language, argument_value
+                       )
+                   else:
+                       completion_values = self._get_all_framework_completions(
+                           argument_value
+                       )

        # 限制返回结果数量(规范要求最多100个)
        if len(completion_values) > 100:
            completion_values = completion_values[:100]

        return {
            "completion": {
                "values": completion_values,
                "total": len(completion_values),
                "hasMore": False,
            }
        }

    # 获取框架补全建议
+   def _get_framework_completions(self, language: str, partial_value: str):
+       """根据语言获取框架补全建议"""
+       frameworks = completion_data["frameworks"].get(language, [])
+       if not partial_value:
+           return frameworks

        # 模糊匹配
+       return [fw for fw in frameworks if partial_value.lower() in fw.lower()]

    # 获取所有框架补全建议
+   def _get_all_framework_completions(self, partial_value: str):
+       """获取所有框架补全建议"""
+       all_frameworks = []
+       for frameworks in completion_data["frameworks"].values():
+           all_frameworks.extend(frameworks)

+       if not partial_value:
+           return all_frameworks

        # 模糊匹配
+       return [fw for fw in all_frameworks if partial_value.lower() in fw.lower()]

    # 获取语言补全建议
    def _get_language_completions(self, partial_value: str):
        """获取语言补全建议"""
        if not partial_value:
            return completion_data["languages"]

        # 模糊匹配
        return [
            lang
            for lang in completion_data["languages"]
            if partial_value.lower() in lang.lower()
        ]

    # 处理初始化请求(实际生效的方法,覆盖上面的方法)
    def handle_initialize(self, params, session_id: str = None):
        """处理初始化请求"""
        # 获取客户端信息
        client_info = params.get("clientInfo", {})
        # 获取或创建会话ID
        session_id = self._get_or_create_session(session_id)
        # 存储客户端信息到会话
        self.sessions[session_id]["client_info"] = client_info
        # 记录客户端初始化信息
        logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
        # 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
        return {
            "protocolVersion": self.protocol_version,
            "capabilities": self.capabilities,
            "serverInfo": self.server_info,
            "sessionId": session_id,  # 返回会话ID给客户端
        }


# 定义MCPHTTPHandler类,继承自BaseHTTPRequestHandler
class MCPHTTPHandler(BaseHTTPRequestHandler):
    """HTTP处理器,支持Streamable HTTP传输"""

    # 初始化方法,接收mcp_server参数
    def __init__(self, *args, mcp_server=None, **kwargs):
        # 保存MCP服务器实例
        self.mcp_server = mcp_server
        # 调用父类的初始化方法
        super().__init__(*args, **kwargs)

    # 处理MCP消息
    def _handle_mcp_message(self, message, session_id: str = None):
        """处理MCP消息"""
        # 获取方法名
        method = message.get("method")
        # 获取参数
        params = message.get("params", {})
        # 获取消息ID
        msg_id = message.get("id")
        # 判断方法类型
        if method == "initialize":
            # 调用服务器的handle_initialize方法
            result = self.mcp_server.handle_initialize(params, session_id)
            # 如果有消息ID,返回带ID的结果
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "tools/list":
            # 处理工具列表请求
            result = self.mcp_server.handle_tools_list(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "tools/call":
            # 处理工具调用请求
            result = self.mcp_server.handle_tools_call(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "completion/complete":
            # 处理补全请求
            result = self.mcp_server.handle_completion_complete(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
        else:
            # 如果方法不存在,返回错误
            if msg_id:
                return {
                    "id": msg_id,
                    "error": {"code": -32601, "message": f"Method not found: {method}"},
                }
            # 没有ID则返回None
            return None

    # 处理POST请求(发送消息到服务器)
    def do_POST(self):
        """处理POST请求(发送消息到服务器)"""
        try:
            # 检查请求路径是否为/mcp
            if self.path != "/mcp":
                self.send_error(404, "MCP endpoint not found")
                return
            # 检查Accept头是否包含application/json
            accept_header = self.headers.get("Accept", "")
            if "application/json" not in accept_header:
                self.send_error(400, "Missing required Accept header")
                return
            # 检查MCP-Protocol-Version头是否匹配
            protocol_version = self.headers.get("MCP-Protocol-Version")
            if (
                protocol_version
                and protocol_version != self.mcp_server.protocol_version
            ):
                self.send_error(
                    400, f"Unsupported protocol version: {protocol_version}"
                )
                return
            # 获取会话ID
            session_id = self.headers.get("Mcp-Session-Id")
            # 读取请求体长度
            content_length = int(self.headers.get("Content-Length", 0))
            # 读取请求体内容
            body = self.rfile.read(content_length)
            try:
                # 尝试解析JSON消息
                message = json.loads(body.decode("utf-8"))
            except json.JSONDecodeError:
                # JSON解析失败
                self.send_error(400, "Invalid JSON")
                return
            # 处理MCP消息
            response = self._handle_mcp_message(message, session_id)
            # 发送响应
            if response:
                # 发送200响应码
                self.send_response(200)
                # 设置响应头Content-Type为application/json
                self.send_header("Content-Type", "application/json")
                # 设置协议版本头
                self.send_header(
                    "MCP-Protocol-Version", self.mcp_server.protocol_version
                )
                # 如果响应中包含会话ID,则添加到响应头
                if "result" in response and "sessionId" in response["result"]:
                    self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
                # 结束响应头
                self.end_headers()
                # 写入JSON响应体
                self.wfile.write(json.dumps(response).encode("utf-8"))
            else:
                # 没有响应内容,返回202 Accepted
                self.send_response(202)  # Accepted
                self.end_headers()
        except Exception as e:
            # 记录错误日志
            logger.error(f"POST处理错误: {e}")
            # 返回500内部服务器错误
            self.send_error(500, "Internal Server Error")


# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
    """运行HTTP服务器"""

    # 定义处理器工厂函数,用于传递mcp_server实例
    def handler_factory(*args, **kwargs):
        return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)

    # 创建ThreadingHTTPServer实例,绑定主机和端口
    server = ThreadingHTTPServer((host, port), handler_factory)
    # 记录服务器启动信息
    logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")
    # 记录会话管理功能启用信息
    logger.info("会话管理功能已启用,支持会话ID跟踪和自动清理")

    # 启动会话清理线程
    import threading

    # 定义会话清理函数
    def cleanup_sessions():
        while True:
            try:
                # 每5分钟清理一次
                time.sleep(300)
                # 调用会话清理方法
                cleaned_count = mcp_server._cleanup_expired_sessions()
                # 如果有清理的会话,记录日志
                if cleaned_count > 0:
                    logger.info(f"清理了 {cleaned_count} 个过期会话")
                # 记录当前活跃会话数
                active_sessions = len(mcp_server.get_all_sessions())
                logger.debug(f"当前活跃会话数: {active_sessions}")
            except Exception as e:
                # 清理出错,记录日志
                logger.error(f"会话清理错误: {e}")

    # 启动清理线程
    cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
    cleanup_thread.start()
    logger.info("会话清理线程已启动")
    try:
        # 启动服务器,进入循环监听请求
        server.serve_forever()
    except KeyboardInterrupt:
        # 捕获Ctrl+C,记录服务器停止
        logger.info("HTTP服务器已停止")
        logger.info("正在清理所有会话...")
        # 清理所有会话
        all_sessions = mcp_server.get_all_sessions()
        for session_id in list(all_sessions.keys()):
            del mcp_server.sessions[session_id]
        logger.info(f"已清理 {len(all_sessions)} 个会话")
    finally:
        # 关闭服务器
        server.server_close()


# 定义主函数
def main():
    """主函数"""
    # 导入argparse模块,用于解析命令行参数
    import argparse

    # 创建ArgumentParser对象,设置描述信息
    parser = argparse.ArgumentParser(description="MCP HTTP服务器")
    # 添加--host参数,指定服务器主机,默认localhost
    parser.add_argument(
        "--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
    )
    # 添加--port参数,指定服务器端口,默认8000
    parser.add_argument(
        "--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
    )

    # 解析命令行参数
    args = parser.parse_args()

    # 创建MCP服务器实例
    mcp_server = MCPServer()

    # 运行HTTP服务器
    run_http_server(mcp_server, args.host, args.port)


# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

3. 分析类型补全 #

3.1. client.py #

client.py

# 导入json模块,用于处理JSON数据
import json

# 导入logging模块,用于日志记录
import logging

# 导入requests模块,用于发送HTTP请求
import requests

# 从urllib.parse模块导入urljoin,用于拼接URL
from urllib.parse import urljoin

# 配置日志级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义MCPClient类,表示MCP HTTP客户端
class MCPClient:
    """MCP HTTP客户端"""

    # 初始化方法,设置服务器URL和相关属性
    def __init__(self, server_url: str):
        # 协议版本号
        self.protocol_version = "2025-06-18"
        # 客户端信息
        self.client_info = {"name": "MCP客户端", "version": "1.0.0"}
        # 服务器信息,初始化为None
        self.server_info = None
        # 服务器能力,初始化为None
        self.capabilities = None
        # 会话ID,初始化为None
        self.session_id = None
        # 消息ID,初始值为1
        self.message_id = 1

        # 去除服务器URL末尾的斜杠
        self.server_url = server_url.rstrip("/")
        # 拼接MCP接口的完整URL
        self.mcp_endpoint = urljoin(self.server_url, "/mcp")
        # 创建requests的Session对象
        self.session = requests.Session()
        # 设置Session的默认请求头
        self.session.headers.update(
            {
                "Accept": "application/json",
                "MCP-Protocol-Version": self.protocol_version,
            }
        )

    # 请求补全建议方法
    def request_completion(
        self,  # 当前实例
+       ref_type,  # 引用类型:ref/prompt or ref/resource
+       ref_name,  # 引用名称:code_review or file:///path
+       argument_name,  # 参数名:language or path or analysis_type
+       argument_value,  # 参数值:py or /path/to/file.js or sec
        context_arguments=None,  # 上下文参数
    ):
        # 方法文档字符串,说明用途
        """请求补全建议"""
+       ref_obj = {"type": ref_type}
+       if ref_type == "ref/resource":
+           ref_obj["uri"] = ref_name
+       else:
+           ref_obj["name"] = ref_name
        # 构造补全请求的消息体
        request = {
            "jsonrpc": "2.0",  # 指定JSON-RPC协议版本
            "id": self._generate_id(),  # 生成唯一消息ID
            "method": "completion/complete",  # 指定调用的方法
            "params": {
+               "ref": ref_obj,  # 引用类型和名称
                "argument": {
                    "name": argument_name,
                    "value": argument_value,
                },  # 参数名和参数值
            },
        }
        # 如果有上下文参数,添加到请求中
        if context_arguments:
            request["params"]["context"] = {"arguments": context_arguments}
        # 通过私有方法发送请求消息,并获取响应
        response = self._send_message(request)
        # 判断响应是否存在且包含"result"字段
        if response and "result" in response:
            # 从响应中获取补全内容
            completion = response["result"].get("completion", {})
            # 记录获取到的补全建议数量
            logger.info(f"获取到 {len(completion.get('values', []))} 个补全建议")
            # 返回补全内容
            return completion
        else:
            # 如果请求失败,记录错误日志
            logger.error(f"请求补全失败")
            # 返回None表示失败
            return None

    # 重置会话方法
    def reset_session(self):
        """重置会话"""
        # 保存旧的会话ID
        old_session_id = self.session_id
        # 清空当前会话ID
        self.session_id = None
        # 记录会话重置信息
        logger.info(f"会话已重置,原会话ID: {old_session_id}")

    # 获取当前会话信息方法
    def get_session_info(self):
        """获取当前会话信息"""
        # 返回会话相关信息字典
        return {
            "session_id": self.session_id,
            "server_url": self.server_url,
            "protocol_version": self.protocol_version,
        }

    # 获取可用工具列表方法
    def list_tools(self):
        """获取可用工具列表"""
        # 构造请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "tools/list",
            "params": {},
        }

        # 发送请求并获取响应
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            # 获取工具列表
            tools = response["result"].get("tools", [])
            logger.info(f"可用工具数量: {len(tools)}")
            return tools
        else:
            # 获取失败,记录错误
            logger.error("获取工具列表失败")
            return []

    # 通过HTTP发送消息的私有方法
    def _send_message(self, message):
        """通过HTTP发送消息"""
        try:
            # 初始化请求头字典
            headers = {}
            # 如果有会话ID,则添加到请求头
            if self.session_id:
                headers["Mcp-Session-Id"] = self.session_id
                logger.debug(f"使用会话ID: {self.session_id}")

            # 发送POST请求
            response = self.session.post(
                self.mcp_endpoint, json=message, headers=headers, timeout=30
            )

            # 检查响应状态码
            if response.status_code == 200:
                # 检查响应头中是否有新的会话ID
                new_session_id = response.headers.get("Mcp-Session-Id")
                # 如果有新的会话ID且与当前不同,则更新
                if new_session_id and new_session_id != self.session_id:
                    self.session_id = new_session_id
                    logger.info(f"更新会话ID: {self.session_id}")

                # 返回响应的JSON内容
                return response.json()
            # 如果状态码为202,表示已接受但无内容
            elif response.status_code == 202:
                # Accepted - 通知或响应
                return None
            # 如果状态码为400,表示请求错误
            elif response.status_code == 400:
                logger.error(f"请求错误: {response.text}")
                return None
            # 如果状态码为401,表示会话ID无效
            elif response.status_code == 401:
                logger.error("会话ID无效,需要重新初始化")
                self.session_id = None
                return None
            # 其他HTTP错误
            else:
                logger.error(f"HTTP错误: {response.status_code} - {response.text}")
                return None

        # 捕获请求异常
        except requests.exceptions.RequestException as e:
            logger.error(f"HTTP请求错误: {e}")
            return None

    # 生成消息ID的方法
    def _generate_id(self) -> str:
        """生成消息ID"""
        # 消息ID自增
        self.message_id += 1
        # 返回字符串类型的消息ID
        return str(self.message_id)

    # 发送消息到服务器的方法(重复定义,实际只会用后面这个)
    def _send_message(self, message):
        """通过HTTP发送消息"""
        try:
            # 初始化请求头字典
            headers = {}
            # 如果有会话ID,则添加到请求头
            if self.session_id:
                headers["Mcp-Session-Id"] = self.session_id
                logger.debug(f"使用会话ID: {self.session_id}")

            # 发送POST请求到MCP端点
            response = self.session.post(
                self.mcp_endpoint, json=message, headers=headers, timeout=30
            )

            # 检查响应状态码
            if response.status_code == 200:
                # 从响应头获取新的会话ID
                new_session_id = response.headers.get("Mcp-Session-Id")
                # 如果有新的会话ID且与当前不同,则更新
                if new_session_id and new_session_id != self.session_id:
                    self.session_id = new_session_id
                    logger.info(f"更新会话ID: {self.session_id}")

                # 返回响应的JSON内容
                return response.json()
            # 如果状态码为202,表示已接受但无内容
            elif response.status_code == 202:
                # Accepted - 通知或响应
                return None
            # 如果状态码为400,表示请求错误
            elif response.status_code == 400:
                logger.error(f"请求错误: {response.text}")
                return None
            # 如果状态码为401,表示会话ID无效
            elif response.status_code == 401:
                logger.error("会话ID无效,需要重新初始化")
                self.session_id = None
                return None
            # 其他HTTP错误
            else:
                logger.error(f"HTTP错误: {response.status_code} - {response.text}")
                return None

        # 捕获请求异常
        except requests.exceptions.RequestException as e:
            logger.error(f"HTTP请求错误: {e}")
            return None

    # 调用工具方法
    def call_tool(self, tool_name, arguments):
        """调用工具"""
        # 构造调用工具的请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "tools/call",
            "params": {"calls": [{"name": tool_name, "arguments": arguments}]},
        }

        # 发送请求并获取响应
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            # 获取调用结果
            calls = response["result"].get("calls", [])
            if calls:
                return calls[0]
        else:
            # 调用失败,记录错误
            logger.error(f"调用工具 {tool_name} 失败")
        return None

    # 初始化与服务器连接的方法
    def initialize(self) -> bool:
        """初始化与服务器的连接"""
        # 构造初始化请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "initialize",
            "params": {
                "protocolVersion": self.protocol_version,
                "capabilities": {},
                "clientInfo": self.client_info,
            },
        }

        # 发送初始化请求
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            result = response["result"]
            # 获取服务器信息
            self.server_info = result.get("serverInfo")
            # 获取服务器能力
            self.capabilities = result.get("capabilities")

            # 提取并存储会话ID
            if "sessionId" in result:
                self.session_id = result["sessionId"]
                logger.info(f"获取到会话ID: {self.session_id}")

            logger.info(f"服务器初始化成功: {self.server_info}")
            return True
        else:
            # 初始化失败,记录错误
            logger.error("服务器初始化失败")
            return False


# HTTP传输的函数
def http_transport():
    """HTTP传输"""
    print("=== HTTP传输 ===")

    client = MCPClient("http://localhost:8000")

    try:
        if not client.initialize():
            print("初始化失败")
            return
        session_info = client.get_session_info()
        print(f" 会话信息: {session_info}")
        tools = client.list_tools()
        print(f"可用工具: {[tool['name'] for tool in tools]}")

        result = client.call_tool("calculate", {"expression": "10 / 2 + 5"})
        if result:
            content = result.get("content", [])
            if content:
                print(f"计算结果: {content[0].get('text', '')}")
        # 打印分隔符,表示开始演示Completion功能
        print("\n=== Completion功能 ===")
        # 打印分隔符,表示开始演示语言补全
        print("\n🔤 语言补全演示:")
        # 调用request_completion方法,请求针对'language'字段输入'py'的补全建议
        # language_completion = client.request_completion(
        #    "ref/prompt", "code_review", "language", "py"
        # )
        ## 如果成功获取到补全结果
        # if language_completion:
        #    # 从返回结果中获取'values'字段,默认为空列表
        #    values = language_completion.get("values", [])
        #    # 打印前5个补全建议
        #    print(f"输入 'py' 的补全建议: {values[:5]}...")  # 显示前5个
        #
        ## 2. 演示框架补全(无上下文)
        # print("\n🏗️  框架补全演示(无上下文):")
        # framework_completion = client.request_completion(
        #    "ref/prompt", "code_review", "framework", "fla"
        # )
        # if framework_completion:
        #    values = framework_completion.get("values", [])
        #    print(f"输入 'fla' 的补全建议: {values[:5]}...")
        ## 3. 演示框架补全(有上下文 - 已选择Python语言)
        # print("\n🐍 框架补全演示(已选择Python语言):")
        # framework_completion_with_context = client.request_completion(
        #    "ref/prompt",
        #    "code_review",
        #    "framework",
        #    "fla",
        #    context_arguments={"language": "python"},
        # )
        # if framework_completion_with_context:
        #    values = framework_completion_with_context.get("values", [])
        #    print(f"在Python语言下,输入 'fla' 的补全建议: {values}")
        # 4. 演示分析类型补全
+       print("\n📊 分析类型补全演示:")
+       analysis_completion = client.request_completion(
+           "ref/resource", "file:///path", "analysis_type", "sec"
        )
+       if analysis_completion:
+           values = analysis_completion.get("values", [])
+           print(f"输入 'sec' 的分析类型补全建议: {values}")
    except Exception as e:
        print(f"HTTP错误: {e}")


# 主函数
def main():
    """主函数"""
    # 调用HTTP传输函数
    http_transport()


# 判断是否为主程序入口
if __name__ == "__main__":
    main()

3.2. server.py #

server.py

# 导入logging模块,用于日志记录
import logging

# 导入json模块,用于处理JSON数据
import json

# 导入uuid模块,用于生成唯一会话ID
import uuid

# 导入time模块,用于时间戳
import time

# 从tools模块导入tools对象和completion_data
from tools import tools, completion_data

# 导入datetime模块,用于处理日期和时间


# 从http.server模块导入ThreadingHTTPServer和BaseHTTPRequestHandler类
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler

# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)

# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
    """MCP服务器核心类"""

    # 初始化方法
    def __init__(self):
        # 协议版本号
        self.protocol_version = "2025-06-18"
        # 服务器信息,包括名称和版本
        self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
        # 服务器能力,包括工具、资源和提示
        self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
        # 会话管理字典
        self.sessions = {}

    # 获取所有活跃会话信息
    def get_all_sessions(self):
        """获取所有活跃会话信息"""
        return self.sessions.copy()

    def _cleanup_expired_sessions(self, max_age: int = 3600):
        """清理过期会话(默认1小时)"""
        current_time = time.time()
        expired_sessions = []

        for session_id, session_data in self.sessions.items():
            if current_time - session_data["last_activity"] > max_age:
                expired_sessions.append(session_id)

        for session_id in expired_sessions:
            del self.sessions[session_id]
            logger.info(f"清理过期会话: {session_id}")

        return len(expired_sessions)

    # 处理工具列表请求
    def handle_tools_list(self, params=None, session_id: str = None):
        """处理工具列表请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)
        # 返回工具列表
        return {"tools": tools}

    # 处理初始化请求(旧方法,已被覆盖)
    def handle_initialize(self, params, session_id: str = None):
        """处理初始化请求"""
        # 返回服务器能力
        return {"capabilities": self.capabilities}

    # 生成唯一的会话ID
    def _generate_session_id(self) -> str:
        """生成唯一的会话ID"""
        # 使用uuid4生成唯一ID
        return str(uuid.uuid4())

    # 获取会话信息
    def get_session_info(self, session_id: str):
        """获取会话信息"""
        # 如果会话ID存在于sessions字典中,返回会话信息
        if session_id in self.sessions:
            return self.sessions[session_id]
        # 否则返回None
        return None

    # 获取或创建会话
    def _get_or_create_session(self, session_id: str = None):
        """获取或创建会话"""
        # 如果没有传入会话ID,则生成一个新的
        if not session_id:
            session_id = self._generate_session_id()
        # 如果会话ID不存在于sessions字典中,则新建一个会话
        if session_id not in self.sessions:
            self.sessions[session_id] = {
                "created_at": time.time(),
                "last_activity": time.time(),
                "message_count": 0,
                "client_info": None,
            }
            # 记录新会话的创建
            logger.info(f"创建新会话: {session_id}")
        # 更新会话的最后活动时间
        self.sessions[session_id]["last_activity"] = time.time()
        # 增加消息计数
        self.sessions[session_id]["message_count"] += 1
        # 返回会话ID
        return session_id

    # 处理工具调用请求
    def handle_tools_call(self, params, session_id: str = None):
        """处理工具调用请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)

        # 获取调用列表
        calls = params.get("calls", [])
        # 初始化结果列表
        results = []

        # 遍历每个调用
        for call in calls:
            # 获取工具名称
            tool_name = call.get("name")
            # 获取参数
            arguments = call.get("arguments", {})

            # 如果工具名称为"calculate"
            if tool_name == "calculate":
                # 获取表达式
                expression = arguments.get("expression", "")
                try:
                    # 计算表达式结果
                    result = eval(expression)
                    # 添加成功结果到结果列表
                    results.append(
                        {
                            "name": tool_name,
                            "content": [
                                {
                                    "type": "text",
                                    "text": f"计算结果: {expression} = {result}",
                                }
                            ],
                        }
                    )
                except Exception as e:
                    # 计算出错,添加错误信息
                    results.append(
                        {
                            "name": tool_name,
                            "isError": True,
                            "error": {"message": f"计算错误: {str(e)}"},
                        }
                    )
            else:
                # 未知工具,添加错误信息
                results.append(
                    {
                        "name": tool_name,
                        "isError": True,
                        "error": {"message": f"未知工具: {tool_name}"},
                    }
                )

        # 返回所有调用的结果
        return {"calls": results}

    # 处理补全请求
    def handle_completion_complete(self, params, session_id: str = None):
        """处理补全请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)

        # 获取引用类型和参数信息
        ref = params.get("ref", {})
        context = params.get("context", {})
        ref_type = ref.get("type")

        argument = params.get("argument", {})
        argument_name = argument.get("name")
        argument_value = argument.get("value", "")

        # 根据引用类型和参数名提供相应的补全建议
        completion_values = []

        if ref_type == "ref/prompt":
            # 处理提示补全
            prompt_name = ref.get("name")
            if prompt_name == "code_review":
                if argument_name == "language":
                    # 语言补全
                    completion_values = self._get_language_completions(argument_value)
                elif argument_name == "framework":
                    # 框架补全,需要根据已选择的语言
                    selected_language = context.get("arguments", {}).get("language")
                    if selected_language:
                        completion_values = self._get_framework_completions(
                            selected_language, argument_value
                        )
                    else:
                        completion_values = self._get_all_framework_completions(
                            argument_value
                        )
+       elif ref_type == "ref/resource":
            # 处理资源URI补全
+           uri = ref.get("uri", "")
+           if "file:///" in uri:
+               if argument_name == "path":
+                   completion_values = self._get_file_path_completions(argument_value)
+               elif argument_name == "analysis_type":
+                   completion_values = self._get_analysis_type_completions(
+                       argument_value
+                   )
        # 限制返回结果数量(规范要求最多100个)
        if len(completion_values) > 100:
            completion_values = completion_values[:100]

        return {
            "completion": {
                "values": completion_values,
                "total": len(completion_values),
                "hasMore": False,
            }
        }

    # 获取分析类型补全建议
+   def _get_analysis_type_completions(self, partial_value: str):
+       """获取分析类型补全建议"""
+       if not partial_value:
+           return completion_data["analysis_types"]

        # 模糊匹配
+       return [
+           atype
+           for atype in completion_data["analysis_types"]
+           if partial_value.lower() in atype.lower()
+       ]

    # 获取文件路径补全建议
+   def _get_file_path_completions(self, partial_value: str):
+       """获取文件路径补全建议"""
        # 这里可以根据实际文件系统提供补全
        # 为了演示,返回一些示例文件扩展名
+       if not partial_value:
+           return completion_data["file_extensions"]

+       return [
+           ext
+           for ext in completion_data["file_extensions"]
+           if partial_value.lower() in ext.lower()
+       ]

    # 获取框架补全建议
    def _get_framework_completions(self, language: str, partial_value: str):
        """根据语言获取框架补全建议"""
        frameworks = completion_data["frameworks"].get(language, [])
        if not partial_value:
            return frameworks

        # 模糊匹配
        return [fw for fw in frameworks if partial_value.lower() in fw.lower()]

    # 获取所有框架补全建议
    def _get_all_framework_completions(self, partial_value: str):
        """获取所有框架补全建议"""
        all_frameworks = []
        for frameworks in completion_data["frameworks"].values():
            all_frameworks.extend(frameworks)

        if not partial_value:
            return all_frameworks

        # 模糊匹配
        return [fw for fw in all_frameworks if partial_value.lower() in fw.lower()]

    # 获取语言补全建议
    def _get_language_completions(self, partial_value: str):
        """获取语言补全建议"""
        if not partial_value:
            return completion_data["languages"]

        # 模糊匹配
        return [
            lang
            for lang in completion_data["languages"]
            if partial_value.lower() in lang.lower()
        ]

    # 处理初始化请求(实际生效的方法,覆盖上面的方法)
    def handle_initialize(self, params, session_id: str = None):
        """处理初始化请求"""
        # 获取客户端信息
        client_info = params.get("clientInfo", {})
        # 获取或创建会话ID
        session_id = self._get_or_create_session(session_id)
        # 存储客户端信息到会话
        self.sessions[session_id]["client_info"] = client_info
        # 记录客户端初始化信息
        logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
        # 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
        return {
            "protocolVersion": self.protocol_version,
            "capabilities": self.capabilities,
            "serverInfo": self.server_info,
            "sessionId": session_id,  # 返回会话ID给客户端
        }


# 定义MCPHTTPHandler类,继承自BaseHTTPRequestHandler
class MCPHTTPHandler(BaseHTTPRequestHandler):
    """HTTP处理器,支持Streamable HTTP传输"""

    # 初始化方法,接收mcp_server参数
    def __init__(self, *args, mcp_server=None, **kwargs):
        # 保存MCP服务器实例
        self.mcp_server = mcp_server
        # 调用父类的初始化方法
        super().__init__(*args, **kwargs)

    # 处理MCP消息
    def _handle_mcp_message(self, message, session_id: str = None):
        """处理MCP消息"""
        # 获取方法名
        method = message.get("method")
        # 获取参数
        params = message.get("params", {})
        # 获取消息ID
        msg_id = message.get("id")
        # 判断方法类型
        if method == "initialize":
            # 调用服务器的handle_initialize方法
            result = self.mcp_server.handle_initialize(params, session_id)
            # 如果有消息ID,返回带ID的结果
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "tools/list":
            # 处理工具列表请求
            result = self.mcp_server.handle_tools_list(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "tools/call":
            # 处理工具调用请求
            result = self.mcp_server.handle_tools_call(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "completion/complete":
            # 处理补全请求
            result = self.mcp_server.handle_completion_complete(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
        else:
            # 如果方法不存在,返回错误
            if msg_id:
                return {
                    "id": msg_id,
                    "error": {"code": -32601, "message": f"Method not found: {method}"},
                }
            # 没有ID则返回None
            return None

    # 处理POST请求(发送消息到服务器)
    def do_POST(self):
        """处理POST请求(发送消息到服务器)"""
        try:
            # 检查请求路径是否为/mcp
            if self.path != "/mcp":
                self.send_error(404, "MCP endpoint not found")
                return
            # 检查Accept头是否包含application/json
            accept_header = self.headers.get("Accept", "")
            if "application/json" not in accept_header:
                self.send_error(400, "Missing required Accept header")
                return
            # 检查MCP-Protocol-Version头是否匹配
            protocol_version = self.headers.get("MCP-Protocol-Version")
            if (
                protocol_version
                and protocol_version != self.mcp_server.protocol_version
            ):
                self.send_error(
                    400, f"Unsupported protocol version: {protocol_version}"
                )
                return
            # 获取会话ID
            session_id = self.headers.get("Mcp-Session-Id")
            # 读取请求体长度
            content_length = int(self.headers.get("Content-Length", 0))
            # 读取请求体内容
            body = self.rfile.read(content_length)
            try:
                # 尝试解析JSON消息
                message = json.loads(body.decode("utf-8"))
            except json.JSONDecodeError:
                # JSON解析失败
                self.send_error(400, "Invalid JSON")
                return
            # 处理MCP消息
            response = self._handle_mcp_message(message, session_id)
            # 发送响应
            if response:
                # 发送200响应码
                self.send_response(200)
                # 设置响应头Content-Type为application/json
                self.send_header("Content-Type", "application/json")
                # 设置协议版本头
                self.send_header(
                    "MCP-Protocol-Version", self.mcp_server.protocol_version
                )
                # 如果响应中包含会话ID,则添加到响应头
                if "result" in response and "sessionId" in response["result"]:
                    self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
                # 结束响应头
                self.end_headers()
                # 写入JSON响应体
                self.wfile.write(json.dumps(response).encode("utf-8"))
            else:
                # 没有响应内容,返回202 Accepted
                self.send_response(202)  # Accepted
                self.end_headers()
        except Exception as e:
            # 记录错误日志
            logger.error(f"POST处理错误: {e}")
            # 返回500内部服务器错误
            self.send_error(500, "Internal Server Error")


# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
    """运行HTTP服务器"""

    # 定义处理器工厂函数,用于传递mcp_server实例
    def handler_factory(*args, **kwargs):
        return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)

    # 创建ThreadingHTTPServer实例,绑定主机和端口
    server = ThreadingHTTPServer((host, port), handler_factory)
    # 记录服务器启动信息
    logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")
    # 记录会话管理功能启用信息
    logger.info("会话管理功能已启用,支持会话ID跟踪和自动清理")

    # 启动会话清理线程
    import threading

    # 定义会话清理函数
    def cleanup_sessions():
        while True:
            try:
                # 每5分钟清理一次
                time.sleep(300)
                # 调用会话清理方法
                cleaned_count = mcp_server._cleanup_expired_sessions()
                # 如果有清理的会话,记录日志
                if cleaned_count > 0:
                    logger.info(f"清理了 {cleaned_count} 个过期会话")
                # 记录当前活跃会话数
                active_sessions = len(mcp_server.get_all_sessions())
                logger.debug(f"当前活跃会话数: {active_sessions}")
            except Exception as e:
                # 清理出错,记录日志
                logger.error(f"会话清理错误: {e}")

    # 启动清理线程
    cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
    cleanup_thread.start()
    logger.info("会话清理线程已启动")
    try:
        # 启动服务器,进入循环监听请求
        server.serve_forever()
    except KeyboardInterrupt:
        # 捕获Ctrl+C,记录服务器停止
        logger.info("HTTP服务器已停止")
        logger.info("正在清理所有会话...")
        # 清理所有会话
        all_sessions = mcp_server.get_all_sessions()
        for session_id in list(all_sessions.keys()):
            del mcp_server.sessions[session_id]
        logger.info(f"已清理 {len(all_sessions)} 个会话")
    finally:
        # 关闭服务器
        server.server_close()


# 定义主函数
def main():
    """主函数"""
    # 导入argparse模块,用于解析命令行参数
    import argparse

    # 创建ArgumentParser对象,设置描述信息
    parser = argparse.ArgumentParser(description="MCP HTTP服务器")
    # 添加--host参数,指定服务器主机,默认localhost
    parser.add_argument(
        "--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
    )
    # 添加--port参数,指定服务器端口,默认8000
    parser.add_argument(
        "--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
    )

    # 解析命令行参数
    args = parser.parse_args()

    # 创建MCP服务器实例
    mcp_server = MCPServer()

    # 运行HTTP服务器
    run_http_server(mcp_server, args.host, args.port)


# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

4. 代码检查 #

4.1. client.py #

client.py

# 导入json模块,用于处理JSON数据
import json

# 导入logging模块,用于日志记录
import logging

# 导入requests模块,用于发送HTTP请求
import requests

# 从urllib.parse模块导入urljoin,用于拼接URL
from urllib.parse import urljoin

# 配置日志级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义MCPClient类,表示MCP HTTP客户端
class MCPClient:
    """MCP HTTP客户端"""

    # 初始化方法,设置服务器URL和相关属性
    def __init__(self, server_url: str):
        # 协议版本号
        self.protocol_version = "2025-06-18"
        # 客户端信息
        self.client_info = {"name": "MCP客户端", "version": "1.0.0"}
        # 服务器信息,初始化为None
        self.server_info = None
        # 服务器能力,初始化为None
        self.capabilities = None
        # 会话ID,初始化为None
        self.session_id = None
        # 消息ID,初始值为1
        self.message_id = 1

        # 去除服务器URL末尾的斜杠
        self.server_url = server_url.rstrip("/")
        # 拼接MCP接口的完整URL
        self.mcp_endpoint = urljoin(self.server_url, "/mcp")
        # 创建requests的Session对象
        self.session = requests.Session()
        # 设置Session的默认请求头
        self.session.headers.update(
            {
                "Accept": "application/json",
                "MCP-Protocol-Version": self.protocol_version,
            }
        )

    # 请求补全建议方法
    def request_completion(
        self,  # 当前实例
        ref_type,  # 引用类型:ref/prompt or ref/resource
        ref_name,  # 引用名称:code_review or file:///path
        argument_name,  # 参数名:language or path or analysis_type
        argument_value,  # 参数值:py or /path/to/file.js or sec
        context_arguments=None,  # 上下文参数
    ):
        # 方法文档字符串,说明用途
        """请求补全建议"""
        ref_obj = {"type": ref_type}
        if ref_type == "ref/resource":
            ref_obj["uri"] = ref_name
        else:
            ref_obj["name"] = ref_name
        # 构造补全请求的消息体
        request = {
            "jsonrpc": "2.0",  # 指定JSON-RPC协议版本
            "id": self._generate_id(),  # 生成唯一消息ID
            "method": "completion/complete",  # 指定调用的方法
            "params": {
                "ref": ref_obj,  # 引用类型和名称
                "argument": {
                    "name": argument_name,
                    "value": argument_value,
                },  # 参数名和参数值
            },
        }
        # 如果有上下文参数,添加到请求中
        if context_arguments:
            request["params"]["context"] = {"arguments": context_arguments}
        # 通过私有方法发送请求消息,并获取响应
        response = self._send_message(request)
        # 判断响应是否存在且包含"result"字段
        if response and "result" in response:
            # 从响应中获取补全内容
            completion = response["result"].get("completion", {})
            # 记录获取到的补全建议数量
            logger.info(f"获取到 {len(completion.get('values', []))} 个补全建议")
            # 返回补全内容
            return completion
        else:
            # 如果请求失败,记录错误日志
            logger.error(f"请求补全失败")
            # 返回None表示失败
            return None

    # 重置会话方法
    def reset_session(self):
        """重置会话"""
        # 保存旧的会话ID
        old_session_id = self.session_id
        # 清空当前会话ID
        self.session_id = None
        # 记录会话重置信息
        logger.info(f"会话已重置,原会话ID: {old_session_id}")

    # 获取当前会话信息方法
    def get_session_info(self):
        """获取当前会话信息"""
        # 返回会话相关信息字典
        return {
            "session_id": self.session_id,
            "server_url": self.server_url,
            "protocol_version": self.protocol_version,
        }

    # 获取可用工具列表方法
    def list_tools(self):
        """获取可用工具列表"""
        # 构造请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "tools/list",
            "params": {},
        }

        # 发送请求并获取响应
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            # 获取工具列表
            tools = response["result"].get("tools", [])
            logger.info(f"可用工具数量: {len(tools)}")
            return tools
        else:
            # 获取失败,记录错误
            logger.error("获取工具列表失败")
            return []

    # 通过HTTP发送消息的私有方法
    def _send_message(self, message):
        """通过HTTP发送消息"""
        try:
            # 初始化请求头字典
            headers = {}
            # 如果有会话ID,则添加到请求头
            if self.session_id:
                headers["Mcp-Session-Id"] = self.session_id
                logger.debug(f"使用会话ID: {self.session_id}")

            # 发送POST请求
            response = self.session.post(
                self.mcp_endpoint, json=message, headers=headers, timeout=30
            )

            # 检查响应状态码
            if response.status_code == 200:
                # 检查响应头中是否有新的会话ID
                new_session_id = response.headers.get("Mcp-Session-Id")
                # 如果有新的会话ID且与当前不同,则更新
                if new_session_id and new_session_id != self.session_id:
                    self.session_id = new_session_id
                    logger.info(f"更新会话ID: {self.session_id}")

                # 返回响应的JSON内容
                return response.json()
            # 如果状态码为202,表示已接受但无内容
            elif response.status_code == 202:
                # Accepted - 通知或响应
                return None
            # 如果状态码为400,表示请求错误
            elif response.status_code == 400:
                logger.error(f"请求错误: {response.text}")
                return None
            # 如果状态码为401,表示会话ID无效
            elif response.status_code == 401:
                logger.error("会话ID无效,需要重新初始化")
                self.session_id = None
                return None
            # 其他HTTP错误
            else:
                logger.error(f"HTTP错误: {response.status_code} - {response.text}")
                return None

        # 捕获请求异常
        except requests.exceptions.RequestException as e:
            logger.error(f"HTTP请求错误: {e}")
            return None

    # 生成消息ID的方法
    def _generate_id(self) -> str:
        """生成消息ID"""
        # 消息ID自增
        self.message_id += 1
        # 返回字符串类型的消息ID
        return str(self.message_id)

    # 发送消息到服务器的方法(重复定义,实际只会用后面这个)
    def _send_message(self, message):
        """通过HTTP发送消息"""
        try:
            # 初始化请求头字典
            headers = {}
            # 如果有会话ID,则添加到请求头
            if self.session_id:
                headers["Mcp-Session-Id"] = self.session_id
                logger.debug(f"使用会话ID: {self.session_id}")

            # 发送POST请求到MCP端点
            response = self.session.post(
                self.mcp_endpoint, json=message, headers=headers, timeout=30
            )

            # 检查响应状态码
            if response.status_code == 200:
                # 从响应头获取新的会话ID
                new_session_id = response.headers.get("Mcp-Session-Id")
                # 如果有新的会话ID且与当前不同,则更新
                if new_session_id and new_session_id != self.session_id:
                    self.session_id = new_session_id
                    logger.info(f"更新会话ID: {self.session_id}")

                # 返回响应的JSON内容
                return response.json()
            # 如果状态码为202,表示已接受但无内容
            elif response.status_code == 202:
                # Accepted - 通知或响应
                return None
            # 如果状态码为400,表示请求错误
            elif response.status_code == 400:
                logger.error(f"请求错误: {response.text}")
                return None
            # 如果状态码为401,表示会话ID无效
            elif response.status_code == 401:
                logger.error("会话ID无效,需要重新初始化")
                self.session_id = None
                return None
            # 其他HTTP错误
            else:
                logger.error(f"HTTP错误: {response.status_code} - {response.text}")
                return None

        # 捕获请求异常
        except requests.exceptions.RequestException as e:
            logger.error(f"HTTP请求错误: {e}")
            return None

    # 调用工具方法
    def call_tool(self, tool_name, arguments):
        """调用工具"""
        # 构造调用工具的请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "tools/call",
            "params": {"calls": [{"name": tool_name, "arguments": arguments}]},
        }

        # 发送请求并获取响应
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            # 获取调用结果
            calls = response["result"].get("calls", [])
            if calls:
                return calls[0]
        else:
            # 调用失败,记录错误
            logger.error(f"调用工具 {tool_name} 失败")
        return None

    # 初始化与服务器连接的方法
    def initialize(self) -> bool:
        """初始化与服务器的连接"""
        # 构造初始化请求消息
        request = {
            "jsonrpc": "2.0",
            "id": self._generate_id(),
            "method": "initialize",
            "params": {
                "protocolVersion": self.protocol_version,
                "capabilities": {},
                "clientInfo": self.client_info,
            },
        }

        # 发送初始化请求
        response = self._send_message(request)
        # 如果响应存在且包含result字段
        if response and "result" in response:
            result = response["result"]
            # 获取服务器信息
            self.server_info = result.get("serverInfo")
            # 获取服务器能力
            self.capabilities = result.get("capabilities")

            # 提取并存储会话ID
            if "sessionId" in result:
                self.session_id = result["sessionId"]
                logger.info(f"获取到会话ID: {self.session_id}")

            logger.info(f"服务器初始化成功: {self.server_info}")
            return True
        else:
            # 初始化失败,记录错误
            logger.error("服务器初始化失败")
            return False


# HTTP传输的函数
def http_transport():
    """HTTP传输"""
    print("=== HTTP传输 ===")

    client = MCPClient("http://localhost:8000")

    try:
        if not client.initialize():
            print("初始化失败")
            return
        session_info = client.get_session_info()
        print(f" 会话信息: {session_info}")
        tools = client.list_tools()
        print(f"可用工具: {[tool['name'] for tool in tools]}")

        result = client.call_tool("calculate", {"expression": "10 / 2 + 5"})
        if result:
            content = result.get("content", [])
            if content:
                print(f"计算结果: {content[0].get('text', '')}")
        # 打印分隔符,表示开始演示Completion功能
        print("\n=== Completion功能 ===")
        # 打印分隔符,表示开始演示语言补全
        print("\n🔤 语言补全演示:")
        # 调用request_completion方法,请求针对'language'字段输入'py'的补全建议
        # language_completion = client.request_completion(
        #    "ref/prompt", "code_review", "language", "py"
        # )
        ## 如果成功获取到补全结果
        # if language_completion:
        #    # 从返回结果中获取'values'字段,默认为空列表
        #    values = language_completion.get("values", [])
        #    # 打印前5个补全建议
        #    print(f"输入 'py' 的补全建议: {values[:5]}...")  # 显示前5个
        #
        ## 2. 演示框架补全(无上下文)
        # print("\n🏗️  框架补全演示(无上下文):")
        # framework_completion = client.request_completion(
        #    "ref/prompt", "code_review", "framework", "fla"
        # )
        # if framework_completion:
        #    values = framework_completion.get("values", [])
        #    print(f"输入 'fla' 的补全建议: {values[:5]}...")
        ## 3. 演示框架补全(有上下文 - 已选择Python语言)
        # print("\n🐍 框架补全演示(已选择Python语言):")
        # framework_completion_with_context = client.request_completion(
        #    "ref/prompt",
        #    "code_review",
        #    "framework",
        #    "fla",
        #    context_arguments={"language": "python"},
        # )
        # if framework_completion_with_context:
        #    values = framework_completion_with_context.get("values", [])
        #    print(f"在Python语言下,输入 'fla' 的补全建议: {values}")
        # 4. 演示分析类型补全
        # print("\n📊 分析类型补全演示:")
        # analysis_completion = client.request_completion(
        #    "ref/resource", "file:///path", "analysis_type", "sec"
        # )
        # if analysis_completion:
        #    values = analysis_completion.get("values", [])
        #    print(f"输入 'sec' 的分析类型补全建议: {values}")

        # 5. 演示文件扩展名补全
        # print("\n📁 文件扩展名补全演示:")
        # file_completion = client.request_completion(
        #    "ref/resource", "file:///path", "path", ".py"
        # )
        # if file_completion:
        #    values = file_completion.get("values", [])
        #    print(f"输入 '.py' 的文件扩展名补全建议: {values[:5]}...")
        # 测试code_review工具
+       review_result = client.call_tool(
+           "code_review",
+           {
+               "language": "python",
+               "framework": "flask",
+               "code": "print('Hello, World!')",
+           },
        )
+       if review_result:
+           content = review_result.get("content", [])
+           if content:
+               print(f"代码审查结果: {content[0].get('text', '')}")
        # 测试file_analyzer工具
+       analysis_result = client.call_tool(
+           "file_analyzer", {"file_path": "example.py", "analysis_type": "security"}
+       )
+       if analysis_result:
+           content = analysis_result.get("content", [])
+           if content:
+               print(f"文件分析结果: {content[0].get('text', '')}")
    except Exception as e:
        print(f"HTTP错误: {e}")


# 主函数
def main():
    """主函数"""
    # 调用HTTP传输函数
    http_transport()


# 判断是否为主程序入口
if __name__ == "__main__":
    main()

4.2. server.py #

server.py

# 导入logging模块,用于日志记录
import logging

# 导入json模块,用于处理JSON数据
import json

# 导入uuid模块,用于生成唯一会话ID
import uuid

# 导入time模块,用于时间戳
import time

# 从tools模块导入tools对象和completion_data
from tools import tools, completion_data

# 导入datetime模块,用于处理日期和时间


# 从http.server模块导入ThreadingHTTPServer和BaseHTTPRequestHandler类
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler

# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)

# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
    """MCP服务器核心类"""

    # 初始化方法
    def __init__(self):
        # 协议版本号
        self.protocol_version = "2025-06-18"
        # 服务器信息,包括名称和版本
        self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
        # 服务器能力,包括工具、资源和提示
        self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
        # 会话管理字典
        self.sessions = {}

    # 获取所有活跃会话信息
    def get_all_sessions(self):
        """获取所有活跃会话信息"""
        return self.sessions.copy()

    def _cleanup_expired_sessions(self, max_age: int = 3600):
        """清理过期会话(默认1小时)"""
        current_time = time.time()
        expired_sessions = []

        for session_id, session_data in self.sessions.items():
            if current_time - session_data["last_activity"] > max_age:
                expired_sessions.append(session_id)

        for session_id in expired_sessions:
            del self.sessions[session_id]
            logger.info(f"清理过期会话: {session_id}")

        return len(expired_sessions)

    # 处理工具列表请求
    def handle_tools_list(self, params=None, session_id: str = None):
        """处理工具列表请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)
        # 返回工具列表
        return {"tools": tools}

    # 处理初始化请求(旧方法,已被覆盖)
    def handle_initialize(self, params, session_id: str = None):
        """处理初始化请求"""
        # 返回服务器能力
        return {"capabilities": self.capabilities}

    # 生成唯一的会话ID
    def _generate_session_id(self) -> str:
        """生成唯一的会话ID"""
        # 使用uuid4生成唯一ID
        return str(uuid.uuid4())

    # 获取会话信息
    def get_session_info(self, session_id: str):
        """获取会话信息"""
        # 如果会话ID存在于sessions字典中,返回会话信息
        if session_id in self.sessions:
            return self.sessions[session_id]
        # 否则返回None
        return None

    # 获取或创建会话
    def _get_or_create_session(self, session_id: str = None):
        """获取或创建会话"""
        # 如果没有传入会话ID,则生成一个新的
        if not session_id:
            session_id = self._generate_session_id()
        # 如果会话ID不存在于sessions字典中,则新建一个会话
        if session_id not in self.sessions:
            self.sessions[session_id] = {
                "created_at": time.time(),
                "last_activity": time.time(),
                "message_count": 0,
                "client_info": None,
            }
            # 记录新会话的创建
            logger.info(f"创建新会话: {session_id}")
        # 更新会话的最后活动时间
        self.sessions[session_id]["last_activity"] = time.time()
        # 增加消息计数
        self.sessions[session_id]["message_count"] += 1
        # 返回会话ID
        return session_id

    # 处理工具调用请求
    def handle_tools_call(self, params, session_id: str = None):
        """处理工具调用请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)

        # 获取调用列表
        calls = params.get("calls", [])
        # 初始化结果列表
        results = []

        # 遍历每个调用
        for call in calls:
            # 获取工具名称
            tool_name = call.get("name")
            # 获取参数
            arguments = call.get("arguments", {})

            # 如果工具名称为"calculate"
            if tool_name == "calculate":
                # 获取表达式
                expression = arguments.get("expression", "")
                try:
                    # 计算表达式结果
                    result = eval(expression)
                    # 添加成功结果到结果列表
                    results.append(
                        {
                            "name": tool_name,
                            "content": [
                                {
                                    "type": "text",
                                    "text": f"计算结果: {expression} = {result}",
                                }
                            ],
                        }
                    )
                except Exception as e:
                    # 计算出错,添加错误信息
                    results.append(
                        {
                            "name": tool_name,
                            "isError": True,
                            "error": {"message": f"计算错误: {str(e)}"},
                        }
                    )
+           elif tool_name == "code_review":
                # 处理代码审查工具
+               language = arguments.get("language", "")
+               framework = arguments.get("framework", "")
+               code = arguments.get("code", "")

+               review_result = f"代码审查结果:\n语言: {language}"
+               if framework:
+                   review_result += f"\n框架: {framework}"
+               review_result += f"\n代码长度: {len(code)} 字符"

+               results.append(
+                   {
+                       "name": tool_name,
+                       "content": [{"type": "text", "text": review_result}],
+                   }
+               )
+           elif tool_name == "file_analyzer":
                # 处理文件分析工具
+               file_path = arguments.get("file_path", "")
+               analysis_type = arguments.get("analysis_type", "syntax")

+               analysis_result = f"文件分析结果:\n文件路径: {file_path}\n分析类型: {analysis_type}\n状态: 分析完成"

+               results.append(
+                   {
+                       "name": tool_name,
+                       "content": [{"type": "text", "text": analysis_result}],
+                   }
+               )
            else:
                # 未知工具,添加错误信息
                results.append(
                    {
                        "name": tool_name,
                        "isError": True,
                        "error": {"message": f"未知工具: {tool_name}"},
                    }
                )

        # 返回所有调用的结果
        return {"calls": results}

    # 处理补全请求
    def handle_completion_complete(self, params, session_id: str = None):
        """处理补全请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)

        # 获取引用类型和参数信息
        ref = params.get("ref", {})
        context = params.get("context", {})
        ref_type = ref.get("type")

        argument = params.get("argument", {})
        argument_name = argument.get("name")
        argument_value = argument.get("value", "")

        # 根据引用类型和参数名提供相应的补全建议
        completion_values = []

        if ref_type == "ref/prompt":
            # 处理提示补全
            prompt_name = ref.get("name")
            if prompt_name == "code_review":
                if argument_name == "language":
                    # 语言补全
                    completion_values = self._get_language_completions(argument_value)
                elif argument_name == "framework":
                    # 框架补全,需要根据已选择的语言
                    selected_language = context.get("arguments", {}).get("language")
                    if selected_language:
                        completion_values = self._get_framework_completions(
                            selected_language, argument_value
                        )
                    else:
                        completion_values = self._get_all_framework_completions(
                            argument_value
                        )
        elif ref_type == "ref/resource":
            # 处理资源URI补全
            uri = ref.get("uri", "")
            if "file:///" in uri:
                if argument_name == "path":
                    completion_values = self._get_file_path_completions(argument_value)
                elif argument_name == "analysis_type":
                    completion_values = self._get_analysis_type_completions(
                        argument_value
                    )
        # 限制返回结果数量(规范要求最多100个)
        if len(completion_values) > 100:
            completion_values = completion_values[:100]

        return {
            "completion": {
                "values": completion_values,
                "total": len(completion_values),
                "hasMore": False,
            }
        }

    # 获取分析类型补全建议
    def _get_analysis_type_completions(self, partial_value: str):
        """获取分析类型补全建议"""
        if not partial_value:
            return completion_data["analysis_types"]

        # 模糊匹配
        return [
            atype
            for atype in completion_data["analysis_types"]
            if partial_value.lower() in atype.lower()
        ]

    # 获取文件路径补全建议
    def _get_file_path_completions(self, partial_value: str):
        """获取文件路径补全建议"""
        # 这里可以根据实际文件系统提供补全
        # 为了演示,返回一些示例文件扩展名
        if not partial_value:
            return completion_data["file_extensions"]

        return [
            ext
            for ext in completion_data["file_extensions"]
            if partial_value.lower() in ext.lower()
        ]

    # 获取框架补全建议
    def _get_framework_completions(self, language: str, partial_value: str):
        """根据语言获取框架补全建议"""
        frameworks = completion_data["frameworks"].get(language, [])
        if not partial_value:
            return frameworks

        # 模糊匹配
        return [fw for fw in frameworks if partial_value.lower() in fw.lower()]

    # 获取所有框架补全建议
    def _get_all_framework_completions(self, partial_value: str):
        """获取所有框架补全建议"""
        all_frameworks = []
        for frameworks in completion_data["frameworks"].values():
            all_frameworks.extend(frameworks)

        if not partial_value:
            return all_frameworks

        # 模糊匹配
        return [fw for fw in all_frameworks if partial_value.lower() in fw.lower()]

    # 获取语言补全建议
    def _get_language_completions(self, partial_value: str):
        """获取语言补全建议"""
        if not partial_value:
            return completion_data["languages"]

        # 模糊匹配
        return [
            lang
            for lang in completion_data["languages"]
            if partial_value.lower() in lang.lower()
        ]

    # 处理初始化请求(实际生效的方法,覆盖上面的方法)
    def handle_initialize(self, params, session_id: str = None):
        """处理初始化请求"""
        # 获取客户端信息
        client_info = params.get("clientInfo", {})
        # 获取或创建会话ID
        session_id = self._get_or_create_session(session_id)
        # 存储客户端信息到会话
        self.sessions[session_id]["client_info"] = client_info
        # 记录客户端初始化信息
        logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
        # 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
        return {
            "protocolVersion": self.protocol_version,
            "capabilities": self.capabilities,
            "serverInfo": self.server_info,
            "sessionId": session_id,  # 返回会话ID给客户端
        }


# 定义MCPHTTPHandler类,继承自BaseHTTPRequestHandler
class MCPHTTPHandler(BaseHTTPRequestHandler):
    """HTTP处理器,支持Streamable HTTP传输"""

    # 初始化方法,接收mcp_server参数
    def __init__(self, *args, mcp_server=None, **kwargs):
        # 保存MCP服务器实例
        self.mcp_server = mcp_server
        # 调用父类的初始化方法
        super().__init__(*args, **kwargs)

    # 处理MCP消息
    def _handle_mcp_message(self, message, session_id: str = None):
        """处理MCP消息"""
        # 获取方法名
        method = message.get("method")
        # 获取参数
        params = message.get("params", {})
        # 获取消息ID
        msg_id = message.get("id")
        # 判断方法类型
        if method == "initialize":
            # 调用服务器的handle_initialize方法
            result = self.mcp_server.handle_initialize(params, session_id)
            # 如果有消息ID,返回带ID的结果
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "tools/list":
            # 处理工具列表请求
            result = self.mcp_server.handle_tools_list(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "tools/call":
            # 处理工具调用请求
            result = self.mcp_server.handle_tools_call(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "completion/complete":
            # 处理补全请求
            result = self.mcp_server.handle_completion_complete(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
        else:
            # 如果方法不存在,返回错误
            if msg_id:
                return {
                    "id": msg_id,
                    "error": {"code": -32601, "message": f"Method not found: {method}"},
                }
            # 没有ID则返回None
            return None

    # 处理POST请求(发送消息到服务器)
    def do_POST(self):
        """处理POST请求(发送消息到服务器)"""
        try:
            # 检查请求路径是否为/mcp
            if self.path != "/mcp":
                self.send_error(404, "MCP endpoint not found")
                return
            # 检查Accept头是否包含application/json
            accept_header = self.headers.get("Accept", "")
            if "application/json" not in accept_header:
                self.send_error(400, "Missing required Accept header")
                return
            # 检查MCP-Protocol-Version头是否匹配
            protocol_version = self.headers.get("MCP-Protocol-Version")
            if (
                protocol_version
                and protocol_version != self.mcp_server.protocol_version
            ):
                self.send_error(
                    400, f"Unsupported protocol version: {protocol_version}"
                )
                return
            # 获取会话ID
            session_id = self.headers.get("Mcp-Session-Id")
            # 读取请求体长度
            content_length = int(self.headers.get("Content-Length", 0))
            # 读取请求体内容
            body = self.rfile.read(content_length)
            try:
                # 尝试解析JSON消息
                message = json.loads(body.decode("utf-8"))
            except json.JSONDecodeError:
                # JSON解析失败
                self.send_error(400, "Invalid JSON")
                return
            # 处理MCP消息
            response = self._handle_mcp_message(message, session_id)
            # 发送响应
            if response:
                # 发送200响应码
                self.send_response(200)
                # 设置响应头Content-Type为application/json
                self.send_header("Content-Type", "application/json")
                # 设置协议版本头
                self.send_header(
                    "MCP-Protocol-Version", self.mcp_server.protocol_version
                )
                # 如果响应中包含会话ID,则添加到响应头
                if "result" in response and "sessionId" in response["result"]:
                    self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
                # 结束响应头
                self.end_headers()
                # 写入JSON响应体
                self.wfile.write(json.dumps(response).encode("utf-8"))
            else:
                # 没有响应内容,返回202 Accepted
                self.send_response(202)  # Accepted
                self.end_headers()
        except Exception as e:
            # 记录错误日志
            logger.error(f"POST处理错误: {e}")
            # 返回500内部服务器错误
            self.send_error(500, "Internal Server Error")


# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
    """运行HTTP服务器"""

    # 定义处理器工厂函数,用于传递mcp_server实例
    def handler_factory(*args, **kwargs):
        return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)

    # 创建ThreadingHTTPServer实例,绑定主机和端口
    server = ThreadingHTTPServer((host, port), handler_factory)
    # 记录服务器启动信息
    logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")
    # 记录会话管理功能启用信息
    logger.info("会话管理功能已启用,支持会话ID跟踪和自动清理")

    # 启动会话清理线程
    import threading

    # 定义会话清理函数
    def cleanup_sessions():
        while True:
            try:
                # 每5分钟清理一次
                time.sleep(300)
                # 调用会话清理方法
                cleaned_count = mcp_server._cleanup_expired_sessions()
                # 如果有清理的会话,记录日志
                if cleaned_count > 0:
                    logger.info(f"清理了 {cleaned_count} 个过期会话")
                # 记录当前活跃会话数
                active_sessions = len(mcp_server.get_all_sessions())
                logger.debug(f"当前活跃会话数: {active_sessions}")
            except Exception as e:
                # 清理出错,记录日志
                logger.error(f"会话清理错误: {e}")

    # 启动清理线程
    cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
    cleanup_thread.start()
    logger.info("会话清理线程已启动")
    try:
        # 启动服务器,进入循环监听请求
        server.serve_forever()
    except KeyboardInterrupt:
        # 捕获Ctrl+C,记录服务器停止
        logger.info("HTTP服务器已停止")
        logger.info("正在清理所有会话...")
        # 清理所有会话
        all_sessions = mcp_server.get_all_sessions()
        for session_id in list(all_sessions.keys()):
            del mcp_server.sessions[session_id]
        logger.info(f"已清理 {len(all_sessions)} 个会话")
    finally:
        # 关闭服务器
        server.server_close()


# 定义主函数
def main():
    """主函数"""
    # 导入argparse模块,用于解析命令行参数
    import argparse

    # 创建ArgumentParser对象,设置描述信息
    parser = argparse.ArgumentParser(description="MCP HTTP服务器")
    # 添加--host参数,指定服务器主机,默认localhost
    parser.add_argument(
        "--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
    )
    # 添加--port参数,指定服务器端口,默认8000
    parser.add_argument(
        "--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
    )

    # 解析命令行参数
    args = parser.parse_args()

    # 创建MCP服务器实例
    mcp_server = MCPServer()

    # 运行HTTP服务器
    run_http_server(mcp_server, args.host, args.port)


# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

4.3. tools.py #

tools.py

# MCP工具定义

tools = [
    {
        "name": "calculate",
        "description": "执行数学计算",
        "inputSchema": {
            "type": "object",
            "properties": {
                "expression": {
                    "type": "string",
                    "description": "数学表达式",
                }
            },
            "required": ["expression"],
        },
    },
+   {
+       "name": "code_review",
+       "description": "代码审查工具",
+       "inputSchema": {
+           "type": "object",
+           "properties": {
+               "language": {
+                   "type": "string",
+                   "description": "编程语言",
+               },
+               "framework": {
+                   "type": "string",
+                   "description": "框架名称",
+               },
+               "code": {
+                   "type": "string",
+                   "description": "要审查的代码",
+               },
+           },
+           "required": ["language", "code"],
+       },
+   },
+   {
+       "name": "file_analyzer",
+       "description": "文件分析工具",
+       "inputSchema": {
+           "type": "object",
+           "properties": {
+               "file_path": {
+                   "type": "string",
+                   "description": "文件路径",
+               },
+               "analysis_type": {
+                   "type": "string",
+                   "description": "分析类型",
+               },
+           },
+           "required": ["file_path"],
+       },
+   },
]

# 定义补全数据字典
completion_data = {
    # 支持的编程语言列表
    "languages": [
        "python",
        "javascript",
        "typescript",
        "java",
        "c++",
        "c#",
        "go",
        "rust",
        "php",
        "ruby",
        "swift",
        "kotlin",
        "scala",
        "dart",
        "elixir",
        "clojure",
    ],
    # 各编程语言对应的主流框架
    "frameworks": {
        # Python常用框架
        "python": [
            "django",
            "flask",
            "fastapi",
            "pytorch",
            "tensorflow",
            "pandas",
            "numpy",
        ],
        # JavaScript常用框架
        "javascript": ["react", "vue", "angular", "express", "next.js", "nuxt.js"],
        # TypeScript常用框架
        "typescript": ["react", "vue", "angular", "express", "next.js", "nuxt.js"],
        # Java常用框架
        "java": ["spring", "spring boot", "hibernate", "maven", "gradle"],
        # C++常用框架
        "c++": ["boost", "qt", "wxwidgets", "stl"],
        # C#常用框架
        "c#": [".net", "asp.net", "entity framework", "xamarin"],
        # Go常用框架
        "go": ["gin", "echo", "fiber", "gorilla", "cobra"],
        # Rust常用框架
        "rust": ["tokio", "actix", "rocket", "serde", "clap"],
    },
    # 支持的分析类型
    "analysis_types": [
        "syntax",  # 语法分析
        "complexity",  # 复杂度分析
        "security",  # 安全性分析
        "performance",  # 性能分析
        "style",  # 代码风格分析
        "documentation",  # 文档分析
    ],
    # 支持的文件扩展名
    "file_extensions": [
        ".py",  # Python文件
        ".js",  # JavaScript文件
        ".ts",  # TypeScript文件
        ".java",  # Java文件
        ".cpp",  # C++文件
        ".cs",  # C#文件
        ".go",  # Go文件
        ".rs",  # Rust文件
        ".php",  # PHP文件
        ".rb",  # Ruby文件
    ],
}

访问验证

请输入访问令牌

Token不正确,请重新输入