1. Streamable HTTP #
Model Context Protocol (MCP) 的 Streamable HTTP 传输协议是对传统 HTTP+SSE 传输的重大改进,旨在解决现有传输方式的局限性,同时保持其优势。
2. 传统 HTTP+SSE 传输的问题 #
2.1. 连接不可恢复性 #
- 一旦 SSE 连接断开,无法自动恢复
- 需要重新建立连接,丢失所有状态
2.2. 服务器高可用性要求 #
- 服务器必须维护长连接
- 对服务器基础设施要求较高
2.3. 消息传递限制 #
- 只能通过 SSE 传递服务器消息
- 客户端到服务器的消息传递方式有限
3. Streamable HTTP 的优势 #
3.1. 无状态服务器支持 #
- 服务器可以选择完全无状态
- 消除了对高可用性长连接的要求
3.2. 纯 HTTP 实现 #
- 可以在纯 HTTP 服务器上实现 MCP
- 不需要特殊的 SSE 支持
3.3. 基础设施兼容性 #
- "就是 HTTP",确保与中间件和基础设施的兼容性
- 向后兼容现有实现
3.4. 灵活的升级路径 #
- 服务器可以选择在需要时使用 SSE 进行流式响应
- 支持渐进式升级
4. 核心设计原则 #
4.1. 端点简化 #
- 移除
/sse端点 - 所有客户端到服务器的消息通过
/message端点
4.2. 可升级的请求 #
- 服务器可以将任何客户端请求升级为 SSE
- 用于发送通知/请求
4.3. 会话管理 #
- 服务器可以选择建立会话 ID 来维护状态
- 客户端在每次请求中传递会话 ID
4.4. 流式响应 #
- 客户端可以通过向
/message发送空 GET 请求来启动 SSE 流
5. 实现场景 #
5.1. 无状态服务器 #
完全无状态的服务器,不支持长连接:
# 示例:无状态 MCP 服务器
1. 始终确认初始化(但不需要持久化任何状态)
2. 用单个 JSON-RPC 响应响应任何传入的 ToolListRequest
3. 通过执行工具、等待完成,然后发送单个 CallToolResponse 作为 HTTP 响应体来处理任何 CallToolRequest5.2. 无状态服务器 + 流式传输 #
完全无状态但支持流式传输的服务器:
# 示例:无状态服务器 + 流式传输
1. 当传入的 POST 请求是 CallToolRequest 时,服务器指示响应将是 SSE
2. 服务器开始执行工具
3. 服务器在工具执行期间通过 SSE 发送任意数量的 ProgressNotification
4. 当工具执行完成时,服务器通过 SSE 发送 CallToolResponse
5. 服务器关闭 SSE 流5.3. 有状态服务器 #
有状态服务器的实现与今天非常相似:
# 主要区别:
1. 服务器需要生成会话 ID
2. 客户端需要在每个请求中传递该会话 ID
3. 服务器可以使用会话 ID 进行粘性路由或在消息总线上路由消息6. 为什么不选择 WebSocket? #
核心团队深入讨论了将 WebSocket 作为主要远程传输(而不是 SSE),并对其应用类似的工作以使其可断开和可恢复。最终决定不追求 WS 的原因:
- RPC 式使用:如果只是想要以 "RPC 式" 方式使用 MCP(例如,一个只暴露基本工具的无状态 MCP 服务器),WebSocket 会产生很多不必要的操作和网络开销
- 基础设施兼容性:HTTP 在基础设施层面有更好的兼容性
- 渐进式升级:Streamable HTTP 提供了从简单到复杂的渐进式升级路径
7. 向后兼容性 #
这个提案可以向后兼容地实现,允许服务器根据需要选择完全无状态。现有的 HTTP+SSE 实现可以逐步迁移到新的协议。
8. 总结 #
Streamable HTTP 传输协议为 MCP 带来了重要的改进:
- 灵活性:支持从完全无状态到完全有状态的服务器实现
- 兼容性:与现有 HTTP 基础设施完全兼容
- 可扩展性:支持渐进式功能升级
- 性能:消除了不必要的连接开销
这个新协议为 MCP 在更广泛的基础设施环境中的部署铺平了道路,同时保持了其核心优势。
9.参考资料 #
10. HTTP+SSE 实现 #
10.1. 服务器 #
# 导入异步IO库
import asyncio
# 导入json库用于数据序列化
import json
# 导入uuid库用于生成唯一会话ID
import uuid
# 导入Starlette应用相关模块
from starlette.applications import Starlette
# 导入路由模块
from starlette.routing import Route
# 导入响应类型
from starlette.responses import JSONResponse, StreamingResponse
# 导入请求对象
from starlette.requests import Request
# 导入uvicorn服务器
import uvicorn
# 定义传统HTTP+SSE MCP服务器类
class TraditionalMCPServer:
"""传统的 HTTP+SSE MCP 服务器实现"""
# 构造函数,初始化连接和会话字典
def __init__(self):
self.active_connections = {}
self.sessions = {}
# 处理初始化请求
async def handle_initialize(self, request: Request):
"""处理初始化请求"""
# 解析请求体为json
data = await request.json()
# 创建唯一会话ID
session_id = str(uuid.uuid4())
# 保存会话信息
self.sessions[session_id] = {
"protocol_version": data.get("protocolVersion", "2024-11-05"),
"capabilities": data.get("capabilities", {}),
"client_info": data.get("clientInfo", {}),
"created_at": asyncio.get_event_loop().time(),
}
# 为该会话创建消息队列
self.active_connections[session_id] = asyncio.Queue()
# 返回初始化响应,包含会话ID
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}, "resources": {}, "prompts": {}},
"serverInfo": {
"name": "Traditional MCP Server",
"version": "1.0.0",
"session_id": session_id, # 返回会话ID
},
},
}
)
# 处理工具列表请求
async def handle_tool_list(self, request: Request):
"""处理工具列表请求"""
# 解析请求体
data = await request.json()
# 获取请求头中的会话ID
session_id = request.headers.get("X-Session-ID")
# 校验会话ID有效性
if not session_id or session_id not in self.sessions:
return JSONResponse({"error": "Invalid session"}, status_code=400)
# 返回可用工具列表
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {
"tools": [
{
"name": "echo",
"description": "Echo back the input",
"inputSchema": {
"type": "object",
"properties": {"message": {"type": "string"}},
"required": ["message"],
},
},
{
"name": "long_running_task",
"description": "A long running task that sends progress updates",
"inputSchema": {
"type": "object",
"properties": {
"duration": {"type": "integer", "default": 10}
},
},
},
]
},
}
)
# 处理工具调用请求
async def handle_tool_call(self, request: Request):
"""处理工具调用请求"""
# 解析请求体
data = await request.json()
# 获取会话ID
session_id = request.headers.get("X-Session-ID")
# 校验会话ID
if not session_id or session_id not in self.sessions:
return JSONResponse({"error": "Invalid session"}, status_code=400)
# 获取工具名称
tool_name = data["params"]["name"]
# 如果是echo工具,直接返回结果
if tool_name == "echo":
# 简单工具,直接返回结果
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {
"content": [
{
"type": "text",
"text": f"Echo: {data['params']['arguments']['message']}",
}
]
},
}
)
# 如果是长运行任务,走SSE流式返回
elif tool_name == "long_running_task":
# 长运行任务,通过 SSE 发送进度更新
duration = data["params"]["arguments"].get("duration", 10)
return await self.handle_long_running_task(data, session_id, duration)
# 未知工具,返回错误
return JSONResponse({"error": "Unknown tool"}, status_code=400)
# 处理长运行任务,通过SSE发送进度
async def handle_long_running_task(self, data, session_id, duration):
"""处理长运行任务,通过 SSE 发送进度更新"""
# 定义异步生成器,逐步推送进度
async def progress_stream():
# 发送开始通知
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'method': 'notifications/progress',
'params': {
'type': 'begin',
'title': 'Long Running Task',
'message': 'Starting task...'
}
})}\n\n"
# 模拟任务执行,逐步发送进度
for i in range(duration):
await asyncio.sleep(1)
progress = (i + 1) / duration * 100
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'method': 'notifications/progress',
'params': {
'type': 'update',
'title': 'Long Running Task',
'message': f'Progress: {progress:.1f}%',
'percentage': progress
}
})}\n\n"
# 发送完成通知
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'method': 'notifications/progress',
'params': {
'type': 'end',
'title': 'Long Running Task',
'message': 'Task completed!'
}
})}\n\n"
# 发送最终结果
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'id': data.get('id'),
'result': {
'content': [
{
'type': 'text',
'text': f'Task completed after {duration} seconds!'
}
]
}
})}\n\n"
# 返回流式响应
return StreamingResponse(
progress_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Session-ID": session_id,
},
)
# 处理SSE连接请求
async def handle_sse_connection(self, request: Request):
"""处理 SSE 连接"""
# 获取会话ID
session_id = request.headers.get("X-Session-ID")
# 校验会话ID
if not session_id or session_id not in self.sessions:
return JSONResponse({"error": "Invalid session"}, status_code=400)
# 定义SSE流生成器
async def sse_stream():
queue = self.active_connections[session_id]
# 发送连接确认消息
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'method': 'notifications/connection',
'params': {
'status': 'connected',
'session_id': session_id
}
})}\n\n"
try:
while True:
# 等待消息队列中的消息
message = await asyncio.wait_for(queue.get(), timeout=30.0)
yield f"data: {json.dumps(message)}\n\n"
except asyncio.TimeoutError:
# 超时则发送心跳包保持连接
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'method': 'notifications/heartbeat',
'params': {'timestamp': asyncio.get_event_loop().time()}
})}\n\n"
# 返回SSE流响应
return StreamingResponse(
sse_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Session-ID": session_id,
},
)
# 处理关闭请求
async def handle_shutdown(self, request: Request):
"""处理关闭请求"""
# 解析请求体
data = await request.json()
# 获取会话ID
session_id = request.headers.get("X-Session-ID")
# 删除会话信息
if session_id in self.sessions:
del self.sessions[session_id]
# 删除消息队列
if session_id in self.active_connections:
del self.active_connections[session_id]
# 返回关闭响应
return JSONResponse({"jsonrpc": "2.0", "id": data.get("id"), "result": {}})
# 创建Starlette应用
def create_app():
"""创建 Starlette 应用"""
# 实例化服务器对象
server = TraditionalMCPServer()
# 定义各个路由的处理函数
async def initialize(request: Request):
return await server.handle_initialize(request)
async def tool_list(request: Request):
return await server.handle_tool_list(request)
async def tool_call(request: Request):
return await server.handle_tool_call(request)
async def sse_connection(request: Request):
return await server.handle_sse_connection(request)
async def shutdown(request: Request):
return await server.handle_shutdown(request)
async def homepage(request: Request):
# 返回主页信息
return JSONResponse(
{
"message": "Traditional HTTP+SSE MCP Server",
"endpoints": {
"/initialize": "POST - Initialize MCP connection",
"/tools/list": "POST - List available tools",
"/tools/call": "POST - Call a tool",
"/sse": "GET - Establish SSE connection",
"/shutdown": "POST - Shutdown connection",
},
"note": "This server demonstrates the limitations of traditional HTTP+SSE transport",
}
)
# 返回Starlette应用对象,注册路由
return Starlette(
routes=[
Route("/", homepage),
Route("/initialize", initialize, methods=["POST"]),
Route("/tools/list", tool_list, methods=["POST"]),
Route("/tools/call", tool_call, methods=["POST"]),
Route("/sse", sse_connection, methods=["GET"]),
Route("/shutdown", shutdown, methods=["POST"]),
]
)
# 主程序入口
if __name__ == "__main__":
# 创建应用
app = create_app()
# 打印服务器启动信息
print("启动传统 HTTP+SSE MCP 服务器...")
print("特点:")
print(" - 需要维护长连接")
print(" - 连接断开后无法恢复")
print(" - 服务器必须保持高可用性")
print(" - 只能通过 SSE 发送服务器消息")
print("\n服务器地址: http://127.0.0.1:8000")
# 启动uvicorn服务器
uvicorn.run(app, host="127.0.0.1", port=8000)
10.2. 客户端 #
# 导入异步IO库
import asyncio
# 导入json库用于数据序列化和反序列化
import json
# 导入aiohttp库用于异步HTTP请求
import aiohttp
# 定义传统的 HTTP+SSE MCP 客户端类
class TraditionalMCPClient:
"""传统的 HTTP+SSE MCP 客户端实现"""
# 构造函数,初始化服务器地址和相关属性
def __init__(self, server_url: str = "http://127.0.0.1:8000"):
self.server_url = server_url # 服务器地址
self.session_id = None # 会话ID
self.sse_task = None # SSE监听任务
self.session = None # aiohttp会话对象
# 异步上下文管理器入口
async def __aenter__(self):
"""异步上下文管理器入口"""
self.session = aiohttp.ClientSession() # 创建aiohttp会话
return self
# 异步上下文管理器出口
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if self.sse_task:
self.sse_task.cancel() # 取消SSE监听任务
if self.session:
await self.session.close() # 关闭aiohttp会话
# 初始化MCP连接
async def initialize(self) -> bool:
"""初始化 MCP 连接"""
try:
# 构造初始化请求的payload
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}, "resources": {}, "prompts": {}},
"clientInfo": {
"name": "Traditional MCP Client",
"version": "1.0.0",
},
},
}
# 发送POST请求到/initialize接口
async with self.session.post(
f"{self.server_url}/initialize", json=payload
) as response:
# 判断响应状态码
if response.status == 200:
result = await response.json()
# 从服务器响应中获取会话ID
self.session_id = result["result"]["serverInfo"].get("session_id")
print("连接初始化成功")
print(f" 协议版本: {result['result']['protocolVersion']}")
print(f" 服务器: {result['result']['serverInfo']['name']}")
print(f" 会话ID: {self.session_id}")
return True
else:
print(f" 连接初始化失败: {response.status}")
return False
except Exception as e:
print(f" 连接初始化异常: {e}")
return False
# 获取可用工具列表
async def list_tools(self) -> list:
"""获取可用工具列表"""
if not self.session_id:
print(" 请先初始化连接")
return []
try:
# 构造请求payload
payload = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}
# 设置请求头,包含会话ID
headers = {"X-Session-ID": self.session_id}
# 发送POST请求到/tools/list接口
async with self.session.post(
f"{self.server_url}/tools/list", json=payload, headers=headers
) as response:
# 判断响应状态码
if response.status == 200:
result = await response.json()
tools = result["result"]["tools"]
print(f"获取到 {len(tools)} 个工具:")
for tool in tools:
print(f" - {tool['name']}: {tool['description']}")
return tools
else:
print(f" 获取工具列表失败: {response.status}")
return []
except Exception as e:
print(f" 获取工具列表异常: {e}")
return []
# 调用指定工具
async def call_tool(self, tool_name: str, arguments):
"""调用工具"""
if not self.session_id:
print(" 请先初始化连接")
return None
try:
# 构造请求payload
payload = {
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
}
# 设置请求头,包含会话ID
headers = {"X-Session-ID": self.session_id}
# 发送POST请求到/tools/call接口
async with self.session.post(
f"{self.server_url}/tools/call", json=payload, headers=headers
) as response:
# 判断响应状态码
if response.status == 200:
content_type = response.headers.get("content-type", "")
# 判断是否为SSE流式响应
if "text/event-stream" in content_type:
# SSE 流式响应
print(f" 工具 {tool_name} 返回流式响应...")
return await self._handle_sse_response(response)
else:
# 普通 JSON 响应
try:
result = await response.json()
print(f" 工具 {tool_name} 调用成功")
return result
except Exception as json_error:
print(f"⚠️ JSON 解析失败: {json_error}")
# 尝试读取原始响应内容
text_content = await response.text()
print(f" 原始响应: {text_content[:200]}...")
return {
"error": "JSON parsing failed",
"raw_content": text_content,
}
else:
print(f" 工具调用失败: {response.status}")
return None
except Exception as e:
print(f" 工具调用异常: {e}")
return None
# 处理SSE响应
async def _handle_sse_response(self, response):
"""处理 SSE 响应"""
print(" 接收到 SSE 流式响应...")
result = None
try:
# 异步遍历SSE流中的每一行
async for line in response.content:
line = line.decode("utf-8").strip()
# 判断是否为SSE数据行
if line.startswith("data: "):
try:
# 解析JSON数据,去除前缀
data = json.loads(line[6:]) # 移除 'data: ' 前缀
# 判断消息类型
if "method" in data:
# 处理进度通知
if "notifications/progress" in data["method"]:
progress_data = data["params"]
if progress_data["type"] == "begin":
print(f" 任务开始: {progress_data['title']}")
elif progress_data["type"] == "update":
print(f" 进度更新: {progress_data['message']}")
elif progress_data["type"] == "end":
print(f" 任务完成: {progress_data['title']}")
# 处理心跳通知
elif "notifications/heartbeat" in data["method"]:
print(
f" 心跳: {data['params'].get('timestamp', 'N/A')}"
)
# 处理最终结果
elif "result" in data:
# 结果消息
result = data
print(
f"📋 最终结果: {data['result']['content'][0]['text']}"
)
break
except json.JSONDecodeError as e:
print(f"⚠️ SSE 数据解析失败: {e}, 原始数据: {line}")
continue
except Exception as e:
print(f"⚠️ SSE 流处理异常: {e}")
# 返回结果或错误信息
return result or {"error": "No result received from SSE stream"}
# 建立SSE连接
async def establish_sse_connection(self) -> bool:
"""建立 SSE 连接"""
if not self.session_id:
print(" 请先初始化连接")
return False
try:
# 设置请求头,包含会话ID
headers = {"X-Session-ID": self.session_id}
# 发送GET请求到/sse接口
async with self.session.get(
f"{self.server_url}/sse", headers=headers
) as response:
# 判断响应状态码
if response.status == 200:
print(" SSE 连接建立成功")
# 启动 SSE 监听任务
self.sse_task = asyncio.create_task(self._listen_sse(response))
return True
else:
print(f" SSE 连接建立失败: {response.status}")
return False
except Exception as e:
print(f" SSE 连接异常: {e}")
return False
# 监听SSE消息
async def _listen_sse(self, response):
"""监听 SSE 消息"""
try:
# 异步遍历SSE流中的每一行
async for line in response.content:
line = line.decode("utf-8").strip()
# 判断是否为SSE数据行
if line.startswith("data: "):
try:
data = json.loads(line[6:])
if "method" in data:
print(f"📨 SSE 消息: {data['method']}")
except json.JSONDecodeError:
continue
except asyncio.CancelledError:
print(" SSE 连接已取消")
except Exception as e:
# 检查是否是正常的连接关闭
if "Connection closed" in str(e) or "Connection reset" in str(e):
print(" SSE 连接正常关闭")
else:
print(f" SSE 监听异常: {e}")
# 关闭连接
async def shutdown(self) -> bool:
"""关闭连接"""
if not self.session_id:
return True
try:
# 构造关闭请求的payload
payload = {"jsonrpc": "2.0", "id": 4, "method": "shutdown", "params": {}}
# 设置请求头,包含会话ID
headers = {"X-Session-ID": self.session_id}
# 发送POST请求到/shutdown接口
async with self.session.post(
f"{self.server_url}/shutdown", json=payload, headers=headers
) as response:
# 判断响应状态码
if response.status == 200:
print("连接关闭成功")
self.session_id = None
return True
else:
print(f" 连接关闭失败: {response.status}")
return False
except Exception as e:
print(f" 连接关闭异常: {e}")
return False
# 演示传统 HTTP+SSE MCP 的工作流程
async def demo_traditional_mcp():
"""演示传统 HTTP+SSE MCP 的工作流程"""
print(" 传统 HTTP+SSE MCP 客户端演示")
print("=" * 50)
# 使用异步上下文管理器创建客户端
async with TraditionalMCPClient() as client:
# 1. 初始化连接
print("\n 1. 初始化连接...")
if not await client.initialize():
print(" 无法继续演示")
return
# 2. 获取工具列表
print("\n 2. 获取工具列表...")
tools = await client.list_tools()
if not tools:
print(" 无法获取工具列表")
return
# 3. 调用简单工具
print("\n 3. 调用简单工具 (echo)...")
result = await client.call_tool("echo", {"message": "Hello, Traditional MCP!"})
if result:
print(f" 结果: {result}")
# 4. 建立 SSE 连接
print("\n 4. 建立 SSE 连接...")
if await client.establish_sse_connection():
# 等待一下让连接稳定
await asyncio.sleep(1)
# 5. 调用长运行工具
print("\n 5. 调用长运行工具...")
result = await client.call_tool("long_running_task", {"duration": 5})
if result:
print(f" 最终结果: {result}")
# 6. 关闭连接
print("\n 6. 关闭连接...")
await client.shutdown()
print("\n" + "=" * 50)
print(" 传统 HTTP+SSE 传输的特点总结:")
print(" 连接断开后无法恢复")
print(" 服务器必须维护长连接")
print(" 对服务器基础设施要求高")
print(" 只能通过 SSE 发送服务器消息")
# 主程序入口
if __name__ == "__main__":
print("启动传统 HTTP+SSE MCP 客户端演示...")
print(" 请确保服务器已启动: python http_sse_server.py")
print()
try:
asyncio.run(demo_traditional_mcp())
except KeyboardInterrupt:
print("\n 演示被用户中断")
except Exception as e:
print(f"\n 演示异常: {e}")
11. Streamable HTTP+SSE 实现 #
11.1. 服务器 #
# 导入异步IO库
import asyncio
# 导入JSON处理库
import json
# 导入UUID库用于生成唯一会话ID
import uuid
# 从starlette导入应用、路由、响应等相关模块
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import JSONResponse, StreamingResponse
from starlette.requests import Request
# 导入uvicorn用于运行ASGI服务器
import uvicorn
# 定义StreamableHTTPMCPServer类,实现MCP服务器
class StreamableHTTPMCPServer:
"""Streamable HTTP MCP 服务器实现"""
# 构造函数,初始化服务器状态
def __init__(self):
# 是否为无状态模式,默认为有状态
self.stateless_mode = False # 可以设置为 False 来启用有状态模式
# 会话信息存储字典
self.sessions = {} # 会话列表
# 消息队列存储字典
self.message_queues = {} # 消息队列
# 生成会话ID的方法
def _generate_session_id(self) -> str:
"""生成会话 ID(仅在非无状态模式下使用)"""
# 如果是有状态模式则生成UUID,否则返回None
return str(uuid.uuid4()) if not self.stateless_mode else None
# 处理初始化请求
async def handle_initialize(self, request: Request):
"""处理初始化请求"""
# 解析请求体中的JSON数据
data = await request.json()
# 判断是否为无状态模式
if self.stateless_mode:
# 无状态模式:不创建持久会话
print(" 无状态模式:初始化请求已确认,但不创建持久会话")
# 返回初始化确认响应
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}, "resources": {}, "prompts": {}},
"serverInfo": {
"name": "Streamable HTTP MCP Server",
"version": "1.0.0",
"mode": "stateless",
},
},
}
)
else:
# 有状态模式:创建会话
session_id = self._generate_session_id()
# 存储会话信息
self.sessions[session_id] = {
"protocol_version": data.get("protocolVersion", "2024-11-05"),
"capabilities": data.get("capabilities", {}),
"client_info": data.get("clientInfo", {}),
"created_at": asyncio.get_event_loop().time(),
}
# 为该会话创建消息队列
self.message_queues[session_id] = asyncio.Queue()
# 返回初始化响应,包含会话ID
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}, "resources": {}, "prompts": {}},
"serverInfo": {
"name": "Streamable HTTP MCP Server",
"version": "1.0.0",
"mode": "stateful",
"session_id": session_id,
},
},
}
)
# 处理工具列表请求
async def handle_tool_list(self, request: Request):
"""处理工具列表请求"""
# 解析请求体中的JSON数据
data = await request.json()
# 如果是有状态模式,校验会话ID
if not self.stateless_mode:
session_id = request.headers.get("X-Session-ID")
if not session_id or session_id not in self.sessions:
# 会话无效,返回错误
return JSONResponse({"error": "Invalid session"}, status_code=400)
# 工具列表对于无状态和有状态服务器都是一样的
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {
"tools": [
{
"name": "echo",
"description": "Echo back the input (stateless)",
"inputSchema": {
"type": "object",
"properties": {"message": {"type": "string"}},
"required": ["message"],
},
},
{
"name": "streaming_task",
"description": "A task that can stream progress (upgradable to SSE)",
"inputSchema": {
"type": "object",
"properties": {
"duration": {"type": "integer", "default": 5},
"stream": {"type": "boolean", "default": False},
},
},
},
{
"name": "stateful_counter",
"description": "A stateful counter (only works in stateful mode)",
"inputSchema": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["increment", "decrement", "get"],
}
},
},
},
]
},
}
)
# 处理工具调用请求
async def handle_tool_call(self, request: Request):
"""处理工具调用请求"""
# 解析请求体中的JSON数据
data = await request.json()
# 获取要调用的工具名称
tool_name = data["params"]["name"]
# 如果是有状态模式,校验会话ID
if not self.stateless_mode:
session_id = request.headers.get("X-Session-ID")
if not session_id or session_id not in self.sessions:
# 会话无效,返回错误
return JSONResponse({"error": "Invalid session"}, status_code=400)
# 处理echo工具
if tool_name == "echo":
# 简单工具,直接返回结果(无状态)
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {
"content": [
{
"type": "text",
"text": f"Echo: {data['params']['arguments']['message']}",
}
]
},
}
)
# 处理streaming_task工具
elif tool_name == "streaming_task":
# 流式任务,可以选择是否升级为 SSE
arguments = data["params"]["arguments"]
# 获取任务持续时间
duration = arguments.get("duration", 5)
# 是否使用流式
use_streaming = arguments.get("stream", False)
if use_streaming:
# 升级为 SSE 流式响应
return await self._handle_streaming_task(data, duration)
else:
# 普通响应,等待任务完成
result = await self._execute_task_sync(duration)
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {
"content": [
{
"type": "text",
"text": f"Task completed after {duration} seconds: {result}",
}
]
},
}
)
# 处理stateful_counter工具
elif tool_name == "stateful_counter":
# 如果是无状态模式,返回错误
if self.stateless_mode:
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"error": {
"code": -32601,
"message": "Stateful counter not available in stateless mode",
},
}
)
# 有状态计数器
session_id = request.headers.get("X-Session-ID")
# 获取操作类型
action = data["params"]["arguments"]["action"]
# 如果会话中没有计数器,初始化为0
if "counter" not in self.sessions[session_id]:
self.sessions[session_id]["counter"] = 0
# 根据操作类型修改计数器
if action == "increment":
self.sessions[session_id]["counter"] += 1
elif action == "decrement":
self.sessions[session_id]["counter"] -= 1
# 获取当前计数器值
counter_value = self.sessions[session_id]["counter"]
# 返回计数器操作结果
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {
"content": [
{
"type": "text",
"text": f"Counter {action}ed. Current value: {counter_value}",
}
]
},
}
)
# 未知工具,返回错误
return JSONResponse({"error": "Unknown tool"}, status_code=400)
# 同步执行任务(异步等待一段时间)
async def _execute_task_sync(self, duration: int) -> str:
"""同步执行任务"""
# 异步等待指定秒数
await asyncio.sleep(duration)
# 返回任务结果字符串
return f"Task result at {asyncio.get_event_loop().time()}"
# 处理流式任务(SSE)
async def _handle_streaming_task(self, data, duration):
"""处理流式任务"""
# 定义异步生成器,逐步推送进度
async def stream_progress():
# 发送开始通知
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'method': 'notifications/progress',
'params': {
'type': 'begin',
'title': 'Streaming Task',
'message': 'Starting streaming task...'
}
})}\n\n"
# 模拟任务执行,发送进度更新
for i in range(duration):
await asyncio.sleep(1)
# 计算进度百分比
progress = (i + 1) / duration * 100
# 发送进度更新通知
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'method': 'notifications/progress',
'params': {
'type': 'update',
'title': 'Streaming Task',
'message': f'Progress: {progress:.1f}%',
'percentage': progress
}
})}\n\n"
# 发送完成通知
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'method': 'notifications/progress',
'params': {
'type': 'end',
'title': 'Streaming Task',
'message': 'Streaming task completed!'
}
})}\n\n"
# 发送最终结果
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'id': data.get('id'),
'result': {
'content': [
{
'type': 'text',
'text': f'Streaming task completed after {duration} seconds!'
}
]
}
})}\n\n"
# 返回StreamingResponse,SSE流式响应
return StreamingResponse(
stream_progress(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
# 处理统一的消息端点
async def handle_message_endpoint(self, request: Request):
"""处理统一的消息端点"""
# 如果是GET请求,建立SSE流
if request.method == "GET":
# GET 请求:建立 SSE 流(可选)
return await self._handle_sse_connection(request)
# 如果是POST请求,处理MCP消息
elif request.method == "POST":
# POST 请求:处理 MCP 消息
return await self._handle_mcp_message(request)
# 其他方法不允许
else:
return JSONResponse({"error": "Method not allowed"}, status_code=405)
# 处理SSE连接
async def _handle_sse_connection(self, request: Request):
"""处理 SSE 连接(可选功能)"""
# 无状态模式不支持SSE
if self.stateless_mode:
return JSONResponse(
{"error": "SSE connections not supported in stateless mode"},
status_code=400,
)
# 获取会话ID
session_id = request.headers.get("X-Session-ID")
# 校验会话ID
if not session_id or session_id not in self.sessions:
return JSONResponse({"error": "Invalid session"}, status_code=400)
# 定义SSE流生成器
async def sse_stream():
# 获取该会话的消息队列
queue = self.message_queues[session_id]
# 发送连接确认
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'method': 'notifications/connection',
'params': {
'status': 'connected',
'session_id': session_id,
'mode': 'stateful'
}
})}\n\n"
try:
while True:
# 等待消息,超时30秒
message = await asyncio.wait_for(queue.get(), timeout=30.0)
# 推送消息
yield f"data: {json.dumps(message)}\n\n"
except asyncio.TimeoutError:
# 超时则发送心跳保持连接
yield f"data: {json.dumps({
'jsonrpc': '2.0',
'method': 'notifications/heartbeat',
'params': {'timestamp': asyncio.get_event_loop().time()}
})}\n\n"
# 返回StreamingResponse,SSE流式响应
return StreamingResponse(
sse_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
# 处理MCP消息
async def _handle_mcp_message(self, request: Request):
"""处理 MCP 消息"""
try:
# 解析请求体中的JSON数据
data = await request.json()
# 获取方法名
method = data.get("method", "")
# 根据方法名分发处理
if method == "initialize":
return await self.handle_initialize(request)
elif method == "tools/list":
return await self.handle_tool_list(request)
elif method == "tools/call":
return await self.handle_tool_call(request)
else:
# 未知方法,返回错误
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"error": {
"code": -32601,
"message": f"Method not found: {method}",
},
}
)
except Exception as e:
# 解析异常,返回错误
return JSONResponse(
{
"jsonrpc": "2.0",
"id": request.query_params.get("id"),
"error": {"code": -32700, "message": f"Parse error: {str(e)}"},
},
status_code=400,
)
# 处理关闭请求
async def handle_shutdown(self, request: Request):
"""处理关闭请求"""
# 无状态模式无需清理
if self.stateless_mode:
return JSONResponse(
{
"jsonrpc": "2.0",
"id": 1,
"result": {"message": "Stateless server - no cleanup needed"},
}
)
# 解析请求体中的JSON数据
data = await request.json()
# 获取会话ID
session_id = request.headers.get("X-Session-ID")
# 删除会话信息
if session_id in self.sessions:
del self.sessions[session_id]
# 删除消息队列
if session_id in self.message_queues:
del self.message_queues[session_id]
# 返回清理完成响应
return JSONResponse(
{
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {"message": "Session cleaned up"},
}
)
# 创建Starlette应用
def create_app():
"""创建 Starlette 应用"""
# 实例化服务器对象
server = StreamableHTTPMCPServer()
# 定义初始化端点处理函数
async def initialize(request: Request):
return await server.handle_initialize(request)
# 定义工具列表端点处理函数
async def tool_list(request: Request):
return await server.handle_tool_list(request)
# 定义工具调用端点处理函数
async def tool_call(request: Request):
return await server.handle_tool_call(request)
# 定义统一消息端点处理函数
async def message_endpoint(request: Request):
return await server.handle_message_endpoint(request)
# 定义关闭端点处理函数
async def shutdown(request: Request):
return await server.handle_shutdown(request)
# 定义主页端点处理函数
async def homepage(request: Request):
return JSONResponse(
{
"message": "Streamable HTTP MCP Server",
"endpoints": {
"/initialize": "POST - Initialize MCP connection",
"/tools/list": "POST - List available tools",
"/tools/call": "POST - Call a tool",
"/message": "GET/POST - Unified message endpoint (SSE + MCP)",
"/shutdown": "POST - Shutdown connection (stateful mode only)",
},
"features": {
"stateless_mode": server.stateless_mode,
"unified_endpoint": True,
"upgradable_requests": True,
"flexible_session_management": True,
},
"note": "This server demonstrates the advantages of Streamable HTTP transport",
}
)
# 返回Starlette应用对象,注册各路由
return Starlette(
routes=[
Route("/", homepage),
Route("/initialize", initialize, methods=["POST"]),
Route("/tools/list", tool_list, methods=["POST"]),
Route("/tools/call", tool_call, methods=["POST"]),
Route("/message", message_endpoint, methods=["GET", "POST"]),
Route("/shutdown", shutdown, methods=["POST"]),
]
)
# 主程序入口
if __name__ == "__main__":
# 创建应用
app = create_app()
# 打印服务器启动信息
print("启动 Streamable HTTP MCP 服务器...")
print(" 特点:")
print(" 支持无状态和有状态模式")
print(" 统一的 /message 端点")
print(" 可升级的请求(支持 SSE)")
print(" 灵活的会话管理")
print(" 与现有 HTTP 基础设施完全兼容")
print(" 渐进式功能升级")
print("\n🌐 服务器地址: http://127.0.0.1:8000")
print("📖 查看 API 文档: http://127.0.0.1:8000")
# 启动uvicorn服务器
uvicorn.run(app, host="127.0.0.1", port=8000)
11.2 客户端 #
import asyncio
import json
import aiohttp
class StreamableHTTPMCPClient:
"""Streamable HTTP MCP 客户端实现"""
def __init__(self, server_url: str = "http://127.0.0.1:8000"):
self.server_url = server_url
self.session_id = None
self.sse_task = None
self.session = None
self.server_mode = None
async def __aenter__(self):
"""异步上下文管理器入口"""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if self.sse_task:
self.sse_task.cancel()
if self.session:
await self.session.close()
async def initialize(self) -> bool:
"""初始化 MCP 连接"""
try:
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}, "resources": {}, "prompts": {}},
"clientInfo": {
"name": "Streamable HTTP MCP Client",
"version": "1.0.0",
},
},
}
async with self.session.post(
f"{self.server_url}/initialize", json=payload
) as response:
if response.status == 200:
result = await response.json()
self.server_mode = result["result"]["serverInfo"]["mode"]
if self.server_mode == "stateful":
# 有状态模式:从服务器信息中获取会话 ID
self.session_id = result["result"]["serverInfo"].get(
"session_id"
)
print(" 连接初始化成功")
print(f" 协议版本: {result['result']['protocolVersion']}")
print(f" 服务器: {result['result']['serverInfo']['name']}")
print(f" 服务器模式: {self.server_mode}")
if self.session_id:
print(f" 会话 ID: {self.session_id}")
return True
else:
print(f"❌ 连接初始化失败: {response.status}")
return False
except Exception as e:
print(f"❌ 连接初始化异常: {e}")
return False
async def list_tools(self) -> list:
"""获取可用工具列表"""
try:
payload = {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}
# 使用统一的 /message 端点
url = f"{self.server_url}/message"
headers = {}
if self.session_id:
headers["X-Session-ID"] = self.session_id
async with self.session.post(
url, json=payload, headers=headers
) as response:
if response.status == 200:
result = await response.json()
tools = result["result"]["tools"]
print(f"获取到 {len(tools)} 个工具:")
for tool in tools:
print(f" - {tool['name']}: {tool['description']}")
return tools
else:
print(f"❌ 获取工具列表失败: {response.status}")
return []
except Exception as e:
print(f"❌ 获取工具列表异常: {e}")
return []
async def call_tool(self, tool_name: str, arguments):
"""调用工具"""
try:
payload = {
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
}
# 使用统一的 /message 端点
url = f"{self.server_url}/message"
headers = {}
if self.session_id:
headers["X-Session-ID"] = self.session_id
async with self.session.post(
url, json=payload, headers=headers
) as response:
if response.status == 200:
content_type = response.headers.get("content-type", "")
if "text/event-stream" in content_type:
# SSE 流式响应
print(f"📡 工具 {tool_name} 返回流式响应...")
return await self._handle_sse_response(response)
else:
# 普通 JSON 响应
try:
result = await response.json()
# 检查是否有错误
if "error" in result:
print(
f"⚠️ 工具 {tool_name} 调用警告: {result['error']['message']}"
)
else:
print(f" 工具 {tool_name} 调用成功")
return result
except Exception as json_error:
print(f"⚠️ JSON 解析失败: {json_error}")
# 尝试读取原始响应内容
text_content = await response.text()
print(f" 原始响应: {text_content[:200]}...")
return {
"error": "JSON parsing failed",
"raw_content": text_content,
}
else:
print(f"❌ 工具调用失败: {response.status}")
return None
except Exception as e:
print(f"❌ 工具调用异常: {e}")
return None
async def _handle_sse_response(self, response):
"""处理 SSE 响应"""
print("📡 接收到 SSE 流式响应...")
result = None
try:
async for line in response.content:
line = line.decode("utf-8").strip()
if line.startswith("data: "):
try:
data = json.loads(line[6:]) # 移除 'data: ' 前缀
if "method" in data:
# 通知消息
if "notifications/progress" in data["method"]:
progress_data = data["params"]
if progress_data["type"] == "begin":
print(f" 任务开始: {progress_data['title']}")
elif progress_data["type"] == "update":
print(f" 进度更新: {progress_data['message']}")
elif progress_data["type"] == "end":
print(f" 任务完成: {progress_data['title']}")
elif "notifications/heartbeat" in data["method"]:
print(
f" 心跳: {data['params'].get('timestamp', 'N/A')}"
)
elif "notifications/connection" in data["method"]:
print(f" 连接状态: {data['params']['status']}")
elif "result" in data:
# 结果消息
result = data
print(
f"📋 最终结果: {data['result']['content'][0]['text']}"
)
break
except json.JSONDecodeError as e:
print(f"⚠️ SSE 数据解析失败: {e}, 原始数据: {line}")
continue
except Exception as e:
print(f"⚠️ SSE 流处理异常: {e}")
return result or {"error": "No result received from SSE stream"}
async def establish_sse_connection(self) -> bool:
"""建立 SSE 连接(仅在有状态模式下可用)"""
if self.server_mode == "stateless":
print(" SSE 连接在无状态模式下不可用")
return False
if not self.session_id:
print(" 请先初始化连接")
return False
try:
# 使用统一的 /message 端点
url = f"{self.server_url}/message"
headers = {}
if self.session_id:
headers["X-Session-ID"] = self.session_id
async with self.session.get(url, headers=headers) as response:
if response.status == 200:
print(" SSE 连接建立成功")
# 启动 SSE 监听任务
self.sse_task = asyncio.create_task(self._listen_sse(response))
return True
else:
print(f" SSE 连接建立失败: {response.status}")
return False
except Exception as e:
print(f" SSE 连接异常: {e}")
return False
async def _listen_sse(self, response):
"""监听 SSE 消息"""
try:
async for line in response.content:
line = line.decode("utf-8").strip()
if line.startswith("data: "):
try:
data = json.loads(line[6:])
if "method" in data:
print(f"📨 SSE 消息: {data['method']}")
except json.JSONDecodeError:
continue
except asyncio.CancelledError:
print("📡 SSE 连接已取消")
except Exception as e:
# 检查是否是正常的连接关闭
error_msg = str(e).lower()
if any(
keyword in error_msg
for keyword in [
"connection closed",
"connection reset",
"broken pipe",
"end of stream",
"stream closed",
]
):
print("📡 SSE 连接正常关闭")
else:
print(f"❌ SSE 监听异常: {e}")
async def shutdown(self) -> bool:
"""关闭连接"""
if self.server_mode == "stateless":
print(" 无状态服务器无需关闭连接")
return True
if not self.session_id:
return True
try:
payload = {"jsonrpc": "2.0", "id": 4, "method": "shutdown", "params": {}}
headers = {"X-Session-ID": self.session_id}
async with self.session.post(
f"{self.server_url}/shutdown", json=payload, headers=headers
) as response:
if response.status == 200:
print("连接关闭成功")
self.session_id = None
return True
else:
print(f" 连接关闭失败: {response.status}")
return False
except Exception as e:
print(f" 连接关闭异常: {e}")
return False
async def demo_streamable_http_mcp():
"""演示 Streamable HTTP MCP 的工作流程"""
print(" Streamable HTTP MCP 客户端演示")
print("=" * 50)
async with StreamableHTTPMCPClient() as client:
# 1. 初始化连接
print("\n 1. 初始化连接...")
if not await client.initialize():
print(" 无法继续演示")
return
# 2. 获取工具列表
print("\n 2. 获取工具列表...")
tools = await client.list_tools()
if not tools:
print(" 无法获取工具列表")
return
# 3. 调用简单工具
print("\n 3. 调用简单工具 (echo)...")
result = await client.call_tool(
"echo", {"message": "Hello, Streamable HTTP MCP!"}
)
if result:
print(f" 结果: {result}")
# 4. 调用长运行工具(非流式)
print("\n 4. 调用长运行工具(非流式)...")
result = await client.call_tool(
"streaming_task", {"duration": 3, "stream": False}
)
if result:
print(f" 结果: {result}")
# 5. 调用长运行工具(流式)
print("\n 5. 调用长运行工具(流式)...")
result = await client.call_tool(
"streaming_task", {"duration": 3, "stream": True}
)
if result:
print(f" 结果: {result}")
# 6. 尝试调用有状态工具
print("\n 6. 尝试调用有状态工具...")
result = await client.call_tool("stateful_counter", {"action": "increment"})
if result:
if "error" in result:
print(f" 预期结果: {result['error']['message']}")
else:
print(f" 结果: {result}")
# 7. 建立 SSE 连接(如果可用)
print("\n7️⃣ 建立 SSE 连接...")
if await client.establish_sse_connection():
# 等待一下让连接稳定并接收一些消息
print(" ⏳ 等待 SSE 消息...")
await asyncio.sleep(3) # 增加等待时间
# 手动取消 SSE 任务,避免在 shutdown 时出现异常
if client.sse_task:
client.sse_task.cancel()
try:
await client.sse_task
except asyncio.CancelledError:
pass
# 8. 关闭连接
print("\n 8. 关闭连接...")
await client.shutdown()
print("\n" + "=" * 50)
print(" Streamable HTTP 传输的特点总结:")
print(" 支持无状态和有状态模式")
print(" 统一的 /message 端点")
print(" 可升级的请求(支持 SSE)")
print(" 灵活的会话管理")
print(" 与现有 HTTP 基础设施完全兼容")
print(" 渐进式功能升级")
print(" 支持请求-响应模式")
print(" 可选的流式传输")
async def compare_transports():
"""比较两种传输方式的差异"""
print("\n" + "=" * 60)
print(" 传输方式对比分析")
print("=" * 60)
print("\n 传统 HTTP+SSE vs Streamable HTTP")
print("-" * 40)
print("\n 传统 HTTP+SSE 的局限性:")
print(" 连接断开后无法恢复")
print(" 服务器必须维护长连接")
print(" 对服务器基础设施要求高")
print(" 只能通过 SSE 发送服务器消息")
print(" 需要单独的 /sse 端点")
print("\n Streamable HTTP 的优势:")
print(" 支持无状态服务器")
print(" 统一的 /message 端点")
print(" 可升级的请求(支持 SSE)")
print(" 灵活的会话管理")
print(" 与现有 HTTP 基础设施完全兼容")
print(" 渐进式功能升级")
print(" 支持请求-响应模式")
print("\n 适用场景:")
print(" 传统 HTTP+SSE:")
print(" - 需要实时双向通信")
print(" - 服务器基础设施完善")
print(" - 对连接稳定性要求高")
print("\n Streamable HTTP:")
print(" - 需要灵活部署选项")
print(" - 基础设施兼容性要求高")
print(" - 需要渐进式功能升级")
print(" - 支持无状态部署")
if __name__ == "__main__":
print("启动 Streamable HTTP MCP 客户端演示...")
print(" 请确保服务器已启动: python streamable_http_server.py")
print()
try:
asyncio.run(demo_streamable_http_mcp())
asyncio.run(compare_transports())
except KeyboardInterrupt:
print("\n 演示被用户中断")
except Exception as e:
print(f"\n 演示异常: {e}")
12.对比 #
12.1 1. 架构设计理念 #
12.1.1 传统 HTTP+SSE 方式 #
- 分离式设计:HTTP请求和SSE连接完全分离
- 多端点架构:需要多个独立的API端点
# SSE方式需要多个端点 /initialize # 初始化 /tools/list # 工具列表 /tools/call # 工具调用 /sse # SSE连接(独立) /shutdown # 关闭连接
12.1.2 Streamable HTTP 方式 #
- 统一式设计:所有通信通过统一端点
- 可升级架构:请求可以动态升级为SSE
# Streamable方式统一端点 /message # 统一消息端点(GET/POST) /initialize # 兼容性端点 /tools/list # 兼容性端点 /tools/call # 兼容性端点
12.2 2. 会话管理模式 #
12.2.1 SSE方式 - 强制有状态 #
# SSE服务器必须维护会话状态
def __init__(self):
self.active_connections = {} # 必须维护连接
self.sessions = {} # 必须维护会话12.2.2 Streamable方式 - 灵活状态管理 #
# Streamable服务器支持两种模式
def __init__(self):
self.stateless_mode = False # 可选择无状态模式
self.sessions = {} # 可选会话管理
self.message_queues = {} # 可选消息队列12.3 3. 连接建立方式 #
12.3.1 SSE方式 - 分离连接 #
# 需要单独建立SSE连接
async def establish_sse_connection(self) -> bool:
async with self.session.get(
f"{self.server_url}/sse", headers=headers
) as response:
# 独立的SSE连接12.3.2 Streamable方式 - 统一连接 #
# 通过统一端点建立连接
async def establish_sse_connection(self) -> bool:
async with self.session.get(
f"{self.server_url}/message", headers=headers
) as response:
# 可升级的统一连接12.4 4. 工具调用机制 #
12.4.1 SSE方式 - 固定响应类型 #
# 工具调用只能返回固定类型
if "text/event-stream" in content_type:
return await self._handle_sse_response(response)
else:
result = await response.json() # 普通JSON响应12.4.2 Streamable方式 - 动态响应类型 #
# 工具调用可以动态选择响应类型
if use_streaming:
return await self._handle_streaming_task(data, duration) # SSE流
else:
result = await self._execute_task_sync(duration) # 普通响应
return JSONResponse(...)12.5 5. 服务器部署灵活性 #
12.5.1 SSE方式 - 高要求部署 #
# 必须维护长连接和会话状态
print("特点:")
print(" - 需要维护长连接")
print(" - 连接断开后无法恢复")
print(" - 服务器必须保持高可用性")
print(" - 只能通过 SSE 发送服务器消息")12.5.2 Streamable方式 - 灵活部署 #
# 支持多种部署模式
print("特点:")
print(" 支持无状态和有状态模式")
print(" 统一的 /message 端点")
print(" 可升级的请求(支持 SSE)")
print(" 灵活的会话管理")
print(" 与现有 HTTP 基础设施完全兼容")12.6 6. 错误处理和恢复能力 #
12.6.1 SSE方式 - 脆弱连接 #
- 连接断开后无法自动恢复
- 必须重新建立整个连接
- 服务器必须保持高可用性
12.6.2 Streamable方式 - 健壮设计 #
- 支持无状态模式,无需恢复
- 每个请求都是独立的
- 可以优雅降级
12.7 7. 实际应用场景对比 #
12.7.1 SSE方式适用场景 #
# 适合需要实时双向通信的场景
print("传统 HTTP+SSE:")
print(" - 需要实时双向通信")
print(" - 服务器基础设施完善")
print(" - 对连接稳定性要求高")12.7.2 Streamable方式适用场景 #
# 适合需要灵活部署的场景
print("Streamable HTTP:")
print(" - 需要灵活部署选项")
print(" - 基础设施兼容性要求高")
print(" - 需要渐进式功能升级")
print(" - 支持无状态部署")12.8 8. 代码复杂度对比 #
12.8.1 SSE方式 - 复杂实现 #
- 需要管理多个端点
- 需要维护连接状态
- 需要处理连接断开
- 代码分散在多个方法中
12.8.2 Streamable方式 - 简化实现 #
- 统一的消息处理
- 可选的会话管理
- 自动的错误处理
- 集中的逻辑处理
12.9 📊 总结对比表 #
| 特性 | SSE方式 | Streamable方式 |
|---|---|---|
| 端点数量 | 5个独立端点 | 1个统一端点 |
| 会话管理 | 强制有状态 | 可选有状态/无状态 |
| 连接恢复 | 不支持 | 支持(无状态模式) |
| 部署复杂度 | 高 | 低 |
| 基础设施要求 | 高 | 低 |
| 向后兼容性 | 差 | 好 |
| 渐进式升级 | 不支持 | 支持 |
| 错误处理 | 复杂 | 简单 |
Streamable HTTP方式代表了MCP传输协议的未来发展方向,它通过统一端点和灵活的状态管理,解决了传统SSE方式的诸多限制,为MCP的广泛应用提供了更好的基础。