1. 采样 #
采样(sampling)是指模型根据已有对话内容(如用户输入和上下文),自动生成新的回复内容的能力。MCP 协议定义了标准的采样 RPC 接口,便于客户端与服务器对接不同大模型或采样后端,如 OpenAI、DeepSeek 或本地模型。
采样接口主要包括:
sampling/createMessage:服务端向客户端发起请求,由客户端按照指定历史消息、温度、最大 token 数等参数,调用 LLM 采样生成“助手”类回复(例如 chat)。- 消息格式严格区分角色(role, 如: user, assistant)及内容结构(如富文本类型 TextContent),支持 stop_sequences 等采样参数,接口易于扩展。
- 客户端(持有 LLM/API 访问权限)在收到采样请求后,将消息和采样参数转换为目标模型 API(如 OpenAI/DeepSeek)所需格式,发起实际采样调用,并适配响应内容,统一 MCP 消息结构返回给服务端。
- 客户端和服务端均通过 JSONRPC 协议传递消息,并支持流式/异步实现。
- 相关结构体如
CreateMessageRequestParams(请求)、CreateMessageResult(回复)等在 types 模块中定义,方便类型校验和代码提示。
实际采样工作流通常如下:
- 服务端(MCP 服务端,如带有 generate_poem 等工具)在工具执行时需要 LLM 能力时,向客户端发起
sampling/createMessage请求; - 客户端(MCP 客户端,持有 LLM/API 访问权限)收到请求后,将消息和采样参数转换为目标模型 API 所需格式,发起实际采样调用;
- 模型后端生成回复内容(assistant),客户端封装为 MCP 协议定义的
CreateMessageResult格式返回给服务端; - 服务端收到回复,将结果返回给工具执行流程。
此设计实现了消息结构和模型能力的解耦,便于后续快速适配和扩展不同的采样后端。
2. sampling_client.py #
sampling_client.py
# 导入异步编程相关模块
import asyncio
# 导入操作系统相关模块
import os
# 导入系统相关模块
import sys
# 导入httpx用于异步HTTP请求
import httpx
# 从mcp_lite包中导入相关类型和类
from mcp_lite import ClientSession, StdioServerParameters, types
# 导入stdio_client用于进程间标准输入输出通信
from mcp_lite.client.stdio import stdio_client
# 定义DeepSeek API的URL
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
# 定义DeepSeek所用的模型名称
DEEPSEEK_MODEL = "deepseek-chat"
# 定义一个工具函数,用于将MCP消息格式转换为DeepSeek API所需格式
def _messages_to_deepseek(messages: list) -> list[dict]:
# """将 MCP SamplingMessage 转为 DeepSeek API 格式。"""
out = []
# 遍历每条消息
for msg in messages:
# 检查消息对象是否有role和content属性
if not hasattr(msg, "role") or not hasattr(msg, "content"):
continue
# 取出角色role属性,默认为user
role = getattr(msg, "role", "user")
# 取出内容content属性,若无则为空字符串
c = getattr(msg, "content", "")
# 如果内容是文本类型,则直接取文本
if isinstance(c, types.TextContent) and c.type == "text":
out.append({"role": role, "content": c.text})
# 如果内容是一个内容块的列表
elif isinstance(c, list):
parts = []
# 遍历每一个内容块
for block in c:
# 如果每个内容块还是文本,则加入parts
if isinstance(block, types.TextContent) and block.type == "text":
parts.append(block.text)
# 多段文本拼接
if parts:
out.append({"role": role, "content": "\n".join(parts)})
# 如果内容本身就是字符串
elif isinstance(c, str):
out.append({"role": role, "content": c})
# 返回整理后的DeepSeek消息列表
return out
# 定义异步函数,用于调用DeepSeek API获取回复
async def _call_deepseek(messages: list, max_tokens: int = 200) -> str:
# """调用 DeepSeek API 获取回复。"""
# 从环境变量获取DeepSeek的API Key,若无则用缺省值
api_key = os.environ.get("DEEPSEEK_API_KEY", "sk-2c101a720e054053aeabffda603e755b").strip()
# 如果没有有效的API Key,则抛出异常
if not api_key:
raise ValueError(
"未设置 DEEPSEEK_API_KEY 环境变量,请从 https://platform.deepseek.com 获取 API Key"
)
# 构造请求载荷
payload = {
"model": DEEPSEEK_MODEL,
"messages": messages,
"max_tokens": max_tokens,
"temperature": 0.7,
}
# 使用httpx异步客户端发送POST请求
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
DEEPSEEK_API_URL,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json=payload,
)
# 若请求失败则抛异常
resp.raise_for_status()
# 解析JSON响应体
data = resp.json()
# 获取第一条返回的choice
choice = data.get("choices", [{}])[0]
# 提取消息内容
msg = choice.get("message", {})
# 返回内容,如果为空则返回空字符串
return msg.get("content", "") or ""
# 定义异步回调函数,处理采样请求
async def handle_sampling(
context,
params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult:
# """处理服务端发起的 sampling/createMessage 请求,调用 DeepSeek API。"""
# 获取请求输入的消息列表
messages = params.messages or []
# 转换为deepseek可识别的消息格式
deepseek_messages = _messages_to_deepseek(messages)
# 若消息为空则回复错误
if not deepseek_messages:
reply = "未收到有效消息内容。"
else:
# 获取最大token数,兼容旧新参数名
max_tokens = getattr(params, "max_tokens", None) or getattr(
params, "maxTokens", 200
)
try:
# 调用DeepSeek获取回复内容
reply = await _call_deepseek(deepseek_messages, max_tokens=max_tokens)
except Exception as e:
# 错误处理
reply = f"DeepSeek API 调用失败:{e}"
print(f"[Sampling] {reply}", file=sys.stderr)
# 在标准错误输出打印reply的前80个字符预览
print(f"[Sampling] DeepSeek 回复: {reply[:80]}...", file=sys.stderr)
# 返回CreateMessageResult对象,内容作为TextContent封装
return types.CreateMessageResult(
role="assistant",
content=types.TextContent(type="text", text=reply),
model=DEEPSEEK_MODEL,
stop_reason="endTurn",
)
# 主函数,负责与采样服务交互和演示
def main():
# """主函数(mcp_lite 使用同步 API)。"""
# 获取当前文件的目录
base_dir = os.path.dirname(os.path.abspath(__file__))
# 拼接得到采样服务端脚本路径
server_path = os.path.join(base_dir, "sampling_server.py")
# 设置采样服务端的启动参数,使用Python解释器
server_params = StdioServerParameters(
command="python",
args=[server_path],
env={},
)
# 启动stdio客户端,与采样服务端通信
with stdio_client(server_params) as (read, write):
# 初始化客户端会话,注册采样回调
session = ClientSession(read, write, sampling_callback=handle_sampling)
# 执行初始化命令,获取服务信息
result = session.initialize()
# 获取服务端名称(兼容旧新字段)
info = getattr(result, "server_info", None) or getattr(result, "serverInfo", None)
server_name = info.name if info else "unknown"
# 打印服务端名称到标准错误
print(f"[Server] {server_name}", file=sys.stderr)
# 获取所有可用工具
tools_result = session.list_tools()
# 提取工具名称列表
tool_names = [t.name for t in tools_result.tools]
# 打印工具名称到标准错误
print(f"[Tools] {tool_names}", file=sys.stderr)
# 演示 1:调用generate_poem工具
print("\n--- 演示 generate_poem(生成诗歌)---", file=sys.stderr)
r1 = session.call_tool("generate_poem", arguments={"topic": "自然"})
# 若工具返回内容则遍历打印每个内容块
if r1.content:
for block in r1.content:
if hasattr(block, "text"):
print(f"[Result] {block.text}")
# 演示 2:调用summarize工具
print("\n--- 演示 summarize(文本摘要)---", file=sys.stderr)
r2 = session.call_tool(
"summarize",
arguments={"text": "人工智能正在改变我们的生活方式,从智能助手到自动驾驶,技术无处不在。"},
)
# 打印返回内容到标准输出
if r2.content:
for block in r2.content:
if hasattr(block, "text"):
print(f"[Result] {block.text}")
# 判断是否作为主程序入口执行
if __name__ == "__main__":
# 若支持reconfigure,则设置标准输入输出为utf-8编码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
# 执行主函数
main()
官方代码
# 导入异步编程相关模块
import asyncio
# 导入操作系统相关模块
import os
# 导入系统相关模块
import sys
# 导入httpx用于异步HTTP请求
import httpx
# 从mcp_lite包中导入相关类型和类
from mcp import ClientSession, StdioServerParameters, types
# 导入stdio_client用于进程间标准输入输出通信
from mcp.client.stdio import stdio_client
# 定义DeepSeek API的URL
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
# 定义DeepSeek所用的模型名称
DEEPSEEK_MODEL = "deepseek-chat"
# 定义一个工具函数,用于将MCP消息格式转换为DeepSeek API所需格式
def _messages_to_deepseek(messages: list) -> list[dict]:
# """将 MCP SamplingMessage 转为 DeepSeek API 格式。"""
out = []
# 遍历每条消息
for msg in messages:
# 检查消息对象是否有role和content属性
if not hasattr(msg, "role") or not hasattr(msg, "content"):
continue
# 取出角色role属性,默认为user
role = getattr(msg, "role", "user")
# 取出内容content属性,若无则为空字符串
c = getattr(msg, "content", "")
# 如果内容是文本类型,则直接取文本
if isinstance(c, types.TextContent) and c.type == "text":
out.append({"role": role, "content": c.text})
# 如果内容是一个内容块的列表
elif isinstance(c, list):
parts = []
# 遍历每一个内容块
for block in c:
# 如果每个内容块还是文本,则加入parts
if isinstance(block, types.TextContent) and block.type == "text":
parts.append(block.text)
# 多段文本拼接
if parts:
out.append({"role": role, "content": "\n".join(parts)})
# 如果内容本身就是字符串
elif isinstance(c, str):
out.append({"role": role, "content": c})
# 返回整理后的DeepSeek消息列表
return out
# 定义异步函数,用于调用DeepSeek API获取回复
async def _call_deepseek(messages: list, max_tokens: int = 200) -> str:
# """调用 DeepSeek API 获取回复。"""
# 从环境变量获取DeepSeek的API Key,若无则用缺省值
api_key = os.environ.get("DEEPSEEK_API_KEY", "sk-2c101a720e054053aeabffda603e755b").strip()
# 如果没有有效的API Key,则抛出异常
if not api_key:
raise ValueError(
"未设置 DEEPSEEK_API_KEY 环境变量,请从 https://platform.deepseek.com 获取 API Key"
)
# 构造请求载荷
payload = {
"model": DEEPSEEK_MODEL,
"messages": messages,
"max_tokens": max_tokens,
"temperature": 0.7,
}
# 使用httpx异步客户端发送POST请求
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
DEEPSEEK_API_URL,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json=payload,
)
# 若请求失败则抛异常
resp.raise_for_status()
# 解析JSON响应体
data = resp.json()
# 获取第一条返回的choice
choice = data.get("choices", [{}])[0]
# 提取消息内容
msg = choice.get("message", {})
# 返回内容,如果为空则返回空字符串
return msg.get("content", "") or ""
# 定义异步回调函数,处理采样请求
async def handle_sampling(
context,
params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult:
# """处理服务端发起的 sampling/createMessage 请求,调用 DeepSeek API。"""
# 获取请求输入的消息列表
messages = params.messages or []
# 转换为deepseek可识别的消息格式
deepseek_messages = _messages_to_deepseek(messages)
# 若消息为空则回复错误
if not deepseek_messages:
reply = "未收到有效消息内容。"
else:
# 获取最大token数,兼容旧新参数名
max_tokens = getattr(params, "max_tokens", None) or getattr(
params, "maxTokens", 200
)
try:
# 调用DeepSeek获取回复内容
reply = await _call_deepseek(deepseek_messages, max_tokens=max_tokens)
except Exception as e:
# 错误处理
reply = f"DeepSeek API 调用失败:{e}"
print(f"[Sampling] {reply}", file=sys.stderr)
# 在标准错误输出打印reply的前80个字符预览
print(f"[Sampling] DeepSeek 回复: {reply[:80]}...", file=sys.stderr)
# 返回CreateMessageResult对象,内容作为TextContent封装
return types.CreateMessageResult(
role="assistant",
content=types.TextContent(type="text", text=reply),
model=DEEPSEEK_MODEL,
stop_reason="endTurn",
)
# 主函数,负责与采样服务交互和演示
async def main() -> None:
# """主函数(mcp_lite 使用同步 API)。"""
# 获取当前文件的目录
base_dir = os.path.dirname(os.path.abspath(__file__))
# 拼接得到采样服务端脚本路径
server_path = os.path.join(base_dir, "sampling_server.py")
# 设置采样服务端的启动参数,使用Python解释器
server_params = StdioServerParameters(
command="python",
args=[server_path],
env={},
)
# 启动stdio客户端,与采样服务端通信
async with stdio_client(server_params) as (read, write):
# 初始化客户端会话,注册采样回调
async with ClientSession(read, write, sampling_callback=handle_sampling) as session:
# 执行初始化命令,获取服务信息
result = await session.initialize()
# 获取服务端名称(兼容旧新字段)
info = getattr(result, "server_info", None) or getattr(result, "serverInfo", None)
server_name = info.name if info else "unknown"
# 打印服务端名称到标准错误
print(f"[Server] {server_name}", file=sys.stderr)
# 获取所有可用工具
tools_result = await session.list_tools()
# 提取工具名称列表
tool_names = [t.name for t in tools_result.tools]
# 打印工具名称到标准错误
print(f"[Tools] {tool_names}", file=sys.stderr)
# 演示 1:调用generate_poem工具
print("\n--- 演示 generate_poem(生成诗歌)---", file=sys.stderr)
r1 = await session.call_tool("generate_poem", arguments={"topic": "自然"})
# 若工具返回内容则遍历打印每个内容块
if r1.content:
for block in r1.content:
if hasattr(block, "text"):
print(f"[Result] {block.text}")
# 演示 2:调用summarize工具
print("\n--- 演示 summarize(文本摘要)---", file=sys.stderr)
r2 = await session.call_tool(
"summarize",
arguments={"text": "人工智能正在改变我们的生活方式,从智能助手到自动驾驶,技术无处不在。"},
)
# 打印返回内容到标准输出
if r2.content:
for block in r2.content:
if hasattr(block, "text"):
print(f"[Result] {block.text}")
# 判断是否作为主程序入口执行
if __name__ == "__main__":
# 若支持reconfigure,则设置标准输入输出为utf-8编码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
# 执行主函数
asyncio.run(main())
3. sampling_server.py #
sampling_server.py
# 导入sys模块,用于系统输入输出
import sys
# 从mcp_lite.server.fastmcp导入Context和FastMCP类
from mcp_lite.server.fastmcp import Context, FastMCP
# 从mcp_lite.types导入SamplingMessage和TextContent类型
from mcp_lite.types import SamplingMessage, TextContent
# 创建FastMCP实例,服务名为“采样服务”
mcp = FastMCP(name="采样服务")
# 注册工具generate_poem,生成诗歌
@mcp.tool()
async def generate_poem(topic: str, ctx) -> str:
# """根据主题生成一首短诗(通过客户端 LLM 采样)。"""
# 构造诗歌生成的提示语
prompt = f"请用中文写一首关于「{topic}」的短诗"
# 调用客户端会话采样接口生成回复
result = await ctx.session.create_message(
messages=[
SamplingMessage(
role="user", # 角色为用户
content=TextContent(type="text", text=prompt), # 构造文本内容
)
],
max_tokens=100, # 限制最大输出token数量
)
# 如果回复内容是文本类型,则返回文本;否则返回内容的字符串表达
if hasattr(result.content, "type") and result.content.type == "text":
return result.content.text
return str(result.content)
# 注册工具summarize,进行摘要
@mcp.tool()
async def summarize(text: str, ctx) -> str:
# """对给定文本进行摘要(通过客户端 LLM 采样)。"""
# 构造摘要的提示语
prompt = f"请用一句话概括以下文本:\n\n{text}"
# 调用客户端会话采样接口生成摘要
result = await ctx.session.create_message(
messages=[
SamplingMessage(
role="user", # 角色为用户
content=TextContent(type="text", text=prompt), # 构造文本内容
)
],
max_tokens=80, # 限制最大输出token数量
)
# 如果回复内容是文本类型,则返回文本;否则返回内容的字符串表达
if hasattr(result.content, "type") and result.content.type == "text":
return result.content.text
return str(result.content)
# 定义主函数,启动服务
def main():
# 打印服务启动信息到标准错误输出
print("启动采样服务...", file=sys.stderr)
# 打印可用工具列表到标准错误输出
print("可用工具:generate_poem(生成诗歌), summarize(文本摘要)", file=sys.stderr)
# 以stdio为传输方式运行mcp服务
mcp.run(transport="stdio")
# 判断是否作为主程序运行
if __name__ == "__main__":
try:
# 检查和设置标准输入输出的编码为utf-8(兼容Windows等环境)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
# 调用主函数
main()
# 捕获键盘中断异常,优雅退出
except KeyboardInterrupt:
print("\n服务器已停止", file=sys.stderr)
# 捕获其它异常并打印错误信息
except Exception as e:
print(f"服务器启动失败:{e}", file=sys.stderr)
官方代码
# 导入sys模块,用于系统输入输出
import sys
# 从mcp_lite.server.fastmcp导入Context和FastMCP类
from mcp.server.fastmcp import Context, FastMCP
# 从mcp_lite.types导入SamplingMessage和TextContent类型
from mcp.types import SamplingMessage, TextContent
# 创建FastMCP实例,服务名为“采样服务”
mcp = FastMCP(name="采样服务")
# 注册工具generate_poem,生成诗歌
@mcp.tool()
async def generate_poem(topic: str, ctx: Context) -> str:
# """根据主题生成一首短诗(通过客户端 LLM 采样)。"""
# 构造诗歌生成的提示语
prompt = f"请用中文写一首关于「{topic}」的短诗"
# 调用客户端会话采样接口生成回复
result = await ctx.session.create_message(
messages=[
SamplingMessage(
role="user", # 角色为用户
content=TextContent(type="text", text=prompt), # 构造文本内容
)
],
max_tokens=100, # 限制最大输出token数量
)
# 如果回复内容是文本类型,则返回文本;否则返回内容的字符串表达
if hasattr(result.content, "type") and result.content.type == "text":
return result.content.text
return str(result.content)
# 注册工具summarize,进行摘要
@mcp.tool()
async def summarize(text: str, ctx: Context) -> str:
# """对给定文本进行摘要(通过客户端 LLM 采样)。"""
# 构造摘要的提示语
prompt = f"请用一句话概括以下文本:\n\n{text}"
# 调用客户端会话采样接口生成摘要
result = await ctx.session.create_message(
messages=[
SamplingMessage(
role="user", # 角色为用户
content=TextContent(type="text", text=prompt), # 构造文本内容
)
],
max_tokens=80, # 限制最大输出token数量
)
# 如果回复内容是文本类型,则返回文本;否则返回内容的字符串表达
if hasattr(result.content, "type") and result.content.type == "text":
return result.content.text
return str(result.content)
# 定义主函数,启动服务
def main():
# 打印服务启动信息到标准错误输出
print("启动采样服务...", file=sys.stderr)
# 打印可用工具列表到标准错误输出
print("可用工具:generate_poem(生成诗歌), summarize(文本摘要)", file=sys.stderr)
# 以stdio为传输方式运行mcp服务
mcp.run(transport="stdio")
# 判断是否作为主程序运行
if __name__ == "__main__":
try:
# 检查和设置标准输入输出的编码为utf-8(兼容Windows等环境)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
# 调用主函数
main()
# 捕获键盘中断异常,优雅退出
except KeyboardInterrupt:
print("\n服务器已停止", file=sys.stderr)
# 捕获其它异常并打印错误信息
except Exception as e:
print(f"服务器启动失败:{e}", file=sys.stderr)
4. session.py #
mcp_lite/client/session.py
import sys
# 导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 导入 asyncio 用于运行异步 elicitation 回调
import asyncio
# 从 mcp_lite.types 模块导入相关类型和常量
from mcp_lite.types import ( # 从 mcp_lite.types 模块导入以下类型
InitializeRequestParams, # 初始化请求参数类型
InitializeRequest, # 初始化请求类型
LATEST_PROTOCOL_VERSION, # 最新协议版本常量
ClientCapabilities, # 客户端能力类型
ElicitationCapability, # Elicitation 能力类型
FormElicitationCapability, # Form elicitation 能力
UrlElicitationCapability, # URL elicitation 能力
+ SamplingCapability, # Sampling 能力类型
+ CreateMessageRequestParams, # Sampling 请求参数类型
+ CreateMessageResult, # Sampling 响应结果类型
Implementation, # 实现信息类型
InitializedNotification, # 初始化完成通知类型
ElicitRequestFormParams, # Elicitation 请求参数(form)
ElicitResult, # Elicitation 结果类型
InitializeResult, # 初始化结果类型
JSONRPCRequest, # JSONRPC请求类型
JSONRPCResponse, # JSONRPC响应类型
JSONRPCError, # JSONRPC错误类型
+ ErrorData, # JSONRPC 错误数据
JSONRPCNotification, # JSONRPC通知类型
LoggingMessageNotificationParams, # 日志消息通知参数
ListToolsRequest, # 工具列表请求类型
ListToolsResult, # 工具列表结果类型
CallToolResult, # 调用工具响应类型
CallToolRequest, # 调用工具请求类型
CallToolRequestParams, # 调用工具请求参数类型
ListResourcesRequest, # 资源列表请求类型
ListResourcesResult, # 资源列表响应类型
ListResourceTemplatesRequest, # 资源模板列表请求类型
ListResourceTemplatesResult, # 资源模板列表响应类型
ReadResourceRequest, # 读取资源请求类型
ReadResourceRequestParams, # 读取资源请求参数类型
ReadResourceResult, # 读取资源响应类型
ListPromptsRequest, # Prompt 列表请求类型
ListPromptsResult, # Prompt 列表响应类型
GetPromptRequest, # 获取 Prompt 请求类型
GetPromptRequestParams, # 获取 Prompt 请求参数类型
GetPromptResult, # 获取 Prompt 响应类型
CompleteRequest, # 补全请求类型
CompleteRequestParams, # 补全请求参数类型
CompleteResult, # 补全响应类型
CompletionArgument, # 补全参数类型
CompletionContext, # 补全上下文类型
)
# 定义 MCPError 异常类,用于表示协议相关错误
class MCPError(Exception):
# 初始化方法,接收错误码、错误信息和可选的数据
def __init__(self, code, message, data=None):
# 将错误码、错误信息、附加数据赋值为实例属性
self.code, self.message, self.data = code, message, data
# 调用父类 Exception 的初始化,只传递错误信息
super().__init__(message)
# 类方法,利用 JSONRPCError 对象创建 MCPError 实例
@classmethod
def from_jsonrpc(cls, err):
# 从 JSONRPCError.error 对象中取出 code/message/data 创建 MCPError
return cls(err.error.code, err.error.message, err.error.data)
# 定义 ClientSession 类,用于描述客户端会话
class ClientSession:
# 构造方法,接收读取流和写入流,可选 elicitation_callback、sampling_callback
+ def __init__(self, read_stream, write_stream, elicitation_callback=None, sampling_callback=None):
# 赋值读取流和写入流
self._read, self._write = read_stream, write_stream
# 初始化请求 id 计数器为 0
self._req_id = 0
# elicitation 回调:当服务端发送 elicitation/create 时调用,返回 ElicitResult
self._elicitation_callback = elicitation_callback
# sampling 回调:当服务端发送 sampling/createMessage 时调用,返回 CreateMessageResult
+ self._sampling_callback = sampling_callback
# 内部方法,发送请求并同步等待响应
def _request(self, req, progress_callback=None, message_handler=None, logging_callback=None):
# 当前请求 id
rid = self._req_id
# 自增请求 id,确保下次唯一
self._req_id += 1
# 对请求对象进行序列化,转为 dict
d = req.model_dump(by_alias=True, mode="json", exclude_none=True)
# 构建 JSONRPCRequest,再封装入 SessionMessage,通过写入流发送
jreq = JSONRPCRequest(jsonrpc="2.0", id=rid, method=d["method"], params=d.get("params"))
# 将请求对象封装成 SessionMessage 后发送
self._write.send(SessionMessage(message=jreq))
# 循环等待响应
while True:
# 从读取流取出一条消息
msg = self._read.get()
# 若取到 None,说明连接断开,抛出异常
if msg is None:
raise MCPError(-32000, "Connection closed")
# 若消息类型不是 SessionMessage,忽略继续等
if not isinstance(msg, SessionMessage):
continue
# 取消息内容
m = msg.message
# 服务端发起的 elicitation/create 请求(有 id 和 method)
# 判断是否有 elicitation 回调,且消息 m 有 id 且 method 为 "elicitation/create"
if (
self._elicitation_callback
and getattr(m, "id", None) is not None
and getattr(m, "method", "") == "elicitation/create"
):
# 获取消息中的 params 字段,若没有则为 {}
params_raw = getattr(m, "params", None) or {}
try:
# 尝试用 ElicitRequestFormParams 校验并解析参数
params = ElicitRequestFormParams.model_validate(params_raw, by_name=False)
except Exception:
# 校验失败则降级为原始 dict,不做结构检查
params = params_raw
try:
# 调用回调处理表单请求,异步支持
result = self._elicitation_callback(None, params)
# 检查回调是否为协程,如果是则用 asyncio.run 执行
if asyncio.iscoroutine(result):
result = asyncio.run(result)
# 若返回类型正确,构造正常响应
if isinstance(result, ElicitResult):
resp = JSONRPCResponse(
jsonrpc="2.0",
id=m.id,
result=result.model_dump(by_alias=True, exclude_none=True)
)
else:
# 若返回结果不是 ElicitResult,则以 cancel 响应
resp = JSONRPCResponse(
jsonrpc="2.0",
id=m.id,
result=ElicitResult(action="cancel").model_dump(by_alias=True, exclude_none=True)
)
# 发送封装后的响应 SessionMessage
self._write.send(SessionMessage(message=resp))
except Exception:
# 回调过程异常,反馈 cancel,保证流程不挂死
resp = JSONRPCResponse(
jsonrpc="2.0",
id=m.id,
result=ElicitResult(action="cancel").model_dump(by_alias=True, exclude_none=True)
)
self._write.send(SessionMessage(message=resp))
# 处理完表单请求,进入下一个消息处理循环
continue
# 服务端发起 sampling/createMessage 请求
+ if (
+ self._sampling_callback
+ and getattr(m, "id", None) is not None
+ and getattr(m, "method", "") == "sampling/createMessage"
+ ):
# 获取消息中的 params 字段,若无则为空字典
+ params_raw = getattr(m, "params", None) or {}
+ try:
# 使用 CreateMessageRequestParams 校验&解析参数(尽量保证类型正确)
+ params = CreateMessageRequestParams.model_validate(params_raw, by_name=False)
+ except Exception:
# 如果解析失败,则降级为原始入参
+ params = params_raw
+ try:
# 调用采样回调函数,传入参数(支持协程和普通函数)
+ result = self._sampling_callback(None, params)
# 若回调为异步协程,则用 asyncio.run 执行
+ if asyncio.iscoroutine(result):
+ result = asyncio.run(result)
# 如果回调结果是 CreateMessageResult,则封装为正常响应
+ if isinstance(result, CreateMessageResult):
+ resp = JSONRPCResponse(
+ jsonrpc="2.0",
+ id=m.id,
+ result=result.model_dump(by_alias=True, exclude_none=True),
+ )
+ else:
# 若返回类型不符,则回包内部错误
+ resp = JSONRPCError(
+ jsonrpc="2.0",
+ id=m.id,
+ error=ErrorData(code=-32603, message="Sampling callback returned invalid result"),
+ )
# 发送封装后的 SessionMessage 给服务端
+ self._write.send(SessionMessage(message=resp))
+ except Exception as ex:
# 回调发生异常,返回 error 数据包
+ resp = JSONRPCError(
+ jsonrpc="2.0",
+ id=m.id,
+ error=ErrorData(code=-32603, message=str(ex)),
+ )
+ self._write.send(SessionMessage(message=resp))
# 处理完采样请求,进入下个消息循环
+ continue
# 判断当前消息是否为通知(即没有 id 字段且具有 method 字段)
if getattr(m, "id", None) is None and hasattr(m, "method"):
# 获取 method 名称
method = getattr(m, "method", "")
# 获取 params 内容(默认为空字典)
params = getattr(m, "params", None) or {}
# 若为进度通知,且传入了进度回调 progress_callback
if method == "notifications/progress" and progress_callback:
# 调用进度回调,传递 progress/total/message 参数
progress_callback(
params.get("progress", 0),
params.get("total"),
params.get("message"),
)
# 若为普通日志/信息通知
elif method == "notifications/message":
# 如果提供日志回调 logging_callback,则尝试解析为日志参数类型并调用
if logging_callback:
try:
# 使用 LoggingMessageNotificationParams 类型校验参数
p = LoggingMessageNotificationParams.model_validate(params, by_name=False)
# 调用日志回调
logging_callback(p)
except Exception:
# 校验失败则直接将原始 params 传递给日志回调
logging_callback(params)
# 如果指定了通用消息处理回调,则调用
if message_handler:
message_handler(m)
# 处理完通知类型后,继续等待下一个消息
continue
# 若为 Response 或 Error,且 id 匹配
if isinstance(m, (JSONRPCResponse, JSONRPCError)) and getattr(m, "id", None) == rid:
# 若为错误,抛出 MCPError 异常
if isinstance(m, JSONRPCError):
raise MCPError.from_jsonrpc(m)
# 为正常响应则返回结果数据
return m.result
# 内部方法,发送通知消息
def _notify(self, n):
# 通知对象序列化为 dict
d = n.model_dump(by_alias=True, mode="json", exclude_none=True)
# 封装为 JSONRPCNotification,再封装成 SessionMessage 后发出
self._write.send(SessionMessage(
message=JSONRPCNotification(
jsonrpc="2.0",
method=d["method"],
params=d.get("params"),
)
))
# 初始化过程,发送初始化请求,收到响应后再发初始化完成通知
def initialize(self):
# 若提供 elicitation_callback 则声明 ElicitationCapability
caps = ClientCapabilities()
# 如果存在 elicitation_callback 回调,则配置客户端能力中的 elicitation 字段
if self._elicitation_callback:
# 为 elicitation 能力赋值,包括 form 和 url 两种能力
caps.elicitation = ElicitationCapability(
form=FormElicitationCapability(),
url=UrlElicitationCapability()
)
# 如果存在 sampling_callback 回调,则声明 SamplingCapability
+ if self._sampling_callback:
+ caps.sampling = SamplingCapability()
# 构造初始化请求,并发送等待返回
r = self._request(InitializeRequest(
params=InitializeRequestParams(
protocol_version=LATEST_PROTOCOL_VERSION, # 协议版本号
capabilities=caps, # 客户端能力
client_info=Implementation(name="mcp_lite", version="0.1.0"), # 客户端信息
)
))
# 发送"初始化完成"通知
self._notify(InitializedNotification())
# 用 InitializeResult 验证响应并返回
return InitializeResult.model_validate(r, by_name=False)
def list_tools(self):
# 发送 ListToolsRequest 请求,使用 ListToolsResult 校验并返回
return ListToolsResult.model_validate(self._request(ListToolsRequest()), by_name=False)
# 调用指定工具方法,传入工具名和参数
# 定义 call_tool 方法,用于调用指定名称的工具
def call_tool(self, name, arguments=None, progress_callback=None, message_handler=None, logging_callback=None):
# 如果提供了 progress_callback,则将当前请求 id 作为 progressToken 加入 meta,便于服务端推送进度通知
rid = self._req_id
meta = {"progressToken": rid} if progress_callback else None
# 构建工具调用请求参数,包含名称、参数和 meta 信息
params = CallToolRequestParams(name=name, arguments=arguments or {}, meta=meta)
# 调用 _request 方法发送工具调用请求,并用 CallToolResult 校验和结构化响应
return CallToolResult.model_validate(
self._request(
CallToolRequest(params=params),
progress_callback=progress_callback, # 进度回调函数
message_handler=message_handler, # 通用消息处理回调
logging_callback=logging_callback, # 日志回调函数
),
by_name=False, # 按字段名校验
)
# 列出已注册的静态资源
def list_resources(self):
# 调用 _request 方法发送 ListResourcesRequest 请求
# 使用 ListResourcesResult 的 model_validate 进行结果校验和结构化
return ListResourcesResult.model_validate(
self._request(ListResourcesRequest()),
by_name=False,
)
# 列出资源模板(带占位符的资源)
def list_resource_templates(self):
# 调用 _request 方法发送 ListResourceTemplatesRequest 请求
# 使用 ListResourceTemplatesResult 的 model_validate 进行结果校验和结构化
return ListResourceTemplatesResult.model_validate(
self._request(ListResourceTemplatesRequest()),
by_name=False,
)
# 读取指定 URI 的资源内容
def read_resource(self, uri):
# uri 可为 str 或 AnyUrl 类型
# 构造 ReadResourceRequestParams 并发送 ReadResourceRequest 请求
# 使用 ReadResourceResult 的 model_validate 进行结果校验和结构化
return ReadResourceResult.model_validate(
self._request(ReadResourceRequest(params=ReadResourceRequestParams(uri=str(uri)))),
by_name=False,
)
# 列出已注册的 Prompt
# 定义 list_prompts 方法,用于获取所有已注册的 Prompt
def list_prompts(self):
# 发送 ListPromptsRequest 请求,校验响应并结构化为 ListPromptsResult
return ListPromptsResult.model_validate(
self._request(ListPromptsRequest()), # 请求所有 Prompt
by_name=False, # 按字段名校验
)
# 请求 Prompt 或资源模板参数的补全建议
def complete(self, ref, argument, context_arguments=None):
# 构建补全上下文,若有 context_arguments 则封装为 CompletionContext
context = CompletionContext(arguments=context_arguments) if context_arguments else None
# 若 argument 为字典则转为 CompletionArgument
arg = argument if isinstance(argument, CompletionArgument) else CompletionArgument(**argument)
# 发送 CompleteRequest,校验并返回 CompleteResult
return CompleteResult.model_validate(
self._request(CompleteRequest(
params=CompleteRequestParams(ref=ref, argument=arg, context=context)
)),
by_name=False,
)
# 获取指定 Prompt 的内容
# 定义 get_prompt 方法,根据名称和参数获取指定 Prompt 的详情
def get_prompt(self, name, arguments=None):
# 发送 GetPromptRequest,请求参数包含指定名称和参数,响应结构化为 GetPromptResult
return GetPromptResult.model_validate(
self._request(GetPromptRequest( # 发送请求
params=GetPromptRequestParams( # 构造请求参数
name=name, # Prompt 名称
arguments=arguments or {} # Prompt 参数(默认为空字典)
)
)),
by_name=False, # 按字段名校验
)
5. init.py #
mcp_lite/server/fastmcp/init.py
# FastMCP 包:从原 fastmcp 模块导入所有内容以保持向后兼容
# 原 fastmcp.py 的内容已移入此包,通过 __init__ 导出
# 导入系统模块
import sys
# 导入 base64 编码库
import base64
# 导入 json 序列化
import json
# 导入正则表达式模块
import re
# 导入 url 解码工具
from urllib.parse import unquote
# 导入类型相关辅助函数
from typing import get_args, get_origin, get_type_hints
# 导入 SessionMessage 类型
from mcp_lite.message import SessionMessage
# 导入 stdio 通信模块
from mcp_lite.server import stdio
# 导入函数签名和异步工具
import inspect
import asyncio
from mcp_lite.types import ( # 导入 mcp_lite.types 模块中的多个类型
JSONRPCRequest, # JSONRPC 请求类型
JSONRPCNotification, # JSONRPC 通知类型
JSONRPCError, # JSONRPC 错误响应类型
ErrorData, # 错误数据类型
InitializeResult, # 初始化结果类型
LATEST_PROTOCOL_VERSION, # 最新协议版本号
ToolsCapability, # 工具相关能力描述类型
ResourcesCapability, # 资源相关能力描述类型
PromptsCapability, # Prompt 相关能力描述类型
CompletionsCapability, # 补全能力描述类型
ElicitationCapability, # Elicitation 能力类型
FormElicitationCapability, # Form elicitation 能力
UrlElicitationCapability, # URL elicitation 能力
+ SamplingCapability, # Sampling 能力类型
ServerCapabilities, # 服务器能力类型
+ CreateMessageRequestParams, # Sampling 请求参数类型
+ CreateMessageResult, # Sampling 响应结果类型
+ SamplingMessage, # Sampling 消息类型
Implementation, # 实现信息类型
JSONRPCResponse, # JSONRPC 响应类型
ListToolsResult, # 列出工具结果类型
Tool, # 单个工具类型
TextContent, # 文本内容类型
CallToolRequestParams, # 调用工具请求参数类型
CallToolResult, # 调用工具响应结果类型
ElicitResult, # Elicitation 结果类型
Resource, # 静态资源类型
ResourceTemplate, # 资源模板类型
ListResourcesResult, # 资源列表返回结果类型
ListResourceTemplatesResult, # 资源模板列表返回结果类型
ReadResourceRequestParams, # 读取资源请求参数类型
ReadResourceResult, # 读取资源请求响应类型
TextResourceContents, # 资源文本内容类型
BlobResourceContents, # 资源二进制内容类型
Prompt, # Prompt 类型
PromptArgument, # Prompt 参数类型
ListPromptsResult, # Prompt 列表结果类型
GetPromptRequestParams, # 获取 Prompt 请求参数类型
GetPromptResult, # 获取 Prompt 响应结果类型
PromptMessage, # Prompt 消息类型
Completion, # 补全结果内容类型
CompleteRequestParams, # 补全请求参数类型
CompleteResult, # 补全响应类型
)
# 导入 pydantic 的基础模型作为类型判定
from pydantic import BaseModel as PydanticBaseModel
# 导入 Typeddict 测试
from typing_extensions import is_typeddict as _is_typeddict
# 导入本包下 prompts 子模块
from . import prompts
# 辅助函数:提取函数输出 schema 及包裹需求
def _output_schema_and_wrap(fn, structured_output: bool):
# 若不是结构化输出,直接返回
if not structured_output:
return None, False
try:
# 获取函数签名
sig = inspect.signature(fn)
# 获取返回类型注解
ann = sig.return_annotation
# 如果没有注解,返回
if ann is inspect.Parameter.empty:
return None, False
except Exception:
return None, False
# 生成 schema
return _schema_from_annotation(ann, fn.__name__)
# 辅助函数:根据注解和函数名生成 schema
def _schema_from_annotation(ann, func_name: str):
# 没有注解直接返回
if ann is inspect.Parameter.empty:
return None, False
# 获取注解的原类型
origin = get_origin(ann)
wrap = False
schema = None
# 如果是 Pydantic 的模型子类
if PydanticBaseModel and isinstance(ann, type) and issubclass(ann, PydanticBaseModel):
schema = ann.model_json_schema()
return schema, False
# 如果是 Typeddict
if hasattr(ann, "__annotations__") and not (origin is dict or origin is list):
if _is_typeddict(ann):
hints = get_type_hints(ann) if hasattr(ann, "__annotations__") else {}
props = {}
for k, v in hints.items():
t = v
if t is int or t is type(None) and int in get_args(v):
props[k] = {"type": "integer"}
elif t is float:
props[k] = {"type": "number"}
elif t is str:
props[k] = {"type": "string"}
elif t is bool:
props[k] = {"type": "boolean"}
else:
props[k] = {"type": "string"}
schema = {"type": "object", "properties": props, "required": list(props)}
return schema, False
try:
hints = get_type_hints(ann)
if hints:
props = {}
for k, v in hints.items():
if v is int or v is type(None):
props[k] = {"type": "integer"}
elif v is float:
props[k] = {"type": "number"}
elif v is str:
props[k] = {"type": "string"}
elif v is bool:
props[k] = {"type": "boolean"}
elif get_origin(v) is list:
props[k] = {"type": "array", "items": {"type": "string"}}
else:
props[k] = {"type": "string"}
schema = {"type": "object", "properties": props}
return schema, False
except Exception:
pass
# 如果是 dict 类型
if origin is dict:
args = get_args(ann)
if len(args) == 2 and args[0] is str:
vt = args[1]
# 判断 value 类型
if vt is float:
schema = {"type": "object", "additionalProperties": {"type": "number"}}
elif vt is int:
schema = {"type": "object", "additionalProperties": {"type": "integer"}}
elif vt is str:
schema = {"type": "object", "additionalProperties": {"type": "string"}}
else:
schema = {"type": "object", "additionalProperties": {}}
return schema, False
wrap = True
# 如果是 list、基础类型等需要包裹
if origin is list or ann in (int, float, str, bool, type(None)):
wrap = True
# 包裹输出情况
if wrap:
result_schema = {"type": "string"}
if ann is int:
result_schema = {"type": "integer"}
elif ann is float:
result_schema = {"type": "number"}
elif ann is bool:
result_schema = {"type": "boolean"}
elif origin is list:
result_schema = {"type": "array", "items": {"type": "string"}}
schema = {"type": "object", "properties": {"result": result_schema}, "required": ["result"]}
return schema, True
return None, False
# 辅助:对象输出转换为结构化内容
def _to_structured(out, output_schema, wrap_output):
if output_schema is None:
return None
try:
# 如果是 Pydantic 模型
if PydanticBaseModel and isinstance(out, PydanticBaseModel):
return out.model_dump(mode="json")
# 如果需要包裹
if wrap_output:
return {"result": out}
# 如果是字典
if isinstance(out, dict):
return dict(out)
# 带有 __annotations__ 的对象,提取属性
if hasattr(out, "__annotations__"):
hints = get_type_hints(type(out)) if hasattr(type(out), "__annotations__") else getattr(out, "__annotations__", {})
return {k: getattr(out, k) for k in hints if hasattr(out, k)}
# 其它直接包裹
return {"result": out}
except Exception:
# 发生异常返回字符串
return {"result": str(out)}
# 根据函数参数生成 schema
def _schema(fn):
sig = inspect.signature(fn)
props = {}
req = []
for n, p in sig.parameters.items():
# 跳过以下划线开头的参数
if n.startswith("_"):
continue
# 跳过 Context 类型参数(由框架注入)
if _find_context_parameter(fn) == n:
continue
# 参数类型为字符串,标题为参数名
props[n] = {"type": "string", "title": n}
# 必填参数加入 required
if p.default is inspect.Parameter.empty:
req.append(n)
return {"type": "object", "properties": props, "required": req}
# 检测工具函数是否包含 Context 参数
def _find_context_parameter(fn):
"""若函数有 ctx: Context 参数,或参数名为 ctx,返回参数名;否则返回 None。"""
try:
sig = inspect.signature(fn)
hints = get_type_hints(fn) if hasattr(fn, "__annotations__") else {}
for name, param in sig.parameters.items():
ann = hints.get(name, param.annotation)
if ann is not inspect.Parameter.empty:
n = getattr(ann, "__name__", None) or str(ann)
if n == "Context" or (isinstance(n, str) and n.endswith(".Context")):
return name
# 约定:参数名为 ctx 时也视为 Context 注入
if name == "ctx":
return name
except Exception:
pass
return None
# 根据 prompt 函数生成 prompt schema
def _prompt_schema(fn):
"""根据函数签名生成 Prompt 参数 schema。"""
sig = inspect.signature(fn)
props = {}
req = []
for n, p in sig.parameters.items():
if n.startswith("_"):
continue
props[n] = {"type": "string", "title": n}
if p.default is inspect.Parameter.empty:
req.append(n)
return {"type": "object", "properties": props, "required": req}
# Sampling 会话适配器:通过 elicitation_session 向客户端发送 sampling/createMessage
+class _SessionAdapter:
# 为工具中的 ctx.session 提供 create_message 方法
+ """提供 create_message,供工具中 ctx.session 使用。"""
# 构造方法,接收 elicitation_session 对象
+ def __init__(self, elicitation_session):
# 保存 elicitation_session 对象
+ self._session = elicitation_session
# 异步方法,用于向客户端发起采样请求,获取 LLM 回复
+ async def create_message(self, messages, max_tokens=256, system_prompt=None, temperature=None, stop_sequences=None):
# 向客户端发起 LLM 采样请求,返回 CreateMessageResult。
+ """向客户端发起 LLM 采样请求,返回 CreateMessageResult。"""
# 如果 session 对象为 None,抛出运行时异常,说明不支持采样
+ if self._session is None:
+ raise RuntimeError("Sampling not supported: no session (stdio transport required)")
# 初始化一个空列表,用于存放转换后的消息
+ msgs = []
# 遍历传入的消息,每条消息都需要序列化处理
+ for m in messages or []:
# 获取消息的角色,默认为 user
+ role = getattr(m, "role", "user")
# 获取消息的内容属性,若没有则为 None
+ c = getattr(m, "content", None)
# 若内容为 None,不处理这条消息
+ if c is None:
+ continue
# 如果内容有 model_dump 方法,说明是 Pydantic 模型,直接序列化
+ if hasattr(c, "model_dump"):
+ content = c.model_dump(mode="json")
# 如果内容本身是个列表(可能为内容块列表),逐项序列化
+ elif isinstance(c, list):
+ content = [x.model_dump(mode="json") if hasattr(x, "model_dump") else {"type": "text", "text": str(x)} for x in c]
# 其它情况,全部按文本类型打包
+ else:
+ content = {"type": "text", "text": str(c)}
# 添加序列化后的消息(包含角色和内容)到 msgs
+ msgs.append({"role": role, "content": content})
# 用 CreateMessageRequestParams 构造参数,保证类型正确并兼容官方 MCP 客户端
+ params_obj = CreateMessageRequestParams(
+ messages=list(messages) if messages else [],
+ max_tokens=int(max_tokens),
+ system_prompt=system_prompt,
+ temperature=temperature,
+ stop_sequences=stop_sequences,
+ )
# 将参数对象序列化为 dict,并应用别名和去除 None 字段
+ params = params_obj.model_dump(by_alias=True, exclude_none=True, mode="json")
# 调用 session 的 send_request,RPC 远程调用采样接口
+ raw = self._session.send_request("sampling/createMessage", params)
# 将返回结果校验并转为 CreateMessageResult 类型实例,返回
+ return CreateMessageResult.model_validate(raw, by_name=False)
# 简化的 Elicitation 结果类型(与 ctx.elicit 返回的 result.data 兼容)
class _ElicitationResult:
"""elicit() 返回的结果,支持 result.action 和 result.data。"""
def __init__(self, action: str, data=None):
self.action = action
self.data = data
# Context 类:工具执行时的上下文,支持 read_resource、report_progress、debug、info、elicit
class Context:
# 工具执行上下文,提供资源读取、进度报告、日志发送、elicitation 等能力。
+ def __init__(
+ self,
+ mcp_server,
+ request_id,
+ send_notification=None,
+ progress_token=None,
+ elicitation_session=None
+ ):
# 保存 mcp_server 实例
self._mcp = mcp_server
# 保存请求 ID
self.request_id = request_id
# 发送通知的方法,如未提供则为无操作 lambda
self._send = send_notification or (lambda _: None)
# 进度 token,通知进度时用
self._progress_token = progress_token
# elicitation 会话(用于与客户端交互获取表单数据)
self._elicitation_session = elicitation_session
# Elicitation session 的适配器,为 None 时不支持 sampling
+ self._session_adapter = (
+ _SessionAdapter(elicitation_session) if elicitation_session else None
+ )
# session 属性,用于会话采样功能
+ @property
+ def session(self):
# Sampling 会话,用于 ctx.session.create_message(),需要 stdio 传输支持
+ if self._session_adapter is None:
# 若无适配器则抛异常:不支持采样
+ raise RuntimeError(
+ "Sampling not supported: no session (stdio transport required)"
+ )
# 返回适配器以支持采样功能
+ return self._session_adapter
# 异步 elicit 方法,发送 elicitation/create 请求并自动解析表单输入
async def elicit(self, message: str, schema: type):
# 若没有 elicitation session,则抛出异常
if self._elicitation_session is None:
+ raise RuntimeError(
+ "Elicitation not supported: no elicitation session (stdio transport required)"
+ )
# 获取 JSON Schema 格式描述(若可能)
json_schema = schema.model_json_schema() if hasattr(schema, "model_json_schema") else {}
# 组装参数:模式、消息、请求的 schema 信息
+ params = {
+ "mode": "form",
+ "message": message,
+ "requestedSchema": json_schema
+ }
# 通过 session 同步发送请求(阻塞式)
raw = self._elicitation_session.send_request("elicitation/create", params)
# 解析 elicitation 返回结果为对象
result = ElicitResult.model_validate(raw, by_name=False)
# 如果用户已提交且有内容
if result.action == "accept" and result.content:
# 内容通过 schema 校验
validated = schema.model_validate(result.content)
# 返回带 data 的 ElicitationResult
return _ElicitationResult(action="accept", data=validated)
# 否则返回 action,并 data=None
return _ElicitationResult(action=result.action, data=None)
# 异步读取资源方法
async def read_resource(self, uri: str):
# 将 uri 转为字符串
uri_str = str(uri)
# 检查是否为静态资源
if res := self._mcp._resources.get(uri_str):
# 调用资源的 run 方法获取内容
out = res.run()
# 导入内容类型
from mcp_lite.types import TextResourceContents
# 返回 TextResourceContents(文本)
+ return [
+ TextResourceContents(
+ uri=uri_str,
+ text=str(out),
+ mime_type=res.mime_type
+ )
+ ]
# 检查是否匹配到资源模板
for template in self._mcp._resource_templates.values():
# 匹配模板,返回参数 args
if args := template.matches(uri_str):
# 运行模板获得输出内容
out = template.run(args)
# 导入文本与二进制内容类型
from mcp_lite.types import TextResourceContents, BlobResourceContents
# 若是二进制,转为 base64
if isinstance(out, bytes):
+ import base64
+ return [
+ BlobResourceContents(
+ uri=uri_str,
+ blob=base64.b64encode(out).decode(),
+ mime_type=template.mime_type or "application/octet-stream"
+ )
+ ]
# 否则返回文本内容
+ return [
+ TextResourceContents(
+ uri=uri_str,
+ text=str(out),
+ mime_type=template.mime_type or "text/plain"
+ )
+ ]
# 未找到资源则返回空列表
return []
# 异步发送进度通知
+ async def report_progress(
+ self,
+ progress: float,
+ total: float | None = None,
+ message: str | None = None
+ ):
# 仅当有 progress token 时才发送通知
if self._progress_token is not None:
+ self._send(
+ {
+ "method": "notifications/progress",
+ "params": {
+ "progressToken": self._progress_token,
+ "progress": progress,
+ "total": total,
+ "message": message,
+ },
+ }
+ )
# 异步发送 debug 级别日志
async def debug(self, data):
+ self._send(
+ {
+ "method": "notifications/message",
+ "params": {"level": "debug", "data": data},
+ }
+ )
# 异步发送 info 级别日志
async def info(self, data):
+ self._send(
+ {
+ "method": "notifications/message",
+ "params": {"level": "info", "data": data},
+ }
+ )
# 工具函数封装类
class _Tool:
def __init__(self, fn, name=None, desc=None, structured_output=True):
# 函数本体
self.fn = fn
# 名称
self.name = name or fn.__name__
# 描述
self.desc = desc or (fn.__doc__ or "").strip()
# 入参 schema
self.schema = _schema(fn)
# 是否是异步函数
self.async_fn = asyncio.iscoroutinefunction(fn)
# 是否结构化输出
self.structured_output = structured_output
# 输出 schema 及需否包裹
self.output_schema, self.wrap_output = _output_schema_and_wrap(fn, structured_output) if structured_output else (None, False)
# Context 参数名(若有)
self._ctx_param = _find_context_parameter(fn)
# 转换为 Tool 对象
def to_tool(self):
return Tool(name=self.name, description=self.desc or None, input_schema=self.schema, output_schema=self.output_schema)
# 执行工具(自动支持异步),支持注入 Context
def run(self, args, mcp_server=None, request_id=None, progress_token=None, send_notification=None, elicitation_session=None):
if self._ctx_param is not None and mcp_server is not None:
ctx = Context(mcp_server, request_id, send_notification, progress_token, elicitation_session)
args = dict(args) if args else {}
args[self._ctx_param] = ctx
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# 静态资源封装类
class _Resource:
def __init__(self, uri, fn, mime_type="text/plain"):
# 资源 URI
self.uri = uri
# 回调函数
self.fn = fn
# MIME 类型
self.mime_type = mime_type
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 资源运行,自动支持异步
def run(self):
if self.async_fn:
return asyncio.run(self.fn())
return self.fn()
# 资源模板封装类(支持 URI 参数模板)
class _ResourceTemplate:
def __init__(self, uri_template, fn, mime_type="text/plain"):
# URI 模板
self.uri_template = uri_template
# 生成资源内容的函数
self.fn = fn
# MIME 类型
self.mime_type = mime_type
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 提取函数里除了 _ 开头之外的参数名
sig = inspect.signature(fn)
self.param_names = [n for n in sig.parameters if not n.startswith("_")]
# 检查 URI 是否与模板匹配,并解析参数
def matches(self, uri):
pattern = self.uri_template.replace("{", "(?P<").replace("}", ">[^/]+)")
m = re.match(f"^{pattern}$", uri)
if m:
return {k: unquote(v) for k, v in m.groupdict().items()}
return None
# 执行模板内容生成函数
def run(self, args):
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# Prompt 封装
class _Prompt:
"""内部 Prompt 封装类。"""
def __init__(self, fn, name=None, title=None, description=None):
# prompt 生成函数
self.fn = fn
# prompt 名称
self.name = name or fn.__name__
# prompt 标题
self.title = title
# prompt 描述
self.description = description or (fn.__doc__ or "").strip()
# 通用 schema
self.schema = _prompt_schema(fn)
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 从 schema 生成 PromptArgument 列表
args_list = []
if "properties" in self.schema:
required = set(self.schema.get("required", []))
for pname, pinfo in self.schema["properties"].items():
args_list.append(
PromptArgument(
name=pname,
description=pinfo.get("title"),
required=pname in required,
)
)
self.arguments = args_list
# 转成 Prompt 对象
def to_prompt(self):
return Prompt(
name=self.name,
title=self.title,
description=self.description or None,
arguments=self.arguments if self.arguments else None,
)
# prompt 执行
def run(self, args):
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# FastMCP 主类
class FastMCP:
def __init__(self, name="mcp-server"):
# 服务器名称
self.name = name
# 工具注册表
self._tools = {}
# 静态资源注册表
self._resources = {}
# 资源模板注册表
self._resource_templates = {}
# prompt 注册表
self._prompts = {}
# 补全处理器(注册后用于 completion/complete 请求)
self._completion_handler = None
# 注册工具的装饰器
def tool(self, name=None, description=None, structured_output=True):
def deco(fn):
t = _Tool(fn, name, description, structured_output=structured_output)
self._tools[t.name] = t
return fn
return deco
# 注册资源或模板资源的装饰器
def resource(self, uri, mime_type="text/plain"):
def deco(fn):
# 判断带模板参数还是静态资源
if "{" in uri and "}" in uri:
template = _ResourceTemplate(uri, fn, mime_type)
self._resource_templates[uri] = template
else:
res = _Resource(uri, fn, mime_type)
self._resources[uri] = res
return fn
return deco
# 注册补全处理器的装饰器
def completion(self):
"""注册参数补全处理器,用于 Prompt 或资源模板参数的自动补全。"""
def deco(fn):
self._completion_handler = fn
return fn
return deco
# 注册 Prompt 的装饰器
def prompt(self, name=None, title=None, description=None):
"""注册 Prompt 的装饰器。"""
def deco(fn):
p = _Prompt(fn, name=name, title=title, description=description)
self._prompts[p.name] = p
return fn
return deco
# 核心 RPC 方法分发
def _handle(self, req, elicitation_session=None):
# 获取方法名、参数和请求 id
method, params, rid = req.method, req.params or {}, req.id
# 初始化请求
if method == "initialize":
caps = ServerCapabilities(
tools=ToolsCapability(),
resources=ResourcesCapability(),
prompts=PromptsCapability() if self._prompts else None,
completions=CompletionsCapability() if self._completion_handler else None,
elicitation=ElicitationCapability(form=FormElicitationCapability(), url=UrlElicitationCapability()),
+ sampling=SamplingCapability(),
)
r = InitializeResult(
protocol_version=LATEST_PROTOCOL_VERSION,
capabilities=caps,
server_info=Implementation(name=self.name, version="0.1.0"),
)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 工具列表请求
if method == "tools/list":
r = ListToolsResult(tools=[t.to_tool() for t in self._tools.values()])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 工具调用请求
if method == "tools/call":
p = CallToolRequestParams.model_validate(params, by_name=False)
t = self._tools.get(p.name)
if not t:
return JSONRPCError(
jsonrpc="2.0",
id=rid,
error=ErrorData(code=-32602, message=f"Unknown tool: {p.name}")
)
# 从 meta/_meta 提取 progressToken,用于发送进度/日志通知
meta = p.meta or {}
progress_token = meta.get("progressToken") or meta.get("progress_token")
notifications = []
def _send_notification(payload):
method_name = payload.get("method", "")
params = payload.get("params")
notifications.append(JSONRPCNotification(jsonrpc="2.0", method=method_name, params=params))
try:
out = t.run(
p.arguments or {},
mcp_server=self,
request_id=rid,
progress_token=progress_token,
send_notification=_send_notification,
elicitation_session=elicitation_session,
)
# 如果直接返回的是 CallToolResult
if isinstance(out, CallToolResult):
r = out
else:
struct = None
# 结构化输出
if t.output_schema is not None:
struct = _to_structured(out, t.output_schema, t.wrap_output)
# 字符串直接用文本包装
if isinstance(out, str):
c = [TextContent(text=out)]
elif out is None:
c = []
elif struct is not None:
# 结构化结果按 JSON 格式化后返回
c = [TextContent(text=json.dumps(struct, ensure_ascii=False, indent=2))]
else:
# 普通对象尝试转 json,否则转字符串
try:
if PydanticBaseModel and isinstance(out, PydanticBaseModel):
text = json.dumps(out.model_dump(mode="json"), ensure_ascii=False, indent=2)
elif isinstance(out, dict):
text = json.dumps(out, ensure_ascii=False, indent=2)
else:
text = str(out)
except Exception:
text = str(out)
c = [TextContent(text=text)]
r = CallToolResult(content=c, structured_content=struct)
except Exception as e:
r = CallToolResult(content=[TextContent(text=str(e))], is_error=True)
resp = JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
if notifications:
return (notifications, resp)
return resp
# prompts/list 请求
if method == "prompts/list":
prompts_list = [p.to_prompt() for p in self._prompts.values()]
r = ListPromptsResult(prompts=prompts_list)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# prompts/get 请求
if method == "prompts/get":
p = GetPromptRequestParams.model_validate(params, by_name=False)
prompt_obj = self._prompts.get(p.name)
if not prompt_obj:
return JSONRPCError(
jsonrpc="2.0",
id=rid,
error=ErrorData(code=-32602, message=f"Unknown prompt: {p.name}")
)
try:
args = p.arguments or {}
result = prompt_obj.run(args)
messages = []
# 单条消息包装为列表
if not isinstance(result, (list, tuple)):
result = [result]
for msg in result:
# 若为内置的 Message 对象
if isinstance(msg, prompts.base.Message):
content = msg.content
if isinstance(content, str):
content = TextContent(type="text", text=content)
messages.append(PromptMessage(role=msg.role, content=content))
# 字符串消息按 user 角色组装
elif isinstance(msg, str):
messages.append(PromptMessage(role="user", content=TextContent(type="text", text=msg)))
# dict 消息,读 role 和 content
elif isinstance(msg, dict):
role = msg.get("role", "user")
cnt = msg.get("content", "")
if isinstance(cnt, dict) and cnt.get("type") == "text":
content = TextContent(**cnt)
else:
content = TextContent(type="text", text=str(cnt) if not isinstance(cnt, str) else cnt)
messages.append(PromptMessage(role=role, content=content))
# 其它均归为 user 文本
else:
messages.append(PromptMessage(role="user", content=TextContent(type="text", text=str(msg))))
r = GetPromptResult(description=prompt_obj.description, messages=messages)
except Exception as e:
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 资源静态列表
if method == "resources/list":
resources = [
Resource(uri=u, name=u, mime_type=r.mime_type)
for u, r in self._resources.items()
]
r = ListResourcesResult(resources=resources)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 资源模板列表
if method == "resources/templates/list":
templates = [
ResourceTemplate(uri_template=u, name=u, mime_type=t.mime_type)
for u, t in self._resource_templates.items()
]
r = ListResourceTemplatesResult(resource_templates=templates)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 处理补全请求 completion/complete
if method == "completion/complete":
# 如果没有补全处理器,返回方法未找到错误
if not self._completion_handler:
return JSONRPCError(
jsonrpc="2.0",
id=rid,
error=ErrorData(code=-32601, message="Method not found: completion/complete"),
)
# 校验并解析补全请求参数
p = CompleteRequestParams.model_validate(params, by_name=False)
try:
# 获取补全处理函数
fn = self._completion_handler
# 如果处理函数是协程,则用 asyncio.run 执行,否则直接调用
result = (
asyncio.run(fn(p.ref, p.argument, p.context))
if asyncio.iscoroutinefunction(fn)
else fn(p.ref, p.argument, p.context)
)
# 若处理函数无结果则返回空补全
if result is None:
result = Completion(values=[], total=None, has_more=None)
# 封装补全响应对象
r = CompleteResult(completion=result)
# 构造并返回 JSONRPCResponse 响应
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
except Exception as e:
# 捕获异常并返回内部错误
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
# 读取资源接口
if method == "resources/read":
p = ReadResourceRequestParams.model_validate(params, by_name=False)
uri_str = str(p.uri)
try:
# 静态资源优先
if res := self._resources.get(uri_str):
out = res.run()
content = TextResourceContents(uri=uri_str, text=str(out), mime_type=res.mime_type)
r = ReadResourceResult(contents=[content])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 匹配模板资源
for template in self._resource_templates.values():
if args := template.matches(uri_str):
out = template.run(args)
if isinstance(out, bytes):
content = BlobResourceContents(
uri=uri_str,
blob=base64.b64encode(out).decode(),
mime_type=template.mime_type or "application/octet-stream",
)
else:
content = TextResourceContents(
uri=uri_str,
text=str(out),
mime_type=template.mime_type or "text/plain",
)
r = ReadResourceResult(contents=[content])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 找不到资源
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32602, message=f"Unknown resource: {uri_str}"))
except Exception as e:
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
# 未知方法
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32601, message=f"Method not found: {method}"))
# 消息包装与请求
def _handle_msg(self, msg, elicitation_session=None):
# 非 SessionMessage 类型忽略
if not isinstance(msg, SessionMessage):
return None
m = msg.message
# 必须是带 id 的 jsonrpc 请求
if not isinstance(m, JSONRPCRequest) or getattr(m, "id", None) is None:
return None
#print("[Server] Request:", m.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
try:
resp = self._handle(m, elicitation_session)
#print("[Server] Response:", resp.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
return resp
except Exception as e:
err = JSONRPCError(jsonrpc="2.0", id=m.id, error=ErrorData(code=-32603, message=str(e)))
#print("[Server] Response (error):", err.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
return err
# 运行服务器主逻辑
def run(self, transport="stdio", host: str = "127.0.0.1", port: int = 8000):
# 如果传输方式为 stdio,则启动标准输入输出服务器
if transport == "stdio":
stdio.stdio_server(self._handle_msg)
# 如果传输方式为 streamable-http,则启动基于 HTTP 的服务器
elif transport == "streamable-http":
# 导入 Starlette 框架相关模块
from starlette.applications import Starlette
from starlette.routing import Mount
# 导入 streamable_http_app 工厂方法
from mcp_lite.server.streamable_http import streamable_http_app
# 导入 uvicorn 用于启动 ASGI 服务
import uvicorn
# 创建 Streamable HTTP ASGI 应用
app = streamable_http_app(self._handle_msg, path="/")
# 将 /mcp 路径挂载到应用
full_app = Starlette(routes=[Mount("/mcp", app=app)])
# 启动 uvicorn 服务,监听指定 host 和 port
uvicorn.run(full_app, host=host, port=port)
# 如果传输方式不被支持,则抛出异常
else:
raise ValueError(f"unsupported transport: {transport}")
# 明确导出接口
__all__ = ["FastMCP", "Context", "prompts"]
6. types.py #
mcp_lite/types.py
# 从 pydantic 导入 BaseModel、ConfigDict、Field、TypeAdapter、field_validator
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, field_validator
# 导入 pydantic 的 to_camel 驼峰命名生成器
from pydantic.alias_generators import to_camel
# 导入 Any、Literal、Annotated 类型注解
from typing import Any, Literal, Annotated
# 定义 RequestId 类型,既可以是 int 也可以是 str
RequestId = int | str
# 定义 MCP 的基础模型类,支持驼峰命名和按名称填充
class MCPModel(BaseModel):
# 指定 Pydantic 模型配置:使用驼峰形式命名和按名称填充
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
# Elicitation 能力(用于 initialize 能力协商,需在 ClientCapabilities/ServerCapabilities 前定义)
class FormElicitationCapability(MCPModel):
pass
class UrlElicitationCapability(MCPModel):
pass
class ElicitationCapability(MCPModel):
form: FormElicitationCapability | None = None
url: UrlElicitationCapability | None = None
# Sampling 能力(客户端支持 LLM 采样时声明)
+class SamplingCapability(MCPModel):
+ pass
# 定义客户端能力结构体
class ClientCapabilities(MCPModel):
# 可选字段:客户端实验性能力扩展,键为 str,值为字典
experimental: dict[str, dict[str, Any]] | None = None
# 可选字段:客户端 Elicitation 能力(提供 elicitation_callback 时声明)
elicitation: ElicitationCapability | None = None
# 可选字段:客户端 Sampling 能力(提供 sampling_callback 时声明)
+ sampling: SamplingCapability | None = None
# 定义服务器或客户端的实现信息结构体
class Implementation(MCPModel):
# 实现的名称
name: str = ""
# 实现的版本号
version: str = ""
# 可选字段:人类可读的标题
title: str | None = None
# 可选字段:实现的描述
description: str | None = None
# 定义初始化请求参数结构体
class InitializeRequestParams(MCPModel):
# 协议版本号
protocol_version: str = ""
# 可选字段:客户端能力描述
capabilities: ClientCapabilities = None
# 可选字段:客户端实现信息
client_info: Implementation = None
# 定义初始化请求结构体
class InitializeRequest(MCPModel):
# 方法名,固定为 "initialize"
method: Literal["initialize"] = "initialize"
# 可选字段:参数,类型为 InitializeRequestParams
params: InitializeRequestParams = None
# 当前协议的最新版本号
LATEST_PROTOCOL_VERSION = "2024-11-05"
# 定义工具相关能力结构体
class ToolsCapability(MCPModel):
# 工具列表是否发生变化,可选布尔类型
list_changed: bool | None = None
# 定义资源相关能力结构体
class ResourcesCapability(MCPModel):
# 是否支持资源订阅
subscribe: bool | None = None
# 资源列表是否变化通知
list_changed: bool | None = None
# 定义 Prompt 相关能力结构体
class PromptsCapability(MCPModel):
list_changed: bool | None = None
# 定义补全能力描述(服务端声明支持补全时使用)
class CompletionsCapability(MCPModel):
pass
# 定义服务端能力描述结构体
class ServerCapabilities(MCPModel):
# 可选字段:实验性能力扩展
experimental: dict[str, dict[str, Any]] | None = None
# 可选字段:工具能力
tools: ToolsCapability | None = None
# 可选字段:资源能力
resources: ResourcesCapability | None = None
# 可选字段:Prompt 能力
prompts: PromptsCapability | None = None
# 可选字段:补全能力(注册了 @completion 时声明)
completions: CompletionsCapability | None = None
# 可选字段:Elicitation 能力(服务端支持 elicitation 时声明)
elicitation: ElicitationCapability | None = None
# 可选字段:Sampling 能力(服务端支持 sampling 时声明)
+ sampling: SamplingCapability | None = None
# 定义初始化响应结构体
class InitializeResult(MCPModel):
# 协议版本号
protocol_version: str = ""
# 可选字段:服务端能力描述
capabilities: ServerCapabilities = None
# 可选字段:服务端实现信息
server_info: Implementation = None
# 可选字段:初始化说明信息
instructions: str | None = None
# 定义客户端初始化完成通知结构体
class InitializedNotification(MCPModel):
# 方法名,固定为 "notifications/initialized"
method: Literal["notifications/initialized"] = "notifications/initialized"
# 可选字段:通知参数,可以为字典或 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 请求的数据结构
class JSONRPCRequest(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 请求 ID,可以为 int 或 str 类型
id: RequestId = None
# 方法名称,字符串类型
method: str = ""
# 方法参数,为一个字典或 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 通知的数据结构(没有 id 字段)
class JSONRPCNotification(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 通知的方法名称
method: str = ""
# 通知参数,可以为字典或者 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 响应的数据结构
class JSONRPCResponse(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 响应的 ID,需要与请求 ID 匹配
id: RequestId = None
# 响应结果,可以为字典或 None
result: dict[str, Any] = None
# 定义错误的数据结构
class ErrorData(BaseModel):
# 错误码,默认值为 0
code: int = 0
# 错误信息,默认值为空字符串
message: str = ""
# 附加的错误数据,可以为任意类型或 None
data: Any = None
# 定义 JSONRPC 错误消息的数据结构
class JSONRPCError(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 错误对应的请求 ID,可以为 None
id: RequestId | None = None
# 错误的详细信息,类型为 ErrorData
error: ErrorData = None
# 定义所有 JSONRPC 消息的联合类型
JSONRPCMessage = JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError
# 进度通知参数(notifications/progress)
class ProgressNotificationParams(MCPModel):
progress_token: str | int = ""
progress: float = 0.0
total: float | None = None
message: str | None = None
# 日志消息通知参数(notifications/message)
class LoggingMessageNotificationParams(MCPModel):
level: str = "info"
data: Any = None
logger: str | None = None
# 定义 JSONRPC 消息适配器,用于类型自动推断和校验
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)
# 定义工具结果中的文本内容块
class TextContent(MCPModel):
type: Literal["text"] = "text"
text: str = ""
# 定义采样消息类(用于 sampling/createMessage)
+class SamplingMessage(MCPModel):
# 消息角色字段,默认值为 "user"
+ role: str = "user"
# 消息内容,可以是单个 TextContent 或 TextContent 列表,默认为空文本
+ content: TextContent | list[TextContent] = Field(default_factory=lambda: TextContent(type="text", text=""))
# 定义 sampling/createMessage 请求参数的数据结构
+class CreateMessageRequestParams(MCPModel):
# 消息列表参数,元素为 SamplingMessage,默认为空列表
+ messages: list[SamplingMessage] = Field(default_factory=list)
# 最大生成 token 数,默认值为 256
+ max_tokens: int = 256
# 系统提示词,可以为 None
+ system_prompt: str | None = None
# 采样温度参数,可为 None
+ temperature: float | None = None
# 停止序列列表,可为 None
+ stop_sequences: list[str] | None = None
# 定义 sampling/createMessage 响应结果的数据结构
+class CreateMessageResult(MCPModel):
# 回复消息的角色,默认值为 "assistant"
+ role: str = "assistant"
# 回复内容字段,类型为 TextContent,默认为空文本
+ content: TextContent = Field(default_factory=lambda: TextContent(type="text", text=""))
# 所用的模型名称,默认为空字符串
+ model: str = ""
# 停止原因,可为 None
+ stop_reason: str | None = None
# 定义工具描述数据结构体
class Tool(MCPModel):
# 工具名称
name: str = ""
# 可选字段:工具描述
description: str | None = None
# 工具输入参数的 schema,默认为空字典
input_schema: dict[str, Any] = Field(default_factory=dict)
# 可选字段:工具输出 schema
output_schema: dict[str, Any] | None = None
# 定义获取工具列表请求结构体
class ListToolsRequest(MCPModel):
# 方法名,固定为 "tools/list"
method: Literal["tools/list"] = "tools/list"
# 可选字段:参数,可以为字典或 None
params: dict[str, Any] | None = None
# 定义获取工具列表响应结构体
class ListToolsResult(MCPModel):
# 工具列表,默认为空列表
tools: list[Tool] = []
# 可选字段:分页游标,可为 None
next_cursor: str | None = None
# 定义资源元数据结构体
class Resource(MCPModel):
# 资源 URI
uri: str = ""
# 可选:资源名称
name: str | None = None
# 可选:人类可读标题
title: str | None = None
# 可选:资源描述
description: str | None = None
# 可选:MIME 类型
mime_type: str | None = None
# 定义资源模板结构体
class ResourceTemplate(MCPModel):
# URI 模板,如 greeting://{name}
uri_template: str = ""
# 可选:模板名称
name: str | None = None
# 可选:人类可读标题
title: str | None = None
# 可选:模板描述
description: str | None = None
# 可选:MIME 类型
mime_type: str | None = None
# 定义 resources/list 请求结构体
class ListResourcesRequest(MCPModel):
method: Literal["resources/list"] = "resources/list"
params: dict[str, Any] | None = None
# 定义 resources/list 响应结构体
class ListResourcesResult(MCPModel):
resources: list[Resource] = []
next_cursor: str | None = None
# 定义 resources/templates/list 请求结构体
class ListResourceTemplatesRequest(MCPModel):
method: Literal["resources/templates/list"] = "resources/templates/list"
params: dict[str, Any] | None = None
# 定义 resources/templates/list 响应结构体
class ListResourceTemplatesResult(MCPModel):
resource_templates: list[ResourceTemplate] = []
next_cursor: str | None = None
# 定义 resources/read 请求参数结构体
class ReadResourceRequestParams(MCPModel):
uri: str = ""
# 定义 resources/read 请求结构体
class ReadResourceRequest(MCPModel):
method: Literal["resources/read"] = "resources/read"
params: ReadResourceRequestParams = None
# 定义资源内容基类(文本)
class TextResourceContents(MCPModel):
uri: str = ""
mime_type: str | None = None
text: str = ""
# 定义资源内容基类(二进制)
class BlobResourceContents(MCPModel):
uri: str = ""
mime_type: str | None = None
blob: str = "" # base64 编码
# 定义 resources/read 响应结构体
class ReadResourceResult(MCPModel):
contents: list[TextResourceContents | BlobResourceContents] = []
# 定义 Prompt 参数结构体
# 定义代表 Prompt 参数的模型
class PromptArgument(MCPModel):
# 参数名称
name: str = ""
# 参数描述,可为 None
description: str | None = None
# 是否为必填参数,可为 None
required: bool | None = None
# 定义 Prompt 元数据结构体
# 定义代表 Prompt 元数据的模型
class Prompt(MCPModel):
# Prompt 名称
name: str = ""
# Prompt 描述,可为 None
description: str | None = None
# Prompt 参数列表,可为 None
arguments: list[PromptArgument] | None = None
# Prompt 标题,可为 None
title: str | None = None
# 定义 prompts/list 请求结构体
# 定义 prompts/list 请求的数据模型
class ListPromptsRequest(MCPModel):
# 固定方法字段
method: Literal["prompts/list"] = "prompts/list"
# 请求参数,可为 None
params: dict[str, Any] | None = None
# 定义 prompts/list 响应结构体
# 定义 prompts/list 响应的数据模型
class ListPromptsResult(MCPModel):
# 返回的 Prompt 列表
prompts: list[Prompt] = []
# 下一页游标,可为 None
next_cursor: str | None = None
# 定义 prompts/get 请求参数结构体
# 定义 prompts/get 的参数模型
class GetPromptRequestParams(MCPModel):
# Prompt 名称
name: str = ""
# 传递给 Prompt 的参数,可为 None
arguments: dict[str, str] | None = None
# 定义 prompts/get 请求结构体
# 定义 prompts/get 请求的数据模型
class GetPromptRequest(MCPModel):
# 固定方法字段
method: Literal["prompts/get"] = "prompts/get"
# 请求参数,可为 None
params: GetPromptRequestParams | None = None
# 定义 Prompt 消息结构体(role + content)
# 定义表示 Prompt 里的单条消息的数据模型
class PromptMessage(MCPModel):
# 消息角色(user 或 assistant)
role: Literal["user", "assistant"] = "user"
# 消息内容,可以是 TextContent 或字典,默认为空文本
content: TextContent | dict[str, Any] = Field(default_factory=lambda: TextContent(text=""))
# 字段校验器:在模型初始化前解析 content 字段
@field_validator("content", mode="before")
@classmethod
def _parse_content(cls, v):
# 如果是字典且 type 为 "text",转换为 TextContent 实例
if isinstance(v, dict) and v.get("type") == "text":
return TextContent(text=v.get("text", ""))
# 否则原样返回
return v
# 定义 prompts/get 响应结构体
# 定义 prompts/get 的响应数据模型
class GetPromptResult(MCPModel):
# Prompt 的描述,可为 None
description: str | None = None
# Prompt 返回的消息列表
messages: list[PromptMessage] = []
# 定义 Prompt 引用类型(用于补全请求)
class PromptReference(MCPModel):
# 类型字段,固定为 "ref/prompt"
type: Literal["ref/prompt"] = "ref/prompt"
# Prompt 名称
name: str = ""
# 定义资源模板引用类型(用于补全请求)
class ResourceTemplateReference(MCPModel):
# 类型字段,固定为 "ref/resource"
type: Literal["ref/resource"] = "ref/resource"
# 资源模板的 URI
uri: str = ""
# 定义补全参数类型
class CompletionArgument(MCPModel):
# 参数名称
name: str = ""
# 参数值
value: str = ""
# 定义补全上下文类型(已解析的其它参数)
class CompletionContext(MCPModel):
# 补全上下文参数,字典类型,可能为 None
arguments: dict[str, str] | None = None
# 定义补全结果内容类型
class Completion(MCPModel):
# 候选内容列表,默认为空列表
values: list[str] = Field(default_factory=list)
# 总数,可为 None
total: int | None = None
# 是否有更多结果,可为 None
has_more: bool | None = None
# 定义 completion/complete 请求参数
class CompleteRequestParams(MCPModel):
# 引用字段,用 type 字段区分 PromptReference 与 ResourceTemplateReference
ref: Annotated[
PromptReference | ResourceTemplateReference,
Field(discriminator="type"),
] = None
# 需要补全的参数
argument: CompletionArgument = None
# 辅助的上下文字段,可为 None
context: CompletionContext | None = None
# 定义 completion/complete 请求
class CompleteRequest(MCPModel):
# 方法名,固定为 "completion/complete"
method: Literal["completion/complete"] = "completion/complete"
# 请求参数
params: CompleteRequestParams = None
# 定义 completion/complete 响应
class CompleteResult(MCPModel):
# 补全结果,类型为 Completion
completion: Completion = None
# 定义调用工具请求参数结构体
class CallToolRequestParams(MCPModel):
# 工具名称
name: str = ""
# 可选字段:输入参数,为字典类型或 None
arguments: dict[str, Any] | None = None
# 可选:元数据,含 progressToken 时服务端可发送进度通知
meta: dict[str, Any] | None = Field(default=None, alias="_meta")
# 定义调用工具请求结构体
class CallToolRequest(MCPModel):
# 方法名,固定为 "tools/call"
method: Literal["tools/call"] = "tools/call"
# 可选字段:参数,类型为 CallToolRequestParams
params: CallToolRequestParams = None
# === Elicitation 相关类型 ===
# 定义 Elicitation 请求的 schema 类型,为一个包含 str 键和任意值的字典,用于描述请求的 JSON Schema 子集
ElicitRequestedSchema: type = dict[str, Any]
# 定义 Form 模式的 elicitation 请求参数结构体
class ElicitRequestFormParams(MCPModel):
# mode 字段,固定为 "form"
mode: Literal["form"] = "form"
# message 字段,请求给用户展示的消息,默认为空字符串
message: str = ""
# requested_schema 字段,表示所请求的 schema,默认为空字典
requested_schema: dict[str, Any] = Field(default_factory=dict)
# 定义 URL 模式的 elicitation 请求参数结构体(暂时保留,未实现)
class ElicitRequestURLParams(MCPModel):
# mode 字段,固定为 "url"
mode: Literal["url"] = "url"
# message 字段,请求给用户展示的消息,默认为空字符串
message: str = ""
# url 字段,请求跳转的 URL,默认为空字符串
url: str = ""
# elicitation_id 字段,当前 elicitation 的唯一标识,默认为空字符串
elicitation_id: str = ""
# 定义 Elicitation 请求参数的联合类型,可以为表单模式参数或 URL 模式参数
ElicitRequestParams = ElicitRequestFormParams | ElicitRequestURLParams
# 定义 Elicitation 请求结构体,表示服务端向客户端发送的请求
class ElicitRequest(MCPModel):
# method 字段,请求的方法名,固定为 "elicitation/create"
method: Literal["elicitation/create"] = "elicitation/create"
# params 字段,请求所需参数,可为 Form 或 URL 模式参数,默认为 None
params: ElicitRequestFormParams | ElicitRequestURLParams = None
# 定义 Elicitation 结果结构体,表示客户端返回给服务端的结果
class ElicitResult(MCPModel):
# action 字段,客户端选择的操作,可为 "accept"、"decline" 或 "cancel",默认为 "cancel"
action: Literal["accept", "decline", "cancel"] = "cancel"
# content 字段,内容的数据体,可为包含多种基本类型的字典,也可以为 None
content: dict[str, str | int | float | bool | list[str] | None] | None = None
# 定义调用工具响应结构体
class CallToolResult(MCPModel):
# 内容字段,由 TextContent 或字典组成的列表,默认为空列表
content: list[TextContent | dict[str, Any]] = Field(default_factory=list)
# 结构化内容字段,可为字典或 None
structured_content: dict[str, Any] | None = None
# 错误标志字段,标识是否为错误结果,默认为 False
is_error: bool = False
# 针对 content 字段的字段校验器,模型初始化前调用
@field_validator("content", mode="before")
@classmethod
def _parse_content(cls, v):
# 如果传入的值不是列表,直接返回
if not isinstance(v, list):
return v
# 初始化输出列表
out = []
# 遍历每一项
for item in v:
# 如果项是字典且类型为 "text",转换为 TextContent 实例
if isinstance(item, dict) and item.get("type") == "text":
out.append(TextContent(text=item.get("text", "")))
# 否则,原样加入输出列表
else:
out.append(item)
# 返回处理后的列表
return out 7.工作流程 #
7.1. 整体架构 #
MCP 采样的核心是:服务端在工具执行时向客户端发起 sampling/createMessage 请求,由客户端调用 LLM(如 DeepSeek)并返回结果。
7.2. 数据流概览 #
工具执行 (generate_poem)
→ ctx.session.create_message(messages, max_tokens)
→ _SessionAdapter 用 CreateMessageRequestParams 构建 params
→ ElicitationSession.send_request("sampling/createMessage", params)
→ 写入 stdout(客户端从 subprocess.stdout 读取)
→ 客户端 _request 循环收到请求
→ 调用 sampling_callback(params)
→ handle_sampling 调用 DeepSeek API
→ 返回 CreateMessageResult
→ 客户端发送 JSON-RPC 响应
→ 服务端 ElicitationSession 从 stdin 读取
→ create_message 返回 CreateMessageResult
→ 工具拿到 result.content.text 并返回7.3 时序图 #
sequenceDiagram
participant User as 用户/调用方
participant Client as MCP 客户端<br/>(sampling_client)
participant Server as MCP 服务端<br/>(sampling_server)
participant Tool as 工具<br/>(generate_poem)
participant LLM as DeepSeek API
User->>Client: 启动 sampling_client.py
Client->>Server: stdio_client 启动子进程
Client->>Server: initialize (声明 sampling 能力)
Server-->>Client: InitializeResult (声明 sampling 能力)
User->>Client: session.call_tool("generate_poem", {topic: "自然"})
Client->>Server: tools/call (generate_poem)
Server->>Tool: 执行 generate_poem(topic, ctx)
Tool->>Tool: ctx.session.create_message(messages, max_tokens)
Note over Tool,Client: _SessionAdapter 构建 CreateMessageRequestParams
Tool->>Client: sampling/createMessage (JSON-RPC 请求)
Client->>Client: handle_sampling(params)
Client->>Client: _messages_to_deepseek(messages)
Client->>LLM: POST /v1/chat/completions (DeepSeek API)
LLM-->>Client: 模型回复文本
Client->>Client: CreateMessageResult(role, content, model, stop_reason)
Client->>Server: JSON-RPC 响应 (CreateMessageResult)
Server->>Tool: create_message 返回 result
Tool->>Tool: return result.content.text
Tool->>Server: CallToolResult(content=[诗歌文本])
Server->>Client: tools/call 响应
Client->>User: 打印 [Result] 诗歌内容
7.4. 各模块修改 #
7.4.1 sampling_client.py(客户端) #
handle_sampling:处理服务端发来的sampling/createMessage,将CreateMessageRequestParams转为 DeepSeek API 格式,调用_call_deepseek,返回CreateMessageResult。main():同步流程:with stdio_client(...) as (read, write)→ClientSession(read, write, sampling_callback=handle_sampling)→session.initialize()→session.call_tool(...)。_messages_to_deepseek:把 MCP 的SamplingMessage转为 DeepSeek 的{role, content}列表。
7.4.2 sampling_server.py(服务端) #
generate_poem/summarize:工具中通过ctx.session.create_message(messages=[...], max_tokens=...)向客户端发起采样。ctx.session:由 FastMCP 注入,内部通过ElicitationSession.send_request("sampling/createMessage", params)与客户端通信。
7.4.3 mcp_lite/client/session.py(客户端会话) #
__init__:新增sampling_callback参数。_request循环:在等待响应时,若收到sampling/createMessage:- 用
CreateMessageRequestParams解析params - 调用
sampling_callback(None, params)(支持 async,用asyncio.run执行) - 若返回
CreateMessageResult,则构造 JSON-RPC 响应并发送;否则返回错误。
- 用
initialize():若提供sampling_callback,则在ClientCapabilities中声明sampling: SamplingCapability()。
7.4.4 mcp_lite/server/fastmcp/__init__.py(服务端 FastMCP) #
_SessionAdapter:create_message(messages, max_tokens, system_prompt, ...)用CreateMessageRequestParams构建参数,model_dump(by_alias=True)得到 camelCase 的 JSON。- 调用
elicitation_session.send_request("sampling/createMessage", params)发送请求并等待响应。 - 将响应解析为
CreateMessageResult返回。
Context.session:属性,返回_SessionAdapter,供工具调用ctx.session.create_message()。initialize:在ServerCapabilities中声明sampling=SamplingCapability()。
7.4.5 mcp_lite/types.py(类型定义) #
SamplingCapability:采样能力声明。SamplingMessage:role+content(TextContent或列表)。CreateMessageRequestParams:messages、max_tokens、system_prompt、temperature、stop_sequences。CreateMessageResult:role、content、model、stop_reason。ClientCapabilities/ServerCapabilities:新增sampling字段。
7.5 要点总结 #
| 角色 | 职责 |
|---|---|
| MCP 服务端 | 提供工具;工具执行时通过 ctx.session.create_message() 向客户端发起采样请求 |
| MCP 客户端 | 持有 LLM/API;收到 sampling/createMessage 后调用 sampling_callback,转调 DeepSeek 等 API,返回 CreateMessageResult |
| ElicitationSession | 复用 stdio 双向通道,用于服务端向客户端发送 sampling/createMessage 并等待响应 |
| _SessionAdapter | 封装 create_message,负责构建参数、发送请求、解析响应 |
整体上,采样请求由服务端发起,客户端负责实际 LLM 调用,通过 stdio 上的 JSON-RPC 完成往返通信。