导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.授权
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • starlette
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • z
  • 1.StreamableHTTP
  • 2. streamable_http.py
  • 3. streamable_http.py
  • 4. streamable_http_client.py
  • 5. streamable_http_server.py
  • 6. init.py
  • 7. 工作流程
    • 7.1 功能概述
    • 7.2 时序图
      • 7.2.1 整体交互流程
      • 7.2.2 单次请求的 HTTP 层细节
      • 7.2.3 客户端退出流程
    • 7.3 各模块说明
      • 7.3.1 客户端传输层 mcp_lite/client/streamable_http.py
      • 7.3.2 服务端传输层 mcp_lite/server/streamable_http.py
      • 7.3.3 FastMCP 扩展 mcp_lite/server/fastmcp/__init__.py
      • 7.3.4 示例客户端 streamable_http_client.py
      • 7.3.5 示例服务端 streamable_http_server.py
    • 7.4 要点总结

1.StreamableHTTP #

  • 什么是传输?
uv add httpx  "uvicorn[standard]" starlette

本次为FastMCP项目引入了 Streamable HTTP 传输能力,包括客户端(mcp_lite/client/streamable_http.py)与服务端(mcp_lite/server/streamable_http.py)的实现,以及统一的 API 适配。Streamable HTTP 通过标准 HTTP POST 接口实现消息流式传递,为对话和任务处理带来更好的兼容性与扩展性。主要特性如下:

  1. 客户端实现:streamable_http_client 提供了线程安全的 get/send 同步接口,支持在上下文管理器中启动后台线程,通过 httpx 库与服务端进行流式消息交互。
  2. 服务端实现:streamable_http_app 基于 Starlette 构建 HTTP API,封装 JSON-RPC 消息协议,支持多会话和 Session ID 头自动管理,便于与多客户端对接。
  3. 集成与运行:开发者可以在 FastMCP 实例中通过 transport="streamable-http" 选项,快速启动兼容标准 HTTP POST 的流式消息微服务,并可对接 Web 或跨语言客户端。
npx @modelcontextprotocol/inspector uv --directory D:/aprepare/mcp-starter run streamable_http_server.py

2. streamable_http.py #

mcp_lite/client/streamable_http.py

# mcp_lite Streamable HTTP 客户端传输
# 导入 json 序列化模块
import json
# 导入线程模块
import threading
# 导入 contextmanager 上下文管理器装饰器
from contextlib import contextmanager
# 导入线程安全队列
from queue import Queue

# 导入 httpx HTTP 客户端库
import httpx

# 导入会话消息类型
from mcp_lite.message import SessionMessage
# 导入 JSONRPC 错误类型和消息适配器
from mcp_lite.types import JSONRPCError, ErrorData, jsonrpc_message_adapter

# 定义 Session ID 的 HTTP 头名称
MCP_SESSION_ID = "mcp-session-id"


# 定义 Streamable HTTP 客户端上下文管理器,返回 (read_stream, write_stream)
def streamable_http_client(url: str, *, timeout: float = 30.0):
    """
    Streamable HTTP 客户端上下文管理器,返回 (read_stream, write_stream)。

    提供同步的 get/send 接口,供 mcp_lite.ClientSession 使用。
    内部通过 HTTP POST 发送消息,接收 JSON 响应。
    """

    # 初始化用于接收读取消息的队列
    read_q: Queue = Queue()
    # 初始化用于发送写入消息的队列
    write_q: Queue = Queue()
    # 通过列表持有 session_id,实现闭包共享与可修改
    session_id: list[str | None] = [None]
    # 停止工作线程的标志位,用列表包装可变状态”的写法,用来在闭包或不同作用域之间共享可修改的值。
    stop_flag: list[bool] = [False]

    # HTTP 通讯的工作线程
    def worker():
        # 创建 httpx 客户端,设置超时和允许重定向
        with httpx.Client(timeout=timeout, follow_redirects=True) as client:
            # 准备基础请求头
            headers = {
                "accept": "application/json, text/event-stream",
                "content-type": "application/json",
            }
            # 持续循环直到收到 stop 标志
            while not stop_flag[0]:
                try:
                    # 从写队列读取消息
                    msg = write_q.get()
                    # 收到 None 消息时退出循环
                    if msg is None:
                        break
                    # 如果消息类型不是 SessionMessage,则跳过
                    if not isinstance(msg, SessionMessage):
                        continue
                    # 序列化消息对象为 JSON 兼容 dict
                    body = msg.message.model_dump(by_alias=True, mode="json", exclude_unset=True)
                    # 如果已有会话 ID,则添加到请求头
                    if session_id[0]:
                        headers[MCP_SESSION_ID] = session_id[0]
                    # 发送 POST 请求到远端服务端
                    resp = client.post(url, json=body, headers=headers)
                    # 尝试从响应头提取 session id 并保存
                    sid = resp.headers.get(MCP_SESSION_ID)
                    if sid:
                        session_id[0] = sid
                    # 202 状态码,无需处理响应体,为通知类消息
                    if resp.status_code == 202:
                        continue
                    # 400 或更高的状态码视为 HTTP 出错,组装错误回应
                    if resp.status_code >= 400:
                        err = {
                            "jsonrpc": "2.0",
                            "id": body.get("id"),
                            "error": {
                                "code": -32603,
                                "message": f"HTTP {resp.status_code}"
                            }
                        }
                        # 校验并转换为 SessionMessage,放入读队列
                        read_q.put(SessionMessage(
                            message=jsonrpc_message_adapter.validate_json(json.dumps(err), by_name=False)
                        ))
                        continue
                    # 200 状态且有响应内容
                    if resp.status_code == 200 and resp.content:
                        try:
                            # 尝试解析响应的 jsonrpc 消息
                            parsed = jsonrpc_message_adapter.validate_json(resp.text, by_name=False)
                            read_q.put(SessionMessage(message=parsed))
                        except Exception as e:
                            # 解析失败,组装解析错误,放入读队列
                            err = {
                                "jsonrpc": "2.0",
                                "id": body.get("id"),
                                "error": {
                                    "code": -32700,
                                    "message": str(e)
                                }
                            }
                            read_q.put(SessionMessage(
                                message=jsonrpc_message_adapter.validate_json(json.dumps(err), by_name=False)
                            ))
                except Exception as e:
                    # 捕获任何异常,组装为 JSONRPC 错误,放入读队列
                    err = JSONRPCError(
                        jsonrpc="2.0",
                        id=None,
                        error=ErrorData(code=-32603, message=str(e))
                    )
                    read_q.put(SessionMessage(message=err))
            # 通知读取端终止
            read_q.put(None)

    # 读取流类,负责从读队列取消息
    class ReadStream:
        def get(self):
            # 从队列获取响应消息
            return read_q.get()

    # 写入流类,负责将消息放入写队列
    class WriteStream:
        def send(self, msg):
            # 向队列添加待发送的消息
            write_q.put(msg)

    # 定义上下文管理器,用于启动/回收线程与维护通道
    @contextmanager
    def _ctx():
        # 创建并启动后台线程
        t = threading.Thread(target=worker, daemon=True)
        t.start()
        try:
            # 暴露 ReadStream 和 WriteStream 实例
            yield ReadStream(), WriteStream()
        finally:
            # 请求线程安全退出并清理
            stop_flag[0] = True
            write_q.put(None)
            t.join(timeout=5)

    # 返回上下文管理器实例
    return _ctx()

3. streamable_http.py #

mcp_lite/server/streamable_http.py

# mcp_lite Streamable HTTP 服务端
# 导入 json 用于序列化和反序列化 JSON 数据
import json
# 导入 uuid 用于生成唯一的会话 ID
import uuid

# 导入 Starlette 框架组件
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Route

# 导入 SessionMessage 用于封装会话消息
from mcp_lite.message import SessionMessage
# 导入 jsonrpc_message_adapter 用于校验与转换 JSON-RPC 消息
from mcp_lite.types import jsonrpc_message_adapter

# 定义会话 ID 的 HTTP 头名称
MCP_SESSION_ID = "mcp-session-id"
# 定义返回内容类型为 application/json
CONTENT_TYPE_JSON = "application/json"

# 定义 streamable_http_app 函数用于生成 Starlette ASGI 应用
def streamable_http_app(handler, path: str = "/"):
    """
    创建 Streamable HTTP ASGI 应用。

    Args:
        handler: 消息处理函数,接收 SessionMessage,返回 JSONRPCResponse 或 JSONRPCError
        path: 应用内路径,Mount 时前缀已剥离,通常为 "/"

    Returns:
        Starlette ASGI 应用
    """

    # 定义 POST 请求处理函数,接收一个 Request 类型参数
    async def handle_post(request: Request) -> Response:
        # 如果请求方法不是 POST,返回 405 错误
        if request.method != "POST":
            return Response("Method Not Allowed", status_code=405)
        try:
            # 解析请求体中的 JSON 内容
            body = await request.json()
        except Exception as e:
            # 解析失败返回 JSON-RPC 解析错误
            return Response(
                json.dumps({"jsonrpc": "2.0", "error": {"code": -32700, "message": str(e)}}),
                status_code=400,
                media_type=CONTENT_TYPE_JSON,
            )
        try:
            # 校验并适配 JSON-RPC 消息格式
            msg = jsonrpc_message_adapter.validate_json(json.dumps(body), by_name=False)
        except Exception as e:
            # 格式校验失败返回 JSON-RPC 解析错误
            return Response(
                json.dumps({"jsonrpc": "2.0", "error": {"code": -32700, "message": str(e)}}),
                status_code=400,
                media_type=CONTENT_TYPE_JSON,
            )
        # 读取请求头中的 session-id,如果没有则生成新的 uuid
        session_id = request.headers.get(MCP_SESSION_ID) or str(uuid.uuid4())
        # 封装 SessionMessage 消息对象
        session_msg = SessionMessage(message=msg)
        try:
            # 调用 handler 进行业务处理,返回响应对象
            resp = handler(session_msg)
            # 如果 handler 返回 (notifications, response) 的元组,则只取响应部分
            if isinstance(resp, tuple) and len(resp) == 2:
                resp = resp[1]  # (notifications, response) -> 取最终响应
        except Exception as e:
            # 处理过程中发生异常,构造返回标准 JSONRPC 错误消息
            rid = getattr(msg, "id", None)
            err_body = {"jsonrpc": "2.0", "id": rid, "error": {"code": -32603, "message": str(e)}}
            return Response(json.dumps(err_body), media_type=CONTENT_TYPE_JSON, headers={MCP_SESSION_ID: session_id})
        # 如果没有响应内容,返回 202 表示已接受但无响应体
        if resp is None:
            return Response(status_code=202, headers={MCP_SESSION_ID: session_id})
        # 如果响应对象有 model_dump 方法,序列化为 dict,否则直接作为输出
        out = resp.model_dump(by_alias=True, exclude_unset=True) if hasattr(resp, "model_dump") else resp
        # 返回最终的 JSON 响应,设置 MIME 类型和 session ID
        return Response(
            json.dumps(out, ensure_ascii=False),
            media_type=CONTENT_TYPE_JSON,
            headers={MCP_SESSION_ID: session_id},
        )

    # 配置路由表,只支持 POST 方法
    routes = [Route(path, endpoint=handle_post, methods=["POST"])]
    # 返回 Starlette 应用实例
    return Starlette(routes=routes)

4. streamable_http_client.py #

streamable_http_client.py

# 导入 ClientSession 和 types 类型定义
from mcp_lite import ClientSession, types
# 导入用于创建 Streamable HTTP 客户端的函数
from mcp_lite.client.streamable_http import streamable_http_client


# 主函数定义,程序入口点
def main() -> None:
    # 设置服务端的地址(MCP HTTP 服务器地址)
    server_url = "http://127.0.0.1:8000/mcp"
    # 使用 with 语句连接远程服务器,获取读写流
    with streamable_http_client(server_url) as (read, write):
        # 创建 MCP 客户端会话,绑定读写通道
        session = ClientSession(read, write)
        # 执行初始化握手过程
        session.initialize()

        # 获取所有可用的工具列表
        tools = session.list_tools()
        # 打印工具列表名称
        print("[Tools]", [t.name for t in tools.tools])

        # 调用名为 'greet' 的工具,传入参数 name = "MCP"
        result = session.call_tool("greet", {"name": "MCP"})
        # 定义文本内容列表,用于收集返回内容
        texts = []
        # 遍历返回内容中的所有区块
        for block in result.content:
            # 如果区块是文本类型,则提取文本内容
            if isinstance(block, types.TextContent):
                texts.append(block.text)
        # 合并所有文本内容并打印
        print("[Call greet]", " | ".join(texts))


# 程序入口判断,如果当前脚本被直接执行,则调用 main()
if __name__ == "__main__":
    main()

官方代码

# 导入 asyncio 模块
import asyncio
# 导入 ClientSession 和 types 类型定义
from mcp import ClientSession, types
# 导入用于创建 Streamable HTTP 客户端的函数
from mcp.client.streamable_http import streamable_http_client


# 主函数定义,程序入口点
async def main() -> None:
    # 设置服务端的地址(MCP HTTP 服务器地址)
    server_url = "http://127.0.0.1:8000/mcp"
    # 使用 with 语句连接远程服务器,获取读写流
    # streamable_http_client 返回 (read, write, get_session_id) 三元组
    async with streamable_http_client(server_url) as (read, write, _):
        # 创建 MCP 客户端会话,绑定读写通道
        async with ClientSession(read, write) as session:
            # 执行初始化握手过程
            await session.initialize()

            # 获取所有可用的工具列表
            tools = await session.list_tools()
            # 打印工具列表名称
            print("[Tools]", [t.name for t in tools.tools])

            # 调用名为 'greet' 的工具,传入参数 name = "MCP"
            result = await session.call_tool("greet", arguments={"name": "MCP"})
            # 定义文本内容列表,用于收集返回内容
            texts = []
            # 遍历返回内容中的所有区块
            for block in result.content:
                # 如果区块是文本类型,则提取文本内容
                if isinstance(block, types.TextContent):
                    texts.append(block.text)
            # 合并所有文本内容并打印
            print("[Call greet]", " | ".join(texts))


# 程序入口判断,如果当前脚本被直接执行,则调用 main()
if __name__ == "__main__":
    asyncio.run(main())

5. streamable_http_server.py #

streamable_http_server.py

# 导入 FastMCP 类,用于搭建 MCP 服务器
from mcp_lite.server.fastmcp import FastMCP

# 创建 FastMCP 实例,指定服务器名称为 "HTTP Server"
mcp = FastMCP(name="HTTP Server")

# 使用 MCP 的 tool 装饰器注册工具函数
@mcp.tool()
# 定义 greet 工具,接收参数 name,默认值为 "World"
def greet(name: str = "World") -> str:
    # 返回一个格式化的问候字符串
    return f"Hello, {name}!"

# 判断当前文件是否作为主程序运行
if __name__ == "__main__":
    # 启动 MCP 服务器,采用 streamable-http 协议进行传输
    mcp.run(transport="streamable-http")

官方代码

# 导入 FastMCP 类,用于搭建 MCP 服务器
from mcp.server.fastmcp import FastMCP

# 创建 FastMCP 实例,指定服务器名称为 "HTTP Server"
mcp = FastMCP(name="HTTP Server")

# 使用 MCP 的 tool 装饰器注册工具函数
@mcp.tool()
# 定义 greet 工具,接收参数 name,默认值为 "World"
def greet(name: str = "World") -> str:
    # 返回一个格式化的问候字符串
    return f"Hello, {name}!"

# 判断当前文件是否作为主程序运行
if __name__ == "__main__":
    # 启动 MCP 服务器,采用 streamable-http 协议进行传输
    mcp.run(transport="streamable-http")

6. init.py #

mcp_lite/server/fastmcp/init.py

# FastMCP 包:从原 fastmcp 模块导入所有内容以保持向后兼容
# 原 fastmcp.py 的内容已移入此包,通过 __init__ 导出

# 导入系统模块
import sys
# 导入 base64 编码库
import base64
# 导入 json 序列化
import json
# 导入正则表达式模块
import re
# 导入 url 解码工具
from urllib.parse import unquote
# 导入类型相关辅助函数
from typing import get_args, get_origin, get_type_hints
# 导入 SessionMessage 类型
from mcp_lite.message import SessionMessage
# 导入 stdio 通信模块
from mcp_lite.server import stdio
# 导入函数签名和异步工具
import inspect
import asyncio
from mcp_lite.types import (                                       # 导入 mcp_lite.types 模块中的多个类型
    JSONRPCRequest,                                                # JSONRPC 请求类型
    JSONRPCNotification,                                           # JSONRPC 通知类型
    JSONRPCError,                                                  # JSONRPC 错误响应类型
    ErrorData,                                                     # 错误数据类型
    InitializeResult,                                              # 初始化结果类型
    LATEST_PROTOCOL_VERSION,                                       # 最新协议版本号
    ToolsCapability,                                               # 工具相关能力描述类型
    ResourcesCapability,                                           # 资源相关能力描述类型
    PromptsCapability,                                             # Prompt 相关能力描述类型
    ServerCapabilities,                                            # 服务器能力类型
    Implementation,                                                # 实现信息类型
    JSONRPCResponse,                                               # JSONRPC 响应类型
    ListToolsResult,                                               # 列出工具结果类型
    Tool,                                                          # 单个工具类型
    TextContent,                                                   # 文本内容类型
    CallToolRequestParams,                                         # 调用工具请求参数类型
    CallToolResult,                                                # 调用工具响应结果类型
    Resource,                                                      # 静态资源类型
    ResourceTemplate,                                              # 资源模板类型
    ListResourcesResult,                                           # 资源列表返回结果类型
    ListResourceTemplatesResult,                                   # 资源模板列表返回结果类型
    ReadResourceRequestParams,                                     # 读取资源请求参数类型
    ReadResourceResult,                                            # 读取资源请求响应类型
    TextResourceContents,                                          # 资源文本内容类型
    BlobResourceContents,                                          # 资源二进制内容类型
    Prompt,                                                        # Prompt 类型
    PromptArgument,                                                # Prompt 参数类型
    ListPromptsResult,                                             # Prompt 列表结果类型
    GetPromptRequestParams,                                        # 获取 Prompt 请求参数类型
    GetPromptResult,                                               # 获取 Prompt 响应结果类型
    PromptMessage,                                                 # Prompt 消息类型
)
# 导入 pydantic 的基础模型作为类型判定
from pydantic import BaseModel as PydanticBaseModel
# 导入 Typeddict 测试
from typing_extensions import is_typeddict as _is_typeddict

# 导入本包下 prompts 子模块
from . import prompts

# 辅助函数:提取函数输出 schema 及包裹需求
def _output_schema_and_wrap(fn, structured_output: bool):
    # 若不是结构化输出,直接返回
    if not structured_output:
        return None, False
    try:
        # 获取函数签名
        sig = inspect.signature(fn)
        # 获取返回类型注解
        ann = sig.return_annotation
        # 如果没有注解,返回
        if ann is inspect.Parameter.empty:
            return None, False
    except Exception:
        return None, False
    # 生成 schema
    return _schema_from_annotation(ann, fn.__name__)

# 辅助函数:根据注解和函数名生成 schema
def _schema_from_annotation(ann, func_name: str):
    # 没有注解直接返回
    if ann is inspect.Parameter.empty:
        return None, False
    # 获取注解的原类型
    origin = get_origin(ann)
    wrap = False
    schema = None
    # 如果是 Pydantic 的模型子类
    if PydanticBaseModel and isinstance(ann, type) and issubclass(ann, PydanticBaseModel):
        schema = ann.model_json_schema()
        return schema, False
    # 如果是 Typeddict
    if hasattr(ann, "__annotations__") and not (origin is dict or origin is list):
        if _is_typeddict(ann):
            hints = get_type_hints(ann) if hasattr(ann, "__annotations__") else {}
            props = {}
            for k, v in hints.items():
                t = v
                if t is int or t is type(None) and int in get_args(v):
                    props[k] = {"type": "integer"}
                elif t is float:
                    props[k] = {"type": "number"}
                elif t is str:
                    props[k] = {"type": "string"}
                elif t is bool:
                    props[k] = {"type": "boolean"}
                else:
                    props[k] = {"type": "string"}
            schema = {"type": "object", "properties": props, "required": list(props)}
            return schema, False
        try:
            hints = get_type_hints(ann)
            if hints:
                props = {}
                for k, v in hints.items():
                    if v is int or v is type(None):
                        props[k] = {"type": "integer"}
                    elif v is float:
                        props[k] = {"type": "number"}
                    elif v is str:
                        props[k] = {"type": "string"}
                    elif v is bool:
                        props[k] = {"type": "boolean"}
                    elif get_origin(v) is list:
                        props[k] = {"type": "array", "items": {"type": "string"}}
                    else:
                        props[k] = {"type": "string"}
                schema = {"type": "object", "properties": props}
                return schema, False
        except Exception:
            pass
    # 如果是 dict 类型
    if origin is dict:
        args = get_args(ann)
        if len(args) == 2 and args[0] is str:
            vt = args[1]
            # 判断 value 类型
            if vt is float:
                schema = {"type": "object", "additionalProperties": {"type": "number"}}
            elif vt is int:
                schema = {"type": "object", "additionalProperties": {"type": "integer"}}
            elif vt is str:
                schema = {"type": "object", "additionalProperties": {"type": "string"}}
            else:
                schema = {"type": "object", "additionalProperties": {}}
            return schema, False
        wrap = True
    # 如果是 list、基础类型等需要包裹
    if origin is list or ann in (int, float, str, bool, type(None)):
        wrap = True
    # 包裹输出情况
    if wrap:
        result_schema = {"type": "string"}
        if ann is int:
            result_schema = {"type": "integer"}
        elif ann is float:
            result_schema = {"type": "number"}
        elif ann is bool:
            result_schema = {"type": "boolean"}
        elif origin is list:
            result_schema = {"type": "array", "items": {"type": "string"}}
        schema = {"type": "object", "properties": {"result": result_schema}, "required": ["result"]}
        return schema, True
    return None, False

# 辅助:对象输出转换为结构化内容
def _to_structured(out, output_schema, wrap_output):
    if output_schema is None:
        return None
    try:
        # 如果是 Pydantic 模型
        if PydanticBaseModel and isinstance(out, PydanticBaseModel):
            return out.model_dump(mode="json")
        # 如果需要包裹
        if wrap_output:
            return {"result": out}
        # 如果是字典
        if isinstance(out, dict):
            return dict(out)
        # 带有 __annotations__ 的对象,提取属性
        if hasattr(out, "__annotations__"):
            hints = get_type_hints(type(out)) if hasattr(type(out), "__annotations__") else getattr(out, "__annotations__", {})
            return {k: getattr(out, k) for k in hints if hasattr(out, k)}
        # 其它直接包裹
        return {"result": out}
    except Exception:
        # 发生异常返回字符串
        return {"result": str(out)}

# 根据函数参数生成 schema
def _schema(fn):
    sig = inspect.signature(fn)
    props = {}
    req = []
    for n, p in sig.parameters.items():
        # 跳过以下划线开头的参数
        if n.startswith("_"):
            continue
        # 跳过 Context 类型参数(由框架注入)
        if _find_context_parameter(fn) == n:
            continue
        # 参数类型为字符串,标题为参数名
        props[n] = {"type": "string", "title": n}
        # 必填参数加入 required
        if p.default is inspect.Parameter.empty:
            req.append(n)
    return {"type": "object", "properties": props, "required": req}


# 检测工具函数是否包含 Context 参数
def _find_context_parameter(fn):
    """若函数有 ctx: Context 参数,返回参数名;否则返回 None。"""
    try:
        sig = inspect.signature(fn)
        hints = get_type_hints(fn) if hasattr(fn, "__annotations__") else {}
        for name, param in sig.parameters.items():
            ann = hints.get(name, param.annotation)
            if ann is not inspect.Parameter.empty:
                n = getattr(ann, "__name__", None) or str(ann)
                if n == "Context" or (isinstance(n, str) and n.endswith(".Context")):
                    return name
    except Exception:
        pass
    return None

# 根据 prompt 函数生成 prompt schema
def _prompt_schema(fn):
    """根据函数签名生成 Prompt 参数 schema。"""
    sig = inspect.signature(fn)
    props = {}
    req = []
    for n, p in sig.parameters.items():
        if n.startswith("_"):
            continue
        props[n] = {"type": "string", "title": n}
        if p.default is inspect.Parameter.empty:
            req.append(n)
    return {"type": "object", "properties": props, "required": req}

# Context 类:工具执行时的上下文,支持 read_resource、report_progress、debug、info
class Context:
    """工具执行上下文,提供资源读取、进度报告、日志发送等能力。"""

    def __init__(self, mcp_server, request_id, send_notification=None, progress_token=None):
        self._mcp = mcp_server
        self.request_id = request_id
        self._send = send_notification or (lambda _: None)
        self._progress_token = progress_token

    async def read_resource(self, uri: str):
        """异步读取资源,返回内容块列表(与 ReadResourceResult.contents 结构一致)。"""
        uri_str = str(uri)
        # 静态资源
        if res := self._mcp._resources.get(uri_str):
            out = res.run()
            from mcp_lite.types import TextResourceContents
            return [TextResourceContents(uri=uri_str, text=str(out), mime_type=res.mime_type)]
        # 模板资源
        for template in self._mcp._resource_templates.values():
            if args := template.matches(uri_str):
                out = template.run(args)
                from mcp_lite.types import TextResourceContents, BlobResourceContents
                if isinstance(out, bytes):
                    return [BlobResourceContents(uri=uri_str, blob=base64.b64encode(out).decode(), mime_type=template.mime_type or "application/octet-stream")]
                return [TextResourceContents(uri=uri_str, text=str(out), mime_type=template.mime_type or "text/plain")]
        return []

    async def report_progress(self, progress: float, total: float | None = None, message: str | None = None):
        """发送进度通知(仅当客户端提供了 progressToken 时有效)。"""
        if self._progress_token is not None:
            self._send({"method": "notifications/progress", "params": {"progressToken": self._progress_token, "progress": progress, "total": total, "message": message}})

    async def debug(self, data):
        """发送 debug 级别日志。"""
        self._send({"method": "notifications/message", "params": {"level": "debug", "data": data}})

    async def info(self, data):
        """发送 info 级别日志。"""
        self._send({"method": "notifications/message", "params": {"level": "info", "data": data}})


# 工具函数封装类
class _Tool:
    def __init__(self, fn, name=None, desc=None, structured_output=True):
        # 函数本体
        self.fn = fn
        # 名称
        self.name = name or fn.__name__
        # 描述
        self.desc = desc or (fn.__doc__ or "").strip()
        # 入参 schema
        self.schema = _schema(fn)
        # 是否是异步函数
        self.async_fn = asyncio.iscoroutinefunction(fn)
        # 是否结构化输出
        self.structured_output = structured_output
        # 输出 schema 及需否包裹
        self.output_schema, self.wrap_output = _output_schema_and_wrap(fn, structured_output) if structured_output else (None, False)
        # Context 参数名(若有)
        self._ctx_param = _find_context_parameter(fn)

    # 转换为 Tool 对象
    def to_tool(self):
        return Tool(name=self.name, description=self.desc or None, input_schema=self.schema, output_schema=self.output_schema)

    # 执行工具(自动支持异步),支持注入 Context
    def run(self, args, mcp_server=None, request_id=None, progress_token=None, send_notification=None):
        if self._ctx_param is not None and mcp_server is not None:
            ctx = Context(mcp_server, request_id, send_notification, progress_token)
            args = dict(args) if args else {}
            args[self._ctx_param] = ctx
        if self.async_fn:
            return asyncio.run(self.fn(**args))
        return self.fn(**args)

# 静态资源封装类
class _Resource:
    def __init__(self, uri, fn, mime_type="text/plain"):
        # 资源 URI
        self.uri = uri
        # 回调函数
        self.fn = fn
        # MIME 类型
        self.mime_type = mime_type
        # 是否异步
        self.async_fn = asyncio.iscoroutinefunction(fn)

    # 资源运行,自动支持异步
    def run(self):
        if self.async_fn:
            return asyncio.run(self.fn())
        return self.fn()

# 资源模板封装类(支持 URI 参数模板)
class _ResourceTemplate:
    def __init__(self, uri_template, fn, mime_type="text/plain"):
        # URI 模板
        self.uri_template = uri_template
        # 生成资源内容的函数
        self.fn = fn
        # MIME 类型
        self.mime_type = mime_type
        # 是否异步
        self.async_fn = asyncio.iscoroutinefunction(fn)
        # 提取函数里除了 _ 开头之外的参数名
        sig = inspect.signature(fn)
        self.param_names = [n for n in sig.parameters if not n.startswith("_")]

    # 检查 URI 是否与模板匹配,并解析参数
    def matches(self, uri):
        pattern = self.uri_template.replace("{", "(?P<").replace("}", ">[^/]+)")
        m = re.match(f"^{pattern}$", uri)
        if m:
            return {k: unquote(v) for k, v in m.groupdict().items()}
        return None

    # 执行模板内容生成函数
    def run(self, args):
        if self.async_fn:
            return asyncio.run(self.fn(**args))
        return self.fn(**args)

# Prompt 封装
class _Prompt:
    """内部 Prompt 封装类。"""

    def __init__(self, fn, name=None, title=None, description=None):
        # prompt 生成函数
        self.fn = fn
        # prompt 名称
        self.name = name or fn.__name__
        # prompt 标题
        self.title = title
        # prompt 描述
        self.description = description or (fn.__doc__ or "").strip()
        # 通用 schema
        self.schema = _prompt_schema(fn)
        # 是否异步
        self.async_fn = asyncio.iscoroutinefunction(fn)
        # 从 schema 生成 PromptArgument 列表
        args_list = []
        if "properties" in self.schema:
            required = set(self.schema.get("required", []))
            for pname, pinfo in self.schema["properties"].items():
                args_list.append(
                    PromptArgument(
                        name=pname,
                        description=pinfo.get("title"),
                        required=pname in required,
                    )
                )
        self.arguments = args_list

    # 转成 Prompt 对象
    def to_prompt(self):
        return Prompt(
            name=self.name,
            title=self.title,
            description=self.description or None,
            arguments=self.arguments if self.arguments else None,
        )

    # prompt 执行
    def run(self, args):
        if self.async_fn:
            return asyncio.run(self.fn(**args))
        return self.fn(**args)

# FastMCP 主类
class FastMCP:
    def __init__(self, name="mcp-server"):
        # 服务器名称
        self.name = name
        # 工具注册表
        self._tools = {}
        # 静态资源注册表
        self._resources = {}
        # 资源模板注册表
        self._resource_templates = {}
        # prompt 注册表
        self._prompts = {}

    # 注册工具的装饰器
    def tool(self, name=None, description=None, structured_output=True):
        def deco(fn):
            t = _Tool(fn, name, description, structured_output=structured_output)
            self._tools[t.name] = t
            return fn
        return deco

    # 注册资源或模板资源的装饰器
    def resource(self, uri, mime_type="text/plain"):
        def deco(fn):
            # 判断带模板参数还是静态资源
            if "{" in uri and "}" in uri:
                template = _ResourceTemplate(uri, fn, mime_type)
                self._resource_templates[uri] = template
            else:
                res = _Resource(uri, fn, mime_type)
                self._resources[uri] = res
            return fn
        return deco

    # 注册 Prompt 的装饰器
    def prompt(self, name=None, title=None, description=None):
        """注册 Prompt 的装饰器。"""

        def deco(fn):
            p = _Prompt(fn, name=name, title=title, description=description)
            self._prompts[p.name] = p
            return fn
        return deco

    # 核心 RPC 方法分发
    def _handle(self, req):
        # 获取方法名、参数和请求 id
        method, params, rid = req.method, req.params or {}, req.id
        # 初始化请求
        if method == "initialize":
            caps = ServerCapabilities(
                tools=ToolsCapability(),
                resources=ResourcesCapability(),
                prompts=PromptsCapability() if self._prompts else None,
            )
            r = InitializeResult(
                protocol_version=LATEST_PROTOCOL_VERSION,
                capabilities=caps,
                server_info=Implementation(name=self.name, version="0.1.0"),
            )
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
        # 工具列表请求
        if method == "tools/list":
            r = ListToolsResult(tools=[t.to_tool() for t in self._tools.values()])
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
        # 工具调用请求
        if method == "tools/call":
            p = CallToolRequestParams.model_validate(params, by_name=False)
            t = self._tools.get(p.name)
            if not t:
                return JSONRPCError(
                    jsonrpc="2.0",
                    id=rid,
                    error=ErrorData(code=-32602, message=f"Unknown tool: {p.name}")
                )
            # 从 meta/_meta 提取 progressToken,用于发送进度/日志通知
            meta = p.meta or {}
            progress_token = meta.get("progressToken") or meta.get("progress_token")
            notifications = []

            def _send_notification(payload):
                method_name = payload.get("method", "")
                params = payload.get("params")
                notifications.append(JSONRPCNotification(jsonrpc="2.0", method=method_name, params=params))

            try:
                out = t.run(
                    p.arguments or {},
                    mcp_server=self,
                    request_id=rid,
                    progress_token=progress_token,
                    send_notification=_send_notification,
                )
                # 如果直接返回的是 CallToolResult
                if isinstance(out, CallToolResult):
                    r = out
                else:
                    struct = None
                    # 结构化输出
                    if t.output_schema is not None:
                        struct = _to_structured(out, t.output_schema, t.wrap_output)
                    # 字符串直接用文本包装
                    if isinstance(out, str):
                        c = [TextContent(text=out)]
                    elif out is None:
                        c = []
                    elif struct is not None:
                        # 结构化结果按 JSON 格式化后返回
                        c = [TextContent(text=json.dumps(struct, ensure_ascii=False, indent=2))]
                    else:
                        # 普通对象尝试转 json,否则转字符串
                        try:
                            if PydanticBaseModel and isinstance(out, PydanticBaseModel):
                                text = json.dumps(out.model_dump(mode="json"), ensure_ascii=False, indent=2)
                            elif isinstance(out, dict):
                                text = json.dumps(out, ensure_ascii=False, indent=2)
                            else:
                                text = str(out)
                        except Exception:
                            text = str(out)
                        c = [TextContent(text=text)]
                    r = CallToolResult(content=c, structured_content=struct)
            except Exception as e:
                r = CallToolResult(content=[TextContent(text=str(e))], is_error=True)
            resp = JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
            if notifications:
                return (notifications, resp)
            return resp

        # prompts/list 请求
        if method == "prompts/list":
            prompts_list = [p.to_prompt() for p in self._prompts.values()]
            r = ListPromptsResult(prompts=prompts_list)
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))

        # prompts/get 请求
        if method == "prompts/get":
            p = GetPromptRequestParams.model_validate(params, by_name=False)
            prompt_obj = self._prompts.get(p.name)
            if not prompt_obj:
                return JSONRPCError(
                    jsonrpc="2.0",
                    id=rid,
                    error=ErrorData(code=-32602, message=f"Unknown prompt: {p.name}")
                )
            try:
                args = p.arguments or {}
                result = prompt_obj.run(args)
                messages = []
                # 单条消息包装为列表
                if not isinstance(result, (list, tuple)):
                    result = [result]
                for msg in result:
                    # 若为内置的 Message 对象
                    if isinstance(msg, prompts.base.Message):
                        content = msg.content
                        if isinstance(content, str):
                            content = TextContent(type="text", text=content)
                        messages.append(PromptMessage(role=msg.role, content=content))
                    # 字符串消息按 user 角色组装
                    elif isinstance(msg, str):
                        messages.append(PromptMessage(role="user", content=TextContent(type="text", text=msg)))
                    # dict 消息,读 role 和 content
                    elif isinstance(msg, dict):
                        role = msg.get("role", "user")
                        cnt = msg.get("content", "")
                        if isinstance(cnt, dict) and cnt.get("type") == "text":
                            content = TextContent(**cnt)
                        else:
                            content = TextContent(type="text", text=str(cnt) if not isinstance(cnt, str) else cnt)
                        messages.append(PromptMessage(role=role, content=content))
                    # 其它均归为 user 文本
                    else:
                        messages.append(PromptMessage(role="user", content=TextContent(type="text", text=str(msg))))
                r = GetPromptResult(description=prompt_obj.description, messages=messages)
            except Exception as e:
                return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))

        # 资源静态列表
        if method == "resources/list":
            resources = [
                Resource(uri=u, name=u, mime_type=r.mime_type)
                for u, r in self._resources.items()
            ]
            r = ListResourcesResult(resources=resources)
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))

        # 资源模板列表
        if method == "resources/templates/list":
            templates = [
                ResourceTemplate(uri_template=u, name=u, mime_type=t.mime_type)
                for u, t in self._resource_templates.items()
            ]
            r = ListResourceTemplatesResult(resource_templates=templates)
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))

        # 读取资源接口
        if method == "resources/read":
            p = ReadResourceRequestParams.model_validate(params, by_name=False)
            uri_str = str(p.uri)
            try:
                # 静态资源优先
                if res := self._resources.get(uri_str):
                    out = res.run()
                    content = TextResourceContents(uri=uri_str, text=str(out), mime_type=res.mime_type)
                    r = ReadResourceResult(contents=[content])
                    return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
                # 匹配模板资源
                for template in self._resource_templates.values():
                    if args := template.matches(uri_str):
                        out = template.run(args)
                        if isinstance(out, bytes):
                            content = BlobResourceContents(
                                uri=uri_str,
                                blob=base64.b64encode(out).decode(),
                                mime_type=template.mime_type or "application/octet-stream",
                            )
                        else:
                            content = TextResourceContents(
                                uri=uri_str,
                                text=str(out),
                                mime_type=template.mime_type or "text/plain",
                            )
                        r = ReadResourceResult(contents=[content])
                        return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
                # 找不到资源
                return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32602, message=f"Unknown resource: {uri_str}"))
            except Exception as e:
                return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))

        # 未知方法
        return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32601, message=f"Method not found: {method}"))

    # 消息包装与请求
    def _handle_msg(self, msg):
        # 非 SessionMessage 类型忽略
        if not isinstance(msg, SessionMessage):
            return None
        m = msg.message
        # 必须是带 id 的 jsonrpc 请求
        if not isinstance(m, JSONRPCRequest) or getattr(m, "id", None) is None:
            return None
        #print("[Server] Request:", m.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
        try:
            resp = self._handle(m)
            #print("[Server] Response:", resp.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
            return resp
        except Exception as e:
            err = JSONRPCError(jsonrpc="2.0", id=m.id, error=ErrorData(code=-32603, message=str(e)))
            #print("[Server] Response (error):", err.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
            return err

    # 运行服务器主逻辑
+   def run(self, transport="stdio", host: str = "127.0.0.1", port: int = 8000):
        # 如果传输方式为 stdio,则启动标准输入输出服务器
+       if transport == "stdio":
+           stdio.stdio_server(self._handle_msg)
        # 如果传输方式为 streamable-http,则启动基于 HTTP 的服务器
+       elif transport == "streamable-http":
            # 导入 Starlette 框架相关模块
+           from starlette.applications import Starlette
+           from starlette.routing import Mount
            # 导入 streamable_http_app 工厂方法
+           from mcp_lite.server.streamable_http import streamable_http_app
            # 导入 uvicorn 用于启动 ASGI 服务
+           import uvicorn
            # 创建 Streamable HTTP ASGI 应用
+           app = streamable_http_app(self._handle_msg, path="/")
            # 将 /mcp 路径挂载到应用
+           full_app = Starlette(routes=[Mount("/mcp", app=app)])
            # 启动 uvicorn 服务,监听指定 host 和 port
+           uvicorn.run(full_app, host=host, port=port)
        # 如果传输方式不被支持,则抛出异常
+       else:
            raise ValueError(f"unsupported transport: {transport}")

# 明确导出接口
__all__ = ["FastMCP", "Context", "prompts"]

7. 工作流程 #

7.1 功能概述 #

这部分实现的是 MCP(Model Context Protocol)的 Streamable HTTP 传输:客户端通过 HTTP POST 发送 JSON-RPC 消息,服务端返回 JSON 响应,并用 mcp-session-id 头维护会话。

7.2 时序图 #

7.2.1 整体交互流程 #

sequenceDiagram autonumber participant User as 用户/主程序 participant ClientSession as ClientSession participant ReadStream as ReadStream participant WriteStream as WriteStream participant Worker as Worker 线程 participant HTTP as HTTP 客户端<br/>(httpx) participant Server as Starlette/uvicorn participant Handler as streamable_http_app participant FastMCP as FastMCP._handle_msg User->>ClientSession: 创建 session(read, write) User->>ClientSession: initialize() ClientSession->>WriteStream: send(InitializeRequest) ClientSession->>ReadStream: get() 阻塞等待 Note over Worker,HTTP: Worker 线程循环 Worker->>WriteStream: 从 write_q.get() Worker->>HTTP: POST /mcp (JSON body) HTTP->>Server: HTTP POST Server->>Handler: handle_post(request) Handler->>Handler: 解析 JSON、校验、取 session_id Handler->>FastMCP: _handle_msg(session_msg) FastMCP->>FastMCP: 处理 initialize/tools/list 等 FastMCP-->>Handler: 返回响应对象 Handler->>Server: Response(JSON, headers: session-id) Server->>HTTP: HTTP 200 + JSON HTTP->>Worker: 响应内容 Worker->>ReadStream: read_q.put(SessionMessage) Worker->>Worker: 继续循环处理下一条 ReadStream->>ClientSession: 返回 SessionMessage ClientSession->>User: InitializeResult User->>ClientSession: list_tools() ClientSession->>WriteStream: send(ListToolsRequest) ClientSession->>ReadStream: get() Worker->>HTTP: POST /mcp (tools/list) HTTP->>Server: POST Server->>Handler: handle_post Handler->>FastMCP: _handle_msg FastMCP-->>Handler: ListToolsResult Handler->>HTTP: 200 + JSON Worker->>ReadStream: read_q.put(响应) ReadStream->>ClientSession: ListToolsResult ClientSession->>User: tools User->>ClientSession: call_tool("greet", {...}) ClientSession->>WriteStream: send(CallToolRequest) ClientSession->>ReadStream: get() Worker->>HTTP: POST /mcp (tools/call) HTTP->>Server: POST Server->>Handler: handle_post Handler->>FastMCP: _handle_msg FastMCP->>FastMCP: 执行 greet 工具 FastMCP-->>Handler: CallToolResult Handler->>HTTP: 200 + JSON Worker->>ReadStream: read_q.put(响应) ReadStream->>ClientSession: CallToolResult ClientSession->>User: result User->>User: 退出 with 块 User->>Worker: stop_flag[0]=True, write_q.put(None) Worker->>Worker: 检测到 None,退出循环 Worker->>ReadStream: read_q.put(None) 通知结束

7.2.2 单次请求的 HTTP 层细节 #

sequenceDiagram participant Worker as Worker 线程 participant write_q as write_q 队列 participant httpx as httpx.Client participant Server as uvicorn/Starlette participant Handler as streamable_http_app participant FastMCP as FastMCP participant read_q as read_q 队列 Worker->>write_q: get() 阻塞 Note over Worker: 主线程 WriteStream.send(msg) write_q->>Worker: SessionMessage Worker->>Worker: body = msg.model_dump(...) Worker->>Worker: 若有 session_id,加入 headers Worker->>httpx: post(url, json=body, headers) httpx->>Server: POST /mcp<br/>Content-Type: application/json<br/>[可选] mcp-session-id: xxx Server->>Handler: Request Handler->>Handler: await request.json() Handler->>Handler: validate_json(body) Handler->>Handler: session_id = headers.get(...) or uuid4() Handler->>FastMCP: handler(SessionMessage) FastMCP->>FastMCP: _handle(m) 根据 method 分发 FastMCP-->>Handler: JSONRPCResponse / None Handler->>Handler: 序列化响应 Handler->>Server: Response(200/202, JSON, mcp-session-id) Server->>httpx: HTTP 响应 httpx->>Worker: resp Worker->>Worker: 若有 session_id,保存到 session_id[0] Worker->>Worker: 解析 resp,构造 SessionMessage Worker->>read_q: put(SessionMessage) Note over Worker: 主线程 ReadStream.get() 可取出

7.2.3 客户端退出流程 #

sequenceDiagram participant User as 主线程 participant _ctx as 上下文管理器 participant Worker as Worker 线程 participant write_q as write_q participant read_q as read_q User->>_ctx: 退出 with 块 _ctx->>_ctx: finally 块执行 _ctx->>Worker: stop_flag[0] = True _ctx->>write_q: put(None) Note over Worker: worker 可能正在 write_q.get() 阻塞 write_q->>Worker: 返回 None Worker->>Worker: if msg is None: break Worker->>Worker: 退出 while 循环 Worker->>read_q: put(None) 通知读取端结束 Worker->>Worker: 线程结束 _ctx->>Worker: t.join(timeout=5) _ctx->>User: 上下文清理完成

7.3 各模块说明 #

7.3.1 客户端传输层 mcp_lite/client/streamable_http.py #

作用:在 HTTP 上实现 MCP 的读写通道,供 ClientSession 使用。

结构:

  • 队列:read_q 存响应,write_q 存待发请求
  • 工作线程:从 write_q 取消息 → POST 到服务端 → 把响应放入 read_q
  • 会话 ID:session_id 用列表包装,在多次请求间共享
  • 停止标志:stop_flag 用列表包装,供 worker 和 _ctx 共享,用于优雅退出

流程:

  1. streamable_http_client(url) 返回上下文管理器
  2. 进入 with 时启动 worker 线程
  3. 主线程通过 WriteStream.send() 把消息放入 write_q
  4. worker 取出消息,POST 到服务端,把响应放入 read_q
  5. 主线程通过 ReadStream.get() 从 read_q 取响应
  6. 退出 with 时:stop_flag[0]=True、write_q.put(None),worker 退出

7.3.2 服务端传输层 mcp_lite/server/streamable_http.py #

作用:提供 Starlette ASGI 应用,处理 MCP 的 HTTP 请求。

流程:

  1. 只处理 POST
  2. 解析 JSON 请求体,校验 JSON-RPC 格式
  3. 从请求头取 mcp-session-id,没有则生成 UUID
  4. 封装为 SessionMessage,调用 handler(session_msg)
  5. 根据 handler 返回:
    • None → 202,无响应体
    • 有内容 → 200,JSON 响应
  6. 响应头中带上 mcp-session-id

7.3.3 FastMCP 扩展 mcp_lite/server/fastmcp/__init__.py #

改动:run() 支持 transport="streamable-http"。

def run(self, transport="stdio", host="127.0.0.1", port=8000):
    if transport == "stdio":
        stdio.stdio_server(self._handle_msg)
    elif transport == "streamable-http":
        # 创建 streamable_http_app,挂载到 /mcp,用 uvicorn 启动
        app = streamable_http_app(self._handle_msg, path="/")
        full_app = Starlette(routes=[Mount("/mcp", app=app)])
        uvicorn.run(full_app, host=host, port=port)
    else:
        raise ValueError(...)

7.3.4 示例客户端 streamable_http_client.py #

作用:演示如何用 Streamable HTTP 连接 MCP 服务。

流程:streamable_http_client(url) → ClientSession(read, write) → initialize() → list_tools() → call_tool("greet", {...})。

7.3.5 示例服务端 streamable_http_server.py #

作用:演示 FastMCP 以 streamable-http 方式启动。

mcp = FastMCP(name="HTTP Server")
@mcp.tool()
def greet(name: str = "World") -> str:
    return f"Hello, {name}!"

if __name__ == "__main__":
    mcp.run(transport="streamable-http")  # 监听 8000,路径 /mcp

7.4 要点总结 #

项目 说明
请求模型 每条 MCP 消息对应一次 HTTP POST,无长连接
会话 通过 mcp-session-id 在多次请求间保持会话
客户端线程 Worker 负责 HTTP 收发,主线程通过队列收发消息
202 响应 通知类请求(无 id)返回 202,无响应体
共享状态 session_id、stop_flag 用列表包装,在闭包间共享可变状态
← 上一节 33.上下文 下一节 35.参数补全 →

访问验证

请输入访问令牌

Token不正确,请重新输入