ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1. 低层 Server
  • 2. 服务器:低层 Server 与生命周期管理
  • 3. 客户端:连接低层 Server 并测试功能
  • 4. 运行与验证

1. 低层 Server #

本章目标:

  • 理解低层 Server 与 FastMCP 的区别:直接控制协议端点与生命周期
  • 使用 mcp.server.lowlevel.Server 注册请求处理器与通知处理器
  • 掌握生命周期(lifespan)管理与会话上下文访问

2. 服务器:低层 Server 与生命周期管理 #

新建 C:\mcp-project\lowlevel_server.py:

# 导入异步IO模块
import asyncio

# 从collections.abc导入异步迭代器类型
from collections.abc import AsyncIterator

# 从contextlib导入异步上下文管理器装饰器
from contextlib import asynccontextmanager

# 从typing导入任意类型
from typing import Any

# 导入mcp的stdio传输支持
import mcp.server.stdio

# 导入json模块用于序列化
import json

# 导入MCP类型定义
import mcp.types as types

# 从mcp.server.lowlevel导入通知选项和Server类
from mcp.server.lowlevel import (
    NotificationOptions,
    Server,
)

# 从mcp.server.models导入初始化选项
from mcp.server.models import InitializationOptions


# 1) 定义模拟数据库类(用于演示生命周期管理)
class MockDatabase:
    # 类方法:异步连接数据库,返回数据库实例
    @classmethod
    async def connect(cls) -> "MockDatabase":
        # 打印连接信息
        print("数据库已连接")
        # 返回数据库实例
        return cls()

    # 异步方法:断开数据库连接
    async def disconnect(self) -> None:
        # 打印断开信息
        print("数据库已断开")

    # 异步方法:执行数据库查询,返回模拟结果
    async def query(self, query_str: str) -> list[dict[str, str]]:
        # 返回模拟查询结果
        return [{"id": "1", "name": "示例数据", "query": query_str}]


# 2) 定义生命周期管理函数,负责资源的初始化与清理
@asynccontextmanager
async def server_lifespan(_server: Server) -> AsyncIterator[dict[str, Any]]:
    # 启动时初始化数据库资源
    db = await MockDatabase.connect()
    try:
        # 将数据库资源传递给请求处理器
        yield {"db": db}
    finally:
        # 关闭时清理数据库资源
        await db.disconnect()


# 3) 创建低层Server实例,传入生命周期管理函数
server = Server("lowlevel-server", lifespan=server_lifespan)


# 4) 注册工具列表处理器,返回可用工具列表
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
    # 返回可用工具列表
    return [
        types.Tool(
            name="query_db",  # 工具名称
            description="查询数据库",  # 工具描述
            inputSchema={  # 输入参数Schema
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "要执行的 SQL 查询"}
                },
                "required": ["query"],
            },
            outputSchema={  # 输出结果Schema(结构化输出)
                "type": "object",
                "properties": {
                    "results": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "properties": {
                                "id": {"type": "string"},
                                "name": {"type": "string"},
                                "query": {"type": "string"},
                            },
                        },
                    },
                    "count": {"type": "integer"},
                },
            },
        )
    ]


# 5) 注册工具调用处理器,处理工具调用请求
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any]:
    # 判断工具名称是否为query_db
    if name != "query_db":
        # 抛出未知工具异常
        raise ValueError(f"未知工具:{name}")

    # 获取生命周期上下文(数据库连接)
    ctx = server.request_context
    db = ctx.lifespan_context["db"]

    # 获取查询字符串
    query_str = arguments["query"]
    # 执行数据库查询
    results = await db.query(query_str)

    # 返回结构化结果(低层Server会自动验证outputSchema)
    return {"results": results, "count": len(results)}


# 6) 注册资源列表处理器,返回可用资源列表
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
    # 返回可用资源列表
    return [
        types.Resource(
            uri="data://status",  # 资源URI
            name="系统状态",  # 资源名称
            description="获取系统运行状态",  # 资源描述
            mimeType="application/json",  # MIME类型
        )
    ]


# 7) 注册资源读取处理器,处理资源读取请求
@server.read_resource()
async def handle_read_resource(uri: str) -> list[types.TextContent]:
    # 判断资源URI是否为data://status
    if str(uri) == "data://status":
        # 构造状态数据
        status_data = {"status": "running", "uptime": "1h 30m", "version": "1.0.0"}
        # 返回格式化后的JSON字符串
        return json.dumps(status_data, indent=2)
    else:
        # 抛出未知资源异常
        raise ValueError(f"未知资源:{uri}")


# 8) 注册进度通知处理器,处理进度通知
@server.progress_notification()
async def handle_progress(
    progress_token: str | int, progress: float, total: float | None, message: str | None
) -> None:
    # 打印进度通知信息
    print(f"进度通知:{progress_token} - {progress}/{total} - {message}")


# 9) 主运行函数,启动服务器
async def run():
    # 使用stdio传输方式启动服务器
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="lowlevel-server",  # 服务器名称
                server_version="0.1.0",  # 服务器版本
                capabilities=server.get_capabilities(  # 获取服务器能力
                    notification_options=NotificationOptions(),  # 通知选项
                    experimental_capabilities={},  # 实验性能力
                ),
            ),
        )


# 主程序入口
if __name__ == "__main__":
    # 运行主异步函数
    asyncio.run(run())

要点:

  • 使用 `@server.list_tools()` 等装饰器直接注册协议端点处理器。
  • 通过 server_lifespan 管理资源生命周期。
  • 在请求处理器中通过 server.request_context 访问生命周期上下文。
  • 定义 outputSchema 支持结构化输出。

3. 客户端:连接低层 Server 并测试功能 #

新建 C:\mcp-project\test_client_lowlevel.py:

# 导入异步IO模块
import asyncio

# 导入操作系统相关模块
import os

# 从mcp包导入客户端会话、stdio服务器参数和类型定义
from mcp import ClientSession, StdioServerParameters, types

# 导入stdio客户端适配器
from mcp.client.stdio import stdio_client

# 导入 AnyUrl 类型用于资源 URI
from pydantic import AnyUrl


# 定义异步日志回调函数,当服务器发送日志消息时会被调用
async def on_logging(params: types.LoggingMessageNotificationParams) -> None:
    # 获取日志级别,若不存在则默认为"info"
    level = getattr(params, "level", "info")
    # 获取日志数据内容,若不存在则为None
    data = getattr(params, "data", None)
    # 获取日志记录器名称,若不存在则为None
    logger = getattr(params, "logger", None)
    # 打印日志信息,若无数据则显示<no-data>
    print(
        f"[LOG][{level}]",
        str(data) if data is not None else "<no-data>",
        f"(logger={logger})",
    )


# 定义主异步函数
async def main() -> None:
    # 计算当前文件的绝对路径所在目录
    base_dir = os.path.dirname(os.path.abspath(__file__))
    # 拼接得到 lowlevel_server.py 的绝对路径
    server_path = os.path.join(base_dir, "lowlevel_server.py")

    # 配置以 stdio 方式启动服务器
    server_params = StdioServerParameters(
        command="python",  # 使用 python 运行
        args=[server_path],  # 传入服务器脚本路径
        env={},  # 环境变量为空
    )

    # 建立 stdio 连接并创建客户端会话
    async with stdio_client(server_params) as (read, write):
        # 基于读写流创建 MCP 客户端会话
        async with ClientSession(read, write, logging_callback=on_logging) as session:
            # 初始化 MCP 会话
            await session.initialize()

            # 列出所有可用工具
            tools = await session.list_tools()
            print("[Tools]", [t.name for t in tools.tools])

            # 列出所有可用资源
            resources = await session.list_resources()
            print("[Resources]", [r.uri for r in resources.resources])

            # 测试工具调用(结构化输出)
            print("\n=== 测试工具调用 ===")
            tool_result = await session.call_tool(
                "query_db", {"query": "SELECT * FROM users"}
            )

            # 获取结构化输出内容
            structured = getattr(tool_result, "structuredContent", None)
            if structured:
                # 如果有结构化输出,直接打印
                print("[Structured Output]", structured)
            else:
                # 否则解析文本内容
                texts = []
                for block in tool_result.content:
                    if isinstance(block, types.TextContent):
                        texts.append(block.text)
                print("[Text Content]", " | ".join(texts))

            # 测试资源读取
            print("\n=== 测试资源读取 ===")

            # 读取指定资源内容
            resource_result = await session.read_resource(AnyUrl("data://status"))
            print(resource_result)
            # 提取文本内容
            texts = []
            for block in resource_result.contents:
                if isinstance(block, types.TextResourceContents):
                    texts.append(block.text)
            print("[Resource Content]", " | ".join(texts))


# 判断是否为主程序入口
if __name__ == "__main__":
    # 运行主异步函数
    asyncio.run(main())

说明:

  • 客户端连接方式与 FastMCP 相同,但服务器行为由低层处理器控制。
  • 结构化输出会出现在 structuredContent 字段中。

4. 运行与验证 #

cd C:\mcp-project
python test_client_lowlevel.py

预期输出:

[Tools] ['query_db']
[Resources] [AnyUrl('data://status')]

=== 测试工具调用 ===
[Structured Output] {'results': [{'id': '1', 'name': '示例数据', 'query': 'SELECT * FROM users'}], 'count': 1}

=== 测试资源读取 ===
meta=None contents=[TextResourceContents(uri=AnyUrl('data://status'), mimeType='text/plain', meta=None, text='{\n  "status": "running",\n  "uptime": "1h 30m",\n  "version": "1.0.0"\n}')]
[Resource Content] {
  "status": "running",
  "uptime": "1h 30m",
  "version": "1.0.0"
}

访问验证

请输入访问令牌

Token不正确,请重新输入