导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.授权
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • starlette
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • z
  • 1. 采样
  • 2. sampling_client.py
  • 3. sampling_server.py
  • 4. session.py
  • 5. init.py
  • 6. types.py
  • 7.工作流程
    • 7.1. 整体架构
    • 7.2. 数据流概览
    • 7.3 时序图
    • 7.4. 各模块修改
      • 7.4.1 sampling_client.py(客户端)
      • 7.4.2 sampling_server.py(服务端)
      • 7.4.3 mcp_lite/client/session.py(客户端会话)
      • 7.4.4 mcp_lite/server/fastmcp/__init__.py(服务端 FastMCP)
      • 7.4.5 mcp_lite/types.py(类型定义)
    • 7.5 要点总结

1. 采样 #

  • 什么是采样?

采样(sampling)是指模型根据已有对话内容(如用户输入和上下文),自动生成新的回复内容的能力。MCP 协议定义了标准的采样 RPC 接口,便于客户端与服务器对接不同大模型或采样后端,如 OpenAI、DeepSeek 或本地模型。

采样接口主要包括:

  • sampling/createMessage:服务端向客户端发起请求,由客户端按照指定历史消息、温度、最大 token 数等参数,调用 LLM 采样生成“助手”类回复(例如 chat)。
  • 消息格式严格区分角色(role, 如: user, assistant)及内容结构(如富文本类型 TextContent),支持 stop_sequences 等采样参数,接口易于扩展。
  • 客户端(持有 LLM/API 访问权限)在收到采样请求后,将消息和采样参数转换为目标模型 API(如 OpenAI/DeepSeek)所需格式,发起实际采样调用,并适配响应内容,统一 MCP 消息结构返回给服务端。
  • 客户端和服务端均通过 JSONRPC 协议传递消息,并支持流式/异步实现。
  • 相关结构体如 CreateMessageRequestParams(请求)、CreateMessageResult(回复)等在 types 模块中定义,方便类型校验和代码提示。

实际采样工作流通常如下:

  1. 服务端(MCP 服务端,如带有 generate_poem 等工具)在工具执行时需要 LLM 能力时,向客户端发起 sampling/createMessage 请求;
  2. 客户端(MCP 客户端,持有 LLM/API 访问权限)收到请求后,将消息和采样参数转换为目标模型 API 所需格式,发起实际采样调用;
  3. 模型后端生成回复内容(assistant),客户端封装为 MCP 协议定义的 CreateMessageResult 格式返回给服务端;
  4. 服务端收到回复,将结果返回给工具执行流程。

此设计实现了消息结构和模型能力的解耦,便于后续快速适配和扩展不同的采样后端。

2. sampling_client.py #

sampling_client.py

# 导入异步编程相关模块
import asyncio
# 导入操作系统相关模块
import os
# 导入系统相关模块
import sys

# 导入httpx用于异步HTTP请求
import httpx
# 从mcp_lite包中导入相关类型和类
from mcp_lite import ClientSession, StdioServerParameters, types
# 导入stdio_client用于进程间标准输入输出通信
from mcp_lite.client.stdio import stdio_client

# 定义DeepSeek API的URL
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
# 定义DeepSeek所用的模型名称
DEEPSEEK_MODEL = "deepseek-chat"

# 定义一个工具函数,用于将MCP消息格式转换为DeepSeek API所需格式
def _messages_to_deepseek(messages: list) -> list[dict]:
    # """将 MCP SamplingMessage 转为 DeepSeek API 格式。"""
    out = []
    # 遍历每条消息
    for msg in messages:
        # 检查消息对象是否有role和content属性
        if not hasattr(msg, "role") or not hasattr(msg, "content"):
            continue
        # 取出角色role属性,默认为user
        role = getattr(msg, "role", "user")
        # 取出内容content属性,若无则为空字符串
        c = getattr(msg, "content", "")
        # 如果内容是文本类型,则直接取文本
        if isinstance(c, types.TextContent) and c.type == "text":
            out.append({"role": role, "content": c.text})
        # 如果内容是一个内容块的列表
        elif isinstance(c, list):
            parts = []
            # 遍历每一个内容块
            for block in c:
                # 如果每个内容块还是文本,则加入parts
                if isinstance(block, types.TextContent) and block.type == "text":
                    parts.append(block.text)
            # 多段文本拼接
            if parts:
                out.append({"role": role, "content": "\n".join(parts)})
        # 如果内容本身就是字符串
        elif isinstance(c, str):
            out.append({"role": role, "content": c})
    # 返回整理后的DeepSeek消息列表
    return out


# 定义异步函数,用于调用DeepSeek API获取回复
async def _call_deepseek(messages: list, max_tokens: int = 200) -> str:
    # """调用 DeepSeek API 获取回复。"""
    # 从环境变量获取DeepSeek的API Key,若无则用缺省值
    api_key = os.environ.get("DEEPSEEK_API_KEY", "sk-2c101a720e054053aeabffda603e755b").strip()
    # 如果没有有效的API Key,则抛出异常
    if not api_key:
        raise ValueError(
            "未设置 DEEPSEEK_API_KEY 环境变量,请从 https://platform.deepseek.com 获取 API Key"
        )

    # 构造请求载荷
    payload = {
        "model": DEEPSEEK_MODEL,
        "messages": messages,
        "max_tokens": max_tokens,
        "temperature": 0.7,
    }

    # 使用httpx异步客户端发送POST请求
    async with httpx.AsyncClient(timeout=60.0) as client:
        resp = await client.post(
            DEEPSEEK_API_URL,
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json",
            },
            json=payload,
        )
        # 若请求失败则抛异常
        resp.raise_for_status()
        # 解析JSON响应体
        data = resp.json()
        # 获取第一条返回的choice
        choice = data.get("choices", [{}])[0]
        # 提取消息内容
        msg = choice.get("message", {})
        # 返回内容,如果为空则返回空字符串
        return msg.get("content", "") or ""


# 定义异步回调函数,处理采样请求
async def handle_sampling(
    context,
    params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult:
    # """处理服务端发起的 sampling/createMessage 请求,调用 DeepSeek API。"""
    # 获取请求输入的消息列表
    messages = params.messages or []
    # 转换为deepseek可识别的消息格式
    deepseek_messages = _messages_to_deepseek(messages)
    # 若消息为空则回复错误
    if not deepseek_messages:
        reply = "未收到有效消息内容。"
    else:
        # 获取最大token数,兼容旧新参数名
        max_tokens = getattr(params, "max_tokens", None) or getattr(
            params, "maxTokens", 200
        )
        try:
            # 调用DeepSeek获取回复内容
            reply = await _call_deepseek(deepseek_messages, max_tokens=max_tokens)
        except Exception as e:
            # 错误处理
            reply = f"DeepSeek API 调用失败:{e}"
            print(f"[Sampling] {reply}", file=sys.stderr)

    # 在标准错误输出打印reply的前80个字符预览
    print(f"[Sampling] DeepSeek 回复: {reply[:80]}...", file=sys.stderr)

    # 返回CreateMessageResult对象,内容作为TextContent封装
    return types.CreateMessageResult(
        role="assistant",
        content=types.TextContent(type="text", text=reply),
        model=DEEPSEEK_MODEL,
        stop_reason="endTurn",
    )


# 主函数,负责与采样服务交互和演示
def main():
    # """主函数(mcp_lite 使用同步 API)。"""
    # 获取当前文件的目录
    base_dir = os.path.dirname(os.path.abspath(__file__))
    # 拼接得到采样服务端脚本路径
    server_path = os.path.join(base_dir, "sampling_server.py")

    # 设置采样服务端的启动参数,使用Python解释器
    server_params = StdioServerParameters(
        command="python",
        args=[server_path],
        env={},
    )

    # 启动stdio客户端,与采样服务端通信
    with stdio_client(server_params) as (read, write):
        # 初始化客户端会话,注册采样回调
        session = ClientSession(read, write, sampling_callback=handle_sampling)
        # 执行初始化命令,获取服务信息
        result = session.initialize()
        # 获取服务端名称(兼容旧新字段)
        info = getattr(result, "server_info", None) or getattr(result, "serverInfo", None)
        server_name = info.name if info else "unknown"
        # 打印服务端名称到标准错误
        print(f"[Server] {server_name}", file=sys.stderr)

        # 获取所有可用工具
        tools_result = session.list_tools()
        # 提取工具名称列表
        tool_names = [t.name for t in tools_result.tools]
        # 打印工具名称到标准错误
        print(f"[Tools] {tool_names}", file=sys.stderr)

        # 演示 1:调用generate_poem工具
        print("\n--- 演示 generate_poem(生成诗歌)---", file=sys.stderr)
        r1 = session.call_tool("generate_poem", arguments={"topic": "自然"})
        # 若工具返回内容则遍历打印每个内容块
        if r1.content:
            for block in r1.content:
                if hasattr(block, "text"):
                    print(f"[Result] {block.text}")

        # 演示 2:调用summarize工具
        print("\n--- 演示 summarize(文本摘要)---", file=sys.stderr)
        r2 = session.call_tool(
            "summarize",
            arguments={"text": "人工智能正在改变我们的生活方式,从智能助手到自动驾驶,技术无处不在。"},
        )
        # 打印返回内容到标准输出
        if r2.content:
            for block in r2.content:
                if hasattr(block, "text"):
                    print(f"[Result] {block.text}")


# 判断是否作为主程序入口执行
if __name__ == "__main__":
    # 若支持reconfigure,则设置标准输入输出为utf-8编码
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    # 执行主函数
    main()

官方代码

# 导入异步编程相关模块
import asyncio
# 导入操作系统相关模块
import os
# 导入系统相关模块
import sys

# 导入httpx用于异步HTTP请求
import httpx
# 从mcp_lite包中导入相关类型和类
from mcp import ClientSession, StdioServerParameters, types
# 导入stdio_client用于进程间标准输入输出通信
from mcp.client.stdio import stdio_client

# 定义DeepSeek API的URL
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
# 定义DeepSeek所用的模型名称
DEEPSEEK_MODEL = "deepseek-chat"

# 定义一个工具函数,用于将MCP消息格式转换为DeepSeek API所需格式
def _messages_to_deepseek(messages: list) -> list[dict]:
    # """将 MCP SamplingMessage 转为 DeepSeek API 格式。"""
    out = []
    # 遍历每条消息
    for msg in messages:
        # 检查消息对象是否有role和content属性
        if not hasattr(msg, "role") or not hasattr(msg, "content"):
            continue
        # 取出角色role属性,默认为user
        role = getattr(msg, "role", "user")
        # 取出内容content属性,若无则为空字符串
        c = getattr(msg, "content", "")
        # 如果内容是文本类型,则直接取文本
        if isinstance(c, types.TextContent) and c.type == "text":
            out.append({"role": role, "content": c.text})
        # 如果内容是一个内容块的列表
        elif isinstance(c, list):
            parts = []
            # 遍历每一个内容块
            for block in c:
                # 如果每个内容块还是文本,则加入parts
                if isinstance(block, types.TextContent) and block.type == "text":
                    parts.append(block.text)
            # 多段文本拼接
            if parts:
                out.append({"role": role, "content": "\n".join(parts)})
        # 如果内容本身就是字符串
        elif isinstance(c, str):
            out.append({"role": role, "content": c})
    # 返回整理后的DeepSeek消息列表
    return out


# 定义异步函数,用于调用DeepSeek API获取回复
async def _call_deepseek(messages: list, max_tokens: int = 200) -> str:
    # """调用 DeepSeek API 获取回复。"""
    # 从环境变量获取DeepSeek的API Key,若无则用缺省值
    api_key = os.environ.get("DEEPSEEK_API_KEY", "sk-2c101a720e054053aeabffda603e755b").strip()
    # 如果没有有效的API Key,则抛出异常
    if not api_key:
        raise ValueError(
            "未设置 DEEPSEEK_API_KEY 环境变量,请从 https://platform.deepseek.com 获取 API Key"
        )

    # 构造请求载荷
    payload = {
        "model": DEEPSEEK_MODEL,
        "messages": messages,
        "max_tokens": max_tokens,
        "temperature": 0.7,
    }

    # 使用httpx异步客户端发送POST请求
    async with httpx.AsyncClient(timeout=60.0) as client:
        resp = await client.post(
            DEEPSEEK_API_URL,
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json",
            },
            json=payload,
        )
        # 若请求失败则抛异常
        resp.raise_for_status()
        # 解析JSON响应体
        data = resp.json()
        # 获取第一条返回的choice
        choice = data.get("choices", [{}])[0]
        # 提取消息内容
        msg = choice.get("message", {})
        # 返回内容,如果为空则返回空字符串
        return msg.get("content", "") or ""


# 定义异步回调函数,处理采样请求
async def handle_sampling(
    context,
    params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult:
    # """处理服务端发起的 sampling/createMessage 请求,调用 DeepSeek API。"""
    # 获取请求输入的消息列表
    messages = params.messages or []
    # 转换为deepseek可识别的消息格式
    deepseek_messages = _messages_to_deepseek(messages)
    # 若消息为空则回复错误
    if not deepseek_messages:
        reply = "未收到有效消息内容。"
    else:
        # 获取最大token数,兼容旧新参数名
        max_tokens = getattr(params, "max_tokens", None) or getattr(
            params, "maxTokens", 200
        )
        try:
            # 调用DeepSeek获取回复内容
            reply = await _call_deepseek(deepseek_messages, max_tokens=max_tokens)
        except Exception as e:
            # 错误处理
            reply = f"DeepSeek API 调用失败:{e}"
            print(f"[Sampling] {reply}", file=sys.stderr)

    # 在标准错误输出打印reply的前80个字符预览
    print(f"[Sampling] DeepSeek 回复: {reply[:80]}...", file=sys.stderr)

    # 返回CreateMessageResult对象,内容作为TextContent封装
    return types.CreateMessageResult(
        role="assistant",
        content=types.TextContent(type="text", text=reply),
        model=DEEPSEEK_MODEL,
        stop_reason="endTurn",
    )


# 主函数,负责与采样服务交互和演示
async def main() -> None:
    # """主函数(mcp_lite 使用同步 API)。"""
    # 获取当前文件的目录
    base_dir = os.path.dirname(os.path.abspath(__file__))
    # 拼接得到采样服务端脚本路径
    server_path = os.path.join(base_dir, "sampling_server.py")

    # 设置采样服务端的启动参数,使用Python解释器
    server_params = StdioServerParameters(
        command="python",
        args=[server_path],
        env={},
    )

    # 启动stdio客户端,与采样服务端通信
    async with stdio_client(server_params) as (read, write):
        # 初始化客户端会话,注册采样回调
        async with ClientSession(read, write, sampling_callback=handle_sampling) as session:
            # 执行初始化命令,获取服务信息
            result = await session.initialize()
            # 获取服务端名称(兼容旧新字段)
            info = getattr(result, "server_info", None) or getattr(result, "serverInfo", None)
            server_name = info.name if info else "unknown"
            # 打印服务端名称到标准错误
            print(f"[Server] {server_name}", file=sys.stderr)

            # 获取所有可用工具
            tools_result = await session.list_tools()
            # 提取工具名称列表
            tool_names = [t.name for t in tools_result.tools]
            # 打印工具名称到标准错误
            print(f"[Tools] {tool_names}", file=sys.stderr)

            # 演示 1:调用generate_poem工具
            print("\n--- 演示 generate_poem(生成诗歌)---", file=sys.stderr)
            r1 = await session.call_tool("generate_poem", arguments={"topic": "自然"})
            # 若工具返回内容则遍历打印每个内容块
            if r1.content:
                for block in r1.content:
                    if hasattr(block, "text"):
                        print(f"[Result] {block.text}")

            # 演示 2:调用summarize工具
            print("\n--- 演示 summarize(文本摘要)---", file=sys.stderr)
            r2 = await session.call_tool(
                "summarize",
                arguments={"text": "人工智能正在改变我们的生活方式,从智能助手到自动驾驶,技术无处不在。"},
            )
            # 打印返回内容到标准输出
            if r2.content:
                for block in r2.content:
                    if hasattr(block, "text"):
                        print(f"[Result] {block.text}")


# 判断是否作为主程序入口执行
if __name__ == "__main__":
    # 若支持reconfigure,则设置标准输入输出为utf-8编码
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    # 执行主函数
    asyncio.run(main())

3. sampling_server.py #

sampling_server.py

# 导入sys模块,用于系统输入输出
import sys

# 从mcp_lite.server.fastmcp导入Context和FastMCP类
from mcp_lite.server.fastmcp import Context, FastMCP
# 从mcp_lite.types导入SamplingMessage和TextContent类型
from mcp_lite.types import SamplingMessage, TextContent

# 创建FastMCP实例,服务名为“采样服务”
mcp = FastMCP(name="采样服务")


# 注册工具generate_poem,生成诗歌
@mcp.tool()
async def generate_poem(topic: str, ctx) -> str:
    # """根据主题生成一首短诗(通过客户端 LLM 采样)。"""
    # 构造诗歌生成的提示语
    prompt = f"请用中文写一首关于「{topic}」的短诗"

    # 调用客户端会话采样接口生成回复
    result = await ctx.session.create_message(
        messages=[
            SamplingMessage(
                role="user",  # 角色为用户
                content=TextContent(type="text", text=prompt),  # 构造文本内容
            )
        ],
        max_tokens=100,  # 限制最大输出token数量
    )

    # 如果回复内容是文本类型,则返回文本;否则返回内容的字符串表达
    if hasattr(result.content, "type") and result.content.type == "text":
        return result.content.text
    return str(result.content)


# 注册工具summarize,进行摘要
@mcp.tool()
async def summarize(text: str, ctx) -> str:
    # """对给定文本进行摘要(通过客户端 LLM 采样)。"""
    # 构造摘要的提示语
    prompt = f"请用一句话概括以下文本:\n\n{text}"

    # 调用客户端会话采样接口生成摘要
    result = await ctx.session.create_message(
        messages=[
            SamplingMessage(
                role="user",  # 角色为用户
                content=TextContent(type="text", text=prompt),  # 构造文本内容
            )
        ],
        max_tokens=80,  # 限制最大输出token数量
    )

    # 如果回复内容是文本类型,则返回文本;否则返回内容的字符串表达
    if hasattr(result.content, "type") and result.content.type == "text":
        return result.content.text
    return str(result.content)


# 定义主函数,启动服务
def main():
    # 打印服务启动信息到标准错误输出
    print("启动采样服务...", file=sys.stderr)
    # 打印可用工具列表到标准错误输出
    print("可用工具:generate_poem(生成诗歌), summarize(文本摘要)", file=sys.stderr)
    # 以stdio为传输方式运行mcp服务
    mcp.run(transport="stdio")


# 判断是否作为主程序运行
if __name__ == "__main__":
    try:
        # 检查和设置标准输入输出的编码为utf-8(兼容Windows等环境)
        if hasattr(sys.stdout, "reconfigure"):
            sys.stdout.reconfigure(encoding="utf-8")
            sys.stdin.reconfigure(encoding="utf-8")
        # 调用主函数
        main()
    # 捕获键盘中断异常,优雅退出
    except KeyboardInterrupt:
        print("\n服务器已停止", file=sys.stderr)
    # 捕获其它异常并打印错误信息
    except Exception as e:
        print(f"服务器启动失败:{e}", file=sys.stderr)

官方代码

# 导入sys模块,用于系统输入输出
import sys

# 从mcp_lite.server.fastmcp导入Context和FastMCP类
from mcp.server.fastmcp import Context, FastMCP
# 从mcp_lite.types导入SamplingMessage和TextContent类型
from mcp.types import SamplingMessage, TextContent

# 创建FastMCP实例,服务名为“采样服务”
mcp = FastMCP(name="采样服务")


# 注册工具generate_poem,生成诗歌
@mcp.tool()
async def generate_poem(topic: str, ctx: Context) -> str:
    # """根据主题生成一首短诗(通过客户端 LLM 采样)。"""
    # 构造诗歌生成的提示语
    prompt = f"请用中文写一首关于「{topic}」的短诗"

    # 调用客户端会话采样接口生成回复
    result = await ctx.session.create_message(
        messages=[
            SamplingMessage(
                role="user",  # 角色为用户
                content=TextContent(type="text", text=prompt),  # 构造文本内容
            )
        ],
        max_tokens=100,  # 限制最大输出token数量
    )

    # 如果回复内容是文本类型,则返回文本;否则返回内容的字符串表达
    if hasattr(result.content, "type") and result.content.type == "text":
        return result.content.text
    return str(result.content)


# 注册工具summarize,进行摘要
@mcp.tool()
async def summarize(text: str, ctx: Context) -> str:
    # """对给定文本进行摘要(通过客户端 LLM 采样)。"""
    # 构造摘要的提示语
    prompt = f"请用一句话概括以下文本:\n\n{text}"

    # 调用客户端会话采样接口生成摘要
    result = await ctx.session.create_message(
        messages=[
            SamplingMessage(
                role="user",  # 角色为用户
                content=TextContent(type="text", text=prompt),  # 构造文本内容
            )
        ],
        max_tokens=80,  # 限制最大输出token数量
    )

    # 如果回复内容是文本类型,则返回文本;否则返回内容的字符串表达
    if hasattr(result.content, "type") and result.content.type == "text":
        return result.content.text
    return str(result.content)


# 定义主函数,启动服务
def main():
    # 打印服务启动信息到标准错误输出
    print("启动采样服务...", file=sys.stderr)
    # 打印可用工具列表到标准错误输出
    print("可用工具:generate_poem(生成诗歌), summarize(文本摘要)", file=sys.stderr)
    # 以stdio为传输方式运行mcp服务
    mcp.run(transport="stdio")


# 判断是否作为主程序运行
if __name__ == "__main__":
    try:
        # 检查和设置标准输入输出的编码为utf-8(兼容Windows等环境)
        if hasattr(sys.stdout, "reconfigure"):
            sys.stdout.reconfigure(encoding="utf-8")
            sys.stdin.reconfigure(encoding="utf-8")
        # 调用主函数
        main()
    # 捕获键盘中断异常,优雅退出
    except KeyboardInterrupt:
        print("\n服务器已停止", file=sys.stderr)
    # 捕获其它异常并打印错误信息
    except Exception as e:
        print(f"服务器启动失败:{e}", file=sys.stderr)

4. session.py #

mcp_lite/client/session.py

import sys
# 导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 导入 asyncio 用于运行异步 elicitation 回调
import asyncio
# 从 mcp_lite.types 模块导入相关类型和常量
from mcp_lite.types import (                                            # 从 mcp_lite.types 模块导入以下类型
    InitializeRequestParams,                                            #   初始化请求参数类型
    InitializeRequest,                                                  #   初始化请求类型
    LATEST_PROTOCOL_VERSION,                                            #   最新协议版本常量
    ClientCapabilities,                                                 #   客户端能力类型
    ElicitationCapability,                                              #   Elicitation 能力类型
    FormElicitationCapability,                                          #   Form elicitation 能力
    UrlElicitationCapability,                                           #   URL elicitation 能力
+   SamplingCapability,                                                 #   Sampling 能力类型
+   CreateMessageRequestParams,                                         #   Sampling 请求参数类型
+   CreateMessageResult,                                                #   Sampling 响应结果类型
    Implementation,                                                     #   实现信息类型
    InitializedNotification,                                            #   初始化完成通知类型
    ElicitRequestFormParams,                                            #   Elicitation 请求参数(form)
    ElicitResult,                                                       #   Elicitation 结果类型
    InitializeResult,                                                   #   初始化结果类型
    JSONRPCRequest,                                                     #   JSONRPC请求类型
    JSONRPCResponse,                                                    #   JSONRPC响应类型
    JSONRPCError,                                                       #   JSONRPC错误类型
+   ErrorData,                                                          #   JSONRPC 错误数据
    JSONRPCNotification,                                                #   JSONRPC通知类型
    LoggingMessageNotificationParams,                                   #   日志消息通知参数
    ListToolsRequest,                                                   #   工具列表请求类型
    ListToolsResult,                                                    #   工具列表结果类型
    CallToolResult,                                                     #   调用工具响应类型
    CallToolRequest,                                                    #   调用工具请求类型
    CallToolRequestParams,                                              #   调用工具请求参数类型
    ListResourcesRequest,                                               #   资源列表请求类型
    ListResourcesResult,                                                #   资源列表响应类型
    ListResourceTemplatesRequest,                                       #   资源模板列表请求类型
    ListResourceTemplatesResult,                                        #   资源模板列表响应类型
    ReadResourceRequest,                                                #   读取资源请求类型
    ReadResourceRequestParams,                                          #   读取资源请求参数类型
    ReadResourceResult,                                                 #   读取资源响应类型
    ListPromptsRequest,                                                 #   Prompt 列表请求类型
    ListPromptsResult,                                                  #   Prompt 列表响应类型
    GetPromptRequest,                                                   #   获取 Prompt 请求类型
    GetPromptRequestParams,                                             #   获取 Prompt 请求参数类型
    GetPromptResult,                                                    #   获取 Prompt 响应类型
    CompleteRequest,                                                    #   补全请求类型
    CompleteRequestParams,                                              #   补全请求参数类型
    CompleteResult,                                                     #   补全响应类型
    CompletionArgument,                                                 #   补全参数类型
    CompletionContext,                                                  #   补全上下文类型
)
# 定义 MCPError 异常类,用于表示协议相关错误
class MCPError(Exception):
    # 初始化方法,接收错误码、错误信息和可选的数据
    def __init__(self, code, message, data=None):
        # 将错误码、错误信息、附加数据赋值为实例属性
        self.code, self.message, self.data = code, message, data
        # 调用父类 Exception 的初始化,只传递错误信息
        super().__init__(message)

    # 类方法,利用 JSONRPCError 对象创建 MCPError 实例
    @classmethod
    def from_jsonrpc(cls, err):
        # 从 JSONRPCError.error 对象中取出 code/message/data 创建 MCPError
        return cls(err.error.code, err.error.message, err.error.data)
# 定义 ClientSession 类,用于描述客户端会话
class ClientSession:
    # 构造方法,接收读取流和写入流,可选 elicitation_callback、sampling_callback
+   def __init__(self, read_stream, write_stream, elicitation_callback=None, sampling_callback=None):
        # 赋值读取流和写入流
        self._read, self._write = read_stream, write_stream
        # 初始化请求 id 计数器为 0
        self._req_id = 0
        # elicitation 回调:当服务端发送 elicitation/create 时调用,返回 ElicitResult
        self._elicitation_callback = elicitation_callback
        # sampling 回调:当服务端发送 sampling/createMessage 时调用,返回 CreateMessageResult
+       self._sampling_callback = sampling_callback
    # 内部方法,发送请求并同步等待响应
    def _request(self, req, progress_callback=None, message_handler=None, logging_callback=None):
        # 当前请求 id
        rid = self._req_id
        # 自增请求 id,确保下次唯一
        self._req_id += 1
        # 对请求对象进行序列化,转为 dict
        d = req.model_dump(by_alias=True, mode="json", exclude_none=True)
        # 构建 JSONRPCRequest,再封装入 SessionMessage,通过写入流发送
        jreq = JSONRPCRequest(jsonrpc="2.0", id=rid, method=d["method"], params=d.get("params"))
        # 将请求对象封装成 SessionMessage 后发送
        self._write.send(SessionMessage(message=jreq))
        # 循环等待响应
        while True:
            # 从读取流取出一条消息
            msg = self._read.get()
            # 若取到 None,说明连接断开,抛出异常
            if msg is None:
                raise MCPError(-32000, "Connection closed")
            # 若消息类型不是 SessionMessage,忽略继续等
            if not isinstance(msg, SessionMessage):
                continue
            # 取消息内容
            m = msg.message
            # 服务端发起的 elicitation/create 请求(有 id 和 method)
            # 判断是否有 elicitation 回调,且消息 m 有 id 且 method 为 "elicitation/create"
            if (
                self._elicitation_callback
                and getattr(m, "id", None) is not None
                and getattr(m, "method", "") == "elicitation/create"
            ):
                # 获取消息中的 params 字段,若没有则为 {}
                params_raw = getattr(m, "params", None) or {}
                try:
                    # 尝试用 ElicitRequestFormParams 校验并解析参数
                    params = ElicitRequestFormParams.model_validate(params_raw, by_name=False)
                except Exception:
                    # 校验失败则降级为原始 dict,不做结构检查
                    params = params_raw
                try:
                    # 调用回调处理表单请求,异步支持
                    result = self._elicitation_callback(None, params)
                    # 检查回调是否为协程,如果是则用 asyncio.run 执行
                    if asyncio.iscoroutine(result):
                        result = asyncio.run(result)
                    # 若返回类型正确,构造正常响应
                    if isinstance(result, ElicitResult):
                        resp = JSONRPCResponse(
                            jsonrpc="2.0",
                            id=m.id,
                            result=result.model_dump(by_alias=True, exclude_none=True)
                        )
                    else:
                        # 若返回结果不是 ElicitResult,则以 cancel 响应
                        resp = JSONRPCResponse(
                            jsonrpc="2.0",
                            id=m.id,
                            result=ElicitResult(action="cancel").model_dump(by_alias=True, exclude_none=True)
                        )
                    # 发送封装后的响应 SessionMessage
                    self._write.send(SessionMessage(message=resp))
                except Exception:
                    # 回调过程异常,反馈 cancel,保证流程不挂死
                    resp = JSONRPCResponse(
                        jsonrpc="2.0",
                        id=m.id,
                        result=ElicitResult(action="cancel").model_dump(by_alias=True, exclude_none=True)
                    )
                    self._write.send(SessionMessage(message=resp))
                # 处理完表单请求,进入下一个消息处理循环
                continue
            # 服务端发起 sampling/createMessage 请求
+           if (
+               self._sampling_callback
+               and getattr(m, "id", None) is not None
+               and getattr(m, "method", "") == "sampling/createMessage"
+           ):
                # 获取消息中的 params 字段,若无则为空字典
+               params_raw = getattr(m, "params", None) or {}
+               try:
                    # 使用 CreateMessageRequestParams 校验&解析参数(尽量保证类型正确)
+                   params = CreateMessageRequestParams.model_validate(params_raw, by_name=False)
+               except Exception:
                    # 如果解析失败,则降级为原始入参
+                   params = params_raw
+               try:
                    # 调用采样回调函数,传入参数(支持协程和普通函数)
+                   result = self._sampling_callback(None, params)
                    # 若回调为异步协程,则用 asyncio.run 执行
+                   if asyncio.iscoroutine(result):
+                       result = asyncio.run(result)
                    # 如果回调结果是 CreateMessageResult,则封装为正常响应
+                   if isinstance(result, CreateMessageResult):
+                       resp = JSONRPCResponse(
+                           jsonrpc="2.0",
+                           id=m.id,
+                           result=result.model_dump(by_alias=True, exclude_none=True),
+                       )
+                   else:
                        # 若返回类型不符,则回包内部错误
+                       resp = JSONRPCError(
+                           jsonrpc="2.0",
+                           id=m.id,
+                           error=ErrorData(code=-32603, message="Sampling callback returned invalid result"),
+                       )
                    # 发送封装后的 SessionMessage 给服务端
+                   self._write.send(SessionMessage(message=resp))
+               except Exception as ex:
                    # 回调发生异常,返回 error 数据包
+                   resp = JSONRPCError(
+                       jsonrpc="2.0",
+                       id=m.id,
+                       error=ErrorData(code=-32603, message=str(ex)),
+                   )
+                   self._write.send(SessionMessage(message=resp))
                # 处理完采样请求,进入下个消息循环
+               continue
            # 判断当前消息是否为通知(即没有 id 字段且具有 method 字段)
            if getattr(m, "id", None) is None and hasattr(m, "method"):
                # 获取 method 名称
                method = getattr(m, "method", "")
                # 获取 params 内容(默认为空字典)
                params = getattr(m, "params", None) or {}
                # 若为进度通知,且传入了进度回调 progress_callback
                if method == "notifications/progress" and progress_callback:
                    # 调用进度回调,传递 progress/total/message 参数
                    progress_callback(
                        params.get("progress", 0),
                        params.get("total"),
                        params.get("message"),
                    )
                # 若为普通日志/信息通知
                elif method == "notifications/message":
                    # 如果提供日志回调 logging_callback,则尝试解析为日志参数类型并调用
                    if logging_callback:
                        try:
                            # 使用 LoggingMessageNotificationParams 类型校验参数
                            p = LoggingMessageNotificationParams.model_validate(params, by_name=False)
                            # 调用日志回调
                            logging_callback(p)
                        except Exception:
                            # 校验失败则直接将原始 params 传递给日志回调
                            logging_callback(params)
                    # 如果指定了通用消息处理回调,则调用
                    if message_handler:
                        message_handler(m)
                # 处理完通知类型后,继续等待下一个消息
                continue
            # 若为 Response 或 Error,且 id 匹配
            if isinstance(m, (JSONRPCResponse, JSONRPCError)) and getattr(m, "id", None) == rid:
                # 若为错误,抛出 MCPError 异常
                if isinstance(m, JSONRPCError):
                    raise MCPError.from_jsonrpc(m)
                # 为正常响应则返回结果数据
                return m.result
    # 内部方法,发送通知消息
    def _notify(self, n):
        # 通知对象序列化为 dict
        d = n.model_dump(by_alias=True, mode="json", exclude_none=True)
        # 封装为 JSONRPCNotification,再封装成 SessionMessage 后发出
        self._write.send(SessionMessage(
            message=JSONRPCNotification(
                jsonrpc="2.0",
                method=d["method"],
                params=d.get("params"),
            )
        ))            
    # 初始化过程,发送初始化请求,收到响应后再发初始化完成通知
    def initialize(self):
        # 若提供 elicitation_callback 则声明 ElicitationCapability
        caps = ClientCapabilities()
        # 如果存在 elicitation_callback 回调,则配置客户端能力中的 elicitation 字段
        if self._elicitation_callback:
            # 为 elicitation 能力赋值,包括 form 和 url 两种能力
            caps.elicitation = ElicitationCapability(
                form=FormElicitationCapability(), 
                url=UrlElicitationCapability()
            )
        # 如果存在 sampling_callback 回调,则声明 SamplingCapability
+       if self._sampling_callback:
+           caps.sampling = SamplingCapability()
        # 构造初始化请求,并发送等待返回
        r = self._request(InitializeRequest(
            params=InitializeRequestParams(
                protocol_version=LATEST_PROTOCOL_VERSION,         # 协议版本号
                capabilities=caps,                                # 客户端能力
                client_info=Implementation(name="mcp_lite", version="0.1.0"),  # 客户端信息
            )
        ))
        # 发送"初始化完成"通知
        self._notify(InitializedNotification())
        # 用 InitializeResult 验证响应并返回
        return InitializeResult.model_validate(r, by_name=False)
    def list_tools(self):
        # 发送 ListToolsRequest 请求,使用 ListToolsResult 校验并返回
        return ListToolsResult.model_validate(self._request(ListToolsRequest()), by_name=False)   
    # 调用指定工具方法,传入工具名和参数
    # 定义 call_tool 方法,用于调用指定名称的工具
    def call_tool(self, name, arguments=None, progress_callback=None, message_handler=None, logging_callback=None):
        # 如果提供了 progress_callback,则将当前请求 id 作为 progressToken 加入 meta,便于服务端推送进度通知
        rid = self._req_id
        meta = {"progressToken": rid} if progress_callback else None
        # 构建工具调用请求参数,包含名称、参数和 meta 信息
        params = CallToolRequestParams(name=name, arguments=arguments or {}, meta=meta)
        # 调用 _request 方法发送工具调用请求,并用 CallToolResult 校验和结构化响应
        return CallToolResult.model_validate(
            self._request(
                CallToolRequest(params=params),
                progress_callback=progress_callback,       # 进度回调函数
                message_handler=message_handler,           # 通用消息处理回调
                logging_callback=logging_callback,         # 日志回调函数
            ),
            by_name=False,                                # 按字段名校验
        )

    # 列出已注册的静态资源
    def list_resources(self):
        # 调用 _request 方法发送 ListResourcesRequest 请求
        # 使用 ListResourcesResult 的 model_validate 进行结果校验和结构化
        return ListResourcesResult.model_validate(
            self._request(ListResourcesRequest()),
            by_name=False,
        )

    # 列出资源模板(带占位符的资源)
    def list_resource_templates(self):
        # 调用 _request 方法发送 ListResourceTemplatesRequest 请求
        # 使用 ListResourceTemplatesResult 的 model_validate 进行结果校验和结构化
        return ListResourceTemplatesResult.model_validate(
            self._request(ListResourceTemplatesRequest()),
            by_name=False,
        )

    # 读取指定 URI 的资源内容
    def read_resource(self, uri):
        # uri 可为 str 或 AnyUrl 类型
        # 构造 ReadResourceRequestParams 并发送 ReadResourceRequest 请求
        # 使用 ReadResourceResult 的 model_validate 进行结果校验和结构化
        return ReadResourceResult.model_validate(
            self._request(ReadResourceRequest(params=ReadResourceRequestParams(uri=str(uri)))),
            by_name=False,
        )

    # 列出已注册的 Prompt
    # 定义 list_prompts 方法,用于获取所有已注册的 Prompt
    def list_prompts(self):
        # 发送 ListPromptsRequest 请求,校验响应并结构化为 ListPromptsResult
        return ListPromptsResult.model_validate(
            self._request(ListPromptsRequest()),  # 请求所有 Prompt
            by_name=False,                        # 按字段名校验
        )

    # 请求 Prompt 或资源模板参数的补全建议
    def complete(self, ref, argument, context_arguments=None):
        # 构建补全上下文,若有 context_arguments 则封装为 CompletionContext
        context = CompletionContext(arguments=context_arguments) if context_arguments else None
        # 若 argument 为字典则转为 CompletionArgument
        arg = argument if isinstance(argument, CompletionArgument) else CompletionArgument(**argument)
        # 发送 CompleteRequest,校验并返回 CompleteResult
        return CompleteResult.model_validate(
            self._request(CompleteRequest(
                params=CompleteRequestParams(ref=ref, argument=arg, context=context)
            )),
            by_name=False,
        )

    # 获取指定 Prompt 的内容
    # 定义 get_prompt 方法,根据名称和参数获取指定 Prompt 的详情
    def get_prompt(self, name, arguments=None):
        # 发送 GetPromptRequest,请求参数包含指定名称和参数,响应结构化为 GetPromptResult
        return GetPromptResult.model_validate(
            self._request(GetPromptRequest(                       # 发送请求
                params=GetPromptRequestParams(                    # 构造请求参数
                    name=name,                                    # Prompt 名称
                    arguments=arguments or {}                     # Prompt 参数(默认为空字典)
                )
            )),
            by_name=False,                                        # 按字段名校验
        )

5. init.py #

mcp_lite/server/fastmcp/init.py

# FastMCP 包:从原 fastmcp 模块导入所有内容以保持向后兼容
# 原 fastmcp.py 的内容已移入此包,通过 __init__ 导出

# 导入系统模块
import sys
# 导入 base64 编码库
import base64
# 导入 json 序列化
import json
# 导入正则表达式模块
import re
# 导入 url 解码工具
from urllib.parse import unquote
# 导入类型相关辅助函数
from typing import get_args, get_origin, get_type_hints
# 导入 SessionMessage 类型
from mcp_lite.message import SessionMessage
# 导入 stdio 通信模块
from mcp_lite.server import stdio
# 导入函数签名和异步工具
import inspect
import asyncio
from mcp_lite.types import (                                       # 导入 mcp_lite.types 模块中的多个类型
    JSONRPCRequest,                                                # JSONRPC 请求类型
    JSONRPCNotification,                                           # JSONRPC 通知类型
    JSONRPCError,                                                  # JSONRPC 错误响应类型
    ErrorData,                                                     # 错误数据类型
    InitializeResult,                                              # 初始化结果类型
    LATEST_PROTOCOL_VERSION,                                       # 最新协议版本号
    ToolsCapability,                                               # 工具相关能力描述类型
    ResourcesCapability,                                           # 资源相关能力描述类型
    PromptsCapability,                                             # Prompt 相关能力描述类型
    CompletionsCapability,                                         # 补全能力描述类型
    ElicitationCapability,                                         # Elicitation 能力类型
    FormElicitationCapability,                                     # Form elicitation 能力
    UrlElicitationCapability,                                       # URL elicitation 能力
+   SamplingCapability,                                            # Sampling 能力类型
    ServerCapabilities,                                            # 服务器能力类型
+   CreateMessageRequestParams,                                     # Sampling 请求参数类型
+   CreateMessageResult,                                           # Sampling 响应结果类型
+   SamplingMessage,                                               # Sampling 消息类型
    Implementation,                                                # 实现信息类型
    JSONRPCResponse,                                               # JSONRPC 响应类型
    ListToolsResult,                                               # 列出工具结果类型
    Tool,                                                          # 单个工具类型
    TextContent,                                                   # 文本内容类型
    CallToolRequestParams,                                         # 调用工具请求参数类型
    CallToolResult,                                                # 调用工具响应结果类型
    ElicitResult,                                                 # Elicitation 结果类型
    Resource,                                                      # 静态资源类型
    ResourceTemplate,                                              # 资源模板类型
    ListResourcesResult,                                           # 资源列表返回结果类型
    ListResourceTemplatesResult,                                   # 资源模板列表返回结果类型
    ReadResourceRequestParams,                                     # 读取资源请求参数类型
    ReadResourceResult,                                            # 读取资源请求响应类型
    TextResourceContents,                                          # 资源文本内容类型
    BlobResourceContents,                                          # 资源二进制内容类型
    Prompt,                                                        # Prompt 类型
    PromptArgument,                                                # Prompt 参数类型
    ListPromptsResult,                                             # Prompt 列表结果类型
    GetPromptRequestParams,                                        # 获取 Prompt 请求参数类型
    GetPromptResult,                                               # 获取 Prompt 响应结果类型
    PromptMessage,                                                 # Prompt 消息类型
    Completion,                                                    # 补全结果内容类型
    CompleteRequestParams,                                         # 补全请求参数类型
    CompleteResult,                                                # 补全响应类型
)
# 导入 pydantic 的基础模型作为类型判定
from pydantic import BaseModel as PydanticBaseModel
# 导入 Typeddict 测试
from typing_extensions import is_typeddict as _is_typeddict

# 导入本包下 prompts 子模块
from . import prompts

# 辅助函数:提取函数输出 schema 及包裹需求
def _output_schema_and_wrap(fn, structured_output: bool):
    # 若不是结构化输出,直接返回
    if not structured_output:
        return None, False
    try:
        # 获取函数签名
        sig = inspect.signature(fn)
        # 获取返回类型注解
        ann = sig.return_annotation
        # 如果没有注解,返回
        if ann is inspect.Parameter.empty:
            return None, False
    except Exception:
        return None, False
    # 生成 schema
    return _schema_from_annotation(ann, fn.__name__)

# 辅助函数:根据注解和函数名生成 schema
def _schema_from_annotation(ann, func_name: str):
    # 没有注解直接返回
    if ann is inspect.Parameter.empty:
        return None, False
    # 获取注解的原类型
    origin = get_origin(ann)
    wrap = False
    schema = None
    # 如果是 Pydantic 的模型子类
    if PydanticBaseModel and isinstance(ann, type) and issubclass(ann, PydanticBaseModel):
        schema = ann.model_json_schema()
        return schema, False
    # 如果是 Typeddict
    if hasattr(ann, "__annotations__") and not (origin is dict or origin is list):
        if _is_typeddict(ann):
            hints = get_type_hints(ann) if hasattr(ann, "__annotations__") else {}
            props = {}
            for k, v in hints.items():
                t = v
                if t is int or t is type(None) and int in get_args(v):
                    props[k] = {"type": "integer"}
                elif t is float:
                    props[k] = {"type": "number"}
                elif t is str:
                    props[k] = {"type": "string"}
                elif t is bool:
                    props[k] = {"type": "boolean"}
                else:
                    props[k] = {"type": "string"}
            schema = {"type": "object", "properties": props, "required": list(props)}
            return schema, False
        try:
            hints = get_type_hints(ann)
            if hints:
                props = {}
                for k, v in hints.items():
                    if v is int or v is type(None):
                        props[k] = {"type": "integer"}
                    elif v is float:
                        props[k] = {"type": "number"}
                    elif v is str:
                        props[k] = {"type": "string"}
                    elif v is bool:
                        props[k] = {"type": "boolean"}
                    elif get_origin(v) is list:
                        props[k] = {"type": "array", "items": {"type": "string"}}
                    else:
                        props[k] = {"type": "string"}
                schema = {"type": "object", "properties": props}
                return schema, False
        except Exception:
            pass
    # 如果是 dict 类型
    if origin is dict:
        args = get_args(ann)
        if len(args) == 2 and args[0] is str:
            vt = args[1]
            # 判断 value 类型
            if vt is float:
                schema = {"type": "object", "additionalProperties": {"type": "number"}}
            elif vt is int:
                schema = {"type": "object", "additionalProperties": {"type": "integer"}}
            elif vt is str:
                schema = {"type": "object", "additionalProperties": {"type": "string"}}
            else:
                schema = {"type": "object", "additionalProperties": {}}
            return schema, False
        wrap = True
    # 如果是 list、基础类型等需要包裹
    if origin is list or ann in (int, float, str, bool, type(None)):
        wrap = True
    # 包裹输出情况
    if wrap:
        result_schema = {"type": "string"}
        if ann is int:
            result_schema = {"type": "integer"}
        elif ann is float:
            result_schema = {"type": "number"}
        elif ann is bool:
            result_schema = {"type": "boolean"}
        elif origin is list:
            result_schema = {"type": "array", "items": {"type": "string"}}
        schema = {"type": "object", "properties": {"result": result_schema}, "required": ["result"]}
        return schema, True
    return None, False

# 辅助:对象输出转换为结构化内容
def _to_structured(out, output_schema, wrap_output):
    if output_schema is None:
        return None
    try:
        # 如果是 Pydantic 模型
        if PydanticBaseModel and isinstance(out, PydanticBaseModel):
            return out.model_dump(mode="json")
        # 如果需要包裹
        if wrap_output:
            return {"result": out}
        # 如果是字典
        if isinstance(out, dict):
            return dict(out)
        # 带有 __annotations__ 的对象,提取属性
        if hasattr(out, "__annotations__"):
            hints = get_type_hints(type(out)) if hasattr(type(out), "__annotations__") else getattr(out, "__annotations__", {})
            return {k: getattr(out, k) for k in hints if hasattr(out, k)}
        # 其它直接包裹
        return {"result": out}
    except Exception:
        # 发生异常返回字符串
        return {"result": str(out)}

# 根据函数参数生成 schema
def _schema(fn):
    sig = inspect.signature(fn)
    props = {}
    req = []
    for n, p in sig.parameters.items():
        # 跳过以下划线开头的参数
        if n.startswith("_"):
            continue
        # 跳过 Context 类型参数(由框架注入)
        if _find_context_parameter(fn) == n:
            continue
        # 参数类型为字符串,标题为参数名
        props[n] = {"type": "string", "title": n}
        # 必填参数加入 required
        if p.default is inspect.Parameter.empty:
            req.append(n)
    return {"type": "object", "properties": props, "required": req}


# 检测工具函数是否包含 Context 参数
def _find_context_parameter(fn):
    """若函数有 ctx: Context 参数,或参数名为 ctx,返回参数名;否则返回 None。"""
    try:
        sig = inspect.signature(fn)
        hints = get_type_hints(fn) if hasattr(fn, "__annotations__") else {}
        for name, param in sig.parameters.items():
            ann = hints.get(name, param.annotation)
            if ann is not inspect.Parameter.empty:
                n = getattr(ann, "__name__", None) or str(ann)
                if n == "Context" or (isinstance(n, str) and n.endswith(".Context")):
                    return name
            # 约定:参数名为 ctx 时也视为 Context 注入
            if name == "ctx":
                return name
    except Exception:
        pass
    return None

# 根据 prompt 函数生成 prompt schema
def _prompt_schema(fn):
    """根据函数签名生成 Prompt 参数 schema。"""
    sig = inspect.signature(fn)
    props = {}
    req = []
    for n, p in sig.parameters.items():
        if n.startswith("_"):
            continue
        props[n] = {"type": "string", "title": n}
        if p.default is inspect.Parameter.empty:
            req.append(n)
    return {"type": "object", "properties": props, "required": req}

# Sampling 会话适配器:通过 elicitation_session 向客户端发送 sampling/createMessage
+class _SessionAdapter:
    # 为工具中的 ctx.session 提供 create_message 方法
+   """提供 create_message,供工具中 ctx.session 使用。"""

    # 构造方法,接收 elicitation_session 对象
+   def __init__(self, elicitation_session):
        # 保存 elicitation_session 对象
+       self._session = elicitation_session

    # 异步方法,用于向客户端发起采样请求,获取 LLM 回复
+   async def create_message(self, messages, max_tokens=256, system_prompt=None, temperature=None, stop_sequences=None):
        # 向客户端发起 LLM 采样请求,返回 CreateMessageResult。
+       """向客户端发起 LLM 采样请求,返回 CreateMessageResult。"""
        # 如果 session 对象为 None,抛出运行时异常,说明不支持采样
+       if self._session is None:
+           raise RuntimeError("Sampling not supported: no session (stdio transport required)")
        # 初始化一个空列表,用于存放转换后的消息
+       msgs = []
        # 遍历传入的消息,每条消息都需要序列化处理
+       for m in messages or []:
            # 获取消息的角色,默认为 user
+           role = getattr(m, "role", "user")
            # 获取消息的内容属性,若没有则为 None
+           c = getattr(m, "content", None)
            # 若内容为 None,不处理这条消息
+           if c is None:
+               continue
            # 如果内容有 model_dump 方法,说明是 Pydantic 模型,直接序列化
+           if hasattr(c, "model_dump"):
+               content = c.model_dump(mode="json")
            # 如果内容本身是个列表(可能为内容块列表),逐项序列化
+           elif isinstance(c, list):
+               content = [x.model_dump(mode="json") if hasattr(x, "model_dump") else {"type": "text", "text": str(x)} for x in c]
            # 其它情况,全部按文本类型打包
+           else:
+               content = {"type": "text", "text": str(c)}
            # 添加序列化后的消息(包含角色和内容)到 msgs
+           msgs.append({"role": role, "content": content})
        # 用 CreateMessageRequestParams 构造参数,保证类型正确并兼容官方 MCP 客户端
+       params_obj = CreateMessageRequestParams(
+           messages=list(messages) if messages else [],
+           max_tokens=int(max_tokens),
+           system_prompt=system_prompt,
+           temperature=temperature,
+           stop_sequences=stop_sequences,
+       )
        # 将参数对象序列化为 dict,并应用别名和去除 None 字段
+       params = params_obj.model_dump(by_alias=True, exclude_none=True, mode="json")
        # 调用 session 的 send_request,RPC 远程调用采样接口
+       raw = self._session.send_request("sampling/createMessage", params)
        # 将返回结果校验并转为 CreateMessageResult 类型实例,返回
+       return CreateMessageResult.model_validate(raw, by_name=False)


# 简化的 Elicitation 结果类型(与 ctx.elicit 返回的 result.data 兼容)
class _ElicitationResult:
    """elicit() 返回的结果,支持 result.action 和 result.data。"""

    def __init__(self, action: str, data=None):
        self.action = action
        self.data = data


# Context 类:工具执行时的上下文,支持 read_resource、report_progress、debug、info、elicit
class Context:
    # 工具执行上下文,提供资源读取、进度报告、日志发送、elicitation 等能力。
+   def __init__(
+       self,
+       mcp_server,
+       request_id,
+       send_notification=None,
+       progress_token=None,
+       elicitation_session=None
+   ):
        # 保存 mcp_server 实例
        self._mcp = mcp_server
        # 保存请求 ID
        self.request_id = request_id
        # 发送通知的方法,如未提供则为无操作 lambda
        self._send = send_notification or (lambda _: None)
        # 进度 token,通知进度时用
        self._progress_token = progress_token
        # elicitation 会话(用于与客户端交互获取表单数据)
        self._elicitation_session = elicitation_session
        # Elicitation session 的适配器,为 None 时不支持 sampling
+       self._session_adapter = (
+           _SessionAdapter(elicitation_session) if elicitation_session else None
+       )

    # session 属性,用于会话采样功能
+   @property
+   def session(self):
        # Sampling 会话,用于 ctx.session.create_message(),需要 stdio 传输支持
+       if self._session_adapter is None:
            # 若无适配器则抛异常:不支持采样
+           raise RuntimeError(
+               "Sampling not supported: no session (stdio transport required)"
+           )
        # 返回适配器以支持采样功能
+       return self._session_adapter

    # 异步 elicit 方法,发送 elicitation/create 请求并自动解析表单输入
    async def elicit(self, message: str, schema: type):
        # 若没有 elicitation session,则抛出异常
        if self._elicitation_session is None:
+           raise RuntimeError(
+               "Elicitation not supported: no elicitation session (stdio transport required)"
+           )
        # 获取 JSON Schema 格式描述(若可能)
        json_schema = schema.model_json_schema() if hasattr(schema, "model_json_schema") else {}
        # 组装参数:模式、消息、请求的 schema 信息
+       params = {
+           "mode": "form",
+           "message": message,
+           "requestedSchema": json_schema
+       }
        # 通过 session 同步发送请求(阻塞式)
        raw = self._elicitation_session.send_request("elicitation/create", params)
        # 解析 elicitation 返回结果为对象
        result = ElicitResult.model_validate(raw, by_name=False)
        # 如果用户已提交且有内容
        if result.action == "accept" and result.content:
            # 内容通过 schema 校验
            validated = schema.model_validate(result.content)
            # 返回带 data 的 ElicitationResult
            return _ElicitationResult(action="accept", data=validated)
        # 否则返回 action,并 data=None
        return _ElicitationResult(action=result.action, data=None)

    # 异步读取资源方法
    async def read_resource(self, uri: str):
        # 将 uri 转为字符串
        uri_str = str(uri)
        # 检查是否为静态资源
        if res := self._mcp._resources.get(uri_str):
            # 调用资源的 run 方法获取内容
            out = res.run()
            # 导入内容类型
            from mcp_lite.types import TextResourceContents
            # 返回 TextResourceContents(文本)
+           return [
+               TextResourceContents(
+                   uri=uri_str,
+                   text=str(out),
+                   mime_type=res.mime_type
+               )
+           ]
        # 检查是否匹配到资源模板
        for template in self._mcp._resource_templates.values():
            # 匹配模板,返回参数 args
            if args := template.matches(uri_str):
                # 运行模板获得输出内容
                out = template.run(args)
                # 导入文本与二进制内容类型
                from mcp_lite.types import TextResourceContents, BlobResourceContents
                # 若是二进制,转为 base64
                if isinstance(out, bytes):
+                   import base64
+                   return [
+                       BlobResourceContents(
+                           uri=uri_str,
+                           blob=base64.b64encode(out).decode(),
+                           mime_type=template.mime_type or "application/octet-stream"
+                       )
+                   ]
                # 否则返回文本内容
+               return [
+                   TextResourceContents(
+                       uri=uri_str,
+                       text=str(out),
+                       mime_type=template.mime_type or "text/plain"
+                   )
+               ]
        # 未找到资源则返回空列表
        return []

    # 异步发送进度通知
+   async def report_progress(
+       self,
+       progress: float,
+       total: float | None = None,
+       message: str | None = None
+   ):
        # 仅当有 progress token 时才发送通知
        if self._progress_token is not None:
+           self._send(
+               {
+                   "method": "notifications/progress",
+                   "params": {
+                       "progressToken": self._progress_token,
+                       "progress": progress,
+                       "total": total,
+                       "message": message,
+                   },
+               }
+           )

    # 异步发送 debug 级别日志
    async def debug(self, data):
+       self._send(
+           {
+               "method": "notifications/message",
+               "params": {"level": "debug", "data": data},
+           }
+       )

    # 异步发送 info 级别日志
    async def info(self, data):
+       self._send(
+           {
+               "method": "notifications/message",
+               "params": {"level": "info", "data": data},
+           }
+       )


# 工具函数封装类
class _Tool:
    def __init__(self, fn, name=None, desc=None, structured_output=True):
        # 函数本体
        self.fn = fn
        # 名称
        self.name = name or fn.__name__
        # 描述
        self.desc = desc or (fn.__doc__ or "").strip()
        # 入参 schema
        self.schema = _schema(fn)
        # 是否是异步函数
        self.async_fn = asyncio.iscoroutinefunction(fn)
        # 是否结构化输出
        self.structured_output = structured_output
        # 输出 schema 及需否包裹
        self.output_schema, self.wrap_output = _output_schema_and_wrap(fn, structured_output) if structured_output else (None, False)
        # Context 参数名(若有)
        self._ctx_param = _find_context_parameter(fn)

    # 转换为 Tool 对象
    def to_tool(self):
        return Tool(name=self.name, description=self.desc or None, input_schema=self.schema, output_schema=self.output_schema)

    # 执行工具(自动支持异步),支持注入 Context
    def run(self, args, mcp_server=None, request_id=None, progress_token=None, send_notification=None, elicitation_session=None):
        if self._ctx_param is not None and mcp_server is not None:
            ctx = Context(mcp_server, request_id, send_notification, progress_token, elicitation_session)
            args = dict(args) if args else {}
            args[self._ctx_param] = ctx
        if self.async_fn:
            return asyncio.run(self.fn(**args))
        return self.fn(**args)

# 静态资源封装类
class _Resource:
    def __init__(self, uri, fn, mime_type="text/plain"):
        # 资源 URI
        self.uri = uri
        # 回调函数
        self.fn = fn
        # MIME 类型
        self.mime_type = mime_type
        # 是否异步
        self.async_fn = asyncio.iscoroutinefunction(fn)

    # 资源运行,自动支持异步
    def run(self):
        if self.async_fn:
            return asyncio.run(self.fn())
        return self.fn()

# 资源模板封装类(支持 URI 参数模板)
class _ResourceTemplate:
    def __init__(self, uri_template, fn, mime_type="text/plain"):
        # URI 模板
        self.uri_template = uri_template
        # 生成资源内容的函数
        self.fn = fn
        # MIME 类型
        self.mime_type = mime_type
        # 是否异步
        self.async_fn = asyncio.iscoroutinefunction(fn)
        # 提取函数里除了 _ 开头之外的参数名
        sig = inspect.signature(fn)
        self.param_names = [n for n in sig.parameters if not n.startswith("_")]

    # 检查 URI 是否与模板匹配,并解析参数
    def matches(self, uri):
        pattern = self.uri_template.replace("{", "(?P<").replace("}", ">[^/]+)")
        m = re.match(f"^{pattern}$", uri)
        if m:
            return {k: unquote(v) for k, v in m.groupdict().items()}
        return None

    # 执行模板内容生成函数
    def run(self, args):
        if self.async_fn:
            return asyncio.run(self.fn(**args))
        return self.fn(**args)

# Prompt 封装
class _Prompt:
    """内部 Prompt 封装类。"""

    def __init__(self, fn, name=None, title=None, description=None):
        # prompt 生成函数
        self.fn = fn
        # prompt 名称
        self.name = name or fn.__name__
        # prompt 标题
        self.title = title
        # prompt 描述
        self.description = description or (fn.__doc__ or "").strip()
        # 通用 schema
        self.schema = _prompt_schema(fn)
        # 是否异步
        self.async_fn = asyncio.iscoroutinefunction(fn)
        # 从 schema 生成 PromptArgument 列表
        args_list = []
        if "properties" in self.schema:
            required = set(self.schema.get("required", []))
            for pname, pinfo in self.schema["properties"].items():
                args_list.append(
                    PromptArgument(
                        name=pname,
                        description=pinfo.get("title"),
                        required=pname in required,
                    )
                )
        self.arguments = args_list

    # 转成 Prompt 对象
    def to_prompt(self):
        return Prompt(
            name=self.name,
            title=self.title,
            description=self.description or None,
            arguments=self.arguments if self.arguments else None,
        )

    # prompt 执行
    def run(self, args):
        if self.async_fn:
            return asyncio.run(self.fn(**args))
        return self.fn(**args)

# FastMCP 主类
class FastMCP:
    def __init__(self, name="mcp-server"):
        # 服务器名称
        self.name = name
        # 工具注册表
        self._tools = {}
        # 静态资源注册表
        self._resources = {}
        # 资源模板注册表
        self._resource_templates = {}
        # prompt 注册表
        self._prompts = {}
        # 补全处理器(注册后用于 completion/complete 请求)
        self._completion_handler = None

    # 注册工具的装饰器
    def tool(self, name=None, description=None, structured_output=True):
        def deco(fn):
            t = _Tool(fn, name, description, structured_output=structured_output)
            self._tools[t.name] = t
            return fn
        return deco

    # 注册资源或模板资源的装饰器
    def resource(self, uri, mime_type="text/plain"):
        def deco(fn):
            # 判断带模板参数还是静态资源
            if "{" in uri and "}" in uri:
                template = _ResourceTemplate(uri, fn, mime_type)
                self._resource_templates[uri] = template
            else:
                res = _Resource(uri, fn, mime_type)
                self._resources[uri] = res
            return fn
        return deco

    # 注册补全处理器的装饰器
    def completion(self):
        """注册参数补全处理器,用于 Prompt 或资源模板参数的自动补全。"""

        def deco(fn):
            self._completion_handler = fn
            return fn

        return deco

    # 注册 Prompt 的装饰器
    def prompt(self, name=None, title=None, description=None):
        """注册 Prompt 的装饰器。"""

        def deco(fn):
            p = _Prompt(fn, name=name, title=title, description=description)
            self._prompts[p.name] = p
            return fn
        return deco

    # 核心 RPC 方法分发
    def _handle(self, req, elicitation_session=None):
        # 获取方法名、参数和请求 id
        method, params, rid = req.method, req.params or {}, req.id
        # 初始化请求
        if method == "initialize":
            caps = ServerCapabilities(
                tools=ToolsCapability(),
                resources=ResourcesCapability(),
                prompts=PromptsCapability() if self._prompts else None,
                completions=CompletionsCapability() if self._completion_handler else None,
                elicitation=ElicitationCapability(form=FormElicitationCapability(), url=UrlElicitationCapability()),
+               sampling=SamplingCapability(),
            )
            r = InitializeResult(
                protocol_version=LATEST_PROTOCOL_VERSION,
                capabilities=caps,
                server_info=Implementation(name=self.name, version="0.1.0"),
            )
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
        # 工具列表请求
        if method == "tools/list":
            r = ListToolsResult(tools=[t.to_tool() for t in self._tools.values()])
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
        # 工具调用请求
        if method == "tools/call":
            p = CallToolRequestParams.model_validate(params, by_name=False)
            t = self._tools.get(p.name)
            if not t:
                return JSONRPCError(
                    jsonrpc="2.0",
                    id=rid,
                    error=ErrorData(code=-32602, message=f"Unknown tool: {p.name}")
                )
            # 从 meta/_meta 提取 progressToken,用于发送进度/日志通知
            meta = p.meta or {}
            progress_token = meta.get("progressToken") or meta.get("progress_token")
            notifications = []

            def _send_notification(payload):
                method_name = payload.get("method", "")
                params = payload.get("params")
                notifications.append(JSONRPCNotification(jsonrpc="2.0", method=method_name, params=params))

            try:
                out = t.run(
                    p.arguments or {},
                    mcp_server=self,
                    request_id=rid,
                    progress_token=progress_token,
                    send_notification=_send_notification,
                    elicitation_session=elicitation_session,
                )
                # 如果直接返回的是 CallToolResult
                if isinstance(out, CallToolResult):
                    r = out
                else:
                    struct = None
                    # 结构化输出
                    if t.output_schema is not None:
                        struct = _to_structured(out, t.output_schema, t.wrap_output)
                    # 字符串直接用文本包装
                    if isinstance(out, str):
                        c = [TextContent(text=out)]
                    elif out is None:
                        c = []
                    elif struct is not None:
                        # 结构化结果按 JSON 格式化后返回
                        c = [TextContent(text=json.dumps(struct, ensure_ascii=False, indent=2))]
                    else:
                        # 普通对象尝试转 json,否则转字符串
                        try:
                            if PydanticBaseModel and isinstance(out, PydanticBaseModel):
                                text = json.dumps(out.model_dump(mode="json"), ensure_ascii=False, indent=2)
                            elif isinstance(out, dict):
                                text = json.dumps(out, ensure_ascii=False, indent=2)
                            else:
                                text = str(out)
                        except Exception:
                            text = str(out)
                        c = [TextContent(text=text)]
                    r = CallToolResult(content=c, structured_content=struct)
            except Exception as e:
                r = CallToolResult(content=[TextContent(text=str(e))], is_error=True)
            resp = JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
            if notifications:
                return (notifications, resp)
            return resp

        # prompts/list 请求
        if method == "prompts/list":
            prompts_list = [p.to_prompt() for p in self._prompts.values()]
            r = ListPromptsResult(prompts=prompts_list)
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))

        # prompts/get 请求
        if method == "prompts/get":
            p = GetPromptRequestParams.model_validate(params, by_name=False)
            prompt_obj = self._prompts.get(p.name)
            if not prompt_obj:
                return JSONRPCError(
                    jsonrpc="2.0",
                    id=rid,
                    error=ErrorData(code=-32602, message=f"Unknown prompt: {p.name}")
                )
            try:
                args = p.arguments or {}
                result = prompt_obj.run(args)
                messages = []
                # 单条消息包装为列表
                if not isinstance(result, (list, tuple)):
                    result = [result]
                for msg in result:
                    # 若为内置的 Message 对象
                    if isinstance(msg, prompts.base.Message):
                        content = msg.content
                        if isinstance(content, str):
                            content = TextContent(type="text", text=content)
                        messages.append(PromptMessage(role=msg.role, content=content))
                    # 字符串消息按 user 角色组装
                    elif isinstance(msg, str):
                        messages.append(PromptMessage(role="user", content=TextContent(type="text", text=msg)))
                    # dict 消息,读 role 和 content
                    elif isinstance(msg, dict):
                        role = msg.get("role", "user")
                        cnt = msg.get("content", "")
                        if isinstance(cnt, dict) and cnt.get("type") == "text":
                            content = TextContent(**cnt)
                        else:
                            content = TextContent(type="text", text=str(cnt) if not isinstance(cnt, str) else cnt)
                        messages.append(PromptMessage(role=role, content=content))
                    # 其它均归为 user 文本
                    else:
                        messages.append(PromptMessage(role="user", content=TextContent(type="text", text=str(msg))))
                r = GetPromptResult(description=prompt_obj.description, messages=messages)
            except Exception as e:
                return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))

        # 资源静态列表
        if method == "resources/list":
            resources = [
                Resource(uri=u, name=u, mime_type=r.mime_type)
                for u, r in self._resources.items()
            ]
            r = ListResourcesResult(resources=resources)
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))

        # 资源模板列表
        if method == "resources/templates/list":
            templates = [
                ResourceTemplate(uri_template=u, name=u, mime_type=t.mime_type)
                for u, t in self._resource_templates.items()
            ]
            r = ListResourceTemplatesResult(resource_templates=templates)
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))

        # 处理补全请求 completion/complete
        if method == "completion/complete":
            # 如果没有补全处理器,返回方法未找到错误
            if not self._completion_handler:
                return JSONRPCError(
                    jsonrpc="2.0",
                    id=rid,
                    error=ErrorData(code=-32601, message="Method not found: completion/complete"),
                )
            # 校验并解析补全请求参数
            p = CompleteRequestParams.model_validate(params, by_name=False)
            try:
                # 获取补全处理函数
                fn = self._completion_handler
                # 如果处理函数是协程,则用 asyncio.run 执行,否则直接调用
                result = (
                    asyncio.run(fn(p.ref, p.argument, p.context))
                    if asyncio.iscoroutinefunction(fn)
                    else fn(p.ref, p.argument, p.context)
                )
                # 若处理函数无结果则返回空补全
                if result is None:
                    result = Completion(values=[], total=None, has_more=None)
                # 封装补全响应对象
                r = CompleteResult(completion=result)
                # 构造并返回 JSONRPCResponse 响应
                return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
            except Exception as e:
                # 捕获异常并返回内部错误
                return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))

        # 读取资源接口
        if method == "resources/read":
            p = ReadResourceRequestParams.model_validate(params, by_name=False)
            uri_str = str(p.uri)
            try:
                # 静态资源优先
                if res := self._resources.get(uri_str):
                    out = res.run()
                    content = TextResourceContents(uri=uri_str, text=str(out), mime_type=res.mime_type)
                    r = ReadResourceResult(contents=[content])
                    return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
                # 匹配模板资源
                for template in self._resource_templates.values():
                    if args := template.matches(uri_str):
                        out = template.run(args)
                        if isinstance(out, bytes):
                            content = BlobResourceContents(
                                uri=uri_str,
                                blob=base64.b64encode(out).decode(),
                                mime_type=template.mime_type or "application/octet-stream",
                            )
                        else:
                            content = TextResourceContents(
                                uri=uri_str,
                                text=str(out),
                                mime_type=template.mime_type or "text/plain",
                            )
                        r = ReadResourceResult(contents=[content])
                        return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
                # 找不到资源
                return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32602, message=f"Unknown resource: {uri_str}"))
            except Exception as e:
                return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))

        # 未知方法
        return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32601, message=f"Method not found: {method}"))

    # 消息包装与请求
    def _handle_msg(self, msg, elicitation_session=None):
        # 非 SessionMessage 类型忽略
        if not isinstance(msg, SessionMessage):
            return None
        m = msg.message
        # 必须是带 id 的 jsonrpc 请求
        if not isinstance(m, JSONRPCRequest) or getattr(m, "id", None) is None:
            return None
        #print("[Server] Request:", m.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
        try:
            resp = self._handle(m, elicitation_session)
            #print("[Server] Response:", resp.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
            return resp
        except Exception as e:
            err = JSONRPCError(jsonrpc="2.0", id=m.id, error=ErrorData(code=-32603, message=str(e)))
            #print("[Server] Response (error):", err.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
            return err

    # 运行服务器主逻辑
    def run(self, transport="stdio", host: str = "127.0.0.1", port: int = 8000):
        # 如果传输方式为 stdio,则启动标准输入输出服务器
        if transport == "stdio":
            stdio.stdio_server(self._handle_msg)
        # 如果传输方式为 streamable-http,则启动基于 HTTP 的服务器
        elif transport == "streamable-http":
            # 导入 Starlette 框架相关模块
            from starlette.applications import Starlette
            from starlette.routing import Mount
            # 导入 streamable_http_app 工厂方法
            from mcp_lite.server.streamable_http import streamable_http_app
            # 导入 uvicorn 用于启动 ASGI 服务
            import uvicorn
            # 创建 Streamable HTTP ASGI 应用
            app = streamable_http_app(self._handle_msg, path="/")
            # 将 /mcp 路径挂载到应用
            full_app = Starlette(routes=[Mount("/mcp", app=app)])
            # 启动 uvicorn 服务,监听指定 host 和 port
            uvicorn.run(full_app, host=host, port=port)
        # 如果传输方式不被支持,则抛出异常
        else:
            raise ValueError(f"unsupported transport: {transport}")

# 明确导出接口
__all__ = ["FastMCP", "Context", "prompts"]

6. types.py #

mcp_lite/types.py

# 从 pydantic 导入 BaseModel、ConfigDict、Field、TypeAdapter、field_validator
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, field_validator
# 导入 pydantic 的 to_camel 驼峰命名生成器
from pydantic.alias_generators import to_camel
# 导入 Any、Literal、Annotated 类型注解
from typing import Any, Literal, Annotated
# 定义 RequestId 类型,既可以是 int 也可以是 str
RequestId = int | str
# 定义 MCP 的基础模型类,支持驼峰命名和按名称填充
class MCPModel(BaseModel):
    # 指定 Pydantic 模型配置:使用驼峰形式命名和按名称填充
   model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
# Elicitation 能力(用于 initialize 能力协商,需在 ClientCapabilities/ServerCapabilities 前定义)
class FormElicitationCapability(MCPModel):
    pass
class UrlElicitationCapability(MCPModel):
    pass
class ElicitationCapability(MCPModel):
    form: FormElicitationCapability | None = None
    url: UrlElicitationCapability | None = None
# Sampling 能力(客户端支持 LLM 采样时声明)
+class SamplingCapability(MCPModel):
+   pass
# 定义客户端能力结构体
class ClientCapabilities(MCPModel):
    # 可选字段:客户端实验性能力扩展,键为 str,值为字典
   experimental: dict[str, dict[str, Any]] | None = None
    # 可选字段:客户端 Elicitation 能力(提供 elicitation_callback 时声明)
   elicitation: ElicitationCapability | None = None
    # 可选字段:客户端 Sampling 能力(提供 sampling_callback 时声明)
+  sampling: SamplingCapability | None = None
# 定义服务器或客户端的实现信息结构体
class Implementation(MCPModel):
    # 实现的名称
   name: str = ""
    # 实现的版本号
   version: str = ""
    # 可选字段:人类可读的标题
   title: str | None = None
    # 可选字段:实现的描述
   description: str | None = None    
# 定义初始化请求参数结构体
class InitializeRequestParams(MCPModel):
    # 协议版本号
   protocol_version: str = ""
    # 可选字段:客户端能力描述
   capabilities: ClientCapabilities = None
    # 可选字段:客户端实现信息
   client_info: Implementation = None
# 定义初始化请求结构体
class InitializeRequest(MCPModel):
    # 方法名,固定为 "initialize"
   method: Literal["initialize"] = "initialize"
    # 可选字段:参数,类型为 InitializeRequestParams
   params: InitializeRequestParams = None    
# 当前协议的最新版本号
LATEST_PROTOCOL_VERSION = "2024-11-05"
# 定义工具相关能力结构体
class ToolsCapability(MCPModel):
    # 工具列表是否发生变化,可选布尔类型
   list_changed: bool | None = None
# 定义资源相关能力结构体
class ResourcesCapability(MCPModel):
    # 是否支持资源订阅
   subscribe: bool | None = None
    # 资源列表是否变化通知
   list_changed: bool | None = None
# 定义 Prompt 相关能力结构体
class PromptsCapability(MCPModel):
    list_changed: bool | None = None
# 定义补全能力描述(服务端声明支持补全时使用)
class CompletionsCapability(MCPModel):
    pass
# 定义服务端能力描述结构体
class ServerCapabilities(MCPModel):
    # 可选字段:实验性能力扩展
   experimental: dict[str, dict[str, Any]] | None = None
    # 可选字段:工具能力
   tools: ToolsCapability | None = None
    # 可选字段:资源能力
   resources: ResourcesCapability | None = None
    # 可选字段:Prompt 能力
   prompts: PromptsCapability | None = None
    # 可选字段:补全能力(注册了 @completion 时声明)
   completions: CompletionsCapability | None = None
    # 可选字段:Elicitation 能力(服务端支持 elicitation 时声明)
   elicitation: ElicitationCapability | None = None
    # 可选字段:Sampling 能力(服务端支持 sampling 时声明)
+  sampling: SamplingCapability | None = None
# 定义初始化响应结构体
class InitializeResult(MCPModel):
    # 协议版本号
   protocol_version: str = ""
    # 可选字段:服务端能力描述
   capabilities: ServerCapabilities = None
    # 可选字段:服务端实现信息
   server_info: Implementation = None
    # 可选字段:初始化说明信息
   instructions: str | None = None
# 定义客户端初始化完成通知结构体
class InitializedNotification(MCPModel):
    # 方法名,固定为 "notifications/initialized"
   method: Literal["notifications/initialized"] = "notifications/initialized"
    # 可选字段:通知参数,可以为字典或 None
   params: dict[str, Any] | None = None    
# 定义 JSONRPC 请求的数据结构
class JSONRPCRequest(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
    jsonrpc: Literal["2.0"] = "2.0"
    # 请求 ID,可以为 int 或 str 类型
    id: RequestId = None
    # 方法名称,字符串类型
    method: str = ""
    # 方法参数,为一个字典或 None
    params: dict[str, Any] | None = None

# 定义 JSONRPC 通知的数据结构(没有 id 字段)
class JSONRPCNotification(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
    jsonrpc: Literal["2.0"] = "2.0"
    # 通知的方法名称
    method: str = ""
    # 通知参数,可以为字典或者 None
    params: dict[str, Any] | None = None

# 定义 JSONRPC 响应的数据结构
class JSONRPCResponse(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
    jsonrpc: Literal["2.0"] = "2.0"
    # 响应的 ID,需要与请求 ID 匹配
    id: RequestId = None
    # 响应结果,可以为字典或 None
    result: dict[str, Any] = None

# 定义错误的数据结构
class ErrorData(BaseModel):
    # 错误码,默认值为 0
    code: int = 0
    # 错误信息,默认值为空字符串
    message: str = ""
    # 附加的错误数据,可以为任意类型或 None
    data: Any = None

# 定义 JSONRPC 错误消息的数据结构
class JSONRPCError(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
    jsonrpc: Literal["2.0"] = "2.0"
    # 错误对应的请求 ID,可以为 None
    id: RequestId | None = None
    # 错误的详细信息,类型为 ErrorData
    error: ErrorData = None
# 定义所有 JSONRPC 消息的联合类型
JSONRPCMessage = JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError

# 进度通知参数(notifications/progress)
class ProgressNotificationParams(MCPModel):
    progress_token: str | int = ""
    progress: float = 0.0
    total: float | None = None
    message: str | None = None

# 日志消息通知参数(notifications/message)
class LoggingMessageNotificationParams(MCPModel):
    level: str = "info"
    data: Any = None
    logger: str | None = None
# 定义 JSONRPC 消息适配器,用于类型自动推断和校验
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)
# 定义工具结果中的文本内容块
class TextContent(MCPModel):
    type: Literal["text"] = "text"
    text: str = ""

# 定义采样消息类(用于 sampling/createMessage)
+class SamplingMessage(MCPModel):
    # 消息角色字段,默认值为 "user"
+   role: str = "user"
    # 消息内容,可以是单个 TextContent 或 TextContent 列表,默认为空文本
+   content: TextContent | list[TextContent] = Field(default_factory=lambda: TextContent(type="text", text=""))

# 定义 sampling/createMessage 请求参数的数据结构
+class CreateMessageRequestParams(MCPModel):
    # 消息列表参数,元素为 SamplingMessage,默认为空列表
+   messages: list[SamplingMessage] = Field(default_factory=list)
    # 最大生成 token 数,默认值为 256
+   max_tokens: int = 256
    # 系统提示词,可以为 None
+   system_prompt: str | None = None
    # 采样温度参数,可为 None
+   temperature: float | None = None
    # 停止序列列表,可为 None
+   stop_sequences: list[str] | None = None

# 定义 sampling/createMessage 响应结果的数据结构
+class CreateMessageResult(MCPModel):
    # 回复消息的角色,默认值为 "assistant"
+   role: str = "assistant"
    # 回复内容字段,类型为 TextContent,默认为空文本
+   content: TextContent = Field(default_factory=lambda: TextContent(type="text", text=""))
    # 所用的模型名称,默认为空字符串
+   model: str = ""
    # 停止原因,可为 None
+   stop_reason: str | None = None

# 定义工具描述数据结构体
class Tool(MCPModel):
    # 工具名称
   name: str = ""
    # 可选字段:工具描述
   description: str | None = None
    # 工具输入参数的 schema,默认为空字典
   input_schema: dict[str, Any] = Field(default_factory=dict)
    # 可选字段:工具输出 schema
   output_schema: dict[str, Any] | None = None
# 定义获取工具列表请求结构体
class ListToolsRequest(MCPModel):
    # 方法名,固定为 "tools/list"
   method: Literal["tools/list"] = "tools/list"
    # 可选字段:参数,可以为字典或 None
   params: dict[str, Any] | None = None

# 定义获取工具列表响应结构体
class ListToolsResult(MCPModel):
    # 工具列表,默认为空列表
   tools: list[Tool] = []
    # 可选字段:分页游标,可为 None
   next_cursor: str | None = None

# 定义资源元数据结构体
class Resource(MCPModel):
    # 资源 URI
   uri: str = ""
    # 可选:资源名称
   name: str | None = None
    # 可选:人类可读标题
   title: str | None = None
    # 可选:资源描述
   description: str | None = None
    # 可选:MIME 类型
   mime_type: str | None = None

# 定义资源模板结构体
class ResourceTemplate(MCPModel):
    # URI 模板,如 greeting://{name}
   uri_template: str = ""
    # 可选:模板名称
   name: str | None = None
    # 可选:人类可读标题
   title: str | None = None
    # 可选:模板描述
   description: str | None = None
    # 可选:MIME 类型
   mime_type: str | None = None

# 定义 resources/list 请求结构体
class ListResourcesRequest(MCPModel):
   method: Literal["resources/list"] = "resources/list"
   params: dict[str, Any] | None = None

# 定义 resources/list 响应结构体
class ListResourcesResult(MCPModel):
   resources: list[Resource] = []
   next_cursor: str | None = None

# 定义 resources/templates/list 请求结构体
class ListResourceTemplatesRequest(MCPModel):
   method: Literal["resources/templates/list"] = "resources/templates/list"
   params: dict[str, Any] | None = None

# 定义 resources/templates/list 响应结构体
class ListResourceTemplatesResult(MCPModel):
   resource_templates: list[ResourceTemplate] = []
   next_cursor: str | None = None

# 定义 resources/read 请求参数结构体
class ReadResourceRequestParams(MCPModel):
   uri: str = ""

# 定义 resources/read 请求结构体
class ReadResourceRequest(MCPModel):
   method: Literal["resources/read"] = "resources/read"
   params: ReadResourceRequestParams = None

# 定义资源内容基类(文本)
class TextResourceContents(MCPModel):
   uri: str = ""
   mime_type: str | None = None
   text: str = ""

# 定义资源内容基类(二进制)
class BlobResourceContents(MCPModel):
   uri: str = ""
   mime_type: str | None = None
   blob: str = ""  # base64 编码

# 定义 resources/read 响应结构体
class ReadResourceResult(MCPModel):
   contents: list[TextResourceContents | BlobResourceContents] = []

# 定义 Prompt 参数结构体
# 定义代表 Prompt 参数的模型
class PromptArgument(MCPModel):
    # 参数名称
    name: str = ""
    # 参数描述,可为 None
    description: str | None = None
    # 是否为必填参数,可为 None
    required: bool | None = None

# 定义 Prompt 元数据结构体
# 定义代表 Prompt 元数据的模型
class Prompt(MCPModel):
    # Prompt 名称
    name: str = ""
    # Prompt 描述,可为 None
    description: str | None = None
    # Prompt 参数列表,可为 None
    arguments: list[PromptArgument] | None = None
    # Prompt 标题,可为 None
    title: str | None = None

# 定义 prompts/list 请求结构体
# 定义 prompts/list 请求的数据模型
class ListPromptsRequest(MCPModel):
    # 固定方法字段
    method: Literal["prompts/list"] = "prompts/list"
    # 请求参数,可为 None
    params: dict[str, Any] | None = None

# 定义 prompts/list 响应结构体
# 定义 prompts/list 响应的数据模型
class ListPromptsResult(MCPModel):
    # 返回的 Prompt 列表
    prompts: list[Prompt] = []
    # 下一页游标,可为 None
    next_cursor: str | None = None

# 定义 prompts/get 请求参数结构体
# 定义 prompts/get 的参数模型
class GetPromptRequestParams(MCPModel):
    # Prompt 名称
    name: str = ""
    # 传递给 Prompt 的参数,可为 None
    arguments: dict[str, str] | None = None

# 定义 prompts/get 请求结构体
# 定义 prompts/get 请求的数据模型
class GetPromptRequest(MCPModel):
    # 固定方法字段
    method: Literal["prompts/get"] = "prompts/get"
    # 请求参数,可为 None
    params: GetPromptRequestParams | None = None

# 定义 Prompt 消息结构体(role + content)
# 定义表示 Prompt 里的单条消息的数据模型
class PromptMessage(MCPModel):
    # 消息角色(user 或 assistant)
    role: Literal["user", "assistant"] = "user"
    # 消息内容,可以是 TextContent 或字典,默认为空文本
    content: TextContent | dict[str, Any] = Field(default_factory=lambda: TextContent(text=""))

    # 字段校验器:在模型初始化前解析 content 字段
    @field_validator("content", mode="before")
    @classmethod
    def _parse_content(cls, v):
        # 如果是字典且 type 为 "text",转换为 TextContent 实例
        if isinstance(v, dict) and v.get("type") == "text":
            return TextContent(text=v.get("text", ""))
        # 否则原样返回
        return v

# 定义 prompts/get 响应结构体
# 定义 prompts/get 的响应数据模型
class GetPromptResult(MCPModel):
    # Prompt 的描述,可为 None
    description: str | None = None
    # Prompt 返回的消息列表
    messages: list[PromptMessage] = []

# 定义 Prompt 引用类型(用于补全请求)
class PromptReference(MCPModel):
    # 类型字段,固定为 "ref/prompt"
    type: Literal["ref/prompt"] = "ref/prompt"
    # Prompt 名称
    name: str = ""

# 定义资源模板引用类型(用于补全请求)
class ResourceTemplateReference(MCPModel):
    # 类型字段,固定为 "ref/resource"
    type: Literal["ref/resource"] = "ref/resource"
    # 资源模板的 URI
    uri: str = ""

# 定义补全参数类型
class CompletionArgument(MCPModel):
    # 参数名称
    name: str = ""
    # 参数值
    value: str = ""

# 定义补全上下文类型(已解析的其它参数)
class CompletionContext(MCPModel):
    # 补全上下文参数,字典类型,可能为 None
    arguments: dict[str, str] | None = None

# 定义补全结果内容类型
class Completion(MCPModel):
    # 候选内容列表,默认为空列表
    values: list[str] = Field(default_factory=list)
    # 总数,可为 None
    total: int | None = None
    # 是否有更多结果,可为 None
    has_more: bool | None = None

# 定义 completion/complete 请求参数
class CompleteRequestParams(MCPModel):
    # 引用字段,用 type 字段区分 PromptReference 与 ResourceTemplateReference
    ref: Annotated[
        PromptReference | ResourceTemplateReference,
        Field(discriminator="type"),
    ] = None
    # 需要补全的参数
    argument: CompletionArgument = None
    # 辅助的上下文字段,可为 None
    context: CompletionContext | None = None

# 定义 completion/complete 请求
class CompleteRequest(MCPModel):
    # 方法名,固定为 "completion/complete"
    method: Literal["completion/complete"] = "completion/complete"
    # 请求参数
    params: CompleteRequestParams = None

# 定义 completion/complete 响应
class CompleteResult(MCPModel):
    # 补全结果,类型为 Completion
    completion: Completion = None


# 定义调用工具请求参数结构体
class CallToolRequestParams(MCPModel):
    # 工具名称
   name: str = ""
    # 可选字段:输入参数,为字典类型或 None
   arguments: dict[str, Any] | None = None
    # 可选:元数据,含 progressToken 时服务端可发送进度通知
   meta: dict[str, Any] | None = Field(default=None, alias="_meta")        
# 定义调用工具请求结构体
class CallToolRequest(MCPModel):
    # 方法名,固定为 "tools/call"
   method: Literal["tools/call"] = "tools/call"
    # 可选字段:参数,类型为 CallToolRequestParams
   params: CallToolRequestParams = None
# === Elicitation 相关类型 ===

# 定义 Elicitation 请求的 schema 类型,为一个包含 str 键和任意值的字典,用于描述请求的 JSON Schema 子集
ElicitRequestedSchema: type = dict[str, Any]

# 定义 Form 模式的 elicitation 请求参数结构体
class ElicitRequestFormParams(MCPModel):
    # mode 字段,固定为 "form"
    mode: Literal["form"] = "form"
    # message 字段,请求给用户展示的消息,默认为空字符串
    message: str = ""
    # requested_schema 字段,表示所请求的 schema,默认为空字典
    requested_schema: dict[str, Any] = Field(default_factory=dict)

# 定义 URL 模式的 elicitation 请求参数结构体(暂时保留,未实现)
class ElicitRequestURLParams(MCPModel):
    # mode 字段,固定为 "url"
    mode: Literal["url"] = "url"
    # message 字段,请求给用户展示的消息,默认为空字符串
    message: str = ""
    # url 字段,请求跳转的 URL,默认为空字符串
    url: str = ""
    # elicitation_id 字段,当前 elicitation 的唯一标识,默认为空字符串
    elicitation_id: str = ""

# 定义 Elicitation 请求参数的联合类型,可以为表单模式参数或 URL 模式参数
ElicitRequestParams = ElicitRequestFormParams | ElicitRequestURLParams

# 定义 Elicitation 请求结构体,表示服务端向客户端发送的请求
class ElicitRequest(MCPModel):
    # method 字段,请求的方法名,固定为 "elicitation/create"
    method: Literal["elicitation/create"] = "elicitation/create"
    # params 字段,请求所需参数,可为 Form 或 URL 模式参数,默认为 None
    params: ElicitRequestFormParams | ElicitRequestURLParams = None

# 定义 Elicitation 结果结构体,表示客户端返回给服务端的结果
class ElicitResult(MCPModel):
    # action 字段,客户端选择的操作,可为 "accept"、"decline" 或 "cancel",默认为 "cancel"
    action: Literal["accept", "decline", "cancel"] = "cancel"
    # content 字段,内容的数据体,可为包含多种基本类型的字典,也可以为 None
    content: dict[str, str | int | float | bool | list[str] | None] | None = None


# 定义调用工具响应结构体
class CallToolResult(MCPModel):
    # 内容字段,由 TextContent 或字典组成的列表,默认为空列表
    content: list[TextContent | dict[str, Any]] = Field(default_factory=list)
    # 结构化内容字段,可为字典或 None
    structured_content: dict[str, Any] | None = None
    # 错误标志字段,标识是否为错误结果,默认为 False
    is_error: bool = False

    # 针对 content 字段的字段校验器,模型初始化前调用
    @field_validator("content", mode="before")
    @classmethod
    def _parse_content(cls, v):
        # 如果传入的值不是列表,直接返回
        if not isinstance(v, list):
            return v
        # 初始化输出列表
        out = []
        # 遍历每一项
        for item in v:
            # 如果项是字典且类型为 "text",转换为 TextContent 实例
            if isinstance(item, dict) and item.get("type") == "text":
                out.append(TextContent(text=item.get("text", "")))
            # 否则,原样加入输出列表
            else:
                out.append(item)
        # 返回处理后的列表
        return out       

7.工作流程 #

7.1. 整体架构 #

MCP 采样的核心是:服务端在工具执行时向客户端发起 sampling/createMessage 请求,由客户端调用 LLM(如 DeepSeek)并返回结果。

7.2. 数据流概览 #

工具执行 (generate_poem)
    → ctx.session.create_message(messages, max_tokens)
    → _SessionAdapter 用 CreateMessageRequestParams 构建 params
    → ElicitationSession.send_request("sampling/createMessage", params)
    → 写入 stdout(客户端从 subprocess.stdout 读取)
    → 客户端 _request 循环收到请求
    → 调用 sampling_callback(params)
    → handle_sampling 调用 DeepSeek API
    → 返回 CreateMessageResult
    → 客户端发送 JSON-RPC 响应
    → 服务端 ElicitationSession 从 stdin 读取
    → create_message 返回 CreateMessageResult
    → 工具拿到 result.content.text 并返回

7.3 时序图 #

sequenceDiagram participant User as 用户/调用方 participant Client as MCP 客户端<br/>(sampling_client) participant Server as MCP 服务端<br/>(sampling_server) participant Tool as 工具<br/>(generate_poem) participant LLM as DeepSeek API User->>Client: 启动 sampling_client.py Client->>Server: stdio_client 启动子进程 Client->>Server: initialize (声明 sampling 能力) Server-->>Client: InitializeResult (声明 sampling 能力) User->>Client: session.call_tool("generate_poem", {topic: "自然"}) Client->>Server: tools/call (generate_poem) Server->>Tool: 执行 generate_poem(topic, ctx) Tool->>Tool: ctx.session.create_message(messages, max_tokens) Note over Tool,Client: _SessionAdapter 构建 CreateMessageRequestParams Tool->>Client: sampling/createMessage (JSON-RPC 请求) Client->>Client: handle_sampling(params) Client->>Client: _messages_to_deepseek(messages) Client->>LLM: POST /v1/chat/completions (DeepSeek API) LLM-->>Client: 模型回复文本 Client->>Client: CreateMessageResult(role, content, model, stop_reason) Client->>Server: JSON-RPC 响应 (CreateMessageResult) Server->>Tool: create_message 返回 result Tool->>Tool: return result.content.text Tool->>Server: CallToolResult(content=[诗歌文本]) Server->>Client: tools/call 响应 Client->>User: 打印 [Result] 诗歌内容

7.4. 各模块修改 #

7.4.1 sampling_client.py(客户端) #

  • handle_sampling:处理服务端发来的 sampling/createMessage,将 CreateMessageRequestParams 转为 DeepSeek API 格式,调用 _call_deepseek,返回 CreateMessageResult。
  • main():同步流程:with stdio_client(...) as (read, write) → ClientSession(read, write, sampling_callback=handle_sampling) → session.initialize() → session.call_tool(...)。
  • _messages_to_deepseek:把 MCP 的 SamplingMessage 转为 DeepSeek 的 {role, content} 列表。

7.4.2 sampling_server.py(服务端) #

  • generate_poem / summarize:工具中通过 ctx.session.create_message(messages=[...], max_tokens=...) 向客户端发起采样。
  • ctx.session:由 FastMCP 注入,内部通过 ElicitationSession.send_request("sampling/createMessage", params) 与客户端通信。

7.4.3 mcp_lite/client/session.py(客户端会话) #

  • __init__:新增 sampling_callback 参数。
  • _request 循环:在等待响应时,若收到 sampling/createMessage:
    • 用 CreateMessageRequestParams 解析 params
    • 调用 sampling_callback(None, params)(支持 async,用 asyncio.run 执行)
    • 若返回 CreateMessageResult,则构造 JSON-RPC 响应并发送;否则返回错误。
  • initialize():若提供 sampling_callback,则在 ClientCapabilities 中声明 sampling: SamplingCapability()。

7.4.4 mcp_lite/server/fastmcp/__init__.py(服务端 FastMCP) #

  • _SessionAdapter:
    • create_message(messages, max_tokens, system_prompt, ...) 用 CreateMessageRequestParams 构建参数,model_dump(by_alias=True) 得到 camelCase 的 JSON。
    • 调用 elicitation_session.send_request("sampling/createMessage", params) 发送请求并等待响应。
    • 将响应解析为 CreateMessageResult 返回。
  • Context.session:属性,返回 _SessionAdapter,供工具调用 ctx.session.create_message()。
  • initialize:在 ServerCapabilities 中声明 sampling=SamplingCapability()。

7.4.5 mcp_lite/types.py(类型定义) #

  • SamplingCapability:采样能力声明。
  • SamplingMessage:role + content(TextContent 或列表)。
  • CreateMessageRequestParams:messages、max_tokens、system_prompt、temperature、stop_sequences。
  • CreateMessageResult:role、content、model、stop_reason。
  • ClientCapabilities / ServerCapabilities:新增 sampling 字段。

7.5 要点总结 #

角色 职责
MCP 服务端 提供工具;工具执行时通过 ctx.session.create_message() 向客户端发起采样请求
MCP 客户端 持有 LLM/API;收到 sampling/createMessage 后调用 sampling_callback,转调 DeepSeek 等 API,返回 CreateMessageResult
ElicitationSession 复用 stdio 双向通道,用于服务端向客户端发送 sampling/createMessage 并等待响应
_SessionAdapter 封装 create_message,负责构建参数、发送请求、解析响应

整体上,采样请求由服务端发起,客户端负责实际 LLM 调用,通过 stdio 上的 JSON-RPC 完成往返通信。

← 上一节 36.引导 下一节 38.LowLevel →

访问验证

请输入访问令牌

Token不正确,请重新输入