1. 低层 Server #
本章目标:
- 理解低层 Server 与 FastMCP 的区别:直接控制协议端点与生命周期
- 使用
mcp.server.lowlevel.Server注册请求处理器与通知处理器 - 掌握生命周期(lifespan)管理与会话上下文访问
2. 服务器:低层 Server 与生命周期管理 #
新建 C:\mcp-project\lowlevel_server.py:
# 导入异步IO模块
import asyncio
# 从collections.abc导入异步迭代器类型
from collections.abc import AsyncIterator
# 从contextlib导入异步上下文管理器装饰器
from contextlib import asynccontextmanager
# 从typing导入任意类型
from typing import Any
# 导入mcp的stdio传输支持
import mcp.server.stdio
# 导入json模块用于序列化
import json
# 导入MCP类型定义
import mcp.types as types
# 从mcp.server.lowlevel导入通知选项和Server类
from mcp.server.lowlevel import (
NotificationOptions,
Server,
)
# 从mcp.server.models导入初始化选项
from mcp.server.models import InitializationOptions
# 1) 定义模拟数据库类(用于演示生命周期管理)
class MockDatabase:
# 类方法:异步连接数据库,返回数据库实例
@classmethod
async def connect(cls) -> "MockDatabase":
# 打印连接信息
print("数据库已连接")
# 返回数据库实例
return cls()
# 异步方法:断开数据库连接
async def disconnect(self) -> None:
# 打印断开信息
print("数据库已断开")
# 异步方法:执行数据库查询,返回模拟结果
async def query(self, query_str: str) -> list[dict[str, str]]:
# 返回模拟查询结果
return [{"id": "1", "name": "示例数据", "query": query_str}]
# 2) 定义生命周期管理函数,负责资源的初始化与清理
@asynccontextmanager
async def server_lifespan(_server: Server) -> AsyncIterator[dict[str, Any]]:
# 启动时初始化数据库资源
db = await MockDatabase.connect()
try:
# 将数据库资源传递给请求处理器
yield {"db": db}
finally:
# 关闭时清理数据库资源
await db.disconnect()
# 3) 创建低层Server实例,传入生命周期管理函数
server = Server("lowlevel-server", lifespan=server_lifespan)
# 4) 注册工具列表处理器,返回可用工具列表
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
# 返回可用工具列表
return [
types.Tool(
name="query_db", # 工具名称
description="查询数据库", # 工具描述
inputSchema={ # 输入参数Schema
"type": "object",
"properties": {
"query": {"type": "string", "description": "要执行的 SQL 查询"}
},
"required": ["query"],
},
outputSchema={ # 输出结果Schema(结构化输出)
"type": "object",
"properties": {
"results": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"query": {"type": "string"},
},
},
},
"count": {"type": "integer"},
},
},
)
]
# 5) 注册工具调用处理器,处理工具调用请求
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any]:
# 判断工具名称是否为query_db
if name != "query_db":
# 抛出未知工具异常
raise ValueError(f"未知工具:{name}")
# 获取生命周期上下文(数据库连接)
ctx = server.request_context
db = ctx.lifespan_context["db"]
# 获取查询字符串
query_str = arguments["query"]
# 执行数据库查询
results = await db.query(query_str)
# 返回结构化结果(低层Server会自动验证outputSchema)
return {"results": results, "count": len(results)}
# 6) 注册资源列表处理器,返回可用资源列表
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
# 返回可用资源列表
return [
types.Resource(
uri="data://status", # 资源URI
name="系统状态", # 资源名称
description="获取系统运行状态", # 资源描述
mimeType="application/json", # MIME类型
)
]
# 7) 注册资源读取处理器,处理资源读取请求
@server.read_resource()
async def handle_read_resource(uri: str) -> list[types.TextContent]:
# 判断资源URI是否为data://status
if str(uri) == "data://status":
# 构造状态数据
status_data = {"status": "running", "uptime": "1h 30m", "version": "1.0.0"}
# 返回格式化后的JSON字符串
return json.dumps(status_data, indent=2)
else:
# 抛出未知资源异常
raise ValueError(f"未知资源:{uri}")
# 8) 注册进度通知处理器,处理进度通知
@server.progress_notification()
async def handle_progress(
progress_token: str | int, progress: float, total: float | None, message: str | None
) -> None:
# 打印进度通知信息
print(f"进度通知:{progress_token} - {progress}/{total} - {message}")
# 9) 主运行函数,启动服务器
async def run():
# 使用stdio传输方式启动服务器
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="lowlevel-server", # 服务器名称
server_version="0.1.0", # 服务器版本
capabilities=server.get_capabilities( # 获取服务器能力
notification_options=NotificationOptions(), # 通知选项
experimental_capabilities={}, # 实验性能力
),
),
)
# 主程序入口
if __name__ == "__main__":
# 运行主异步函数
asyncio.run(run())
要点:
- 使用 `@server.list_tools()` 等装饰器直接注册协议端点处理器。
- 通过
server_lifespan管理资源生命周期。 - 在请求处理器中通过
server.request_context访问生命周期上下文。 - 定义
outputSchema支持结构化输出。
3. 客户端:连接低层 Server 并测试功能 #
新建 C:\mcp-project\test_client_lowlevel.py:
# 导入异步IO模块
import asyncio
# 导入操作系统相关模块
import os
# 从mcp包导入客户端会话、stdio服务器参数和类型定义
from mcp import ClientSession, StdioServerParameters, types
# 导入stdio客户端适配器
from mcp.client.stdio import stdio_client
# 导入 AnyUrl 类型用于资源 URI
from pydantic import AnyUrl
# 定义异步日志回调函数,当服务器发送日志消息时会被调用
async def on_logging(params: types.LoggingMessageNotificationParams) -> None:
# 获取日志级别,若不存在则默认为"info"
level = getattr(params, "level", "info")
# 获取日志数据内容,若不存在则为None
data = getattr(params, "data", None)
# 获取日志记录器名称,若不存在则为None
logger = getattr(params, "logger", None)
# 打印日志信息,若无数据则显示<no-data>
print(
f"[LOG][{level}]",
str(data) if data is not None else "<no-data>",
f"(logger={logger})",
)
# 定义主异步函数
async def main() -> None:
# 计算当前文件的绝对路径所在目录
base_dir = os.path.dirname(os.path.abspath(__file__))
# 拼接得到 lowlevel_server.py 的绝对路径
server_path = os.path.join(base_dir, "lowlevel_server.py")
# 配置以 stdio 方式启动服务器
server_params = StdioServerParameters(
command="python", # 使用 python 运行
args=[server_path], # 传入服务器脚本路径
env={}, # 环境变量为空
)
# 建立 stdio 连接并创建客户端会话
async with stdio_client(server_params) as (read, write):
# 基于读写流创建 MCP 客户端会话
async with ClientSession(read, write, logging_callback=on_logging) as session:
# 初始化 MCP 会话
await session.initialize()
# 列出所有可用工具
tools = await session.list_tools()
print("[Tools]", [t.name for t in tools.tools])
# 列出所有可用资源
resources = await session.list_resources()
print("[Resources]", [r.uri for r in resources.resources])
# 测试工具调用(结构化输出)
print("\n=== 测试工具调用 ===")
tool_result = await session.call_tool(
"query_db", {"query": "SELECT * FROM users"}
)
# 获取结构化输出内容
structured = getattr(tool_result, "structuredContent", None)
if structured:
# 如果有结构化输出,直接打印
print("[Structured Output]", structured)
else:
# 否则解析文本内容
texts = []
for block in tool_result.content:
if isinstance(block, types.TextContent):
texts.append(block.text)
print("[Text Content]", " | ".join(texts))
# 测试资源读取
print("\n=== 测试资源读取 ===")
# 读取指定资源内容
resource_result = await session.read_resource(AnyUrl("data://status"))
print(resource_result)
# 提取文本内容
texts = []
for block in resource_result.contents:
if isinstance(block, types.TextResourceContents):
texts.append(block.text)
print("[Resource Content]", " | ".join(texts))
# 判断是否为主程序入口
if __name__ == "__main__":
# 运行主异步函数
asyncio.run(main())
说明:
- 客户端连接方式与 FastMCP 相同,但服务器行为由低层处理器控制。
- 结构化输出会出现在
structuredContent字段中。
4. 运行与验证 #
cd C:\mcp-project
python test_client_lowlevel.py预期输出:
[Tools] ['query_db']
[Resources] [AnyUrl('data://status')]
=== 测试工具调用 ===
[Structured Output] {'results': [{'id': '1', 'name': '示例数据', 'query': 'SELECT * FROM users'}], 'count': 1}
=== 测试资源读取 ===
meta=None contents=[TextResourceContents(uri=AnyUrl('data://status'), mimeType='text/plain', meta=None, text='{\n "status": "running",\n "uptime": "1h 30m",\n "version": "1.0.0"\n}')]
[Resource Content] {
"status": "running",
"uptime": "1h 30m",
"version": "1.0.0"
}