1. LowLevel #
MCP 低层 server(lowlevel_server.py)的实现,展示了如何用最基础的方式注册工具、资源、以及处理对应的调用。
主要特性说明:
工具注册与调用
- 通过 `@server.list_tools()` 装饰器注册了工具的列举函数,返回工具的定义,包括名称、输入输出 schema 等。
- 通过 `@server.call_tool()
注册了工具调用处理函数,实现了对注册工具(如query_db`)的参数校验与 SQL 查询执行,并返回结构化的查询结果。
资源注册与访问
- 通过 `@server.list_resources()` 提供了已知资源列表,例如系统状态等。
- 通过 `@server.read_resource()` 提供了资源内容的读取,实现指定 URI 的内容获取和格式化(如 JSON)。
主运行逻辑
- 启动时通过标准输入输出(stdio)与客户端创建通信通道,实现了兼容 MCP 标准协议的服务器端主循环。
- 支持详情见异步主函数
run(),实践中适用于脚本或 CLI 启动方式。
该 server 示例适合作为理解工具/资源/协议基本模型与实现机制的参考样例,可以灵活扩展更多工具与资源场景。配合 lowlevel_client.py 客户端脚本,可便捷演示完整的调用流程与数据交互。
2. lowlevel_client.py #
lowlevel_client.py
# MCP 低层客户端:连接 lowlevel_server,演示工具、资源。
"""MCP 低层客户端:连接 lowlevel_server,演示工具、资源。"""
# 导入异步io相关模块
import asyncio
# 导入操作系统相关模块
import os
# 导入系统模块
import sys
# 从mcp包中导入相关类型和工具
from mcp import ClientSession, StdioServerParameters, types
# 从mcp.client.stdio导入stdio_client函数
from mcp.client.stdio import stdio_client
# 从pydantic导入AnyUrl类型
from pydantic import AnyUrl
# 获取当前文件所在目录
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 构建虚拟环境下python可执行文件路径
VENV_PYTHON = os.path.join(
BASE_DIR, ".venv", "Scripts" if os.name == "nt" else "bin", "python.exe" if os.name == "nt" else "python"
)
# 打印工具调用结果的辅助函数
def _print_tool_result(result) -> None:
# 尝试获取结构化内容
if structured := getattr(result, "structuredContent", None):
# 如果有结构化内容,则打印
print("[Structured Output]", structured)
else:
# 否则拼接文本内容列表
texts = [b.text for b in result.content if isinstance(b, types.TextContent)]
print("[Text Content]", " | ".join(texts))
# 打印资源内容的辅助函数
def _print_resource_content(result) -> None:
# 提取属于TextResourceContents类型的内容
texts = [b.text for b in result.contents if isinstance(b, types.TextResourceContents)]
print("[Resource Content]", " | ".join(texts))
# 主异步函数,负责客户端主流程
async def main() -> None:
# 创建与lowlevel_server的服务器参数
server_params = StdioServerParameters(
command=VENV_PYTHON,
args=[os.path.join(BASE_DIR, "lowlevel_server.py")],
env={},
cwd=BASE_DIR,
)
# 与服务端建立stdio客户端连接
async with stdio_client(server_params) as (read, write):
# 创建Session会话
async with ClientSession(read, write) as session:
# 初始化session
await session.initialize()
# 获取并打印支持的工具列表
tools = await session.list_tools()
print("[Tools]", [t.name for t in tools.tools])
# 获取并打印资源列表
resources = await session.list_resources()
print("[Resources]", [r.uri for r in resources.resources])
# 演示调用工具
print("\n=== 测试工具调用 ===")
tool_result = await session.call_tool("query_db", {"query": "SELECT * FROM users"})
_print_tool_result(tool_result)
# 演示读取资源
print("\n=== 测试资源读取 ===")
resource_result = await session.read_resource(AnyUrl("data://status"))
_print_resource_content(resource_result)
# 程序入口
if __name__ == "__main__":
# 检查标准输出输入是否支持reconfigure方法,并设置为UTF-8编码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
# 运行主异步函数
asyncio.run(main())
3. lowlevel_server.py #
lowlevel_server.py
# MCP 低层服务器:工具、资源、生命周期。
"""MCP 低层服务器:工具、资源、生命周期。"""
# 导入 asyncio 异步编程库
import asyncio
# 导入 json 用于序列化和反序列化
import json
# 导入异步迭代器类型
from collections.abc import AsyncIterator
# 导入异步上下文管理器装饰器
from contextlib import asynccontextmanager
# 导入 Any 类型,表示任意类型
from typing import Any
# 导入 MCP 的类型定义模块
import mcp.types as types
# 导入 MCP 服务器基类
from mcp.server import Server
# 导入底层通知选项
from mcp.server.lowlevel import NotificationOptions
# 导入 stdio_server 用于标准输入输出通信
from mcp.server.stdio import stdio_server
# 定义一个模拟数据库类
class MockDatabase:
# 类方法:模拟数据库连接,返回数据库实例
@classmethod
async def connect(cls) -> "MockDatabase":
return cls()
# 实例方法:模拟数据库断开连接
async def disconnect(self) -> None:
pass
# 实例方法:模拟执行数据库查询,返回查询结果列表
async def query(self, query_str: str) -> list[dict[str, str]]:
return [{"id": "1", "name": "示例数据", "query": query_str}]
# 定义服务器生命周期管理的异步上下文管理器
@asynccontextmanager
async def server_lifespan(_server: Server) -> AsyncIterator[dict[str, Any]]:
# 进行虚拟数据库连接
db = await MockDatabase.connect()
try:
# 生成数据库对象放入后续请求上下文
yield {"db": db}
finally:
# 无论是否异常都断开数据库连接
await db.disconnect()
# 创建 MCP 服务器实例,指定生命周期上下文管理器
server = Server("lowlevel-server", lifespan=server_lifespan)
# 注册工具列举处理器,返回工具列表
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
# 返回一个包含 query_db 工具的工具列表
return [
types.Tool(
# 工具名称
name="query_db",
# 工具描述
description="查询数据库",
# 输入 schema 定义
inputSchema={
"type": "object",
"properties": {"query": {"type": "string", "description": "要执行的 SQL 查询"}},
"required": ["query"],
},
# 输出 schema 定义
outputSchema={
"type": "object",
"properties": {
"results": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"query": {"type": "string"}
},
},
},
"count": {"type": "integer"},
},
},
)
]
# 注册工具调用处理器,执行工具对应的功能
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any]:
# 如果工具名称不是 query_db,则抛出异常
if name != "query_db":
raise ValueError(f"未知工具:{name}")
# 获取当前请求上下文
ctx = server.request_context
# 从生命周期上下文中获取数据库对象
db = ctx.lifespan_context["db"]
# 执行数据库查询方法
results = await db.query(arguments["query"])
# 返回查询结果和条数
return {"results": results, "count": len(results)}
# 注册资源列举处理器,返回全部资源列表
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
# 返回一个资源对象列表
return [
types.Resource(
# 资源 URI
uri="data://status",
# 资源名称
name="系统状态",
# 资源描述
description="获取系统运行状态",
# 资源 MIME 类型
mimeType="application/json",
)
]
# 注册资源读取处理器,根据资源 URI 返回内容
@server.read_resource()
async def handle_read_resource(uri: str) -> str:
# 如果资源 URI 等于 data://status
if str(uri) == "data://status":
# 返回系统状态的 JSON 字符串
return json.dumps({"status": "running", "uptime": "1h 30m", "version": "1.0.0"}, indent=2)
# 如果未知资源则抛出异常
raise ValueError(f"未知资源:{uri}")
# 服务器运行主异步函数
async def run() -> None:
# 使用 stdio_server 创建通信通道
async with stdio_server() as (read_stream, write_stream):
# 启动服务器,传入读写流和初始化参数
await server.run(
read_stream,
write_stream,
server.create_initialization_options(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
)
# 程序主入口
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(run())
4. 执行过程 #
4.1 整体执行流程 #
4.1.1. 启动阶段 #
用户执行 uv run lowlevel_client.py
│
▼
main() 被 asyncio.run() 调用
│
▼
stdio_client(server_params) 启动子进程
│
├── 创建子进程:python lowlevel_server.py
├── 子进程 stdin ← 客户端 write
└── 子进程 stdout → 客户端 read
│
▼
服务端 main() 被 anyio.run() 调用
│
▼
stdio_server() 创建 stdin/stdout 读写流
│
▼
server.run() 进入消息循环,等待客户端请求4.1.2. 客户端主流程 #
ClientSession(read, write)
│
▼
session.initialize() ─── MCP 握手:initialize 请求/响应
│
▼
session.list_tools() ─── tools/list 请求 → 返回 ListToolsResult
│
▼
session.list_resources() ─── resources/list 请求 → 返回 ListResourcesResult
│
▼
session.call_tool("query_db", {...}) ─── tools/call 请求 → 返回 CallToolResult
│
▼
session.read_resource("data://status") ─── resources/read 请求 → 返回 ReadResourceResult4.2 时序图 #
4.2.1. 连接与初始化 #
sequenceDiagram
participant User as 用户
participant Client as lowlevel_client
participant Stdio as stdio_client
participant ServerProc as 服务端进程
participant Server as lowlevel_server
participant MCP as MCP Server
User->>Client: uv run lowlevel_client.py
Client->>Stdio: stdio_client(server_params)
Stdio->>ServerProc: 启动子进程 python lowlevel_server.py
ServerProc->>Server: anyio.run(run)
Server->>MCP: stdio_server() + server.run()
Server->>Server: 进入 lifespan 上下文
Server->>Server: MockDatabase.connect()
Server->>Server: yield {"db": db}
Client->>Client: ClientSession(read, write)
Client->>Client: session.initialize()
Note over Client,MCP: 发送 initialize 请求
Client->>MCP: InitializeRequest
MCP->>Client: InitializeResult (capabilities)
Client->>Client: 会话就绪
4.2.2. list_tools 流程 #
sequenceDiagram
participant Client as ClientSession
participant Stream as stdio 流
participant Server as lowlevel_server
participant Handler as handle_list_tools
Client->>Client: session.list_tools()
Client->>Stream: 发送 JSON-RPC: tools/list
Stream->>Server: 读取 ListToolsRequest
Server->>Server: 匹配 request_handlers[ListToolsRequest]
Server->>Handler: handle_list_tools()
Handler->>Handler: return [Tool(query_db, ...)]
Server->>Stream: 发送 ListToolsResult
Stream->>Client: 解析 ListToolsResult
Client->>Client: 返回 tools 对象
4.2.3. call_tool 流程 #
sequenceDiagram
participant Client as ClientSession
participant Stream as stdio 流
participant Server as lowlevel_server
participant Handler as handle_call_tool
participant DB as MockDatabase
Client->>Client: session.call_tool("query_db", {"query": "SELECT * FROM users"})
Client->>Stream: 发送 CallToolRequest
Stream->>Server: 读取 CallToolRequest
Server->>Server: 匹配 request_handlers[CallToolRequest]
Server->>Handler: handle_call_tool(name, arguments)
Handler->>Handler: ctx = server.request_context
Handler->>Handler: db = ctx.lifespan_context["db"]
Handler->>DB: db.query(arguments["query"])
DB->>Handler: [{"id":"1","name":"示例数据",...}]
Handler->>Handler: return {"results": ..., "count": 1}
Server->>Server: 包装为 CallToolResult
Server->>Stream: 发送 CallToolResult
Stream->>Client: 解析 CallToolResult
Client->>Client: 返回 tool_result
4.2.4. read_resource 流程 #
sequenceDiagram
participant Client as ClientSession
participant Stream as stdio 流
participant Server as lowlevel_server
participant Handler as handle_read_resource
Client->>Client: session.read_resource("data://status")
Client->>Stream: 发送 ReadResourceRequest
Stream->>Server: 读取 ReadResourceRequest
Server->>Server: 匹配 request_handlers[ReadResourceRequest]
Server->>Handler: handle_read_resource(uri)
Handler->>Handler: json.dumps({"status":"running",...})
Handler->>Server: return JSON 字符串
Server->>Server: 包装为 ReadResourceResult
Server->>Stream: 发送 ReadResourceResult
Stream->>Client: 解析 ReadResourceResult
Client->>Client: 返回 resource_result
4.3 请求上下文与生命周期 #
flowchart TB
subgraph 服务端启动
A[server.run] --> B[进入 lifespan]
B --> C[MockDatabase.connect]
C --> D["yield {'db': db}"]
D --> E[进入消息循环]
end
subgraph 每次请求
E --> F[收到 ClientRequest]
F --> G[设置 request_ctx]
G --> H["ctx.lifespan_context = {'db': db}"]
H --> I[调用 handler]
I --> J[handler 通过 server.request_context 取 db]
J --> K[返回响应]
K --> L[重置 request_ctx]
L --> E
end
subgraph 关闭
M[客户端断开] --> N[stdin 关闭]
N --> O[退出 lifespan finally]
O --> P[db.disconnect]
end
4.4 总结 #
| 组件 | 职责 |
|---|---|
| MockDatabase | 模拟数据库,在 lifespan 中创建和销毁 |
| server_lifespan | 管理 db 生命周期,通过 yield {"db": db} 注入到请求上下文 |
| Server | 注册工具、资源处理器,通过 request_context 暴露当前请求和 lifespan |
| handle_call_tool | 通过 ctx.lifespan_context["db"] 拿到 db |
| stdio_client | 启动服务端子进程,建立 stdin/stdout 管道 |
| ClientSession | 封装 JSON-RPC 请求/响应,提供 list_tools、call_tool 等 API |
| 通信 | 每行 JSON 一条 JSON-RPC 消息,stdin 写入、stdout 读取 |