ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1. Streamable HTTP
  • 2. 传统 HTTP+SSE 传输的问题
    • 2.1. 连接不可恢复性
    • 2.2. 服务器高可用性要求
    • 2.3. 消息传递限制
  • 3. Streamable HTTP 的优势
    • 3.1. 无状态服务器支持
    • 3.2. 纯 HTTP 实现
    • 3.3. 基础设施兼容性
    • 3.4. 灵活的升级路径
  • 4. 核心设计原则
    • 4.1. 端点简化
    • 4.2. 可升级的请求
    • 4.3. 会话管理
    • 4.4. 流式响应
  • 5. 实现场景
    • 5.1. 无状态服务器
    • 5.2. 无状态服务器 + 流式传输
    • 5.3. 有状态服务器
  • 6. 为什么不选择 WebSocket?
  • 7. 向后兼容性
  • 8. 总结
  • 9.参考资料
  • 10. HTTP+SSE 实现
    • 10.1. 服务器
    • 10.2. 客户端
  • 11. Streamable HTTP+SSE 实现
    • 11.1. 服务器
    • 11.2 客户端
  • 12.对比
    • 12.1 1. 架构设计理念
      • 12.1.1 传统 HTTP+SSE 方式
      • 12.1.2 Streamable HTTP 方式
    • 12.2 2. 会话管理模式
      • 12.2.1 SSE方式 - 强制有状态
      • 12.2.2 Streamable方式 - 灵活状态管理
    • 12.3 3. 连接建立方式
      • 12.3.1 SSE方式 - 分离连接
      • 12.3.2 Streamable方式 - 统一连接
    • 12.4 4. 工具调用机制
      • 12.4.1 SSE方式 - 固定响应类型
      • 12.4.2 Streamable方式 - 动态响应类型
    • 12.5 5. 服务器部署灵活性
      • 12.5.1 SSE方式 - 高要求部署
      • 12.5.2 Streamable方式 - 灵活部署
    • 12.6 6. 错误处理和恢复能力
      • 12.6.1 SSE方式 - 脆弱连接
      • 12.6.2 Streamable方式 - 健壮设计
    • 12.7 7. 实际应用场景对比
      • 12.7.1 SSE方式适用场景
      • 12.7.2 Streamable方式适用场景
    • 12.8 8. 代码复杂度对比
      • 12.8.1 SSE方式 - 复杂实现
      • 12.8.2 Streamable方式 - 简化实现
  • 12.9 📊 总结对比表

1. Streamable HTTP #

Model Context Protocol (MCP) 的 Streamable HTTP 传输协议是对传统 HTTP+SSE 传输的重大改进,旨在解决现有传输方式的局限性,同时保持其优势。

2. 传统 HTTP+SSE 传输的问题 #

2.1. 连接不可恢复性 #

  • 一旦 SSE 连接断开,无法自动恢复
  • 需要重新建立连接,丢失所有状态

2.2. 服务器高可用性要求 #

  • 服务器必须维护长连接
  • 对服务器基础设施要求较高

2.3. 消息传递限制 #

  • 只能通过 SSE 传递服务器消息
  • 客户端到服务器的消息传递方式有限

3. Streamable HTTP 的优势 #

3.1. 无状态服务器支持 #

  • 服务器可以选择完全无状态
  • 消除了对高可用性长连接的要求

3.2. 纯 HTTP 实现 #

  • 可以在纯 HTTP 服务器上实现 MCP
  • 不需要特殊的 SSE 支持

3.3. 基础设施兼容性 #

  • "就是 HTTP",确保与中间件和基础设施的兼容性
  • 向后兼容现有实现

3.4. 灵活的升级路径 #

  • 服务器可以选择在需要时使用 SSE 进行流式响应
  • 支持渐进式升级

4. 核心设计原则 #

4.1. 端点简化 #

  • 移除 /sse 端点
  • 所有客户端到服务器的消息通过 /message 端点

4.2. 可升级的请求 #

  • 服务器可以将任何客户端请求升级为 SSE
  • 用于发送通知/请求

4.3. 会话管理 #

  • 服务器可以选择建立会话 ID 来维护状态
  • 客户端在每次请求中传递会话 ID

4.4. 流式响应 #

  • 客户端可以通过向 /message 发送空 GET 请求来启动 SSE 流

5. 实现场景 #

5.1. 无状态服务器 #

完全无状态的服务器,不支持长连接:

# 示例:无状态 MCP 服务器
1. 始终确认初始化(但不需要持久化任何状态)
2. 用单个 JSON-RPC 响应响应任何传入的 ToolListRequest
3. 通过执行工具、等待完成,然后发送单个 CallToolResponse 作为 HTTP 响应体来处理任何 CallToolRequest

5.2. 无状态服务器 + 流式传输 #

完全无状态但支持流式传输的服务器:

# 示例:无状态服务器 + 流式传输
1. 当传入的 POST 请求是 CallToolRequest 时,服务器指示响应将是 SSE
2. 服务器开始执行工具
3. 服务器在工具执行期间通过 SSE 发送任意数量的 ProgressNotification
4. 当工具执行完成时,服务器通过 SSE 发送 CallToolResponse
5. 服务器关闭 SSE 流

5.3. 有状态服务器 #

有状态服务器的实现与今天非常相似:

# 主要区别:
1. 服务器需要生成会话 ID
2. 客户端需要在每个请求中传递该会话 ID
3. 服务器可以使用会话 ID 进行粘性路由或在消息总线上路由消息

6. 为什么不选择 WebSocket? #

核心团队深入讨论了将 WebSocket 作为主要远程传输(而不是 SSE),并对其应用类似的工作以使其可断开和可恢复。最终决定不追求 WS 的原因:

  1. RPC 式使用:如果只是想要以 "RPC 式" 方式使用 MCP(例如,一个只暴露基本工具的无状态 MCP 服务器),WebSocket 会产生很多不必要的操作和网络开销
  2. 基础设施兼容性:HTTP 在基础设施层面有更好的兼容性
  3. 渐进式升级:Streamable HTTP 提供了从简单到复杂的渐进式升级路径

7. 向后兼容性 #

这个提案可以向后兼容地实现,允许服务器根据需要选择完全无状态。现有的 HTTP+SSE 实现可以逐步迁移到新的协议。

8. 总结 #

Streamable HTTP 传输协议为 MCP 带来了重要的改进:

  • 灵活性:支持从完全无状态到完全有状态的服务器实现
  • 兼容性:与现有 HTTP 基础设施完全兼容
  • 可扩展性:支持渐进式功能升级
  • 性能:消除了不必要的连接开销

这个新协议为 MCP 在更广泛的基础设施环境中的部署铺平了道路,同时保持了其核心优势。

9.参考资料 #

  • RFC PR #206
  • MCP 官方文档
  • GitHub 讨论

10. HTTP+SSE 实现 #

10.1. 服务器 #

# 导入异步IO库
import asyncio

# 导入json库用于数据序列化
import json

# 导入uuid库用于生成唯一会话ID
import uuid

# 导入Starlette应用相关模块
from starlette.applications import Starlette

# 导入路由模块
from starlette.routing import Route

# 导入响应类型
from starlette.responses import JSONResponse, StreamingResponse

# 导入请求对象
from starlette.requests import Request

# 导入uvicorn服务器
import uvicorn


# 定义传统HTTP+SSE MCP服务器类
class TraditionalMCPServer:
    """传统的 HTTP+SSE MCP 服务器实现"""

    # 构造函数,初始化连接和会话字典
    def __init__(self):
        self.active_connections = {}
        self.sessions = {}

    # 处理初始化请求
    async def handle_initialize(self, request: Request):
        """处理初始化请求"""
        # 解析请求体为json
        data = await request.json()

        # 创建唯一会话ID
        session_id = str(uuid.uuid4())
        # 保存会话信息
        self.sessions[session_id] = {
            "protocol_version": data.get("protocolVersion", "2024-11-05"),
            "capabilities": data.get("capabilities", {}),
            "client_info": data.get("clientInfo", {}),
            "created_at": asyncio.get_event_loop().time(),
        }

        # 为该会话创建消息队列
        self.active_connections[session_id] = asyncio.Queue()

        # 返回初始化响应,包含会话ID
        return JSONResponse(
            {
                "jsonrpc": "2.0",
                "id": data.get("id"),
                "result": {
                    "protocolVersion": "2024-11-05",
                    "capabilities": {"tools": {}, "resources": {}, "prompts": {}},
                    "serverInfo": {
                        "name": "Traditional MCP Server",
                        "version": "1.0.0",
                        "session_id": session_id,  # 返回会话ID
                    },
                },
            }
        )

    # 处理工具列表请求
    async def handle_tool_list(self, request: Request):
        """处理工具列表请求"""
        # 解析请求体
        data = await request.json()
        # 获取请求头中的会话ID
        session_id = request.headers.get("X-Session-ID")

        # 校验会话ID有效性
        if not session_id or session_id not in self.sessions:
            return JSONResponse({"error": "Invalid session"}, status_code=400)

        # 返回可用工具列表
        return JSONResponse(
            {
                "jsonrpc": "2.0",
                "id": data.get("id"),
                "result": {
                    "tools": [
                        {
                            "name": "echo",
                            "description": "Echo back the input",
                            "inputSchema": {
                                "type": "object",
                                "properties": {"message": {"type": "string"}},
                                "required": ["message"],
                            },
                        },
                        {
                            "name": "long_running_task",
                            "description": "A long running task that sends progress updates",
                            "inputSchema": {
                                "type": "object",
                                "properties": {
                                    "duration": {"type": "integer", "default": 10}
                                },
                            },
                        },
                    ]
                },
            }
        )

    # 处理工具调用请求
    async def handle_tool_call(self, request: Request):
        """处理工具调用请求"""
        # 解析请求体
        data = await request.json()
        # 获取会话ID
        session_id = request.headers.get("X-Session-ID")

        # 校验会话ID
        if not session_id or session_id not in self.sessions:
            return JSONResponse({"error": "Invalid session"}, status_code=400)

        # 获取工具名称
        tool_name = data["params"]["name"]

        # 如果是echo工具,直接返回结果
        if tool_name == "echo":
            # 简单工具,直接返回结果
            return JSONResponse(
                {
                    "jsonrpc": "2.0",
                    "id": data.get("id"),
                    "result": {
                        "content": [
                            {
                                "type": "text",
                                "text": f"Echo: {data['params']['arguments']['message']}",
                            }
                        ]
                    },
                }
            )

        # 如果是长运行任务,走SSE流式返回
        elif tool_name == "long_running_task":
            # 长运行任务,通过 SSE 发送进度更新
            duration = data["params"]["arguments"].get("duration", 10)
            return await self.handle_long_running_task(data, session_id, duration)

        # 未知工具,返回错误
        return JSONResponse({"error": "Unknown tool"}, status_code=400)

    # 处理长运行任务,通过SSE发送进度
    async def handle_long_running_task(self, data, session_id, duration):
        """处理长运行任务,通过 SSE 发送进度更新"""

        # 定义异步生成器,逐步推送进度
        async def progress_stream():
            # 发送开始通知
            yield f"data: {json.dumps({
                'jsonrpc': '2.0',
                'method': 'notifications/progress',
                'params': {
                    'type': 'begin',
                    'title': 'Long Running Task',
                    'message': 'Starting task...'
                }
            })}\n\n"

            # 模拟任务执行,逐步发送进度
            for i in range(duration):
                await asyncio.sleep(1)
                progress = (i + 1) / duration * 100

                yield f"data: {json.dumps({
                    'jsonrpc': '2.0',
                    'method': 'notifications/progress',
                    'params': {
                        'type': 'update',
                        'title': 'Long Running Task',
                        'message': f'Progress: {progress:.1f}%',
                        'percentage': progress
                    }
                })}\n\n"

            # 发送完成通知
            yield f"data: {json.dumps({
                'jsonrpc': '2.0',
                'method': 'notifications/progress',
                'params': {
                    'type': 'end',
                    'title': 'Long Running Task',
                    'message': 'Task completed!'
                }
            })}\n\n"

            # 发送最终结果
            yield f"data: {json.dumps({
                'jsonrpc': '2.0',
                'id': data.get('id'),
                'result': {
                    'content': [
                        {
                            'type': 'text',
                            'text': f'Task completed after {duration} seconds!'
                        }
                    ]
                }
            })}\n\n"

        # 返回流式响应
        return StreamingResponse(
            progress_stream(),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
                "X-Session-ID": session_id,
            },
        )

    # 处理SSE连接请求
    async def handle_sse_connection(self, request: Request):
        """处理 SSE 连接"""
        # 获取会话ID
        session_id = request.headers.get("X-Session-ID")

        # 校验会话ID
        if not session_id or session_id not in self.sessions:
            return JSONResponse({"error": "Invalid session"}, status_code=400)

        # 定义SSE流生成器
        async def sse_stream():
            queue = self.active_connections[session_id]

            # 发送连接确认消息
            yield f"data: {json.dumps({
                'jsonrpc': '2.0',
                'method': 'notifications/connection',
                'params': {
                    'status': 'connected',
                    'session_id': session_id
                }
            })}\n\n"

            try:
                while True:
                    # 等待消息队列中的消息
                    message = await asyncio.wait_for(queue.get(), timeout=30.0)
                    yield f"data: {json.dumps(message)}\n\n"
            except asyncio.TimeoutError:
                # 超时则发送心跳包保持连接
                yield f"data: {json.dumps({
                    'jsonrpc': '2.0',
                    'method': 'notifications/heartbeat',
                    'params': {'timestamp': asyncio.get_event_loop().time()}
                })}\n\n"

        # 返回SSE流响应
        return StreamingResponse(
            sse_stream(),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
                "X-Session-ID": session_id,
            },
        )

    # 处理关闭请求
    async def handle_shutdown(self, request: Request):
        """处理关闭请求"""
        # 解析请求体
        data = await request.json()
        # 获取会话ID
        session_id = request.headers.get("X-Session-ID")

        # 删除会话信息
        if session_id in self.sessions:
            del self.sessions[session_id]

        # 删除消息队列
        if session_id in self.active_connections:
            del self.active_connections[session_id]

        # 返回关闭响应
        return JSONResponse({"jsonrpc": "2.0", "id": data.get("id"), "result": {}})


# 创建Starlette应用
def create_app():
    """创建 Starlette 应用"""
    # 实例化服务器对象
    server = TraditionalMCPServer()

    # 定义各个路由的处理函数
    async def initialize(request: Request):
        return await server.handle_initialize(request)

    async def tool_list(request: Request):
        return await server.handle_tool_list(request)

    async def tool_call(request: Request):
        return await server.handle_tool_call(request)

    async def sse_connection(request: Request):
        return await server.handle_sse_connection(request)

    async def shutdown(request: Request):
        return await server.handle_shutdown(request)

    async def homepage(request: Request):
        # 返回主页信息
        return JSONResponse(
            {
                "message": "Traditional HTTP+SSE MCP Server",
                "endpoints": {
                    "/initialize": "POST - Initialize MCP connection",
                    "/tools/list": "POST - List available tools",
                    "/tools/call": "POST - Call a tool",
                    "/sse": "GET - Establish SSE connection",
                    "/shutdown": "POST - Shutdown connection",
                },
                "note": "This server demonstrates the limitations of traditional HTTP+SSE transport",
            }
        )

    # 返回Starlette应用对象,注册路由
    return Starlette(
        routes=[
            Route("/", homepage),
            Route("/initialize", initialize, methods=["POST"]),
            Route("/tools/list", tool_list, methods=["POST"]),
            Route("/tools/call", tool_call, methods=["POST"]),
            Route("/sse", sse_connection, methods=["GET"]),
            Route("/shutdown", shutdown, methods=["POST"]),
        ]
    )


# 主程序入口
if __name__ == "__main__":
    # 创建应用
    app = create_app()
    # 打印服务器启动信息
    print("启动传统 HTTP+SSE MCP 服务器...")
    print("特点:")
    print("   - 需要维护长连接")
    print("   - 连接断开后无法恢复")
    print("   - 服务器必须保持高可用性")
    print("   - 只能通过 SSE 发送服务器消息")
    print("\n服务器地址: http://127.0.0.1:8000")

    # 启动uvicorn服务器
    uvicorn.run(app, host="127.0.0.1", port=8000)

10.2. 客户端 #

# 导入异步IO库
import asyncio

# 导入json库用于数据序列化和反序列化
import json

# 导入aiohttp库用于异步HTTP请求
import aiohttp


# 定义传统的 HTTP+SSE MCP 客户端类
class TraditionalMCPClient:
    """传统的 HTTP+SSE MCP 客户端实现"""

    # 构造函数,初始化服务器地址和相关属性
    def __init__(self, server_url: str = "http://127.0.0.1:8000"):
        self.server_url = server_url  # 服务器地址
        self.session_id = None  # 会话ID
        self.sse_task = None  # SSE监听任务
        self.session = None  # aiohttp会话对象

    # 异步上下文管理器入口
    async def __aenter__(self):
        """异步上下文管理器入口"""
        self.session = aiohttp.ClientSession()  # 创建aiohttp会话
        return self

    # 异步上下文管理器出口
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口"""
        if self.sse_task:
            self.sse_task.cancel()  # 取消SSE监听任务
        if self.session:
            await self.session.close()  # 关闭aiohttp会话

    # 初始化MCP连接
    async def initialize(self) -> bool:
        """初始化 MCP 连接"""
        try:
            # 构造初始化请求的payload
            payload = {
                "jsonrpc": "2.0",
                "id": 1,
                "method": "initialize",
                "params": {
                    "protocolVersion": "2024-11-05",
                    "capabilities": {"tools": {}, "resources": {}, "prompts": {}},
                    "clientInfo": {
                        "name": "Traditional MCP Client",
                        "version": "1.0.0",
                    },
                },
            }

            # 发送POST请求到/initialize接口
            async with self.session.post(
                f"{self.server_url}/initialize", json=payload
            ) as response:
                # 判断响应状态码
                if response.status == 200:
                    result = await response.json()
                    # 从服务器响应中获取会话ID
                    self.session_id = result["result"]["serverInfo"].get("session_id")

                    print("连接初始化成功")
                    print(f"   协议版本: {result['result']['protocolVersion']}")
                    print(f"   服务器: {result['result']['serverInfo']['name']}")
                    print(f"   会话ID: {self.session_id}")
                    return True
                else:
                    print(f" 连接初始化失败: {response.status}")
                    return False

        except Exception as e:
            print(f" 连接初始化异常: {e}")
            return False

    # 获取可用工具列表
    async def list_tools(self) -> list:
        """获取可用工具列表"""
        if not self.session_id:
            print(" 请先初始化连接")
            return []

        try:
            # 构造请求payload
            payload = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}

            # 设置请求头,包含会话ID
            headers = {"X-Session-ID": self.session_id}
            # 发送POST请求到/tools/list接口
            async with self.session.post(
                f"{self.server_url}/tools/list", json=payload, headers=headers
            ) as response:
                # 判断响应状态码
                if response.status == 200:
                    result = await response.json()
                    tools = result["result"]["tools"]
                    print(f"获取到 {len(tools)} 个工具:")
                    for tool in tools:
                        print(f"   - {tool['name']}: {tool['description']}")
                    return tools
                else:
                    print(f" 获取工具列表失败: {response.status}")
                    return []

        except Exception as e:
            print(f" 获取工具列表异常: {e}")
            return []

    # 调用指定工具
    async def call_tool(self, tool_name: str, arguments):
        """调用工具"""
        if not self.session_id:
            print(" 请先初始化连接")
            return None

        try:
            # 构造请求payload
            payload = {
                "jsonrpc": "2.0",
                "id": 3,
                "method": "tools/call",
                "params": {"name": tool_name, "arguments": arguments},
            }

            # 设置请求头,包含会话ID
            headers = {"X-Session-ID": self.session_id}
            # 发送POST请求到/tools/call接口
            async with self.session.post(
                f"{self.server_url}/tools/call", json=payload, headers=headers
            ) as response:
                # 判断响应状态码
                if response.status == 200:
                    content_type = response.headers.get("content-type", "")

                    # 判断是否为SSE流式响应
                    if "text/event-stream" in content_type:
                        # SSE 流式响应
                        print(f" 工具 {tool_name} 返回流式响应...")
                        return await self._handle_sse_response(response)
                    else:
                        # 普通 JSON 响应
                        try:
                            result = await response.json()
                            print(f" 工具 {tool_name} 调用成功")
                            return result
                        except Exception as json_error:
                            print(f"⚠️  JSON 解析失败: {json_error}")
                            # 尝试读取原始响应内容
                            text_content = await response.text()
                            print(f"   原始响应: {text_content[:200]}...")
                            return {
                                "error": "JSON parsing failed",
                                "raw_content": text_content,
                            }
                else:
                    print(f" 工具调用失败: {response.status}")
                    return None

        except Exception as e:
            print(f" 工具调用异常: {e}")
            return None

    # 处理SSE响应
    async def _handle_sse_response(self, response):
        """处理 SSE 响应"""
        print(" 接收到 SSE 流式响应...")
        result = None

        try:
            # 异步遍历SSE流中的每一行
            async for line in response.content:
                line = line.decode("utf-8").strip()
                # 判断是否为SSE数据行
                if line.startswith("data: "):
                    try:
                        # 解析JSON数据,去除前缀
                        data = json.loads(line[6:])  # 移除 'data: ' 前缀

                        # 判断消息类型
                        if "method" in data:
                            # 处理进度通知
                            if "notifications/progress" in data["method"]:
                                progress_data = data["params"]
                                if progress_data["type"] == "begin":
                                    print(f" 任务开始: {progress_data['title']}")
                                elif progress_data["type"] == "update":
                                    print(f" 进度更新: {progress_data['message']}")
                                elif progress_data["type"] == "end":
                                    print(f" 任务完成: {progress_data['title']}")
                            # 处理心跳通知
                            elif "notifications/heartbeat" in data["method"]:
                                print(
                                    f" 心跳: {data['params'].get('timestamp', 'N/A')}"
                                )

                        # 处理最终结果
                        elif "result" in data:
                            # 结果消息
                            result = data
                            print(
                                f"📋 最终结果: {data['result']['content'][0]['text']}"
                            )
                            break

                    except json.JSONDecodeError as e:
                        print(f"⚠️  SSE 数据解析失败: {e}, 原始数据: {line}")
                        continue

        except Exception as e:
            print(f"⚠️  SSE 流处理异常: {e}")

        # 返回结果或错误信息
        return result or {"error": "No result received from SSE stream"}

    # 建立SSE连接
    async def establish_sse_connection(self) -> bool:
        """建立 SSE 连接"""
        if not self.session_id:
            print(" 请先初始化连接")
            return False

        try:
            # 设置请求头,包含会话ID
            headers = {"X-Session-ID": self.session_id}
            # 发送GET请求到/sse接口
            async with self.session.get(
                f"{self.server_url}/sse", headers=headers
            ) as response:
                # 判断响应状态码
                if response.status == 200:
                    print(" SSE 连接建立成功")

                    # 启动 SSE 监听任务
                    self.sse_task = asyncio.create_task(self._listen_sse(response))
                    return True
                else:
                    print(f" SSE 连接建立失败: {response.status}")
                    return False

        except Exception as e:
            print(f" SSE 连接异常: {e}")
            return False

    # 监听SSE消息
    async def _listen_sse(self, response):
        """监听 SSE 消息"""
        try:
            # 异步遍历SSE流中的每一行
            async for line in response.content:
                line = line.decode("utf-8").strip()
                # 判断是否为SSE数据行
                if line.startswith("data: "):
                    try:
                        data = json.loads(line[6:])
                        if "method" in data:
                            print(f"📨 SSE 消息: {data['method']}")
                    except json.JSONDecodeError:
                        continue
        except asyncio.CancelledError:
            print(" SSE 连接已取消")
        except Exception as e:
            # 检查是否是正常的连接关闭
            if "Connection closed" in str(e) or "Connection reset" in str(e):
                print(" SSE 连接正常关闭")
            else:
                print(f" SSE 监听异常: {e}")

    # 关闭连接
    async def shutdown(self) -> bool:
        """关闭连接"""
        if not self.session_id:
            return True

        try:
            # 构造关闭请求的payload
            payload = {"jsonrpc": "2.0", "id": 4, "method": "shutdown", "params": {}}

            # 设置请求头,包含会话ID
            headers = {"X-Session-ID": self.session_id}
            # 发送POST请求到/shutdown接口
            async with self.session.post(
                f"{self.server_url}/shutdown", json=payload, headers=headers
            ) as response:
                # 判断响应状态码
                if response.status == 200:
                    print("连接关闭成功")
                    self.session_id = None
                    return True
                else:
                    print(f" 连接关闭失败: {response.status}")
                    return False

        except Exception as e:
            print(f" 连接关闭异常: {e}")
            return False


# 演示传统 HTTP+SSE MCP 的工作流程
async def demo_traditional_mcp():
    """演示传统 HTTP+SSE MCP 的工作流程"""
    print(" 传统 HTTP+SSE MCP 客户端演示")
    print("=" * 50)

    # 使用异步上下文管理器创建客户端
    async with TraditionalMCPClient() as client:
        # 1. 初始化连接
        print("\n 1. 初始化连接...")
        if not await client.initialize():
            print(" 无法继续演示")
            return

        # 2. 获取工具列表
        print("\n 2. 获取工具列表...")
        tools = await client.list_tools()
        if not tools:
            print(" 无法获取工具列表")
            return

        # 3. 调用简单工具
        print("\n 3. 调用简单工具 (echo)...")
        result = await client.call_tool("echo", {"message": "Hello, Traditional MCP!"})
        if result:
            print(f"   结果: {result}")

        # 4. 建立 SSE 连接
        print("\n 4. 建立 SSE 连接...")
        if await client.establish_sse_connection():
            # 等待一下让连接稳定
            await asyncio.sleep(1)

        # 5. 调用长运行工具
        print("\n 5. 调用长运行工具...")
        result = await client.call_tool("long_running_task", {"duration": 5})
        if result:
            print(f"   最终结果: {result}")

        # 6. 关闭连接
        print("\n 6. 关闭连接...")
        await client.shutdown()

    print("\n" + "=" * 50)
    print(" 传统 HTTP+SSE 传输的特点总结:")
    print("    连接断开后无法恢复")
    print("    服务器必须维护长连接")
    print("    对服务器基础设施要求高")
    print("    只能通过 SSE 发送服务器消息")


# 主程序入口
if __name__ == "__main__":
    print("启动传统 HTTP+SSE MCP 客户端演示...")
    print(" 请确保服务器已启动: python http_sse_server.py")
    print()

    try:
        asyncio.run(demo_traditional_mcp())
    except KeyboardInterrupt:
        print("\n 演示被用户中断")
    except Exception as e:
        print(f"\n 演示异常: {e}")

11. Streamable HTTP+SSE 实现 #

11.1. 服务器 #

# 导入异步IO库
import asyncio

# 导入JSON处理库
import json

# 导入UUID库用于生成唯一会话ID
import uuid

# 从starlette导入应用、路由、响应等相关模块
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import JSONResponse, StreamingResponse
from starlette.requests import Request

# 导入uvicorn用于运行ASGI服务器
import uvicorn


# 定义StreamableHTTPMCPServer类,实现MCP服务器
class StreamableHTTPMCPServer:
    """Streamable HTTP MCP 服务器实现"""

    # 构造函数,初始化服务器状态
    def __init__(self):
        # 是否为无状态模式,默认为有状态
        self.stateless_mode = False  # 可以设置为 False 来启用有状态模式
        # 会话信息存储字典
        self.sessions = {}  # 会话列表
        # 消息队列存储字典
        self.message_queues = {}  # 消息队列

    # 生成会话ID的方法
    def _generate_session_id(self) -> str:
        """生成会话 ID(仅在非无状态模式下使用)"""
        # 如果是有状态模式则生成UUID,否则返回None
        return str(uuid.uuid4()) if not self.stateless_mode else None

    # 处理初始化请求
    async def handle_initialize(self, request: Request):
        """处理初始化请求"""
        # 解析请求体中的JSON数据
        data = await request.json()

        # 判断是否为无状态模式
        if self.stateless_mode:
            # 无状态模式:不创建持久会话
            print(" 无状态模式:初始化请求已确认,但不创建持久会话")
            # 返回初始化确认响应
            return JSONResponse(
                {
                    "jsonrpc": "2.0",
                    "id": data.get("id"),
                    "result": {
                        "protocolVersion": "2024-11-05",
                        "capabilities": {"tools": {}, "resources": {}, "prompts": {}},
                        "serverInfo": {
                            "name": "Streamable HTTP MCP Server",
                            "version": "1.0.0",
                            "mode": "stateless",
                        },
                    },
                }
            )
        else:
            # 有状态模式:创建会话
            session_id = self._generate_session_id()
            # 存储会话信息
            self.sessions[session_id] = {
                "protocol_version": data.get("protocolVersion", "2024-11-05"),
                "capabilities": data.get("capabilities", {}),
                "client_info": data.get("clientInfo", {}),
                "created_at": asyncio.get_event_loop().time(),
            }
            # 为该会话创建消息队列
            self.message_queues[session_id] = asyncio.Queue()

            # 返回初始化响应,包含会话ID
            return JSONResponse(
                {
                    "jsonrpc": "2.0",
                    "id": data.get("id"),
                    "result": {
                        "protocolVersion": "2024-11-05",
                        "capabilities": {"tools": {}, "resources": {}, "prompts": {}},
                        "serverInfo": {
                            "name": "Streamable HTTP MCP Server",
                            "version": "1.0.0",
                            "mode": "stateful",
                            "session_id": session_id,
                        },
                    },
                }
            )

    # 处理工具列表请求
    async def handle_tool_list(self, request: Request):
        """处理工具列表请求"""
        # 解析请求体中的JSON数据
        data = await request.json()

        # 如果是有状态模式,校验会话ID
        if not self.stateless_mode:
            session_id = request.headers.get("X-Session-ID")
            if not session_id or session_id not in self.sessions:
                # 会话无效,返回错误
                return JSONResponse({"error": "Invalid session"}, status_code=400)

        # 工具列表对于无状态和有状态服务器都是一样的
        return JSONResponse(
            {
                "jsonrpc": "2.0",
                "id": data.get("id"),
                "result": {
                    "tools": [
                        {
                            "name": "echo",
                            "description": "Echo back the input (stateless)",
                            "inputSchema": {
                                "type": "object",
                                "properties": {"message": {"type": "string"}},
                                "required": ["message"],
                            },
                        },
                        {
                            "name": "streaming_task",
                            "description": "A task that can stream progress (upgradable to SSE)",
                            "inputSchema": {
                                "type": "object",
                                "properties": {
                                    "duration": {"type": "integer", "default": 5},
                                    "stream": {"type": "boolean", "default": False},
                                },
                            },
                        },
                        {
                            "name": "stateful_counter",
                            "description": "A stateful counter (only works in stateful mode)",
                            "inputSchema": {
                                "type": "object",
                                "properties": {
                                    "action": {
                                        "type": "string",
                                        "enum": ["increment", "decrement", "get"],
                                    }
                                },
                            },
                        },
                    ]
                },
            }
        )

    # 处理工具调用请求
    async def handle_tool_call(self, request: Request):
        """处理工具调用请求"""
        # 解析请求体中的JSON数据
        data = await request.json()
        # 获取要调用的工具名称
        tool_name = data["params"]["name"]

        # 如果是有状态模式,校验会话ID
        if not self.stateless_mode:
            session_id = request.headers.get("X-Session-ID")
            if not session_id or session_id not in self.sessions:
                # 会话无效,返回错误
                return JSONResponse({"error": "Invalid session"}, status_code=400)

        # 处理echo工具
        if tool_name == "echo":
            # 简单工具,直接返回结果(无状态)
            return JSONResponse(
                {
                    "jsonrpc": "2.0",
                    "id": data.get("id"),
                    "result": {
                        "content": [
                            {
                                "type": "text",
                                "text": f"Echo: {data['params']['arguments']['message']}",
                            }
                        ]
                    },
                }
            )

        # 处理streaming_task工具
        elif tool_name == "streaming_task":
            # 流式任务,可以选择是否升级为 SSE
            arguments = data["params"]["arguments"]
            # 获取任务持续时间
            duration = arguments.get("duration", 5)
            # 是否使用流式
            use_streaming = arguments.get("stream", False)

            if use_streaming:
                # 升级为 SSE 流式响应
                return await self._handle_streaming_task(data, duration)
            else:
                # 普通响应,等待任务完成
                result = await self._execute_task_sync(duration)
                return JSONResponse(
                    {
                        "jsonrpc": "2.0",
                        "id": data.get("id"),
                        "result": {
                            "content": [
                                {
                                    "type": "text",
                                    "text": f"Task completed after {duration} seconds: {result}",
                                }
                            ]
                        },
                    }
                )

        # 处理stateful_counter工具
        elif tool_name == "stateful_counter":
            # 如果是无状态模式,返回错误
            if self.stateless_mode:
                return JSONResponse(
                    {
                        "jsonrpc": "2.0",
                        "id": data.get("id"),
                        "error": {
                            "code": -32601,
                            "message": "Stateful counter not available in stateless mode",
                        },
                    }
                )

            # 有状态计数器
            session_id = request.headers.get("X-Session-ID")
            # 获取操作类型
            action = data["params"]["arguments"]["action"]

            # 如果会话中没有计数器,初始化为0
            if "counter" not in self.sessions[session_id]:
                self.sessions[session_id]["counter"] = 0

            # 根据操作类型修改计数器
            if action == "increment":
                self.sessions[session_id]["counter"] += 1
            elif action == "decrement":
                self.sessions[session_id]["counter"] -= 1

            # 获取当前计数器值
            counter_value = self.sessions[session_id]["counter"]

            # 返回计数器操作结果
            return JSONResponse(
                {
                    "jsonrpc": "2.0",
                    "id": data.get("id"),
                    "result": {
                        "content": [
                            {
                                "type": "text",
                                "text": f"Counter {action}ed. Current value: {counter_value}",
                            }
                        ]
                    },
                }
            )

        # 未知工具,返回错误
        return JSONResponse({"error": "Unknown tool"}, status_code=400)

    # 同步执行任务(异步等待一段时间)
    async def _execute_task_sync(self, duration: int) -> str:
        """同步执行任务"""
        # 异步等待指定秒数
        await asyncio.sleep(duration)
        # 返回任务结果字符串
        return f"Task result at {asyncio.get_event_loop().time()}"

    # 处理流式任务(SSE)
    async def _handle_streaming_task(self, data, duration):
        """处理流式任务"""

        # 定义异步生成器,逐步推送进度
        async def stream_progress():
            # 发送开始通知
            yield f"data: {json.dumps({
                'jsonrpc': '2.0',
                'method': 'notifications/progress',
                'params': {
                    'type': 'begin',
                    'title': 'Streaming Task',
                    'message': 'Starting streaming task...'
                }
            })}\n\n"

            # 模拟任务执行,发送进度更新
            for i in range(duration):
                await asyncio.sleep(1)
                # 计算进度百分比
                progress = (i + 1) / duration * 100

                # 发送进度更新通知
                yield f"data: {json.dumps({
                    'jsonrpc': '2.0',
                    'method': 'notifications/progress',
                    'params': {
                        'type': 'update',
                        'title': 'Streaming Task',
                        'message': f'Progress: {progress:.1f}%',
                        'percentage': progress
                    }
                })}\n\n"

            # 发送完成通知
            yield f"data: {json.dumps({
                'jsonrpc': '2.0',
                'method': 'notifications/progress',
                'params': {
                    'type': 'end',
                    'title': 'Streaming Task',
                    'message': 'Streaming task completed!'
                }
            })}\n\n"

            # 发送最终结果
            yield f"data: {json.dumps({
                'jsonrpc': '2.0',
                'id': data.get('id'),
                'result': {
                    'content': [
                        {
                            'type': 'text',
                            'text': f'Streaming task completed after {duration} seconds!'
                        }
                    ]
                }
            })}\n\n"

        # 返回StreamingResponse,SSE流式响应
        return StreamingResponse(
            stream_progress(),
            media_type="text/event-stream",
            headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
        )

    # 处理统一的消息端点
    async def handle_message_endpoint(self, request: Request):
        """处理统一的消息端点"""
        # 如果是GET请求,建立SSE流
        if request.method == "GET":
            # GET 请求:建立 SSE 流(可选)
            return await self._handle_sse_connection(request)
        # 如果是POST请求,处理MCP消息
        elif request.method == "POST":
            # POST 请求:处理 MCP 消息
            return await self._handle_mcp_message(request)
        # 其他方法不允许
        else:
            return JSONResponse({"error": "Method not allowed"}, status_code=405)

    # 处理SSE连接
    async def _handle_sse_connection(self, request: Request):
        """处理 SSE 连接(可选功能)"""
        # 无状态模式不支持SSE
        if self.stateless_mode:
            return JSONResponse(
                {"error": "SSE connections not supported in stateless mode"},
                status_code=400,
            )

        # 获取会话ID
        session_id = request.headers.get("X-Session-ID")
        # 校验会话ID
        if not session_id or session_id not in self.sessions:
            return JSONResponse({"error": "Invalid session"}, status_code=400)

        # 定义SSE流生成器
        async def sse_stream():
            # 获取该会话的消息队列
            queue = self.message_queues[session_id]

            # 发送连接确认
            yield f"data: {json.dumps({
                'jsonrpc': '2.0',
                'method': 'notifications/connection',
                'params': {
                    'status': 'connected',
                    'session_id': session_id,
                    'mode': 'stateful'
                }
            })}\n\n"

            try:
                while True:
                    # 等待消息,超时30秒
                    message = await asyncio.wait_for(queue.get(), timeout=30.0)
                    # 推送消息
                    yield f"data: {json.dumps(message)}\n\n"
            except asyncio.TimeoutError:
                # 超时则发送心跳保持连接
                yield f"data: {json.dumps({
                    'jsonrpc': '2.0',
                    'method': 'notifications/heartbeat',
                    'params': {'timestamp': asyncio.get_event_loop().time()}
                })}\n\n"

        # 返回StreamingResponse,SSE流式响应
        return StreamingResponse(
            sse_stream(),
            media_type="text/event-stream",
            headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
        )

    # 处理MCP消息
    async def _handle_mcp_message(self, request: Request):
        """处理 MCP 消息"""
        try:
            # 解析请求体中的JSON数据
            data = await request.json()
            # 获取方法名
            method = data.get("method", "")

            # 根据方法名分发处理
            if method == "initialize":
                return await self.handle_initialize(request)
            elif method == "tools/list":
                return await self.handle_tool_list(request)
            elif method == "tools/call":
                return await self.handle_tool_call(request)
            else:
                # 未知方法,返回错误
                return JSONResponse(
                    {
                        "jsonrpc": "2.0",
                        "id": data.get("id"),
                        "error": {
                            "code": -32601,
                            "message": f"Method not found: {method}",
                        },
                    }
                )
        except Exception as e:
            # 解析异常,返回错误
            return JSONResponse(
                {
                    "jsonrpc": "2.0",
                    "id": request.query_params.get("id"),
                    "error": {"code": -32700, "message": f"Parse error: {str(e)}"},
                },
                status_code=400,
            )

    # 处理关闭请求
    async def handle_shutdown(self, request: Request):
        """处理关闭请求"""
        # 无状态模式无需清理
        if self.stateless_mode:
            return JSONResponse(
                {
                    "jsonrpc": "2.0",
                    "id": 1,
                    "result": {"message": "Stateless server - no cleanup needed"},
                }
            )

        # 解析请求体中的JSON数据
        data = await request.json()
        # 获取会话ID
        session_id = request.headers.get("X-Session-ID")

        # 删除会话信息
        if session_id in self.sessions:
            del self.sessions[session_id]

        # 删除消息队列
        if session_id in self.message_queues:
            del self.message_queues[session_id]

        # 返回清理完成响应
        return JSONResponse(
            {
                "jsonrpc": "2.0",
                "id": data.get("id"),
                "result": {"message": "Session cleaned up"},
            }
        )


# 创建Starlette应用
def create_app():
    """创建 Starlette 应用"""
    # 实例化服务器对象
    server = StreamableHTTPMCPServer()

    # 定义初始化端点处理函数
    async def initialize(request: Request):
        return await server.handle_initialize(request)

    # 定义工具列表端点处理函数
    async def tool_list(request: Request):
        return await server.handle_tool_list(request)

    # 定义工具调用端点处理函数
    async def tool_call(request: Request):
        return await server.handle_tool_call(request)

    # 定义统一消息端点处理函数
    async def message_endpoint(request: Request):
        return await server.handle_message_endpoint(request)

    # 定义关闭端点处理函数
    async def shutdown(request: Request):
        return await server.handle_shutdown(request)

    # 定义主页端点处理函数
    async def homepage(request: Request):
        return JSONResponse(
            {
                "message": "Streamable HTTP MCP Server",
                "endpoints": {
                    "/initialize": "POST - Initialize MCP connection",
                    "/tools/list": "POST - List available tools",
                    "/tools/call": "POST - Call a tool",
                    "/message": "GET/POST - Unified message endpoint (SSE + MCP)",
                    "/shutdown": "POST - Shutdown connection (stateful mode only)",
                },
                "features": {
                    "stateless_mode": server.stateless_mode,
                    "unified_endpoint": True,
                    "upgradable_requests": True,
                    "flexible_session_management": True,
                },
                "note": "This server demonstrates the advantages of Streamable HTTP transport",
            }
        )

    # 返回Starlette应用对象,注册各路由
    return Starlette(
        routes=[
            Route("/", homepage),
            Route("/initialize", initialize, methods=["POST"]),
            Route("/tools/list", tool_list, methods=["POST"]),
            Route("/tools/call", tool_call, methods=["POST"]),
            Route("/message", message_endpoint, methods=["GET", "POST"]),
            Route("/shutdown", shutdown, methods=["POST"]),
        ]
    )


# 主程序入口
if __name__ == "__main__":
    # 创建应用
    app = create_app()
    # 打印服务器启动信息
    print("启动 Streamable HTTP MCP 服务器...")
    print(" 特点:")
    print("   支持无状态和有状态模式")
    print("   统一的 /message 端点")
    print("   可升级的请求(支持 SSE)")
    print("   灵活的会话管理")
    print("   与现有 HTTP 基础设施完全兼容")
    print("   渐进式功能升级")
    print("\n🌐 服务器地址: http://127.0.0.1:8000")
    print("📖 查看 API 文档: http://127.0.0.1:8000")

    # 启动uvicorn服务器
    uvicorn.run(app, host="127.0.0.1", port=8000)

11.2 客户端 #

import asyncio
import json
import aiohttp


class StreamableHTTPMCPClient:
    """Streamable HTTP MCP 客户端实现"""

    def __init__(self, server_url: str = "http://127.0.0.1:8000"):
        self.server_url = server_url
        self.session_id = None
        self.sse_task = None
        self.session = None
        self.server_mode = None

    async def __aenter__(self):
        """异步上下文管理器入口"""
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口"""
        if self.sse_task:
            self.sse_task.cancel()
        if self.session:
            await self.session.close()

    async def initialize(self) -> bool:
        """初始化 MCP 连接"""
        try:
            payload = {
                "jsonrpc": "2.0",
                "id": 1,
                "method": "initialize",
                "params": {
                    "protocolVersion": "2024-11-05",
                    "capabilities": {"tools": {}, "resources": {}, "prompts": {}},
                    "clientInfo": {
                        "name": "Streamable HTTP MCP Client",
                        "version": "1.0.0",
                    },
                },
            }

            async with self.session.post(
                f"{self.server_url}/initialize", json=payload
            ) as response:
                if response.status == 200:
                    result = await response.json()
                    self.server_mode = result["result"]["serverInfo"]["mode"]

                    if self.server_mode == "stateful":
                        # 有状态模式:从服务器信息中获取会话 ID
                        self.session_id = result["result"]["serverInfo"].get(
                            "session_id"
                        )

                    print(" 连接初始化成功")
                    print(f"   协议版本: {result['result']['protocolVersion']}")
                    print(f"   服务器: {result['result']['serverInfo']['name']}")
                    print(f"   服务器模式: {self.server_mode}")
                    if self.session_id:
                        print(f"   会话 ID: {self.session_id}")
                    return True
                else:
                    print(f"❌ 连接初始化失败: {response.status}")
                    return False

        except Exception as e:
            print(f"❌ 连接初始化异常: {e}")
            return False

    async def list_tools(self) -> list:
        """获取可用工具列表"""
        try:
            payload = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}

            # 使用统一的 /message 端点
            url = f"{self.server_url}/message"
            headers = {}
            if self.session_id:
                headers["X-Session-ID"] = self.session_id

            async with self.session.post(
                url, json=payload, headers=headers
            ) as response:
                if response.status == 200:
                    result = await response.json()
                    tools = result["result"]["tools"]
                    print(f"获取到 {len(tools)} 个工具:")
                    for tool in tools:
                        print(f"   - {tool['name']}: {tool['description']}")
                    return tools
                else:
                    print(f"❌ 获取工具列表失败: {response.status}")
                    return []

        except Exception as e:
            print(f"❌ 获取工具列表异常: {e}")
            return []

    async def call_tool(self, tool_name: str, arguments):
        """调用工具"""
        try:
            payload = {
                "jsonrpc": "2.0",
                "id": 3,
                "method": "tools/call",
                "params": {"name": tool_name, "arguments": arguments},
            }

            # 使用统一的 /message 端点
            url = f"{self.server_url}/message"
            headers = {}
            if self.session_id:
                headers["X-Session-ID"] = self.session_id

            async with self.session.post(
                url, json=payload, headers=headers
            ) as response:
                if response.status == 200:
                    content_type = response.headers.get("content-type", "")

                    if "text/event-stream" in content_type:
                        # SSE 流式响应
                        print(f"📡 工具 {tool_name} 返回流式响应...")
                        return await self._handle_sse_response(response)
                    else:
                        # 普通 JSON 响应
                        try:
                            result = await response.json()

                            # 检查是否有错误
                            if "error" in result:
                                print(
                                    f"⚠️  工具 {tool_name} 调用警告: {result['error']['message']}"
                                )
                            else:
                                print(f" 工具 {tool_name} 调用成功")

                            return result
                        except Exception as json_error:
                            print(f"⚠️  JSON 解析失败: {json_error}")
                            # 尝试读取原始响应内容
                            text_content = await response.text()
                            print(f"   原始响应: {text_content[:200]}...")
                            return {
                                "error": "JSON parsing failed",
                                "raw_content": text_content,
                            }
                else:
                    print(f"❌ 工具调用失败: {response.status}")
                    return None

        except Exception as e:
            print(f"❌ 工具调用异常: {e}")
            return None

    async def _handle_sse_response(self, response):
        """处理 SSE 响应"""
        print("📡 接收到 SSE 流式响应...")
        result = None

        try:
            async for line in response.content:
                line = line.decode("utf-8").strip()
                if line.startswith("data: "):
                    try:
                        data = json.loads(line[6:])  # 移除 'data: ' 前缀

                        if "method" in data:
                            # 通知消息
                            if "notifications/progress" in data["method"]:
                                progress_data = data["params"]
                                if progress_data["type"] == "begin":
                                    print(f" 任务开始: {progress_data['title']}")
                                elif progress_data["type"] == "update":
                                    print(f" 进度更新: {progress_data['message']}")
                                elif progress_data["type"] == "end":
                                    print(f" 任务完成: {progress_data['title']}")
                            elif "notifications/heartbeat" in data["method"]:
                                print(
                                    f" 心跳: {data['params'].get('timestamp', 'N/A')}"
                                )
                            elif "notifications/connection" in data["method"]:
                                print(f" 连接状态: {data['params']['status']}")

                        elif "result" in data:
                            # 结果消息
                            result = data
                            print(
                                f"📋 最终结果: {data['result']['content'][0]['text']}"
                            )
                            break

                    except json.JSONDecodeError as e:
                        print(f"⚠️  SSE 数据解析失败: {e}, 原始数据: {line}")
                        continue

        except Exception as e:
            print(f"⚠️  SSE 流处理异常: {e}")

        return result or {"error": "No result received from SSE stream"}

    async def establish_sse_connection(self) -> bool:
        """建立 SSE 连接(仅在有状态模式下可用)"""
        if self.server_mode == "stateless":
            print(" SSE 连接在无状态模式下不可用")
            return False

        if not self.session_id:
            print(" 请先初始化连接")
            return False

        try:
            # 使用统一的 /message 端点
            url = f"{self.server_url}/message"
            headers = {}
            if self.session_id:
                headers["X-Session-ID"] = self.session_id

            async with self.session.get(url, headers=headers) as response:
                if response.status == 200:
                    print(" SSE 连接建立成功")

                    # 启动 SSE 监听任务
                    self.sse_task = asyncio.create_task(self._listen_sse(response))
                    return True
                else:
                    print(f" SSE 连接建立失败: {response.status}")
                    return False

        except Exception as e:
            print(f" SSE 连接异常: {e}")
            return False

    async def _listen_sse(self, response):
        """监听 SSE 消息"""
        try:
            async for line in response.content:
                line = line.decode("utf-8").strip()
                if line.startswith("data: "):
                    try:
                        data = json.loads(line[6:])
                        if "method" in data:
                            print(f"📨 SSE 消息: {data['method']}")
                    except json.JSONDecodeError:
                        continue
        except asyncio.CancelledError:
            print("📡 SSE 连接已取消")
        except Exception as e:
            # 检查是否是正常的连接关闭
            error_msg = str(e).lower()
            if any(
                keyword in error_msg
                for keyword in [
                    "connection closed",
                    "connection reset",
                    "broken pipe",
                    "end of stream",
                    "stream closed",
                ]
            ):
                print("📡 SSE 连接正常关闭")
            else:
                print(f"❌ SSE 监听异常: {e}")

    async def shutdown(self) -> bool:
        """关闭连接"""
        if self.server_mode == "stateless":
            print(" 无状态服务器无需关闭连接")
            return True

        if not self.session_id:
            return True

        try:
            payload = {"jsonrpc": "2.0", "id": 4, "method": "shutdown", "params": {}}

            headers = {"X-Session-ID": self.session_id}
            async with self.session.post(
                f"{self.server_url}/shutdown", json=payload, headers=headers
            ) as response:
                if response.status == 200:
                    print("连接关闭成功")
                    self.session_id = None
                    return True
                else:
                    print(f" 连接关闭失败: {response.status}")
                    return False

        except Exception as e:
            print(f" 连接关闭异常: {e}")
            return False


async def demo_streamable_http_mcp():
    """演示 Streamable HTTP MCP 的工作流程"""
    print(" Streamable HTTP MCP 客户端演示")
    print("=" * 50)

    async with StreamableHTTPMCPClient() as client:
        # 1. 初始化连接
        print("\n 1. 初始化连接...")
        if not await client.initialize():
            print(" 无法继续演示")
            return

        # 2. 获取工具列表
        print("\n 2. 获取工具列表...")
        tools = await client.list_tools()
        if not tools:
            print(" 无法获取工具列表")
            return

        # 3. 调用简单工具
        print("\n 3. 调用简单工具 (echo)...")
        result = await client.call_tool(
            "echo", {"message": "Hello, Streamable HTTP MCP!"}
        )
        if result:
            print(f"   结果: {result}")

        # 4. 调用长运行工具(非流式)
        print("\n 4. 调用长运行工具(非流式)...")
        result = await client.call_tool(
            "streaming_task", {"duration": 3, "stream": False}
        )
        if result:
            print(f"   结果: {result}")

        # 5. 调用长运行工具(流式)
        print("\n 5. 调用长运行工具(流式)...")
        result = await client.call_tool(
            "streaming_task", {"duration": 3, "stream": True}
        )
        if result:
            print(f"   结果: {result}")

        # 6. 尝试调用有状态工具
        print("\n 6. 尝试调用有状态工具...")
        result = await client.call_tool("stateful_counter", {"action": "increment"})
        if result:
            if "error" in result:
                print(f"   预期结果: {result['error']['message']}")
            else:
                print(f"   结果: {result}")

        # 7. 建立 SSE 连接(如果可用)
        print("\n7️⃣ 建立 SSE 连接...")
        if await client.establish_sse_connection():
            # 等待一下让连接稳定并接收一些消息
            print("   ⏳ 等待 SSE 消息...")
            await asyncio.sleep(3)  # 增加等待时间

            # 手动取消 SSE 任务,避免在 shutdown 时出现异常
            if client.sse_task:
                client.sse_task.cancel()
                try:
                    await client.sse_task
                except asyncio.CancelledError:
                    pass

        # 8. 关闭连接
        print("\n 8. 关闭连接...")
        await client.shutdown()

    print("\n" + "=" * 50)
    print(" Streamable HTTP 传输的特点总结:")
    print("   支持无状态和有状态模式")
    print("   统一的 /message 端点")
    print("   可升级的请求(支持 SSE)")
    print("   灵活的会话管理")
    print("   与现有 HTTP 基础设施完全兼容")
    print("   渐进式功能升级")
    print("   支持请求-响应模式")
    print("   可选的流式传输")


async def compare_transports():
    """比较两种传输方式的差异"""
    print("\n" + "=" * 60)
    print(" 传输方式对比分析")
    print("=" * 60)

    print("\n 传统 HTTP+SSE vs Streamable HTTP")
    print("-" * 40)

    print("\n 传统 HTTP+SSE 的局限性:")
    print("    连接断开后无法恢复")
    print("    服务器必须维护长连接")
    print("    对服务器基础设施要求高")
    print("    只能通过 SSE 发送服务器消息")
    print("    需要单独的 /sse 端点")

    print("\n Streamable HTTP 的优势:")
    print("   支持无状态服务器")
    print("   统一的 /message 端点")
    print("   可升级的请求(支持 SSE)")
    print("   灵活的会话管理")
    print("   与现有 HTTP 基础设施完全兼容")
    print("   渐进式功能升级")
    print("   支持请求-响应模式")

    print("\n 适用场景:")
    print("   传统 HTTP+SSE:")
    print("     - 需要实时双向通信")
    print("     - 服务器基础设施完善")
    print("     - 对连接稳定性要求高")

    print("\n   Streamable HTTP:")
    print("     - 需要灵活部署选项")
    print("     - 基础设施兼容性要求高")
    print("     - 需要渐进式功能升级")
    print("     - 支持无状态部署")


if __name__ == "__main__":
    print("启动 Streamable HTTP MCP 客户端演示...")
    print(" 请确保服务器已启动: python streamable_http_server.py")
    print()

    try:
        asyncio.run(demo_streamable_http_mcp())
        asyncio.run(compare_transports())
    except KeyboardInterrupt:
        print("\n 演示被用户中断")
    except Exception as e:
        print(f"\n 演示异常: {e}")

12.对比 #

12.1 1. 架构设计理念 #

12.1.1 传统 HTTP+SSE 方式 #

  • 分离式设计:HTTP请求和SSE连接完全分离
  • 多端点架构:需要多个独立的API端点
    # SSE方式需要多个端点
    /initialize     # 初始化
    /tools/list     # 工具列表  
    /tools/call     # 工具调用
    /sse           # SSE连接(独立)
    /shutdown      # 关闭连接

12.1.2 Streamable HTTP 方式 #

  • 统一式设计:所有通信通过统一端点
  • 可升级架构:请求可以动态升级为SSE
    # Streamable方式统一端点
    /message       # 统一消息端点(GET/POST)
    /initialize    # 兼容性端点
    /tools/list    # 兼容性端点
    /tools/call    # 兼容性端点

12.2 2. 会话管理模式 #

12.2.1 SSE方式 - 强制有状态 #

# SSE服务器必须维护会话状态
def __init__(self):
    self.active_connections = {}  # 必须维护连接
    self.sessions = {}           # 必须维护会话

12.2.2 Streamable方式 - 灵活状态管理 #

# Streamable服务器支持两种模式
def __init__(self):
    self.stateless_mode = False  # 可选择无状态模式
    self.sessions = {}          # 可选会话管理
    self.message_queues = {}    # 可选消息队列

12.3 3. 连接建立方式 #

12.3.1 SSE方式 - 分离连接 #

# 需要单独建立SSE连接
async def establish_sse_connection(self) -> bool:
    async with self.session.get(
        f"{self.server_url}/sse", headers=headers
    ) as response:
        # 独立的SSE连接

12.3.2 Streamable方式 - 统一连接 #

# 通过统一端点建立连接
async def establish_sse_connection(self) -> bool:
    async with self.session.get(
        f"{self.server_url}/message", headers=headers
    ) as response:
        # 可升级的统一连接

12.4 4. 工具调用机制 #

12.4.1 SSE方式 - 固定响应类型 #

# 工具调用只能返回固定类型
if "text/event-stream" in content_type:
    return await self._handle_sse_response(response)
else:
    result = await response.json()  # 普通JSON响应

12.4.2 Streamable方式 - 动态响应类型 #

# 工具调用可以动态选择响应类型
if use_streaming:
    return await self._handle_streaming_task(data, duration)  # SSE流
else:
    result = await self._execute_task_sync(duration)         # 普通响应
    return JSONResponse(...)

12.5 5. 服务器部署灵活性 #

12.5.1 SSE方式 - 高要求部署 #

# 必须维护长连接和会话状态
print("特点:")
print("   - 需要维护长连接")
print("   - 连接断开后无法恢复") 
print("   - 服务器必须保持高可用性")
print("   - 只能通过 SSE 发送服务器消息")

12.5.2 Streamable方式 - 灵活部署 #

# 支持多种部署模式
print("特点:")
print("   支持无状态和有状态模式")
print("   统一的 /message 端点")
print("   可升级的请求(支持 SSE)")
print("   灵活的会话管理")
print("   与现有 HTTP 基础设施完全兼容")

12.6 6. 错误处理和恢复能力 #

12.6.1 SSE方式 - 脆弱连接 #

  • 连接断开后无法自动恢复
  • 必须重新建立整个连接
  • 服务器必须保持高可用性

12.6.2 Streamable方式 - 健壮设计 #

  • 支持无状态模式,无需恢复
  • 每个请求都是独立的
  • 可以优雅降级

12.7 7. 实际应用场景对比 #

12.7.1 SSE方式适用场景 #

# 适合需要实时双向通信的场景
print("传统 HTTP+SSE:")
print("     - 需要实时双向通信")
print("     - 服务器基础设施完善") 
print("     - 对连接稳定性要求高")

12.7.2 Streamable方式适用场景 #

# 适合需要灵活部署的场景
print("Streamable HTTP:")
print("     - 需要灵活部署选项")
print("     - 基础设施兼容性要求高")
print("     - 需要渐进式功能升级")
print("     - 支持无状态部署")

12.8 8. 代码复杂度对比 #

12.8.1 SSE方式 - 复杂实现 #

  • 需要管理多个端点
  • 需要维护连接状态
  • 需要处理连接断开
  • 代码分散在多个方法中

12.8.2 Streamable方式 - 简化实现 #

  • 统一的消息处理
  • 可选的会话管理
  • 自动的错误处理
  • 集中的逻辑处理

12.9 📊 总结对比表 #

特性 SSE方式 Streamable方式
端点数量 5个独立端点 1个统一端点
会话管理 强制有状态 可选有状态/无状态
连接恢复 不支持 支持(无状态模式)
部署复杂度 高 低
基础设施要求 高 低
向后兼容性 差 好
渐进式升级 不支持 支持
错误处理 复杂 简单

Streamable HTTP方式代表了MCP传输协议的未来发展方向,它通过统一端点和灵活的状态管理,解决了传统SSE方式的诸多限制,为MCP的广泛应用提供了更好的基础。

访问验证

请输入访问令牌

Token不正确,请重新输入