导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.授权
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • starlette
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • z
  • 1. LowLevel
  • 2. lowlevel_client.py
  • 3. lowlevel_server.py
  • 4. 执行过程
    • 4.1 整体执行流程
      • 4.1.1. 启动阶段
      • 4.1.2. 客户端主流程
    • 4.2 时序图
      • 4.2.1. 连接与初始化
      • 4.2.2. list_tools 流程
      • 4.2.3. call_tool 流程
      • 4.2.4. read_resource 流程
    • 4.3 请求上下文与生命周期
    • 4.4 总结

1. LowLevel #

MCP 低层 server(lowlevel_server.py)的实现,展示了如何用最基础的方式注册工具、资源、以及处理对应的调用。

主要特性说明:

  1. 工具注册与调用

    • 通过 `@server.list_tools()` 装饰器注册了工具的列举函数,返回工具的定义,包括名称、输入输出 schema 等。
    • 通过 `@server.call_tool()注册了工具调用处理函数,实现了对注册工具(如query_db`)的参数校验与 SQL 查询执行,并返回结构化的查询结果。
  2. 资源注册与访问

    • 通过 `@server.list_resources()` 提供了已知资源列表,例如系统状态等。
    • 通过 `@server.read_resource()` 提供了资源内容的读取,实现指定 URI 的内容获取和格式化(如 JSON)。
  3. 主运行逻辑

    • 启动时通过标准输入输出(stdio)与客户端创建通信通道,实现了兼容 MCP 标准协议的服务器端主循环。
    • 支持详情见异步主函数 run(),实践中适用于脚本或 CLI 启动方式。

该 server 示例适合作为理解工具/资源/协议基本模型与实现机制的参考样例,可以灵活扩展更多工具与资源场景。配合 lowlevel_client.py 客户端脚本,可便捷演示完整的调用流程与数据交互。

2. lowlevel_client.py #

lowlevel_client.py

# MCP 低层客户端:连接 lowlevel_server,演示工具、资源。
"""MCP 低层客户端:连接 lowlevel_server,演示工具、资源。"""

# 导入异步io相关模块
import asyncio
# 导入操作系统相关模块
import os
# 导入系统模块
import sys

# 从mcp包中导入相关类型和工具
from mcp import ClientSession, StdioServerParameters, types
# 从mcp.client.stdio导入stdio_client函数
from mcp.client.stdio import stdio_client
# 从pydantic导入AnyUrl类型
from pydantic import AnyUrl

# 获取当前文件所在目录
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 构建虚拟环境下python可执行文件路径
VENV_PYTHON = os.path.join(
    BASE_DIR, ".venv", "Scripts" if os.name == "nt" else "bin", "python.exe" if os.name == "nt" else "python"
)

# 打印工具调用结果的辅助函数
def _print_tool_result(result) -> None:
    # 尝试获取结构化内容
    if structured := getattr(result, "structuredContent", None):
        # 如果有结构化内容,则打印
        print("[Structured Output]", structured)
    else:
        # 否则拼接文本内容列表
        texts = [b.text for b in result.content if isinstance(b, types.TextContent)]
        print("[Text Content]", " | ".join(texts))

# 打印资源内容的辅助函数
def _print_resource_content(result) -> None:
    # 提取属于TextResourceContents类型的内容
    texts = [b.text for b in result.contents if isinstance(b, types.TextResourceContents)]
    print("[Resource Content]", " | ".join(texts))

# 主异步函数,负责客户端主流程
async def main() -> None:
    # 创建与lowlevel_server的服务器参数
    server_params = StdioServerParameters(
        command=VENV_PYTHON,
        args=[os.path.join(BASE_DIR, "lowlevel_server.py")],
        env={},
        cwd=BASE_DIR,
    )

    # 与服务端建立stdio客户端连接
    async with stdio_client(server_params) as (read, write):
        # 创建Session会话
        async with ClientSession(read, write) as session:
            # 初始化session
            await session.initialize()

            # 获取并打印支持的工具列表
            tools = await session.list_tools()
            print("[Tools]", [t.name for t in tools.tools])

            # 获取并打印资源列表
            resources = await session.list_resources()
            print("[Resources]", [r.uri for r in resources.resources])

            # 演示调用工具
            print("\n=== 测试工具调用 ===")
            tool_result = await session.call_tool("query_db", {"query": "SELECT * FROM users"})
            _print_tool_result(tool_result)

            # 演示读取资源
            print("\n=== 测试资源读取 ===")
            resource_result = await session.read_resource(AnyUrl("data://status"))
            _print_resource_content(resource_result)

# 程序入口
if __name__ == "__main__":
    # 检查标准输出输入是否支持reconfigure方法,并设置为UTF-8编码
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    # 运行主异步函数
    asyncio.run(main())

3. lowlevel_server.py #

lowlevel_server.py

# MCP 低层服务器:工具、资源、生命周期。
"""MCP 低层服务器:工具、资源、生命周期。"""

# 导入 asyncio 异步编程库
import asyncio
# 导入 json 用于序列化和反序列化
import json
# 导入异步迭代器类型
from collections.abc import AsyncIterator
# 导入异步上下文管理器装饰器
from contextlib import asynccontextmanager
# 导入 Any 类型,表示任意类型
from typing import Any

# 导入 MCP 的类型定义模块
import mcp.types as types
# 导入 MCP 服务器基类
from mcp.server import Server
# 导入底层通知选项
from mcp.server.lowlevel import NotificationOptions
# 导入 stdio_server 用于标准输入输出通信
from mcp.server.stdio import stdio_server

# 定义一个模拟数据库类
class MockDatabase:
    # 类方法:模拟数据库连接,返回数据库实例
    @classmethod
    async def connect(cls) -> "MockDatabase":
        return cls()

    # 实例方法:模拟数据库断开连接
    async def disconnect(self) -> None:
        pass

    # 实例方法:模拟执行数据库查询,返回查询结果列表
    async def query(self, query_str: str) -> list[dict[str, str]]:
        return [{"id": "1", "name": "示例数据", "query": query_str}]

# 定义服务器生命周期管理的异步上下文管理器
@asynccontextmanager
async def server_lifespan(_server: Server) -> AsyncIterator[dict[str, Any]]:
    # 进行虚拟数据库连接
    db = await MockDatabase.connect()
    try:
        # 生成数据库对象放入后续请求上下文
        yield {"db": db}
    finally:
        # 无论是否异常都断开数据库连接
        await db.disconnect()

# 创建 MCP 服务器实例,指定生命周期上下文管理器
server = Server("lowlevel-server", lifespan=server_lifespan)

# 注册工具列举处理器,返回工具列表
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
    # 返回一个包含 query_db 工具的工具列表
    return [
        types.Tool(
            # 工具名称
            name="query_db",
            # 工具描述
            description="查询数据库",
            # 输入 schema 定义
            inputSchema={
                "type": "object",
                "properties": {"query": {"type": "string", "description": "要执行的 SQL 查询"}},
                "required": ["query"],
            },
            # 输出 schema 定义
            outputSchema={
                "type": "object",
                "properties": {
                    "results": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "properties": {
                                "id": {"type": "string"},
                                "name": {"type": "string"},
                                "query": {"type": "string"}
                            },
                        },
                    },
                    "count": {"type": "integer"},
                },
            },
        )
    ]

# 注册工具调用处理器,执行工具对应的功能
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any]:
    # 如果工具名称不是 query_db,则抛出异常
    if name != "query_db":
        raise ValueError(f"未知工具:{name}")
    # 获取当前请求上下文
    ctx = server.request_context
    # 从生命周期上下文中获取数据库对象
    db = ctx.lifespan_context["db"]
    # 执行数据库查询方法
    results = await db.query(arguments["query"])
    # 返回查询结果和条数
    return {"results": results, "count": len(results)}

# 注册资源列举处理器,返回全部资源列表
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
    # 返回一个资源对象列表
    return [
        types.Resource(
            # 资源 URI
            uri="data://status",
            # 资源名称
            name="系统状态",
            # 资源描述
            description="获取系统运行状态",
            # 资源 MIME 类型
            mimeType="application/json",
        )
    ]

# 注册资源读取处理器,根据资源 URI 返回内容
@server.read_resource()
async def handle_read_resource(uri: str) -> str:
    # 如果资源 URI 等于 data://status
    if str(uri) == "data://status":
        # 返回系统状态的 JSON 字符串
        return json.dumps({"status": "running", "uptime": "1h 30m", "version": "1.0.0"}, indent=2)
    # 如果未知资源则抛出异常
    raise ValueError(f"未知资源:{uri}")

# 服务器运行主异步函数
async def run() -> None:
    # 使用 stdio_server 创建通信通道
    async with stdio_server() as (read_stream, write_stream):
        # 启动服务器,传入读写流和初始化参数
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options(
                notification_options=NotificationOptions(),
                experimental_capabilities={},
            ),
        )

# 程序主入口
if __name__ == "__main__":
    # 运行异步主函数
    asyncio.run(run())

4. 执行过程 #

4.1 整体执行流程 #

4.1.1. 启动阶段 #

用户执行 uv run lowlevel_client.py
        │
        ▼
main() 被 asyncio.run() 调用
        │
        ▼
stdio_client(server_params) 启动子进程
        │
        ├── 创建子进程:python lowlevel_server.py
        ├── 子进程 stdin  ← 客户端 write
        └── 子进程 stdout → 客户端 read
        │
        ▼
服务端 main() 被 anyio.run() 调用
        │
        ▼
stdio_server() 创建 stdin/stdout 读写流
        │
        ▼
server.run() 进入消息循环,等待客户端请求

4.1.2. 客户端主流程 #

ClientSession(read, write)
        │
        ▼
session.initialize()  ─── MCP 握手:initialize 请求/响应
        │
        ▼
session.list_tools()   ─── tools/list 请求 → 返回 ListToolsResult
        │
        ▼
session.list_resources() ─── resources/list 请求 → 返回 ListResourcesResult
        │
        ▼
session.call_tool("query_db", {...}) ─── tools/call 请求 → 返回 CallToolResult
        │
        ▼
session.read_resource("data://status") ─── resources/read 请求 → 返回 ReadResourceResult

4.2 时序图 #

4.2.1. 连接与初始化 #

sequenceDiagram participant User as 用户 participant Client as lowlevel_client participant Stdio as stdio_client participant ServerProc as 服务端进程 participant Server as lowlevel_server participant MCP as MCP Server User->>Client: uv run lowlevel_client.py Client->>Stdio: stdio_client(server_params) Stdio->>ServerProc: 启动子进程 python lowlevel_server.py ServerProc->>Server: anyio.run(run) Server->>MCP: stdio_server() + server.run() Server->>Server: 进入 lifespan 上下文 Server->>Server: MockDatabase.connect() Server->>Server: yield {"db": db} Client->>Client: ClientSession(read, write) Client->>Client: session.initialize() Note over Client,MCP: 发送 initialize 请求 Client->>MCP: InitializeRequest MCP->>Client: InitializeResult (capabilities) Client->>Client: 会话就绪

4.2.2. list_tools 流程 #

sequenceDiagram participant Client as ClientSession participant Stream as stdio 流 participant Server as lowlevel_server participant Handler as handle_list_tools Client->>Client: session.list_tools() Client->>Stream: 发送 JSON-RPC: tools/list Stream->>Server: 读取 ListToolsRequest Server->>Server: 匹配 request_handlers[ListToolsRequest] Server->>Handler: handle_list_tools() Handler->>Handler: return [Tool(query_db, ...)] Server->>Stream: 发送 ListToolsResult Stream->>Client: 解析 ListToolsResult Client->>Client: 返回 tools 对象

4.2.3. call_tool 流程 #

sequenceDiagram participant Client as ClientSession participant Stream as stdio 流 participant Server as lowlevel_server participant Handler as handle_call_tool participant DB as MockDatabase Client->>Client: session.call_tool("query_db", {"query": "SELECT * FROM users"}) Client->>Stream: 发送 CallToolRequest Stream->>Server: 读取 CallToolRequest Server->>Server: 匹配 request_handlers[CallToolRequest] Server->>Handler: handle_call_tool(name, arguments) Handler->>Handler: ctx = server.request_context Handler->>Handler: db = ctx.lifespan_context["db"] Handler->>DB: db.query(arguments["query"]) DB->>Handler: [{"id":"1","name":"示例数据",...}] Handler->>Handler: return {"results": ..., "count": 1} Server->>Server: 包装为 CallToolResult Server->>Stream: 发送 CallToolResult Stream->>Client: 解析 CallToolResult Client->>Client: 返回 tool_result

4.2.4. read_resource 流程 #

sequenceDiagram participant Client as ClientSession participant Stream as stdio 流 participant Server as lowlevel_server participant Handler as handle_read_resource Client->>Client: session.read_resource("data://status") Client->>Stream: 发送 ReadResourceRequest Stream->>Server: 读取 ReadResourceRequest Server->>Server: 匹配 request_handlers[ReadResourceRequest] Server->>Handler: handle_read_resource(uri) Handler->>Handler: json.dumps({"status":"running",...}) Handler->>Server: return JSON 字符串 Server->>Server: 包装为 ReadResourceResult Server->>Stream: 发送 ReadResourceResult Stream->>Client: 解析 ReadResourceResult Client->>Client: 返回 resource_result

4.3 请求上下文与生命周期 #

flowchart TB subgraph 服务端启动 A[server.run] --> B[进入 lifespan] B --> C[MockDatabase.connect] C --> D["yield {'db': db}"] D --> E[进入消息循环] end subgraph 每次请求 E --> F[收到 ClientRequest] F --> G[设置 request_ctx] G --> H["ctx.lifespan_context = {'db': db}"] H --> I[调用 handler] I --> J[handler 通过 server.request_context 取 db] J --> K[返回响应] K --> L[重置 request_ctx] L --> E end subgraph 关闭 M[客户端断开] --> N[stdin 关闭] N --> O[退出 lifespan finally] O --> P[db.disconnect] end

4.4 总结 #

组件 职责
MockDatabase 模拟数据库,在 lifespan 中创建和销毁
server_lifespan 管理 db 生命周期,通过 yield {"db": db} 注入到请求上下文
Server 注册工具、资源处理器,通过 request_context 暴露当前请求和 lifespan
handle_call_tool 通过 ctx.lifespan_context["db"] 拿到 db
stdio_client 启动服务端子进程,建立 stdin/stdout 管道
ClientSession 封装 JSON-RPC 请求/响应,提供 list_tools、call_tool 等 API
通信 每行 JSON 一条 JSON-RPC 消息,stdin 写入、stdout 读取
← 上一节 37.采样 下一节 39.任务 →

访问验证

请输入访问令牌

Token不正确,请重新输入