1.StreamableHTTP #
uv add httpx "uvicorn[standard]" starlette本次为FastMCP项目引入了 Streamable HTTP 传输能力,包括客户端(mcp_lite/client/streamable_http.py)与服务端(mcp_lite/server/streamable_http.py)的实现,以及统一的 API 适配。Streamable HTTP 通过标准 HTTP POST 接口实现消息流式传递,为对话和任务处理带来更好的兼容性与扩展性。主要特性如下:
- 客户端实现:
streamable_http_client提供了线程安全的 get/send 同步接口,支持在上下文管理器中启动后台线程,通过 httpx 库与服务端进行流式消息交互。 - 服务端实现:
streamable_http_app基于 Starlette 构建 HTTP API,封装 JSON-RPC 消息协议,支持多会话和 Session ID 头自动管理,便于与多客户端对接。 - 集成与运行:开发者可以在 FastMCP 实例中通过
transport="streamable-http"选项,快速启动兼容标准 HTTP POST 的流式消息微服务,并可对接 Web 或跨语言客户端。
npx @modelcontextprotocol/inspector uv --directory D:/aprepare/mcp-starter run streamable_http_server.py2. streamable_http.py #
mcp_lite/client/streamable_http.py
# mcp_lite Streamable HTTP 客户端传输
# 导入 json 序列化模块
import json
# 导入线程模块
import threading
# 导入 contextmanager 上下文管理器装饰器
from contextlib import contextmanager
# 导入线程安全队列
from queue import Queue
# 导入 httpx HTTP 客户端库
import httpx
# 导入会话消息类型
from mcp_lite.message import SessionMessage
# 导入 JSONRPC 错误类型和消息适配器
from mcp_lite.types import JSONRPCError, ErrorData, jsonrpc_message_adapter
# 定义 Session ID 的 HTTP 头名称
MCP_SESSION_ID = "mcp-session-id"
# 定义 Streamable HTTP 客户端上下文管理器,返回 (read_stream, write_stream)
def streamable_http_client(url: str, *, timeout: float = 30.0):
"""
Streamable HTTP 客户端上下文管理器,返回 (read_stream, write_stream)。
提供同步的 get/send 接口,供 mcp_lite.ClientSession 使用。
内部通过 HTTP POST 发送消息,接收 JSON 响应。
"""
# 初始化用于接收读取消息的队列
read_q: Queue = Queue()
# 初始化用于发送写入消息的队列
write_q: Queue = Queue()
# 通过列表持有 session_id,实现闭包共享与可修改
session_id: list[str | None] = [None]
# 停止工作线程的标志位,用列表包装可变状态”的写法,用来在闭包或不同作用域之间共享可修改的值。
stop_flag: list[bool] = [False]
# HTTP 通讯的工作线程
def worker():
# 创建 httpx 客户端,设置超时和允许重定向
with httpx.Client(timeout=timeout, follow_redirects=True) as client:
# 准备基础请求头
headers = {
"accept": "application/json, text/event-stream",
"content-type": "application/json",
}
# 持续循环直到收到 stop 标志
while not stop_flag[0]:
try:
# 从写队列读取消息
msg = write_q.get()
# 收到 None 消息时退出循环
if msg is None:
break
# 如果消息类型不是 SessionMessage,则跳过
if not isinstance(msg, SessionMessage):
continue
# 序列化消息对象为 JSON 兼容 dict
body = msg.message.model_dump(by_alias=True, mode="json", exclude_unset=True)
# 如果已有会话 ID,则添加到请求头
if session_id[0]:
headers[MCP_SESSION_ID] = session_id[0]
# 发送 POST 请求到远端服务端
resp = client.post(url, json=body, headers=headers)
# 尝试从响应头提取 session id 并保存
sid = resp.headers.get(MCP_SESSION_ID)
if sid:
session_id[0] = sid
# 202 状态码,无需处理响应体,为通知类消息
if resp.status_code == 202:
continue
# 400 或更高的状态码视为 HTTP 出错,组装错误回应
if resp.status_code >= 400:
err = {
"jsonrpc": "2.0",
"id": body.get("id"),
"error": {
"code": -32603,
"message": f"HTTP {resp.status_code}"
}
}
# 校验并转换为 SessionMessage,放入读队列
read_q.put(SessionMessage(
message=jsonrpc_message_adapter.validate_json(json.dumps(err), by_name=False)
))
continue
# 200 状态且有响应内容
if resp.status_code == 200 and resp.content:
try:
# 尝试解析响应的 jsonrpc 消息
parsed = jsonrpc_message_adapter.validate_json(resp.text, by_name=False)
read_q.put(SessionMessage(message=parsed))
except Exception as e:
# 解析失败,组装解析错误,放入读队列
err = {
"jsonrpc": "2.0",
"id": body.get("id"),
"error": {
"code": -32700,
"message": str(e)
}
}
read_q.put(SessionMessage(
message=jsonrpc_message_adapter.validate_json(json.dumps(err), by_name=False)
))
except Exception as e:
# 捕获任何异常,组装为 JSONRPC 错误,放入读队列
err = JSONRPCError(
jsonrpc="2.0",
id=None,
error=ErrorData(code=-32603, message=str(e))
)
read_q.put(SessionMessage(message=err))
# 通知读取端终止
read_q.put(None)
# 读取流类,负责从读队列取消息
class ReadStream:
def get(self):
# 从队列获取响应消息
return read_q.get()
# 写入流类,负责将消息放入写队列
class WriteStream:
def send(self, msg):
# 向队列添加待发送的消息
write_q.put(msg)
# 定义上下文管理器,用于启动/回收线程与维护通道
@contextmanager
def _ctx():
# 创建并启动后台线程
t = threading.Thread(target=worker, daemon=True)
t.start()
try:
# 暴露 ReadStream 和 WriteStream 实例
yield ReadStream(), WriteStream()
finally:
# 请求线程安全退出并清理
stop_flag[0] = True
write_q.put(None)
t.join(timeout=5)
# 返回上下文管理器实例
return _ctx()
3. streamable_http.py #
mcp_lite/server/streamable_http.py
# mcp_lite Streamable HTTP 服务端
# 导入 json 用于序列化和反序列化 JSON 数据
import json
# 导入 uuid 用于生成唯一的会话 ID
import uuid
# 导入 Starlette 框架组件
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Route
# 导入 SessionMessage 用于封装会话消息
from mcp_lite.message import SessionMessage
# 导入 jsonrpc_message_adapter 用于校验与转换 JSON-RPC 消息
from mcp_lite.types import jsonrpc_message_adapter
# 定义会话 ID 的 HTTP 头名称
MCP_SESSION_ID = "mcp-session-id"
# 定义返回内容类型为 application/json
CONTENT_TYPE_JSON = "application/json"
# 定义 streamable_http_app 函数用于生成 Starlette ASGI 应用
def streamable_http_app(handler, path: str = "/"):
"""
创建 Streamable HTTP ASGI 应用。
Args:
handler: 消息处理函数,接收 SessionMessage,返回 JSONRPCResponse 或 JSONRPCError
path: 应用内路径,Mount 时前缀已剥离,通常为 "/"
Returns:
Starlette ASGI 应用
"""
# 定义 POST 请求处理函数,接收一个 Request 类型参数
async def handle_post(request: Request) -> Response:
# 如果请求方法不是 POST,返回 405 错误
if request.method != "POST":
return Response("Method Not Allowed", status_code=405)
try:
# 解析请求体中的 JSON 内容
body = await request.json()
except Exception as e:
# 解析失败返回 JSON-RPC 解析错误
return Response(
json.dumps({"jsonrpc": "2.0", "error": {"code": -32700, "message": str(e)}}),
status_code=400,
media_type=CONTENT_TYPE_JSON,
)
try:
# 校验并适配 JSON-RPC 消息格式
msg = jsonrpc_message_adapter.validate_json(json.dumps(body), by_name=False)
except Exception as e:
# 格式校验失败返回 JSON-RPC 解析错误
return Response(
json.dumps({"jsonrpc": "2.0", "error": {"code": -32700, "message": str(e)}}),
status_code=400,
media_type=CONTENT_TYPE_JSON,
)
# 读取请求头中的 session-id,如果没有则生成新的 uuid
session_id = request.headers.get(MCP_SESSION_ID) or str(uuid.uuid4())
# 封装 SessionMessage 消息对象
session_msg = SessionMessage(message=msg)
try:
# 调用 handler 进行业务处理,返回响应对象
resp = handler(session_msg)
# 如果 handler 返回 (notifications, response) 的元组,则只取响应部分
if isinstance(resp, tuple) and len(resp) == 2:
resp = resp[1] # (notifications, response) -> 取最终响应
except Exception as e:
# 处理过程中发生异常,构造返回标准 JSONRPC 错误消息
rid = getattr(msg, "id", None)
err_body = {"jsonrpc": "2.0", "id": rid, "error": {"code": -32603, "message": str(e)}}
return Response(json.dumps(err_body), media_type=CONTENT_TYPE_JSON, headers={MCP_SESSION_ID: session_id})
# 如果没有响应内容,返回 202 表示已接受但无响应体
if resp is None:
return Response(status_code=202, headers={MCP_SESSION_ID: session_id})
# 如果响应对象有 model_dump 方法,序列化为 dict,否则直接作为输出
out = resp.model_dump(by_alias=True, exclude_unset=True) if hasattr(resp, "model_dump") else resp
# 返回最终的 JSON 响应,设置 MIME 类型和 session ID
return Response(
json.dumps(out, ensure_ascii=False),
media_type=CONTENT_TYPE_JSON,
headers={MCP_SESSION_ID: session_id},
)
# 配置路由表,只支持 POST 方法
routes = [Route(path, endpoint=handle_post, methods=["POST"])]
# 返回 Starlette 应用实例
return Starlette(routes=routes)
4. streamable_http_client.py #
streamable_http_client.py
# 导入 ClientSession 和 types 类型定义
from mcp_lite import ClientSession, types
# 导入用于创建 Streamable HTTP 客户端的函数
from mcp_lite.client.streamable_http import streamable_http_client
# 主函数定义,程序入口点
def main() -> None:
# 设置服务端的地址(MCP HTTP 服务器地址)
server_url = "http://127.0.0.1:8000/mcp"
# 使用 with 语句连接远程服务器,获取读写流
with streamable_http_client(server_url) as (read, write):
# 创建 MCP 客户端会话,绑定读写通道
session = ClientSession(read, write)
# 执行初始化握手过程
session.initialize()
# 获取所有可用的工具列表
tools = session.list_tools()
# 打印工具列表名称
print("[Tools]", [t.name for t in tools.tools])
# 调用名为 'greet' 的工具,传入参数 name = "MCP"
result = session.call_tool("greet", {"name": "MCP"})
# 定义文本内容列表,用于收集返回内容
texts = []
# 遍历返回内容中的所有区块
for block in result.content:
# 如果区块是文本类型,则提取文本内容
if isinstance(block, types.TextContent):
texts.append(block.text)
# 合并所有文本内容并打印
print("[Call greet]", " | ".join(texts))
# 程序入口判断,如果当前脚本被直接执行,则调用 main()
if __name__ == "__main__":
main()
官方代码
# 导入 asyncio 模块
import asyncio
# 导入 ClientSession 和 types 类型定义
from mcp import ClientSession, types
# 导入用于创建 Streamable HTTP 客户端的函数
from mcp.client.streamable_http import streamable_http_client
# 主函数定义,程序入口点
async def main() -> None:
# 设置服务端的地址(MCP HTTP 服务器地址)
server_url = "http://127.0.0.1:8000/mcp"
# 使用 with 语句连接远程服务器,获取读写流
# streamable_http_client 返回 (read, write, get_session_id) 三元组
async with streamable_http_client(server_url) as (read, write, _):
# 创建 MCP 客户端会话,绑定读写通道
async with ClientSession(read, write) as session:
# 执行初始化握手过程
await session.initialize()
# 获取所有可用的工具列表
tools = await session.list_tools()
# 打印工具列表名称
print("[Tools]", [t.name for t in tools.tools])
# 调用名为 'greet' 的工具,传入参数 name = "MCP"
result = await session.call_tool("greet", arguments={"name": "MCP"})
# 定义文本内容列表,用于收集返回内容
texts = []
# 遍历返回内容中的所有区块
for block in result.content:
# 如果区块是文本类型,则提取文本内容
if isinstance(block, types.TextContent):
texts.append(block.text)
# 合并所有文本内容并打印
print("[Call greet]", " | ".join(texts))
# 程序入口判断,如果当前脚本被直接执行,则调用 main()
if __name__ == "__main__":
asyncio.run(main())5. streamable_http_server.py #
streamable_http_server.py
# 导入 FastMCP 类,用于搭建 MCP 服务器
from mcp_lite.server.fastmcp import FastMCP
# 创建 FastMCP 实例,指定服务器名称为 "HTTP Server"
mcp = FastMCP(name="HTTP Server")
# 使用 MCP 的 tool 装饰器注册工具函数
@mcp.tool()
# 定义 greet 工具,接收参数 name,默认值为 "World"
def greet(name: str = "World") -> str:
# 返回一个格式化的问候字符串
return f"Hello, {name}!"
# 判断当前文件是否作为主程序运行
if __name__ == "__main__":
# 启动 MCP 服务器,采用 streamable-http 协议进行传输
mcp.run(transport="streamable-http")
官方代码
# 导入 FastMCP 类,用于搭建 MCP 服务器
from mcp.server.fastmcp import FastMCP
# 创建 FastMCP 实例,指定服务器名称为 "HTTP Server"
mcp = FastMCP(name="HTTP Server")
# 使用 MCP 的 tool 装饰器注册工具函数
@mcp.tool()
# 定义 greet 工具,接收参数 name,默认值为 "World"
def greet(name: str = "World") -> str:
# 返回一个格式化的问候字符串
return f"Hello, {name}!"
# 判断当前文件是否作为主程序运行
if __name__ == "__main__":
# 启动 MCP 服务器,采用 streamable-http 协议进行传输
mcp.run(transport="streamable-http")6. init.py #
mcp_lite/server/fastmcp/init.py
# FastMCP 包:从原 fastmcp 模块导入所有内容以保持向后兼容
# 原 fastmcp.py 的内容已移入此包,通过 __init__ 导出
# 导入系统模块
import sys
# 导入 base64 编码库
import base64
# 导入 json 序列化
import json
# 导入正则表达式模块
import re
# 导入 url 解码工具
from urllib.parse import unquote
# 导入类型相关辅助函数
from typing import get_args, get_origin, get_type_hints
# 导入 SessionMessage 类型
from mcp_lite.message import SessionMessage
# 导入 stdio 通信模块
from mcp_lite.server import stdio
# 导入函数签名和异步工具
import inspect
import asyncio
from mcp_lite.types import ( # 导入 mcp_lite.types 模块中的多个类型
JSONRPCRequest, # JSONRPC 请求类型
JSONRPCNotification, # JSONRPC 通知类型
JSONRPCError, # JSONRPC 错误响应类型
ErrorData, # 错误数据类型
InitializeResult, # 初始化结果类型
LATEST_PROTOCOL_VERSION, # 最新协议版本号
ToolsCapability, # 工具相关能力描述类型
ResourcesCapability, # 资源相关能力描述类型
PromptsCapability, # Prompt 相关能力描述类型
ServerCapabilities, # 服务器能力类型
Implementation, # 实现信息类型
JSONRPCResponse, # JSONRPC 响应类型
ListToolsResult, # 列出工具结果类型
Tool, # 单个工具类型
TextContent, # 文本内容类型
CallToolRequestParams, # 调用工具请求参数类型
CallToolResult, # 调用工具响应结果类型
Resource, # 静态资源类型
ResourceTemplate, # 资源模板类型
ListResourcesResult, # 资源列表返回结果类型
ListResourceTemplatesResult, # 资源模板列表返回结果类型
ReadResourceRequestParams, # 读取资源请求参数类型
ReadResourceResult, # 读取资源请求响应类型
TextResourceContents, # 资源文本内容类型
BlobResourceContents, # 资源二进制内容类型
Prompt, # Prompt 类型
PromptArgument, # Prompt 参数类型
ListPromptsResult, # Prompt 列表结果类型
GetPromptRequestParams, # 获取 Prompt 请求参数类型
GetPromptResult, # 获取 Prompt 响应结果类型
PromptMessage, # Prompt 消息类型
)
# 导入 pydantic 的基础模型作为类型判定
from pydantic import BaseModel as PydanticBaseModel
# 导入 Typeddict 测试
from typing_extensions import is_typeddict as _is_typeddict
# 导入本包下 prompts 子模块
from . import prompts
# 辅助函数:提取函数输出 schema 及包裹需求
def _output_schema_and_wrap(fn, structured_output: bool):
# 若不是结构化输出,直接返回
if not structured_output:
return None, False
try:
# 获取函数签名
sig = inspect.signature(fn)
# 获取返回类型注解
ann = sig.return_annotation
# 如果没有注解,返回
if ann is inspect.Parameter.empty:
return None, False
except Exception:
return None, False
# 生成 schema
return _schema_from_annotation(ann, fn.__name__)
# 辅助函数:根据注解和函数名生成 schema
def _schema_from_annotation(ann, func_name: str):
# 没有注解直接返回
if ann is inspect.Parameter.empty:
return None, False
# 获取注解的原类型
origin = get_origin(ann)
wrap = False
schema = None
# 如果是 Pydantic 的模型子类
if PydanticBaseModel and isinstance(ann, type) and issubclass(ann, PydanticBaseModel):
schema = ann.model_json_schema()
return schema, False
# 如果是 Typeddict
if hasattr(ann, "__annotations__") and not (origin is dict or origin is list):
if _is_typeddict(ann):
hints = get_type_hints(ann) if hasattr(ann, "__annotations__") else {}
props = {}
for k, v in hints.items():
t = v
if t is int or t is type(None) and int in get_args(v):
props[k] = {"type": "integer"}
elif t is float:
props[k] = {"type": "number"}
elif t is str:
props[k] = {"type": "string"}
elif t is bool:
props[k] = {"type": "boolean"}
else:
props[k] = {"type": "string"}
schema = {"type": "object", "properties": props, "required": list(props)}
return schema, False
try:
hints = get_type_hints(ann)
if hints:
props = {}
for k, v in hints.items():
if v is int or v is type(None):
props[k] = {"type": "integer"}
elif v is float:
props[k] = {"type": "number"}
elif v is str:
props[k] = {"type": "string"}
elif v is bool:
props[k] = {"type": "boolean"}
elif get_origin(v) is list:
props[k] = {"type": "array", "items": {"type": "string"}}
else:
props[k] = {"type": "string"}
schema = {"type": "object", "properties": props}
return schema, False
except Exception:
pass
# 如果是 dict 类型
if origin is dict:
args = get_args(ann)
if len(args) == 2 and args[0] is str:
vt = args[1]
# 判断 value 类型
if vt is float:
schema = {"type": "object", "additionalProperties": {"type": "number"}}
elif vt is int:
schema = {"type": "object", "additionalProperties": {"type": "integer"}}
elif vt is str:
schema = {"type": "object", "additionalProperties": {"type": "string"}}
else:
schema = {"type": "object", "additionalProperties": {}}
return schema, False
wrap = True
# 如果是 list、基础类型等需要包裹
if origin is list or ann in (int, float, str, bool, type(None)):
wrap = True
# 包裹输出情况
if wrap:
result_schema = {"type": "string"}
if ann is int:
result_schema = {"type": "integer"}
elif ann is float:
result_schema = {"type": "number"}
elif ann is bool:
result_schema = {"type": "boolean"}
elif origin is list:
result_schema = {"type": "array", "items": {"type": "string"}}
schema = {"type": "object", "properties": {"result": result_schema}, "required": ["result"]}
return schema, True
return None, False
# 辅助:对象输出转换为结构化内容
def _to_structured(out, output_schema, wrap_output):
if output_schema is None:
return None
try:
# 如果是 Pydantic 模型
if PydanticBaseModel and isinstance(out, PydanticBaseModel):
return out.model_dump(mode="json")
# 如果需要包裹
if wrap_output:
return {"result": out}
# 如果是字典
if isinstance(out, dict):
return dict(out)
# 带有 __annotations__ 的对象,提取属性
if hasattr(out, "__annotations__"):
hints = get_type_hints(type(out)) if hasattr(type(out), "__annotations__") else getattr(out, "__annotations__", {})
return {k: getattr(out, k) for k in hints if hasattr(out, k)}
# 其它直接包裹
return {"result": out}
except Exception:
# 发生异常返回字符串
return {"result": str(out)}
# 根据函数参数生成 schema
def _schema(fn):
sig = inspect.signature(fn)
props = {}
req = []
for n, p in sig.parameters.items():
# 跳过以下划线开头的参数
if n.startswith("_"):
continue
# 跳过 Context 类型参数(由框架注入)
if _find_context_parameter(fn) == n:
continue
# 参数类型为字符串,标题为参数名
props[n] = {"type": "string", "title": n}
# 必填参数加入 required
if p.default is inspect.Parameter.empty:
req.append(n)
return {"type": "object", "properties": props, "required": req}
# 检测工具函数是否包含 Context 参数
def _find_context_parameter(fn):
"""若函数有 ctx: Context 参数,返回参数名;否则返回 None。"""
try:
sig = inspect.signature(fn)
hints = get_type_hints(fn) if hasattr(fn, "__annotations__") else {}
for name, param in sig.parameters.items():
ann = hints.get(name, param.annotation)
if ann is not inspect.Parameter.empty:
n = getattr(ann, "__name__", None) or str(ann)
if n == "Context" or (isinstance(n, str) and n.endswith(".Context")):
return name
except Exception:
pass
return None
# 根据 prompt 函数生成 prompt schema
def _prompt_schema(fn):
"""根据函数签名生成 Prompt 参数 schema。"""
sig = inspect.signature(fn)
props = {}
req = []
for n, p in sig.parameters.items():
if n.startswith("_"):
continue
props[n] = {"type": "string", "title": n}
if p.default is inspect.Parameter.empty:
req.append(n)
return {"type": "object", "properties": props, "required": req}
# Context 类:工具执行时的上下文,支持 read_resource、report_progress、debug、info
class Context:
"""工具执行上下文,提供资源读取、进度报告、日志发送等能力。"""
def __init__(self, mcp_server, request_id, send_notification=None, progress_token=None):
self._mcp = mcp_server
self.request_id = request_id
self._send = send_notification or (lambda _: None)
self._progress_token = progress_token
async def read_resource(self, uri: str):
"""异步读取资源,返回内容块列表(与 ReadResourceResult.contents 结构一致)。"""
uri_str = str(uri)
# 静态资源
if res := self._mcp._resources.get(uri_str):
out = res.run()
from mcp_lite.types import TextResourceContents
return [TextResourceContents(uri=uri_str, text=str(out), mime_type=res.mime_type)]
# 模板资源
for template in self._mcp._resource_templates.values():
if args := template.matches(uri_str):
out = template.run(args)
from mcp_lite.types import TextResourceContents, BlobResourceContents
if isinstance(out, bytes):
return [BlobResourceContents(uri=uri_str, blob=base64.b64encode(out).decode(), mime_type=template.mime_type or "application/octet-stream")]
return [TextResourceContents(uri=uri_str, text=str(out), mime_type=template.mime_type or "text/plain")]
return []
async def report_progress(self, progress: float, total: float | None = None, message: str | None = None):
"""发送进度通知(仅当客户端提供了 progressToken 时有效)。"""
if self._progress_token is not None:
self._send({"method": "notifications/progress", "params": {"progressToken": self._progress_token, "progress": progress, "total": total, "message": message}})
async def debug(self, data):
"""发送 debug 级别日志。"""
self._send({"method": "notifications/message", "params": {"level": "debug", "data": data}})
async def info(self, data):
"""发送 info 级别日志。"""
self._send({"method": "notifications/message", "params": {"level": "info", "data": data}})
# 工具函数封装类
class _Tool:
def __init__(self, fn, name=None, desc=None, structured_output=True):
# 函数本体
self.fn = fn
# 名称
self.name = name or fn.__name__
# 描述
self.desc = desc or (fn.__doc__ or "").strip()
# 入参 schema
self.schema = _schema(fn)
# 是否是异步函数
self.async_fn = asyncio.iscoroutinefunction(fn)
# 是否结构化输出
self.structured_output = structured_output
# 输出 schema 及需否包裹
self.output_schema, self.wrap_output = _output_schema_and_wrap(fn, structured_output) if structured_output else (None, False)
# Context 参数名(若有)
self._ctx_param = _find_context_parameter(fn)
# 转换为 Tool 对象
def to_tool(self):
return Tool(name=self.name, description=self.desc or None, input_schema=self.schema, output_schema=self.output_schema)
# 执行工具(自动支持异步),支持注入 Context
def run(self, args, mcp_server=None, request_id=None, progress_token=None, send_notification=None):
if self._ctx_param is not None and mcp_server is not None:
ctx = Context(mcp_server, request_id, send_notification, progress_token)
args = dict(args) if args else {}
args[self._ctx_param] = ctx
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# 静态资源封装类
class _Resource:
def __init__(self, uri, fn, mime_type="text/plain"):
# 资源 URI
self.uri = uri
# 回调函数
self.fn = fn
# MIME 类型
self.mime_type = mime_type
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 资源运行,自动支持异步
def run(self):
if self.async_fn:
return asyncio.run(self.fn())
return self.fn()
# 资源模板封装类(支持 URI 参数模板)
class _ResourceTemplate:
def __init__(self, uri_template, fn, mime_type="text/plain"):
# URI 模板
self.uri_template = uri_template
# 生成资源内容的函数
self.fn = fn
# MIME 类型
self.mime_type = mime_type
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 提取函数里除了 _ 开头之外的参数名
sig = inspect.signature(fn)
self.param_names = [n for n in sig.parameters if not n.startswith("_")]
# 检查 URI 是否与模板匹配,并解析参数
def matches(self, uri):
pattern = self.uri_template.replace("{", "(?P<").replace("}", ">[^/]+)")
m = re.match(f"^{pattern}$", uri)
if m:
return {k: unquote(v) for k, v in m.groupdict().items()}
return None
# 执行模板内容生成函数
def run(self, args):
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# Prompt 封装
class _Prompt:
"""内部 Prompt 封装类。"""
def __init__(self, fn, name=None, title=None, description=None):
# prompt 生成函数
self.fn = fn
# prompt 名称
self.name = name or fn.__name__
# prompt 标题
self.title = title
# prompt 描述
self.description = description or (fn.__doc__ or "").strip()
# 通用 schema
self.schema = _prompt_schema(fn)
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 从 schema 生成 PromptArgument 列表
args_list = []
if "properties" in self.schema:
required = set(self.schema.get("required", []))
for pname, pinfo in self.schema["properties"].items():
args_list.append(
PromptArgument(
name=pname,
description=pinfo.get("title"),
required=pname in required,
)
)
self.arguments = args_list
# 转成 Prompt 对象
def to_prompt(self):
return Prompt(
name=self.name,
title=self.title,
description=self.description or None,
arguments=self.arguments if self.arguments else None,
)
# prompt 执行
def run(self, args):
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# FastMCP 主类
class FastMCP:
def __init__(self, name="mcp-server"):
# 服务器名称
self.name = name
# 工具注册表
self._tools = {}
# 静态资源注册表
self._resources = {}
# 资源模板注册表
self._resource_templates = {}
# prompt 注册表
self._prompts = {}
# 注册工具的装饰器
def tool(self, name=None, description=None, structured_output=True):
def deco(fn):
t = _Tool(fn, name, description, structured_output=structured_output)
self._tools[t.name] = t
return fn
return deco
# 注册资源或模板资源的装饰器
def resource(self, uri, mime_type="text/plain"):
def deco(fn):
# 判断带模板参数还是静态资源
if "{" in uri and "}" in uri:
template = _ResourceTemplate(uri, fn, mime_type)
self._resource_templates[uri] = template
else:
res = _Resource(uri, fn, mime_type)
self._resources[uri] = res
return fn
return deco
# 注册 Prompt 的装饰器
def prompt(self, name=None, title=None, description=None):
"""注册 Prompt 的装饰器。"""
def deco(fn):
p = _Prompt(fn, name=name, title=title, description=description)
self._prompts[p.name] = p
return fn
return deco
# 核心 RPC 方法分发
def _handle(self, req):
# 获取方法名、参数和请求 id
method, params, rid = req.method, req.params or {}, req.id
# 初始化请求
if method == "initialize":
caps = ServerCapabilities(
tools=ToolsCapability(),
resources=ResourcesCapability(),
prompts=PromptsCapability() if self._prompts else None,
)
r = InitializeResult(
protocol_version=LATEST_PROTOCOL_VERSION,
capabilities=caps,
server_info=Implementation(name=self.name, version="0.1.0"),
)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 工具列表请求
if method == "tools/list":
r = ListToolsResult(tools=[t.to_tool() for t in self._tools.values()])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 工具调用请求
if method == "tools/call":
p = CallToolRequestParams.model_validate(params, by_name=False)
t = self._tools.get(p.name)
if not t:
return JSONRPCError(
jsonrpc="2.0",
id=rid,
error=ErrorData(code=-32602, message=f"Unknown tool: {p.name}")
)
# 从 meta/_meta 提取 progressToken,用于发送进度/日志通知
meta = p.meta or {}
progress_token = meta.get("progressToken") or meta.get("progress_token")
notifications = []
def _send_notification(payload):
method_name = payload.get("method", "")
params = payload.get("params")
notifications.append(JSONRPCNotification(jsonrpc="2.0", method=method_name, params=params))
try:
out = t.run(
p.arguments or {},
mcp_server=self,
request_id=rid,
progress_token=progress_token,
send_notification=_send_notification,
)
# 如果直接返回的是 CallToolResult
if isinstance(out, CallToolResult):
r = out
else:
struct = None
# 结构化输出
if t.output_schema is not None:
struct = _to_structured(out, t.output_schema, t.wrap_output)
# 字符串直接用文本包装
if isinstance(out, str):
c = [TextContent(text=out)]
elif out is None:
c = []
elif struct is not None:
# 结构化结果按 JSON 格式化后返回
c = [TextContent(text=json.dumps(struct, ensure_ascii=False, indent=2))]
else:
# 普通对象尝试转 json,否则转字符串
try:
if PydanticBaseModel and isinstance(out, PydanticBaseModel):
text = json.dumps(out.model_dump(mode="json"), ensure_ascii=False, indent=2)
elif isinstance(out, dict):
text = json.dumps(out, ensure_ascii=False, indent=2)
else:
text = str(out)
except Exception:
text = str(out)
c = [TextContent(text=text)]
r = CallToolResult(content=c, structured_content=struct)
except Exception as e:
r = CallToolResult(content=[TextContent(text=str(e))], is_error=True)
resp = JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
if notifications:
return (notifications, resp)
return resp
# prompts/list 请求
if method == "prompts/list":
prompts_list = [p.to_prompt() for p in self._prompts.values()]
r = ListPromptsResult(prompts=prompts_list)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# prompts/get 请求
if method == "prompts/get":
p = GetPromptRequestParams.model_validate(params, by_name=False)
prompt_obj = self._prompts.get(p.name)
if not prompt_obj:
return JSONRPCError(
jsonrpc="2.0",
id=rid,
error=ErrorData(code=-32602, message=f"Unknown prompt: {p.name}")
)
try:
args = p.arguments or {}
result = prompt_obj.run(args)
messages = []
# 单条消息包装为列表
if not isinstance(result, (list, tuple)):
result = [result]
for msg in result:
# 若为内置的 Message 对象
if isinstance(msg, prompts.base.Message):
content = msg.content
if isinstance(content, str):
content = TextContent(type="text", text=content)
messages.append(PromptMessage(role=msg.role, content=content))
# 字符串消息按 user 角色组装
elif isinstance(msg, str):
messages.append(PromptMessage(role="user", content=TextContent(type="text", text=msg)))
# dict 消息,读 role 和 content
elif isinstance(msg, dict):
role = msg.get("role", "user")
cnt = msg.get("content", "")
if isinstance(cnt, dict) and cnt.get("type") == "text":
content = TextContent(**cnt)
else:
content = TextContent(type="text", text=str(cnt) if not isinstance(cnt, str) else cnt)
messages.append(PromptMessage(role=role, content=content))
# 其它均归为 user 文本
else:
messages.append(PromptMessage(role="user", content=TextContent(type="text", text=str(msg))))
r = GetPromptResult(description=prompt_obj.description, messages=messages)
except Exception as e:
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 资源静态列表
if method == "resources/list":
resources = [
Resource(uri=u, name=u, mime_type=r.mime_type)
for u, r in self._resources.items()
]
r = ListResourcesResult(resources=resources)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 资源模板列表
if method == "resources/templates/list":
templates = [
ResourceTemplate(uri_template=u, name=u, mime_type=t.mime_type)
for u, t in self._resource_templates.items()
]
r = ListResourceTemplatesResult(resource_templates=templates)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 读取资源接口
if method == "resources/read":
p = ReadResourceRequestParams.model_validate(params, by_name=False)
uri_str = str(p.uri)
try:
# 静态资源优先
if res := self._resources.get(uri_str):
out = res.run()
content = TextResourceContents(uri=uri_str, text=str(out), mime_type=res.mime_type)
r = ReadResourceResult(contents=[content])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 匹配模板资源
for template in self._resource_templates.values():
if args := template.matches(uri_str):
out = template.run(args)
if isinstance(out, bytes):
content = BlobResourceContents(
uri=uri_str,
blob=base64.b64encode(out).decode(),
mime_type=template.mime_type or "application/octet-stream",
)
else:
content = TextResourceContents(
uri=uri_str,
text=str(out),
mime_type=template.mime_type or "text/plain",
)
r = ReadResourceResult(contents=[content])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 找不到资源
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32602, message=f"Unknown resource: {uri_str}"))
except Exception as e:
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
# 未知方法
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32601, message=f"Method not found: {method}"))
# 消息包装与请求
def _handle_msg(self, msg):
# 非 SessionMessage 类型忽略
if not isinstance(msg, SessionMessage):
return None
m = msg.message
# 必须是带 id 的 jsonrpc 请求
if not isinstance(m, JSONRPCRequest) or getattr(m, "id", None) is None:
return None
#print("[Server] Request:", m.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
try:
resp = self._handle(m)
#print("[Server] Response:", resp.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
return resp
except Exception as e:
err = JSONRPCError(jsonrpc="2.0", id=m.id, error=ErrorData(code=-32603, message=str(e)))
#print("[Server] Response (error):", err.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
return err
# 运行服务器主逻辑
+ def run(self, transport="stdio", host: str = "127.0.0.1", port: int = 8000):
# 如果传输方式为 stdio,则启动标准输入输出服务器
+ if transport == "stdio":
+ stdio.stdio_server(self._handle_msg)
# 如果传输方式为 streamable-http,则启动基于 HTTP 的服务器
+ elif transport == "streamable-http":
# 导入 Starlette 框架相关模块
+ from starlette.applications import Starlette
+ from starlette.routing import Mount
# 导入 streamable_http_app 工厂方法
+ from mcp_lite.server.streamable_http import streamable_http_app
# 导入 uvicorn 用于启动 ASGI 服务
+ import uvicorn
# 创建 Streamable HTTP ASGI 应用
+ app = streamable_http_app(self._handle_msg, path="/")
# 将 /mcp 路径挂载到应用
+ full_app = Starlette(routes=[Mount("/mcp", app=app)])
# 启动 uvicorn 服务,监听指定 host 和 port
+ uvicorn.run(full_app, host=host, port=port)
# 如果传输方式不被支持,则抛出异常
+ else:
raise ValueError(f"unsupported transport: {transport}")
# 明确导出接口
__all__ = ["FastMCP", "Context", "prompts"]7. 工作流程 #
7.1 功能概述 #
这部分实现的是 MCP(Model Context Protocol)的 Streamable HTTP 传输:客户端通过 HTTP POST 发送 JSON-RPC 消息,服务端返回 JSON 响应,并用 mcp-session-id 头维护会话。
7.2 时序图 #
7.2.1 整体交互流程 #
7.2.2 单次请求的 HTTP 层细节 #
7.2.3 客户端退出流程 #
7.3 各模块说明 #
7.3.1 客户端传输层 mcp_lite/client/streamable_http.py #
作用:在 HTTP 上实现 MCP 的读写通道,供 ClientSession 使用。
结构:
- 队列:
read_q存响应,write_q存待发请求 - 工作线程:从
write_q取消息 → POST 到服务端 → 把响应放入read_q - 会话 ID:
session_id用列表包装,在多次请求间共享 - 停止标志:
stop_flag用列表包装,供worker和_ctx共享,用于优雅退出
流程:
streamable_http_client(url)返回上下文管理器- 进入
with时启动worker线程 - 主线程通过
WriteStream.send()把消息放入write_q worker取出消息,POST 到服务端,把响应放入read_q- 主线程通过
ReadStream.get()从read_q取响应 - 退出
with时:stop_flag[0]=True、write_q.put(None),worker退出
7.3.2 服务端传输层 mcp_lite/server/streamable_http.py #
作用:提供 Starlette ASGI 应用,处理 MCP 的 HTTP 请求。
流程:
- 只处理 POST
- 解析 JSON 请求体,校验 JSON-RPC 格式
- 从请求头取
mcp-session-id,没有则生成 UUID - 封装为
SessionMessage,调用handler(session_msg) - 根据 handler 返回:
None→ 202,无响应体- 有内容 → 200,JSON 响应
- 响应头中带上
mcp-session-id
7.3.3 FastMCP 扩展 mcp_lite/server/fastmcp/__init__.py #
改动:run() 支持 transport="streamable-http"。
def run(self, transport="stdio", host="127.0.0.1", port=8000):
if transport == "stdio":
stdio.stdio_server(self._handle_msg)
elif transport == "streamable-http":
# 创建 streamable_http_app,挂载到 /mcp,用 uvicorn 启动
app = streamable_http_app(self._handle_msg, path="/")
full_app = Starlette(routes=[Mount("/mcp", app=app)])
uvicorn.run(full_app, host=host, port=port)
else:
raise ValueError(...)7.3.4 示例客户端 streamable_http_client.py #
作用:演示如何用 Streamable HTTP 连接 MCP 服务。
流程:streamable_http_client(url) → ClientSession(read, write) → initialize() → list_tools() → call_tool("greet", {...})。
7.3.5 示例服务端 streamable_http_server.py #
作用:演示 FastMCP 以 streamable-http 方式启动。
mcp = FastMCP(name="HTTP Server")
@mcp.tool()
def greet(name: str = "World") -> str:
return f"Hello, {name}!"
if __name__ == "__main__":
mcp.run(transport="streamable-http") # 监听 8000,路径 /mcp7.4 要点总结 #
| 项目 | 说明 |
|---|---|
| 请求模型 | 每条 MCP 消息对应一次 HTTP POST,无长连接 |
| 会话 | 通过 mcp-session-id 在多次请求间保持会话 |
| 客户端线程 | Worker 负责 HTTP 收发,主线程通过队列收发消息 |
| 202 响应 | 通知类请求(无 id)返回 202,无响应体 |
| 共享状态 | session_id、stop_flag 用列表包装,在闭包间共享可变状态 |