1.上下文 #
本节(Context)主要用于讲解 FastMCP 协议体系下与“上下文”相关的结构和用法,包含了服务器端和客户端两部分内容:
服务器端(FastMCP/Context):
服务端部分定义了 FastMCP 的主类和 Context 类,负责处理各类 JSON-RPC 消息(如资源、Prompt、工具等请求)。其中 Context 类集成了底层的资源、Prompt 管理,并对资源、工具调用、Prompt 注册与获取等操作提供了统一接口。客户端(context_client.py):
客户端代码示例提供了基础的会话启动、标准输入输出交互、日志与进度通知处理逻辑。通过 ClientSession、StdioServerParameters 等,可以方便地与服务器进行通信、获取资源/Prompt、调用工具,并实时接收服务器端推送的进度与日志。协议数据结构和扩展点:
文件还包含了许多与上下文管理相关的数据结构定义,包括资源请求与响应、Prompt 结构、工具调用参数与结果等。这些类型遵循 MCP (ModelContextProtocol) 设计规范,并大多基于 Pydantic 进行结构校验,方便扩展和定制。
本章重点展示了在 FastMCP 架构下,如何通过“上下文”机制协作多轮会话、Prompt、工具与资源的注册、管理和调用,并通过标准化的数据结构与回调机制实现客户端与服务端的高效交互。这样开发者可以方便地实现丰富的 AI 服务编排与自动化上下文跟踪能力。
npx @modelcontextprotocol/inspector uv --directory D:/aprepare/mcp-starter run context_server.py2. context_client.py #
context_client.py
# 导入操作系统相关模块
import os
# 导入系统相关模块
import sys
# 从 mcp_lite 库中导入客户端会话、标准输入输出服务器参数、类型模块
from mcp_lite import ClientSession, StdioServerParameters, types
# 从 mcp_lite.client.stdio 模块导入 stdio_client
from mcp_lite.client.stdio import stdio_client
# 日志回调函数:当服务器发送日志消息时调用
def on_logging(params: types.LoggingMessageNotificationParams) -> None:
# 获取日志级别,默认为 info
level = getattr(params, "level", "info")
# 获取日志数据,默认无数据
data = getattr(params, "data", None)
# 获取日志记录器名
logger = getattr(params, "logger", None)
# 打印日志消息
print(f"[LOG][{level}]", str(data) if data is not None else "<no-data>", f"(logger={logger})")
# 通用消息处理器:用于处理服务器推送的消息(如进度通知等)
def on_message(message) -> None:
# 获取消息根对象(可能直接就是消息本身)
root = getattr(message, "root", message)
# 获取方法名
method = getattr(root, "method", None)
# 如果是进度通知
if method == "notifications/progress":
# 获取参数
params = getattr(root, "params", None)
# 如果参数存在
if params is not None:
# 获取进度数值
progress = getattr(params, "progress", None)
# 获取总进度数值
total = getattr(params, "total", None)
# 获取进度消息
msg = getattr(params, "message", None)
# 打印进度信息
print(f"[PROGRESS] {progress}/{total} - {msg}")
# 进度回调函数:显示进度信息,实际由 on_message 统一打印,此处直接输出,可留空
def on_progress(progress: float, total: float | None, message: str | None) -> None:
# 打印进度信息
print(f"[PROGRESS] {progress}/{total} - {message}")
# 主程序入口
def main() -> None:
# 获取当前脚本文件的绝对路径所在目录
base_dir = os.path.dirname(os.path.abspath(__file__))
# 拼接 context_server.py 的完整路径
server_path = os.path.join(base_dir, "context_server.py")
# 创建服务器参数对象,使用当前 Python 解释器启动 context_server.py
server_params = StdioServerParameters(
command=sys.executable,
args=[server_path],
env={},
)
# 通过 with 上下文启动 stdio_client(即与 python 服务器进程通讯)
with stdio_client(server_params) as (read, write):
# 使用读写流创建客户端会话
session = ClientSession(read, write)
# 向服务器发送初始化协议
session.initialize()
# 调用服务器上的工具 "long_running_task",并注册各种回调
result = session.call_tool(
"long_running_task",
{"task_name": "long_ask", "steps": 2},
progress_callback=on_progress,
message_handler=on_message,
logging_callback=on_logging,
)
# 用于收集文本内容块
texts = []
# 遍历任务结果返回的内容块
for block in result.content:
# 如果内容块是文本类型,则提取文本并加入列表
if isinstance(block, types.TextContent):
texts.append(block.text)
# 打印最终结果,多个文本用 | 分隔;如果没有文本块,则输出结构化内容
print("[RESULT]", " | ".join(texts) or str(getattr(result, "structuredContent", None)))
# 判断脚本是否作为主程序运行
if __name__ == "__main__":
# 若 sys.stdout 支持 reconfigure,则设置输出为 utf-8 编码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
# 调用主函数
main()
官方代码
# 导入操作系统相关模块
import os
# 导入系统相关模块
import sys
# 导入 asyncio 模块
import asyncio
# 从 mcp_lite 库中导入客户端会话、标准输入输出服务器参数、类型模块
from mcp import ClientSession, StdioServerParameters, types
# 从 mcp_lite.client.stdio 模块导入 stdio_client
from mcp.client.stdio import stdio_client
# 日志回调函数:当服务器发送日志消息时调用(需为 async,ClientSession 要求)
async def on_logging(params: types.LoggingMessageNotificationParams) -> None:
# 获取日志级别,默认为 info
level = getattr(params, "level", "info")
# 获取日志数据,默认无数据
data = getattr(params, "data", None)
# 获取日志记录器名
logger = getattr(params, "logger", None)
# 打印日志消息
print(f"[LOG][{level}]", str(data) if data is not None else "<no-data>", f"(logger={logger})")
# 通用消息处理器:用于处理服务器推送的消息(如进度通知等)(需为 async,ClientSession 要求)
async def on_message(message) -> None:
# 获取消息根对象(可能直接就是消息本身)
root = getattr(message, "root", message)
# 获取方法名
method = getattr(root, "method", None)
# 如果是进度通知
if method == "notifications/progress":
# 获取参数
params = getattr(root, "params", None)
# 如果参数存在
if params is not None:
# 获取进度数值
progress = getattr(params, "progress", None)
# 获取总进度数值
total = getattr(params, "total", None)
# 获取进度消息
msg = getattr(params, "message", None)
# 打印进度信息
print(f"[PROGRESS] {progress}/{total} - {msg}")
# 进度回调函数:显示进度信息(需为 async,call_tool 的 progress_callback 要求)
async def on_progress(progress: float, total: float | None, message: str | None) -> None:
# 打印进度信息
print(f"[PROGRESS] {progress}/{total} - {message}")
# 主程序入口
async def main() -> None:
# 获取当前脚本文件的绝对路径所在目录
base_dir = os.path.dirname(os.path.abspath(__file__))
# 拼接 context_server.py 的完整路径
server_path = os.path.join(base_dir, "context_server.py")
# 创建服务器参数对象,使用当前 Python 解释器启动 context_server.py
server_params = StdioServerParameters(
command=sys.executable,
args=[server_path],
env={},
)
# 通过 with 上下文启动 stdio_client(即与 python 服务器进程通讯)
async with stdio_client(server_params) as (read, write):
# 使用读写流创建客户端会话,logging_callback 和 message_handler 需在构造时传入
async with ClientSession(
read, write,
logging_callback=on_logging,
message_handler=on_message,
) as session:
# 向服务器发送初始化协议
await session.initialize()
# 调用服务器上的工具 "long_running_task",progress_callback 在 call_tool 中传入
result = await session.call_tool(
"long_running_task",
arguments={"task_name": "long_ask", "steps": 2},
progress_callback=on_progress,
)
# 用于收集文本内容块
texts = []
# 遍历任务结果返回的内容块
for block in result.content:
# 如果内容块是文本类型,则提取文本并加入列表
if isinstance(block, types.TextContent):
texts.append(block.text)
# 打印最终结果,多个文本用 | 分隔;如果没有文本块,则输出结构化内容
print("[RESULT]", " | ".join(texts) or str(getattr(result, "structuredContent", None)))
# 判断脚本是否作为主程序运行
if __name__ == "__main__":
# 若 sys.stdout 支持 reconfigure,则设置输出为 utf-8 编码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
# 调用主函数
asyncio.run(main())
3. context_server.py #
context_server.py
# 导入 sys 模块,用于访问标准输入输出流
import sys
# 从 mcp_lite.server.fastmcp 导入 FastMCP 和 Context 类型
from mcp_lite.server.fastmcp import FastMCP, Context
# 创建 FastMCP 服务器实例,指定 name,便于客户端识别
mcp = FastMCP(name="Context Server")
# 注册一个静态资源,URI 为 "data://message"
@mcp.resource("data://message")
# 定义静态资源的处理函数,返回字符串内容
def static_message() -> str:
"""示例静态资源:用于演示在工具中通过 Context 读取资源。"""
# 返回静态资源内容
return "这是一条来自 data://message 的资源内容。"
# 注册工具函数,工具名由装饰器自动生成
@mcp.tool()
# 定义异步工具函数,参数支持任务名称、执行步数和上下文对象
async def long_running_task(task_name: str, ctx: Context, steps: int = 5) -> str:
"""执行一个带进度的任务,期间输出日志、读取资源并返回总结。"""
# 保护性判断,若未注入 Context,直接提示无法继续演示
if ctx is None:
# 返回未注入 Context 的提示信息
return "未注入 Context,无法演示日志/进度/资源读取。"
# 确保 steps 是整数类型(防止客户端传递字符串类型)
steps = int(steps)
# 发送 debug 级别日志,通知开始执行任务
await ctx.debug(f"开始执行任务:{task_name}")
# 发送 info 级别日志,通知任务环境初始化
await ctx.info("初始化任务环境……")
# 通过 Context 的 read_resource 方法读取静态资源内容
resource_result = await ctx.read_resource("data://message")
# 初始化资源预览字符串为空
resource_preview = "<empty>"
# 遍历资源内容,获取第一个内容块的 text 或 content 字段
for block in resource_result:
resource_preview = getattr(block, "text", getattr(block, "content", "<no-text>"))
break
# 发送 info 级别日志,通知资源读取成功,并展示资源内容摘要
await ctx.info(f"读取资源成功:{resource_preview}")
# 循环执行步骤,每步上报进度
for i in range(steps):
# 计算当前进度(0~1 的小数)
progress = (i + 1) / steps
# 通过 Context 上报当前进度(progress),总进度 total=1.0,带上当前步骤信息
await ctx.report_progress(
progress=progress, total=1.0, message=f"Step {i + 1}/{steps}"
)
# 发送 debug 日志,记录当前进度
await ctx.debug(f"进度:{i + 1}/{steps}")
# 读取当前请求的 request_id 标识
req_id = ctx.request_id
# 发送 info 日志,通知任务完成并携带请求 ID
await ctx.info(f"任务完成:{task_name}(request_id={req_id})")
# 返回任务完成的总结字符串,包含刚刚读取的资源摘要
return f"任务 '{task_name}' 完成,读取到的资源摘要:{resource_preview}"
# 主程序入口
if __name__ == "__main__":
# 检查 sys.stdout 是否支持 reconfigure 方法(Python 3.7+ 提供)
if hasattr(sys.stdout, 'reconfigure'):
# 设置标准输出流编码为 utf-8,避免中文内容输出乱码
sys.stdout.reconfigure(encoding='utf-8')
# 设置标准输入流编码为 utf-8
sys.stdin.reconfigure(encoding='utf-8')
# 启动 FastMCP 服务器,采用 stdio(标准输入输出)作为通信方式
mcp.run(transport="stdio")
官方代码
# 导入 sys 模块,用于访问标准输入输出流
import sys
# 从 mcp_lite.server.fastmcp 导入 FastMCP 和 Context 类型
from mcp.server.fastmcp import FastMCP, Context
# 创建 FastMCP 服务器实例,指定 name,便于客户端识别
mcp = FastMCP(name="Context Server")
# 注册一个静态资源,URI 为 "data://message"
@mcp.resource("data://message")
# 定义静态资源的处理函数,返回字符串内容
def static_message() -> str:
"""示例静态资源:用于演示在工具中通过 Context 读取资源。"""
# 返回静态资源内容
return "这是一条来自 data://message 的资源内容。"
# 注册工具函数,工具名由装饰器自动生成
@mcp.tool()
# 定义异步工具函数,参数支持任务名称、执行步数和上下文对象
async def long_running_task(task_name: str, ctx: Context | None, steps: int = 5) -> str:
"""执行一个带进度的任务,期间输出日志、读取资源并返回总结。"""
# 保护性判断,若未注入 Context,直接提示无法继续演示
if ctx is None:
# 返回未注入 Context 的提示信息
return "未注入 Context,无法演示日志/进度/资源读取。"
# 确保 steps 是整数类型(防止客户端传递字符串类型)
steps = int(steps)
# 发送 debug 级别日志,通知开始执行任务
await ctx.debug(f"开始执行任务:{task_name}")
# 发送 info 级别日志,通知任务环境初始化
await ctx.info("初始化任务环境……")
# 通过 Context 的 read_resource 方法读取静态资源内容
resource_result = await ctx.read_resource("data://message")
# 初始化资源预览字符串为空
resource_preview = "<empty>"
# 遍历资源内容,获取第一个内容块的 text 或 content 字段
for block in resource_result:
resource_preview = getattr(block, "text", getattr(block, "content", "<no-text>"))
break
# 发送 info 级别日志,通知资源读取成功,并展示资源内容摘要
await ctx.info(f"读取资源成功:{resource_preview}")
# 循环执行步骤,每步上报进度
for i in range(steps):
# 计算当前进度(0~1 的小数)
progress = (i + 1) / steps
# 通过 Context 上报当前进度(progress),总进度 total=1.0,带上当前步骤信息
await ctx.report_progress(
progress=progress, total=1.0, message=f"Step {i + 1}/{steps}"
)
# 发送 debug 日志,记录当前进度
await ctx.debug(f"进度:{i + 1}/{steps}")
# 读取当前请求的 request_id 标识
req_id = ctx.request_id
# 发送 info 日志,通知任务完成并携带请求 ID
await ctx.info(f"任务完成:{task_name}(request_id={req_id})")
# 返回任务完成的总结字符串,包含刚刚读取的资源摘要
return f"任务 '{task_name}' 完成,读取到的资源摘要:{resource_preview}"
# 主程序入口
if __name__ == "__main__":
# 检查 sys.stdout 是否支持 reconfigure 方法(Python 3.7+ 提供)
if hasattr(sys.stdout, 'reconfigure'):
# 设置标准输出流编码为 utf-8,避免中文内容输出乱码
sys.stdout.reconfigure(encoding='utf-8')
# 设置标准输入流编码为 utf-8
sys.stdin.reconfigure(encoding='utf-8')
# 启动 FastMCP 服务器,采用 stdio(标准输入输出)作为通信方式
mcp.run(transport="stdio")
4. session.py #
mcp_lite/client/session.py
import sys
# 导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 从 mcp_lite.types 模块导入相关类型和常量
from mcp_lite.types import ( # 从 mcp_lite.types 模块导入以下类型
InitializeRequestParams, # 初始化请求参数类型
InitializeRequest, # 初始化请求类型
LATEST_PROTOCOL_VERSION, # 最新协议版本常量
ClientCapabilities, # 客户端能力类型
Implementation, # 实现信息类型
InitializedNotification, # 初始化完成通知类型
InitializeResult, # 初始化结果类型
JSONRPCRequest, # JSONRPC请求类型
JSONRPCResponse, # JSONRPC响应类型
JSONRPCError, # JSONRPC错误类型
JSONRPCNotification, # JSONRPC通知类型
+ LoggingMessageNotificationParams, # 日志消息通知参数
ListToolsRequest, # 工具列表请求类型
ListToolsResult, # 工具列表结果类型
CallToolResult, # 调用工具响应类型
CallToolRequest, # 调用工具请求类型
CallToolRequestParams, # 调用工具请求参数类型
ListResourcesRequest, # 资源列表请求类型
ListResourcesResult, # 资源列表响应类型
ListResourceTemplatesRequest, # 资源模板列表请求类型
ListResourceTemplatesResult, # 资源模板列表响应类型
ReadResourceRequest, # 读取资源请求类型
ReadResourceRequestParams, # 读取资源请求参数类型
ReadResourceResult, # 读取资源响应类型
ListPromptsRequest, # Prompt 列表请求类型
ListPromptsResult, # Prompt 列表响应类型
GetPromptRequest, # 获取 Prompt 请求类型
GetPromptRequestParams, # 获取 Prompt 请求参数类型
GetPromptResult, # 获取 Prompt 响应类型
)
# 定义 MCPError 异常类,用于表示协议相关错误
class MCPError(Exception):
# 初始化方法,接收错误码、错误信息和可选的数据
def __init__(self, code, message, data=None):
# 将错误码、错误信息、附加数据赋值为实例属性
self.code, self.message, self.data = code, message, data
# 调用父类 Exception 的初始化,只传递错误信息
super().__init__(message)
# 类方法,利用 JSONRPCError 对象创建 MCPError 实例
@classmethod
def from_jsonrpc(cls, err):
# 从 JSONRPCError.error 对象中取出 code/message/data 创建 MCPError
return cls(err.error.code, err.error.message, err.error.data)
# 定义 ClientSession 类,用于描述客户端会话
class ClientSession:
# 构造方法,接收读取流和写入流
def __init__(self, read_stream, write_stream):
# 赋值读取流和写入流
self._read, self._write = read_stream, write_stream
# 初始化请求 id 计数器为 0
self._req_id = 0
# 内部方法,发送请求并同步等待响应
+ def _request(self, req, progress_callback=None, message_handler=None, logging_callback=None):
# 当前请求 id
rid = self._req_id
# 自增请求 id,确保下次唯一
self._req_id += 1
# 对请求对象进行序列化,转为 dict
d = req.model_dump(by_alias=True, mode="json", exclude_none=True)
# 构建 JSONRPCRequest,再封装入 SessionMessage,通过写入流发送
jreq = JSONRPCRequest(jsonrpc="2.0", id=rid, method=d["method"], params=d.get("params"))
# 将请求对象封装成 SessionMessage 后发送
self._write.send(SessionMessage(message=jreq))
# 循环等待响应
while True:
# 从读取流取出一条消息
msg = self._read.get()
# 若取到 None,说明连接断开,抛出异常
if msg is None:
raise MCPError(-32000, "Connection closed")
# 若消息类型不是 SessionMessage,忽略继续等
if not isinstance(msg, SessionMessage):
continue
# 取消息内容
m = msg.message
# 判断当前消息是否为通知(即没有 id 字段且具有 method 字段)
+ if getattr(m, "id", None) is None and hasattr(m, "method"):
# 获取 method 名称
+ method = getattr(m, "method", "")
# 获取 params 内容(默认为空字典)
+ params = getattr(m, "params", None) or {}
# 若为进度通知,且传入了进度回调 progress_callback
+ if method == "notifications/progress" and progress_callback:
# 调用进度回调,传递 progress/total/message 参数
+ progress_callback(
+ params.get("progress", 0),
+ params.get("total"),
+ params.get("message"),
+ )
# 若为普通日志/信息通知
+ elif method == "notifications/message":
# 如果提供日志回调 logging_callback,则尝试解析为日志参数类型并调用
+ if logging_callback:
+ try:
# 使用 LoggingMessageNotificationParams 类型校验参数
+ p = LoggingMessageNotificationParams.model_validate(params, by_name=False)
# 调用日志回调
+ logging_callback(p)
+ except Exception:
# 校验失败则直接将原始 params 传递给日志回调
+ logging_callback(params)
# 如果指定了通用消息处理回调,则调用
+ if message_handler:
+ message_handler(m)
# 处理完通知类型后,继续等待下一个消息
+ continue
# 若为 Response 或 Error,且 id 匹配
if isinstance(m, (JSONRPCResponse, JSONRPCError)) and getattr(m, "id", None) == rid:
# 若为错误,抛出 MCPError 异常
if isinstance(m, JSONRPCError):
raise MCPError.from_jsonrpc(m)
# 为正常响应则返回结果数据
return m.result
# 内部方法,发送通知消息
def _notify(self, n):
# 通知对象序列化为 dict
d = n.model_dump(by_alias=True, mode="json", exclude_none=True)
# 封装为 JSONRPCNotification,再封装成 SessionMessage 后发出
self._write.send(SessionMessage(
message=JSONRPCNotification(
jsonrpc="2.0",
method=d["method"],
params=d.get("params"),
)
))
# 初始化过程,发送初始化请求,收到响应后再发初始化完成通知
def initialize(self):
# 构造初始化请求,并发送等待返回
r = self._request(InitializeRequest(
params=InitializeRequestParams(
protocol_version=LATEST_PROTOCOL_VERSION, # 协议版本号
capabilities=ClientCapabilities(), # 客户端能力
client_info=Implementation(name="mcp_lite", version="0.1.0"), # 客户端信息
)
))
# 发送"初始化完成"通知
self._notify(InitializedNotification())
# 用 InitializeResult 验证响应并返回
return InitializeResult.model_validate(r, by_name=False)
def list_tools(self):
# 发送 ListToolsRequest 请求,使用 ListToolsResult 校验并返回
return ListToolsResult.model_validate(self._request(ListToolsRequest()), by_name=False)
# 调用指定工具方法,传入工具名和参数
# 定义 call_tool 方法,用于调用指定名称的工具
+ def call_tool(self, name, arguments=None, progress_callback=None, message_handler=None, logging_callback=None):
# 如果提供了 progress_callback,则将当前请求 id 作为 progressToken 加入 meta,便于服务端推送进度通知
+ rid = self._req_id
+ meta = {"progressToken": rid} if progress_callback else None
# 构建工具调用请求参数,包含名称、参数和 meta 信息
+ params = CallToolRequestParams(name=name, arguments=arguments or {}, meta=meta)
# 调用 _request 方法发送工具调用请求,并用 CallToolResult 校验和结构化响应
return CallToolResult.model_validate(
+ self._request(
+ CallToolRequest(params=params),
+ progress_callback=progress_callback, # 进度回调函数
+ message_handler=message_handler, # 通用消息处理回调
+ logging_callback=logging_callback, # 日志回调函数
+ ),
+ by_name=False, # 按字段名校验
)
# 列出已注册的静态资源
def list_resources(self):
# 调用 _request 方法发送 ListResourcesRequest 请求
# 使用 ListResourcesResult 的 model_validate 进行结果校验和结构化
return ListResourcesResult.model_validate(
self._request(ListResourcesRequest()),
by_name=False,
)
# 列出资源模板(带占位符的资源)
def list_resource_templates(self):
# 调用 _request 方法发送 ListResourceTemplatesRequest 请求
# 使用 ListResourceTemplatesResult 的 model_validate 进行结果校验和结构化
return ListResourceTemplatesResult.model_validate(
self._request(ListResourceTemplatesRequest()),
by_name=False,
)
# 读取指定 URI 的资源内容
def read_resource(self, uri):
# uri 可为 str 或 AnyUrl 类型
# 构造 ReadResourceRequestParams 并发送 ReadResourceRequest 请求
# 使用 ReadResourceResult 的 model_validate 进行结果校验和结构化
return ReadResourceResult.model_validate(
self._request(ReadResourceRequest(params=ReadResourceRequestParams(uri=str(uri)))),
by_name=False,
)
# 列出已注册的 Prompt
# 定义 list_prompts 方法,用于获取所有已注册的 Prompt
def list_prompts(self):
# 发送 ListPromptsRequest 请求,校验响应并结构化为 ListPromptsResult
return ListPromptsResult.model_validate(
self._request(ListPromptsRequest()), # 请求所有 Prompt
by_name=False, # 按字段名校验
)
# 获取指定 Prompt 的内容
# 定义 get_prompt 方法,根据名称和参数获取指定 Prompt 的详情
def get_prompt(self, name, arguments=None):
# 发送 GetPromptRequest,请求参数包含指定名称和参数,响应结构化为 GetPromptResult
return GetPromptResult.model_validate(
self._request(GetPromptRequest( # 发送请求
params=GetPromptRequestParams( # 构造请求参数
name=name, # Prompt 名称
arguments=arguments or {} # Prompt 参数(默认为空字典)
)
)),
by_name=False, # 按字段名校验
)
5. init.py #
mcp_lite/server/fastmcp/init.py
# FastMCP 包:从原 fastmcp 模块导入所有内容以保持向后兼容
# 原 fastmcp.py 的内容已移入此包,通过 __init__ 导出
# 导入系统模块
import sys
# 导入 base64 编码库
import base64
# 导入 json 序列化
import json
# 导入正则表达式模块
import re
# 导入 url 解码工具
from urllib.parse import unquote
# 导入类型相关辅助函数
from typing import get_args, get_origin, get_type_hints
# 导入 SessionMessage 类型
from mcp_lite.message import SessionMessage
# 导入 stdio 通信模块
from mcp_lite.server import stdio
# 导入函数签名和异步工具
import inspect
import asyncio
from mcp_lite.types import ( # 导入 mcp_lite.types 模块中的多个类型
JSONRPCRequest, # JSONRPC 请求类型
+ JSONRPCNotification, # JSONRPC 通知类型
JSONRPCError, # JSONRPC 错误响应类型
ErrorData, # 错误数据类型
InitializeResult, # 初始化结果类型
LATEST_PROTOCOL_VERSION, # 最新协议版本号
ToolsCapability, # 工具相关能力描述类型
ResourcesCapability, # 资源相关能力描述类型
PromptsCapability, # Prompt 相关能力描述类型
ServerCapabilities, # 服务器能力类型
Implementation, # 实现信息类型
JSONRPCResponse, # JSONRPC 响应类型
ListToolsResult, # 列出工具结果类型
Tool, # 单个工具类型
TextContent, # 文本内容类型
CallToolRequestParams, # 调用工具请求参数类型
CallToolResult, # 调用工具响应结果类型
Resource, # 静态资源类型
ResourceTemplate, # 资源模板类型
ListResourcesResult, # 资源列表返回结果类型
ListResourceTemplatesResult, # 资源模板列表返回结果类型
ReadResourceRequestParams, # 读取资源请求参数类型
ReadResourceResult, # 读取资源请求响应类型
TextResourceContents, # 资源文本内容类型
BlobResourceContents, # 资源二进制内容类型
Prompt, # Prompt 类型
PromptArgument, # Prompt 参数类型
ListPromptsResult, # Prompt 列表结果类型
GetPromptRequestParams, # 获取 Prompt 请求参数类型
GetPromptResult, # 获取 Prompt 响应结果类型
PromptMessage, # Prompt 消息类型
)
# 导入 pydantic 的基础模型作为类型判定
from pydantic import BaseModel as PydanticBaseModel
# 导入 Typeddict 测试
from typing_extensions import is_typeddict as _is_typeddict
# 导入本包下 prompts 子模块
from . import prompts
# 辅助函数:提取函数输出 schema 及包裹需求
def _output_schema_and_wrap(fn, structured_output: bool):
# 若不是结构化输出,直接返回
if not structured_output:
return None, False
try:
# 获取函数签名
sig = inspect.signature(fn)
# 获取返回类型注解
ann = sig.return_annotation
# 如果没有注解,返回
if ann is inspect.Parameter.empty:
return None, False
except Exception:
return None, False
# 生成 schema
return _schema_from_annotation(ann, fn.__name__)
# 辅助函数:根据注解和函数名生成 schema
def _schema_from_annotation(ann, func_name: str):
# 没有注解直接返回
if ann is inspect.Parameter.empty:
return None, False
# 获取注解的原类型
origin = get_origin(ann)
wrap = False
schema = None
# 如果是 Pydantic 的模型子类
if PydanticBaseModel and isinstance(ann, type) and issubclass(ann, PydanticBaseModel):
schema = ann.model_json_schema()
return schema, False
# 如果是 Typeddict
if hasattr(ann, "__annotations__") and not (origin is dict or origin is list):
if _is_typeddict(ann):
hints = get_type_hints(ann) if hasattr(ann, "__annotations__") else {}
props = {}
for k, v in hints.items():
t = v
if t is int or t is type(None) and int in get_args(v):
props[k] = {"type": "integer"}
elif t is float:
props[k] = {"type": "number"}
elif t is str:
props[k] = {"type": "string"}
elif t is bool:
props[k] = {"type": "boolean"}
else:
props[k] = {"type": "string"}
schema = {"type": "object", "properties": props, "required": list(props)}
return schema, False
try:
hints = get_type_hints(ann)
if hints:
props = {}
for k, v in hints.items():
if v is int or v is type(None):
props[k] = {"type": "integer"}
elif v is float:
props[k] = {"type": "number"}
elif v is str:
props[k] = {"type": "string"}
elif v is bool:
props[k] = {"type": "boolean"}
elif get_origin(v) is list:
props[k] = {"type": "array", "items": {"type": "string"}}
else:
props[k] = {"type": "string"}
schema = {"type": "object", "properties": props}
return schema, False
except Exception:
pass
# 如果是 dict 类型
if origin is dict:
args = get_args(ann)
if len(args) == 2 and args[0] is str:
vt = args[1]
# 判断 value 类型
if vt is float:
schema = {"type": "object", "additionalProperties": {"type": "number"}}
elif vt is int:
schema = {"type": "object", "additionalProperties": {"type": "integer"}}
elif vt is str:
schema = {"type": "object", "additionalProperties": {"type": "string"}}
else:
schema = {"type": "object", "additionalProperties": {}}
return schema, False
wrap = True
# 如果是 list、基础类型等需要包裹
if origin is list or ann in (int, float, str, bool, type(None)):
wrap = True
# 包裹输出情况
if wrap:
result_schema = {"type": "string"}
if ann is int:
result_schema = {"type": "integer"}
elif ann is float:
result_schema = {"type": "number"}
elif ann is bool:
result_schema = {"type": "boolean"}
elif origin is list:
result_schema = {"type": "array", "items": {"type": "string"}}
schema = {"type": "object", "properties": {"result": result_schema}, "required": ["result"]}
return schema, True
return None, False
# 辅助:对象输出转换为结构化内容
def _to_structured(out, output_schema, wrap_output):
if output_schema is None:
return None
try:
# 如果是 Pydantic 模型
if PydanticBaseModel and isinstance(out, PydanticBaseModel):
return out.model_dump(mode="json")
# 如果需要包裹
if wrap_output:
return {"result": out}
# 如果是字典
if isinstance(out, dict):
return dict(out)
# 带有 __annotations__ 的对象,提取属性
if hasattr(out, "__annotations__"):
hints = get_type_hints(type(out)) if hasattr(type(out), "__annotations__") else getattr(out, "__annotations__", {})
return {k: getattr(out, k) for k in hints if hasattr(out, k)}
# 其它直接包裹
return {"result": out}
except Exception:
# 发生异常返回字符串
return {"result": str(out)}
# 根据函数参数生成 schema
def _schema(fn):
sig = inspect.signature(fn)
props = {}
req = []
for n, p in sig.parameters.items():
# 跳过以下划线开头的参数
if n.startswith("_"):
continue
# 跳过 Context 类型参数(由框架注入)
+ if _find_context_parameter(fn) == n:
+ continue
# 参数类型为字符串,标题为参数名
props[n] = {"type": "string", "title": n}
# 必填参数加入 required
if p.default is inspect.Parameter.empty:
req.append(n)
return {"type": "object", "properties": props, "required": req}
# 检测工具函数是否包含 Context 参数
+def _find_context_parameter(fn):
+ """若函数有 ctx: Context 参数,返回参数名;否则返回 None。"""
+ try:
+ sig = inspect.signature(fn)
+ hints = get_type_hints(fn) if hasattr(fn, "__annotations__") else {}
+ for name, param in sig.parameters.items():
+ ann = hints.get(name, param.annotation)
+ if ann is not inspect.Parameter.empty:
+ n = getattr(ann, "__name__", None) or str(ann)
+ if n == "Context" or (isinstance(n, str) and n.endswith(".Context")):
+ return name
+ except Exception:
+ pass
+ return None
# 根据 prompt 函数生成 prompt schema
def _prompt_schema(fn):
"""根据函数签名生成 Prompt 参数 schema。"""
sig = inspect.signature(fn)
props = {}
req = []
for n, p in sig.parameters.items():
if n.startswith("_"):
continue
props[n] = {"type": "string", "title": n}
if p.default is inspect.Parameter.empty:
req.append(n)
return {"type": "object", "properties": props, "required": req}
# Context 类:工具执行时的上下文,支持 read_resource、report_progress、debug、info
+class Context:
+ """工具执行上下文,提供资源读取、进度报告、日志发送等能力。"""
+ def __init__(self, mcp_server, request_id, send_notification=None, progress_token=None):
+ self._mcp = mcp_server
+ self.request_id = request_id
+ self._send = send_notification or (lambda _: None)
+ self._progress_token = progress_token
+ async def read_resource(self, uri: str):
+ """异步读取资源,返回内容块列表(与 ReadResourceResult.contents 结构一致)。"""
+ uri_str = str(uri)
# 静态资源
+ if res := self._mcp._resources.get(uri_str):
+ out = res.run()
+ from mcp_lite.types import TextResourceContents
+ return [TextResourceContents(uri=uri_str, text=str(out), mime_type=res.mime_type)]
# 模板资源
+ for template in self._mcp._resource_templates.values():
+ if args := template.matches(uri_str):
+ out = template.run(args)
+ from mcp_lite.types import TextResourceContents, BlobResourceContents
+ if isinstance(out, bytes):
+ return [BlobResourceContents(uri=uri_str, blob=base64.b64encode(out).decode(), mime_type=template.mime_type or "application/octet-stream")]
+ return [TextResourceContents(uri=uri_str, text=str(out), mime_type=template.mime_type or "text/plain")]
+ return []
+ async def report_progress(self, progress: float, total: float | None = None, message: str | None = None):
+ """发送进度通知(仅当客户端提供了 progressToken 时有效)。"""
+ if self._progress_token is not None:
+ self._send({"method": "notifications/progress", "params": {"progressToken": self._progress_token, "progress": progress, "total": total, "message": message}})
+ async def debug(self, data):
+ """发送 debug 级别日志。"""
+ self._send({"method": "notifications/message", "params": {"level": "debug", "data": data}})
+ async def info(self, data):
+ """发送 info 级别日志。"""
+ self._send({"method": "notifications/message", "params": {"level": "info", "data": data}})
# 工具函数封装类
class _Tool:
def __init__(self, fn, name=None, desc=None, structured_output=True):
# 函数本体
self.fn = fn
# 名称
self.name = name or fn.__name__
# 描述
self.desc = desc or (fn.__doc__ or "").strip()
# 入参 schema
self.schema = _schema(fn)
# 是否是异步函数
self.async_fn = asyncio.iscoroutinefunction(fn)
# 是否结构化输出
self.structured_output = structured_output
# 输出 schema 及需否包裹
self.output_schema, self.wrap_output = _output_schema_and_wrap(fn, structured_output) if structured_output else (None, False)
# Context 参数名(若有)
+ self._ctx_param = _find_context_parameter(fn)
# 转换为 Tool 对象
def to_tool(self):
return Tool(name=self.name, description=self.desc or None, input_schema=self.schema, output_schema=self.output_schema)
# 执行工具(自动支持异步),支持注入 Context
+ def run(self, args, mcp_server=None, request_id=None, progress_token=None, send_notification=None):
+ if self._ctx_param is not None and mcp_server is not None:
+ ctx = Context(mcp_server, request_id, send_notification, progress_token)
+ args = dict(args) if args else {}
+ args[self._ctx_param] = ctx
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# 静态资源封装类
class _Resource:
def __init__(self, uri, fn, mime_type="text/plain"):
# 资源 URI
self.uri = uri
# 回调函数
self.fn = fn
# MIME 类型
self.mime_type = mime_type
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 资源运行,自动支持异步
def run(self):
if self.async_fn:
return asyncio.run(self.fn())
return self.fn()
# 资源模板封装类(支持 URI 参数模板)
class _ResourceTemplate:
def __init__(self, uri_template, fn, mime_type="text/plain"):
# URI 模板
self.uri_template = uri_template
# 生成资源内容的函数
self.fn = fn
# MIME 类型
self.mime_type = mime_type
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 提取函数里除了 _ 开头之外的参数名
sig = inspect.signature(fn)
self.param_names = [n for n in sig.parameters if not n.startswith("_")]
# 检查 URI 是否与模板匹配,并解析参数
def matches(self, uri):
pattern = self.uri_template.replace("{", "(?P<").replace("}", ">[^/]+)")
m = re.match(f"^{pattern}$", uri)
if m:
return {k: unquote(v) for k, v in m.groupdict().items()}
return None
# 执行模板内容生成函数
def run(self, args):
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# Prompt 封装
class _Prompt:
"""内部 Prompt 封装类。"""
def __init__(self, fn, name=None, title=None, description=None):
# prompt 生成函数
self.fn = fn
# prompt 名称
self.name = name or fn.__name__
# prompt 标题
self.title = title
# prompt 描述
self.description = description or (fn.__doc__ or "").strip()
# 通用 schema
self.schema = _prompt_schema(fn)
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 从 schema 生成 PromptArgument 列表
args_list = []
if "properties" in self.schema:
required = set(self.schema.get("required", []))
for pname, pinfo in self.schema["properties"].items():
args_list.append(
PromptArgument(
name=pname,
description=pinfo.get("title"),
required=pname in required,
)
)
self.arguments = args_list
# 转成 Prompt 对象
def to_prompt(self):
return Prompt(
name=self.name,
title=self.title,
description=self.description or None,
arguments=self.arguments if self.arguments else None,
)
# prompt 执行
def run(self, args):
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# FastMCP 主类
class FastMCP:
def __init__(self, name="mcp-server"):
# 服务器名称
self.name = name
# 工具注册表
self._tools = {}
# 静态资源注册表
self._resources = {}
# 资源模板注册表
self._resource_templates = {}
# prompt 注册表
self._prompts = {}
# 注册工具的装饰器
def tool(self, name=None, description=None, structured_output=True):
def deco(fn):
t = _Tool(fn, name, description, structured_output=structured_output)
self._tools[t.name] = t
return fn
return deco
# 注册资源或模板资源的装饰器
def resource(self, uri, mime_type="text/plain"):
def deco(fn):
# 判断带模板参数还是静态资源
if "{" in uri and "}" in uri:
template = _ResourceTemplate(uri, fn, mime_type)
self._resource_templates[uri] = template
else:
res = _Resource(uri, fn, mime_type)
self._resources[uri] = res
return fn
return deco
# 注册 Prompt 的装饰器
def prompt(self, name=None, title=None, description=None):
"""注册 Prompt 的装饰器。"""
def deco(fn):
p = _Prompt(fn, name=name, title=title, description=description)
self._prompts[p.name] = p
return fn
return deco
# 核心 RPC 方法分发
def _handle(self, req):
# 获取方法名、参数和请求 id
method, params, rid = req.method, req.params or {}, req.id
# 初始化请求
if method == "initialize":
caps = ServerCapabilities(
tools=ToolsCapability(),
resources=ResourcesCapability(),
prompts=PromptsCapability() if self._prompts else None,
)
r = InitializeResult(
protocol_version=LATEST_PROTOCOL_VERSION,
capabilities=caps,
server_info=Implementation(name=self.name, version="0.1.0"),
)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 工具列表请求
if method == "tools/list":
r = ListToolsResult(tools=[t.to_tool() for t in self._tools.values()])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 工具调用请求
if method == "tools/call":
p = CallToolRequestParams.model_validate(params, by_name=False)
t = self._tools.get(p.name)
if not t:
return JSONRPCError(
jsonrpc="2.0",
id=rid,
error=ErrorData(code=-32602, message=f"Unknown tool: {p.name}")
)
# 从 meta/_meta 提取 progressToken,用于发送进度/日志通知
+ meta = p.meta or {}
+ progress_token = meta.get("progressToken") or meta.get("progress_token")
+ notifications = []
+ def _send_notification(payload):
+ method_name = payload.get("method", "")
+ params = payload.get("params")
+ notifications.append(JSONRPCNotification(jsonrpc="2.0", method=method_name, params=params))
try:
+ out = t.run(
+ p.arguments or {},
+ mcp_server=self,
+ request_id=rid,
+ progress_token=progress_token,
+ send_notification=_send_notification,
+ )
# 如果直接返回的是 CallToolResult
if isinstance(out, CallToolResult):
r = out
else:
struct = None
# 结构化输出
if t.output_schema is not None:
struct = _to_structured(out, t.output_schema, t.wrap_output)
# 字符串直接用文本包装
if isinstance(out, str):
c = [TextContent(text=out)]
elif out is None:
c = []
elif struct is not None:
# 结构化结果按 JSON 格式化后返回
c = [TextContent(text=json.dumps(struct, ensure_ascii=False, indent=2))]
else:
# 普通对象尝试转 json,否则转字符串
try:
if PydanticBaseModel and isinstance(out, PydanticBaseModel):
text = json.dumps(out.model_dump(mode="json"), ensure_ascii=False, indent=2)
elif isinstance(out, dict):
text = json.dumps(out, ensure_ascii=False, indent=2)
else:
text = str(out)
except Exception:
text = str(out)
c = [TextContent(text=text)]
r = CallToolResult(content=c, structured_content=struct)
except Exception as e:
r = CallToolResult(content=[TextContent(text=str(e))], is_error=True)
+ resp = JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
+ if notifications:
+ return (notifications, resp)
+ return resp
# prompts/list 请求
if method == "prompts/list":
prompts_list = [p.to_prompt() for p in self._prompts.values()]
r = ListPromptsResult(prompts=prompts_list)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# prompts/get 请求
if method == "prompts/get":
p = GetPromptRequestParams.model_validate(params, by_name=False)
prompt_obj = self._prompts.get(p.name)
if not prompt_obj:
return JSONRPCError(
jsonrpc="2.0",
id=rid,
error=ErrorData(code=-32602, message=f"Unknown prompt: {p.name}")
)
try:
args = p.arguments or {}
result = prompt_obj.run(args)
messages = []
# 单条消息包装为列表
if not isinstance(result, (list, tuple)):
result = [result]
for msg in result:
# 若为内置的 Message 对象
if isinstance(msg, prompts.base.Message):
content = msg.content
if isinstance(content, str):
content = TextContent(type="text", text=content)
messages.append(PromptMessage(role=msg.role, content=content))
# 字符串消息按 user 角色组装
elif isinstance(msg, str):
messages.append(PromptMessage(role="user", content=TextContent(type="text", text=msg)))
# dict 消息,读 role 和 content
elif isinstance(msg, dict):
role = msg.get("role", "user")
cnt = msg.get("content", "")
if isinstance(cnt, dict) and cnt.get("type") == "text":
content = TextContent(**cnt)
else:
content = TextContent(type="text", text=str(cnt) if not isinstance(cnt, str) else cnt)
messages.append(PromptMessage(role=role, content=content))
# 其它均归为 user 文本
else:
messages.append(PromptMessage(role="user", content=TextContent(type="text", text=str(msg))))
r = GetPromptResult(description=prompt_obj.description, messages=messages)
except Exception as e:
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 资源静态列表
if method == "resources/list":
resources = [
Resource(uri=u, name=u, mime_type=r.mime_type)
for u, r in self._resources.items()
]
r = ListResourcesResult(resources=resources)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 资源模板列表
if method == "resources/templates/list":
templates = [
ResourceTemplate(uri_template=u, name=u, mime_type=t.mime_type)
for u, t in self._resource_templates.items()
]
r = ListResourceTemplatesResult(resource_templates=templates)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 读取资源接口
if method == "resources/read":
p = ReadResourceRequestParams.model_validate(params, by_name=False)
uri_str = str(p.uri)
try:
# 静态资源优先
if res := self._resources.get(uri_str):
out = res.run()
content = TextResourceContents(uri=uri_str, text=str(out), mime_type=res.mime_type)
r = ReadResourceResult(contents=[content])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 匹配模板资源
for template in self._resource_templates.values():
if args := template.matches(uri_str):
out = template.run(args)
if isinstance(out, bytes):
content = BlobResourceContents(
uri=uri_str,
blob=base64.b64encode(out).decode(),
mime_type=template.mime_type or "application/octet-stream",
)
else:
content = TextResourceContents(
uri=uri_str,
text=str(out),
mime_type=template.mime_type or "text/plain",
)
r = ReadResourceResult(contents=[content])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 找不到资源
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32602, message=f"Unknown resource: {uri_str}"))
except Exception as e:
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
# 未知方法
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32601, message=f"Method not found: {method}"))
# 消息包装与请求
def _handle_msg(self, msg):
# 非 SessionMessage 类型忽略
if not isinstance(msg, SessionMessage):
return None
m = msg.message
# 必须是带 id 的 jsonrpc 请求
if not isinstance(m, JSONRPCRequest) or getattr(m, "id", None) is None:
return None
#print("[Server] Request:", m.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
try:
resp = self._handle(m)
#print("[Server] Response:", resp.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
return resp
except Exception as e:
err = JSONRPCError(jsonrpc="2.0", id=m.id, error=ErrorData(code=-32603, message=str(e)))
#print("[Server] Response (error):", err.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
return err
# 运行服务器主逻辑
def run(self, transport="stdio"):
# 目前只支持 stdio
if transport != "stdio":
raise ValueError(f"unsupported transport: {transport}")
# 启动 stdio 服务器
stdio.stdio_server(self._handle_msg)
# 明确导出接口
+__all__ = ["FastMCP", "Context", "prompts"]
6. stdio.py #
mcp_lite/server/stdio.py
# 导入 sys 模块,用于标准输入输出操作
import sys
# 从 mcp_lite.message 模块导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 从 mcp_lite.types 模块导入 jsonrpc_message_adapter 适配器
from mcp_lite.types import jsonrpc_message_adapter
# 定义 stdio_server 函数,参数 handler 是处理消息的回调函数
def stdio_server(handler):
# 进入一个无限循环,持续处理标准输入的数据
while True:
# 从标准输入读取一行
line = sys.stdin.readline()
# 如果读取到空字符串,说明 EOF,跳出循环
if not line:
break
# 去除读取行首尾的空白字符
line = line.strip()
# 如果去除空白后字符串仍为空,继续读取下一行
if not line:
continue
try:
# 使用 jsonrpc_message_adapter 对输入字符串进行解析为消息对象
msg = jsonrpc_message_adapter.validate_json(line, by_name=False)
# 使用解析后的消息包装成 SessionMessage 再传递给 handler 处理,获取响应
response = handler(SessionMessage(message=msg))
# 判断 handler 是否返回了响应结果
if response is not None:
# 判断 response 是否为一个长度为 2 的元组(即 (notifications, response))
+ if isinstance(response, tuple) and len(response) == 2:
# 解包 notifications 和 resp
+ notifications, resp = response
# 遍历所有通知消息
+ for n in notifications:
# 将每个通知对象序列化为 JSON 字符串写入标准输出,并添加换行
+ sys.stdout.write(n.model_dump_json(by_alias=True, exclude_unset=True) + "\n")
# 立即刷新标准输出缓冲区,确保消息及时输出到客户端
+ sys.stdout.flush()
# 将 response 设置为实际的响应对象
+ response = resp
# 如果响应对象有 message 属性,则直接使用;否则把响应包装成 SessionMessage
r = response if hasattr(response, "message") else SessionMessage(message=response)
# 将响应的 message 属性序列化为 JSON 字符串,写入标准输出,并换行
sys.stdout.write(r.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n")
# 立即刷新标准输出缓冲区,确保数据被及时输出
sys.stdout.flush()
except Exception:
print("[Server] Error:", sys.exc_info(), file=sys.stderr)
# 捕获所有异常,忽略错误继续处理下一行
pass7. types.py #
mcp_lite/types.py
# 从 pydantic 导入 BaseModel、ConfigDict、Field、TypeAdapter、field_validator
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, field_validator
# 导入 pydantic 的 to_camel 驼峰命名生成器
from pydantic.alias_generators import to_camel
# 导入 Any 和 Literal 类型注解
from typing import Any, Literal
# 定义 RequestId 类型,既可以是 int 也可以是 str
RequestId = int | str
# 定义 MCP 的基础模型类,支持驼峰命名和按名称填充
class MCPModel(BaseModel):
# 指定 Pydantic 模型配置:使用驼峰形式命名和按名称填充
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
# 定义客户端能力结构体
class ClientCapabilities(MCPModel):
# 可选字段:客户端实验性能力扩展,键为 str,值为字典
experimental: dict[str, dict[str, Any]] | None = None
# 定义服务器或客户端的实现信息结构体
class Implementation(MCPModel):
# 实现的名称
name: str = ""
# 实现的版本号
version: str = ""
# 可选字段:人类可读的标题
title: str | None = None
# 可选字段:实现的描述
description: str | None = None
# 定义初始化请求参数结构体
class InitializeRequestParams(MCPModel):
# 协议版本号
protocol_version: str = ""
# 可选字段:客户端能力描述
capabilities: ClientCapabilities = None
# 可选字段:客户端实现信息
client_info: Implementation = None
# 定义初始化请求结构体
class InitializeRequest(MCPModel):
# 方法名,固定为 "initialize"
method: Literal["initialize"] = "initialize"
# 可选字段:参数,类型为 InitializeRequestParams
params: InitializeRequestParams = None
# 当前协议的最新版本号
LATEST_PROTOCOL_VERSION = "2024-11-05"
# 定义工具相关能力结构体
class ToolsCapability(MCPModel):
# 工具列表是否发生变化,可选布尔类型
list_changed: bool | None = None
# 定义资源相关能力结构体
class ResourcesCapability(MCPModel):
# 是否支持资源订阅
subscribe: bool | None = None
# 资源列表是否变化通知
list_changed: bool | None = None
# 定义 Prompt 相关能力结构体
class PromptsCapability(MCPModel):
list_changed: bool | None = None
# 定义服务端能力描述结构体
class ServerCapabilities(MCPModel):
# 可选字段:实验性能力扩展
experimental: dict[str, dict[str, Any]] | None = None
# 可选字段:工具能力
tools: ToolsCapability | None = None
# 可选字段:资源能力
resources: ResourcesCapability | None = None
# 可选字段:Prompt 能力
prompts: PromptsCapability | None = None
# 定义初始化响应结构体
class InitializeResult(MCPModel):
# 协议版本号
protocol_version: str = ""
# 可选字段:服务端能力描述
capabilities: ServerCapabilities = None
# 可选字段:服务端实现信息
server_info: Implementation = None
# 可选字段:初始化说明信息
instructions: str | None = None
# 定义客户端初始化完成通知结构体
class InitializedNotification(MCPModel):
# 方法名,固定为 "notifications/initialized"
method: Literal["notifications/initialized"] = "notifications/initialized"
# 可选字段:通知参数,可以为字典或 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 请求的数据结构
class JSONRPCRequest(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 请求 ID,可以为 int 或 str 类型
id: RequestId = None
# 方法名称,字符串类型
method: str = ""
# 方法参数,为一个字典或 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 通知的数据结构(没有 id 字段)
class JSONRPCNotification(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 通知的方法名称
method: str = ""
# 通知参数,可以为字典或者 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 响应的数据结构
class JSONRPCResponse(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 响应的 ID,需要与请求 ID 匹配
id: RequestId = None
# 响应结果,可以为字典或 None
result: dict[str, Any] = None
# 定义错误的数据结构
class ErrorData(BaseModel):
# 错误码,默认值为 0
code: int = 0
# 错误信息,默认值为空字符串
message: str = ""
# 附加的错误数据,可以为任意类型或 None
data: Any = None
# 定义 JSONRPC 错误消息的数据结构
class JSONRPCError(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 错误对应的请求 ID,可以为 None
id: RequestId | None = None
# 错误的详细信息,类型为 ErrorData
error: ErrorData = None
# 定义所有 JSONRPC 消息的联合类型
JSONRPCMessage = JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError
# 进度通知参数(notifications/progress)
+class ProgressNotificationParams(MCPModel):
+ progress_token: str | int = ""
+ progress: float = 0.0
+ total: float | None = None
+ message: str | None = None
# 日志消息通知参数(notifications/message)
+class LoggingMessageNotificationParams(MCPModel):
+ level: str = "info"
+ data: Any = None
+ logger: str | None = None
# 定义 JSONRPC 消息适配器,用于类型自动推断和校验
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)
# 定义工具结果中的文本内容块
class TextContent(MCPModel):
type: Literal["text"] = "text"
text: str = ""
# 定义工具描述数据结构体
class Tool(MCPModel):
# 工具名称
name: str = ""
# 可选字段:工具描述
description: str | None = None
# 工具输入参数的 schema,默认为空字典
input_schema: dict[str, Any] = Field(default_factory=dict)
# 可选字段:工具输出 schema
output_schema: dict[str, Any] | None = None
# 定义获取工具列表请求结构体
class ListToolsRequest(MCPModel):
# 方法名,固定为 "tools/list"
method: Literal["tools/list"] = "tools/list"
# 可选字段:参数,可以为字典或 None
params: dict[str, Any] | None = None
# 定义获取工具列表响应结构体
class ListToolsResult(MCPModel):
# 工具列表,默认为空列表
tools: list[Tool] = []
# 可选字段:分页游标,可为 None
next_cursor: str | None = None
# 定义资源元数据结构体
class Resource(MCPModel):
# 资源 URI
uri: str = ""
# 可选:资源名称
name: str | None = None
# 可选:人类可读标题
title: str | None = None
# 可选:资源描述
description: str | None = None
# 可选:MIME 类型
mime_type: str | None = None
# 定义资源模板结构体
class ResourceTemplate(MCPModel):
# URI 模板,如 greeting://{name}
uri_template: str = ""
# 可选:模板名称
name: str | None = None
# 可选:人类可读标题
title: str | None = None
# 可选:模板描述
description: str | None = None
# 可选:MIME 类型
mime_type: str | None = None
# 定义 resources/list 请求结构体
class ListResourcesRequest(MCPModel):
method: Literal["resources/list"] = "resources/list"
params: dict[str, Any] | None = None
# 定义 resources/list 响应结构体
class ListResourcesResult(MCPModel):
resources: list[Resource] = []
next_cursor: str | None = None
# 定义 resources/templates/list 请求结构体
class ListResourceTemplatesRequest(MCPModel):
method: Literal["resources/templates/list"] = "resources/templates/list"
params: dict[str, Any] | None = None
# 定义 resources/templates/list 响应结构体
class ListResourceTemplatesResult(MCPModel):
resource_templates: list[ResourceTemplate] = []
next_cursor: str | None = None
# 定义 resources/read 请求参数结构体
class ReadResourceRequestParams(MCPModel):
uri: str = ""
# 定义 resources/read 请求结构体
class ReadResourceRequest(MCPModel):
method: Literal["resources/read"] = "resources/read"
params: ReadResourceRequestParams = None
# 定义资源内容基类(文本)
class TextResourceContents(MCPModel):
uri: str = ""
mime_type: str | None = None
text: str = ""
# 定义资源内容基类(二进制)
class BlobResourceContents(MCPModel):
uri: str = ""
mime_type: str | None = None
blob: str = "" # base64 编码
# 定义 resources/read 响应结构体
class ReadResourceResult(MCPModel):
contents: list[TextResourceContents | BlobResourceContents] = []
# 定义 Prompt 参数结构体
# 定义代表 Prompt 参数的模型
class PromptArgument(MCPModel):
# 参数名称
name: str = ""
# 参数描述,可为 None
description: str | None = None
# 是否为必填参数,可为 None
required: bool | None = None
# 定义 Prompt 元数据结构体
# 定义代表 Prompt 元数据的模型
class Prompt(MCPModel):
# Prompt 名称
name: str = ""
# Prompt 描述,可为 None
description: str | None = None
# Prompt 参数列表,可为 None
arguments: list[PromptArgument] | None = None
# Prompt 标题,可为 None
title: str | None = None
# 定义 prompts/list 请求结构体
# 定义 prompts/list 请求的数据模型
class ListPromptsRequest(MCPModel):
# 固定方法字段
method: Literal["prompts/list"] = "prompts/list"
# 请求参数,可为 None
params: dict[str, Any] | None = None
# 定义 prompts/list 响应结构体
# 定义 prompts/list 响应的数据模型
class ListPromptsResult(MCPModel):
# 返回的 Prompt 列表
prompts: list[Prompt] = []
# 下一页游标,可为 None
next_cursor: str | None = None
# 定义 prompts/get 请求参数结构体
# 定义 prompts/get 的参数模型
class GetPromptRequestParams(MCPModel):
# Prompt 名称
name: str = ""
# 传递给 Prompt 的参数,可为 None
arguments: dict[str, str] | None = None
# 定义 prompts/get 请求结构体
# 定义 prompts/get 请求的数据模型
class GetPromptRequest(MCPModel):
# 固定方法字段
method: Literal["prompts/get"] = "prompts/get"
# 请求参数,可为 None
params: GetPromptRequestParams | None = None
# 定义 Prompt 消息结构体(role + content)
# 定义表示 Prompt 里的单条消息的数据模型
class PromptMessage(MCPModel):
# 消息角色(user 或 assistant)
role: Literal["user", "assistant"] = "user"
# 消息内容,可以是 TextContent 或字典,默认为空文本
content: TextContent | dict[str, Any] = Field(default_factory=lambda: TextContent(text=""))
# 字段校验器:在模型初始化前解析 content 字段
@field_validator("content", mode="before")
@classmethod
def _parse_content(cls, v):
# 如果是字典且 type 为 "text",转换为 TextContent 实例
if isinstance(v, dict) and v.get("type") == "text":
return TextContent(text=v.get("text", ""))
# 否则原样返回
return v
# 定义 prompts/get 响应结构体
# 定义 prompts/get 的响应数据模型
class GetPromptResult(MCPModel):
# Prompt 的描述,可为 None
description: str | None = None
# Prompt 返回的消息列表
messages: list[PromptMessage] = []
# 定义调用工具请求参数结构体
class CallToolRequestParams(MCPModel):
# 工具名称
name: str = ""
# 可选字段:输入参数,为字典类型或 None
+ arguments: dict[str, Any] | None = None
# 可选:元数据,含 progressToken 时服务端可发送进度通知
+ meta: dict[str, Any] | None = Field(default=None, alias="_meta")
# 定义调用工具请求结构体
class CallToolRequest(MCPModel):
# 方法名,固定为 "tools/call"
method: Literal["tools/call"] = "tools/call"
# 可选字段:参数,类型为 CallToolRequestParams
params: CallToolRequestParams = None
# 定义调用工具响应结构体
class CallToolResult(MCPModel):
# 内容字段,由 TextContent 或字典组成的列表,默认为空列表
content: list[TextContent | dict[str, Any]] = Field(default_factory=list)
# 结构化内容字段,可为字典或 None
structured_content: dict[str, Any] | None = None
# 错误标志字段,标识是否为错误结果,默认为 False
is_error: bool = False
# 针对 content 字段的字段校验器,模型初始化前调用
@field_validator("content", mode="before")
@classmethod
def _parse_content(cls, v):
# 如果传入的值不是列表,直接返回
if not isinstance(v, list):
return v
# 初始化输出列表
out = []
# 遍历每一项
for item in v:
# 如果项是字典且类型为 "text",转换为 TextContent 实例
if isinstance(item, dict) and item.get("type") == "text":
out.append(TextContent(text=item.get("text", "")))
# 否则,原样加入输出列表
else:
out.append(item)
# 返回处理后的列表
return out 8.工作流程 #
8.1 功能概述 #
该部分实现了 MCP 的 Context 上下文机制:在工具执行时注入 Context,支持:
- read_resource:读取资源
- report_progress:发送进度通知
- debug / info:发送日志通知
- request_id:获取当前请求 ID
客户端通过 progress_callback、message_handler、logging_callback 接收这些通知。
8.2 各模块修改说明 #
8.2.1. context_client.py(客户端入口) #
- 使用同步 API:
with stdio_client、ClientSession - 定义三个回调:
on_logging:处理notifications/message日志on_message:处理notifications/progress等通用通知on_progress:专门处理进度
- 调用
call_tool时传入progress_callback、message_handler、logging_callback,以接收服务端推送的通知
8.2.2. context_server.py(服务端入口) #
- 注册工具
long_running_task(task_name, ctx: Context, steps=5) - 通过
ctx调用debug、info、read_resource、report_progress - 使用
steps = int(steps)防止客户端传入字符串导致range()报错
8.2.3. session.py(客户端会话) #
_request增加progress_callback、message_handler、logging_callback- 收到无
id的通知时,按method分发:notifications/progress→progress_callbacknotifications/message→logging_callback和message_handler
call_tool在提供progress_callback时,在meta.progressToken中附带当前请求 ID,供服务端发送进度通知
8.2.4. fastmcp/init.py(服务端核心) #
- Context 类:封装
read_resource、report_progress、debug、info,内部通过_send收集通知 - _find_context_parameter:识别工具函数中的
ctx: Context参数 - _Tool.run:在调用工具前注入
Context,并传入mcp_server、request_id、progress_token、send_notification - tools/call 处理:从
params.meta取progressToken,创建_send_notification收集通知,工具执行完后若有通知则返回(notifications, resp)
8.2.5. stdio.py(服务端传输) #
- 若 handler 返回
(notifications, response),先逐条写出通知,再写出最终响应
8.2.6. types.py(类型定义) #
ProgressNotificationParams:进度通知参数LoggingMessageNotificationParams:日志通知参数CallToolRequestParams.meta:携带progressToken的元数据
8.3 关键流程说明 #
- progressToken 传递:客户端在
call_tool时把_req_id作为progressToken放入params._meta,服务端据此发送进度通知。 - 通知与响应顺序:服务端在工具执行期间收集通知,先通过 stdout 发送所有通知,再发送最终 JSON-RPC 响应。
- 客户端消息循环:
_request在等待响应时持续读取消息;无id的视为通知并分发给回调,有id且与rid匹配的视为本次请求的响应并返回。 - Context 注入:
_find_context_parameter识别ctx: Context,_Tool.run在调用工具前构造Context并注入到参数中。