1. 引导 #
如何在该项目中实现“elicitation”功能,即服务端主动向客户端弹出表单请求信息并等待用户填写的过程。在 mcp_lite 体系下,elicitation 支持服务端和客户端能力协商,对表单采集、确认交互、用户输入等场景非常友好。
1.1. 什么是 Elicitation #
Elicitation 通常用于 AI Agent 或工具需要主动请求用户输入、补充数据或确认操作时。服务端可以通过 Elicitation 主动弹出表单,并实现与客户端交互采集,例如:
- 填写任务详情
- 确认某项操作
- 补充所需配置项或参数
1.2. 协议能力声明 #
在协议协商(如 initialize)时,客户端和服务端可用 ElicitationCapability 类声明自身支持的 elicitation 能力(如支持 form、url 模式)。
1.3. 客户端流程(elicitation_client.py) #
客户端需要提供 elicitation_callback 回调(如 handle_elicitation),用于:
- 接收服务端发来的表单请求(通常 mode 为 "form")
- 解析 schema 与必填字段,展示问题、收集输入
- 返回用户填写的结果,或拒绝/取消操作
1.4. 服务端流程 #
服务端(如 fastmcp、工具 handler)可调用 ctx.elicit() 发起表单、确认等交互请求。
- 服务端根据客户端声明的能力(如支持 form)动态构造表单 schema 和提示
- 调用 ctx.elicit(schema=..., message=...),等待客户端填写数据
- 根据收到的结果决定后续业务流程
1.5. stdio 通道集成 #
在 stdio 通道下,服务端调用 ElicitationSession.send_request 通过标准输入输出协议与客户端同步/异步交换请求与回复,并保证 id 唯一性和结果校验。
1.6. 场景举例 #
如在脚本或 session 中,服务端触发工具 create_task、collect_preferences、confirm_action 时,会弹出表单采集任务标题、优先级、用户偏好,以及询问继续/拒绝原因。
1.7 启动调试 #
npx @modelcontextprotocol/inspector uv --directory D:/forever/docs/mcpsdk2 run elicitation_server.py
mcp dev elicitation_server.py| 命令 | 核心作用 | 依赖 | 特点 |
|---|---|---|---|
npx @modelcontextprotocol/inspector uv --directory D:/forever/docs/mcpsdk2 run elicitation_server.py |
直接使用 MCP Inspector(Node.js 版本)启动调试,需要手动指定服务器的启动命令 | Node.js + npx | 通用性强,不依赖 Python 的 mcp CLI,适合任何语言实现的服务器 |
mcp dev elicitation_server.py |
使用 Python 版的 mcp CLI 提供的快捷命令,自动构建服务器启动命令并打开 Inspector |
Python + mcp[cli] |
专为 Python 服务器优化,自动处理路径和解释器(如 uv run 或 python),更简洁 |
mcp dev的内部实现:当你执行mcp dev elicitation_server.py时,它实际上会做两件事:- 根据你当前的项目环境(是否使用
uv、poetry或直接python)自动生成一个合适的服务器启动命令(例如uv run elicitation_server.py)。 - 调用本地的 MCP Inspector(可能通过内置逻辑或启动一个 Node 子进程)并将这个命令传递给它,从而打开调试界面。
- 根据你当前的项目环境(是否使用
npx inspector的手动方式:你手动指定了完整的服务器启动命令(uv --directory ... run ...),Inspector 直接使用它启动服务器并建立连接。
所以最终结果都是:Inspector 界面打开,你的服务器被加载,你可以测试工具、资源和提示。
2. elicitation_client.py #
elicitation_client.py
# 导入os模块处理路径
import os
# 导入sys模块用于标准输入输出
import sys
# 从mcp包导入客户端会话、标准输入输出服务器参数和类型
from mcp_lite import ClientSession, StdioServerParameters, types
# 从mcp.client.stdio导入stdio客户端工具
from mcp_lite.client.stdio import stdio_client
# 定义elicitation回调函数,处理服务端发起的表单收集请求
async def handle_elicitation(
context, # RequestContext[ClientSession, Any]
params: types.ElicitRequestParams,
) -> types.ElicitResult:
# 只支持form模式(表单),如果不是直接拒绝
if getattr(params, "mode", None) != "form":
return types.ElicitResult(action="decline")
# 获取服务端请求信息和schema
message = getattr(params, "message", "请填写以下信息:")
schema = getattr(params, "requested_schema", None) or getattr(
params, "requestedSchema", {}
)
# 打印服务端消息提示
print(f"\n{'='*50}", file=sys.stderr)
print(f"[Elicitation] {message}", file=sys.stderr)
print("=" * 50, file=sys.stderr)
# 从schema中提取属性(fields)和必填项
properties = schema.get("properties", {}) if isinstance(schema, dict) else {}
required = set(schema.get("required", [])) if isinstance(schema, dict) else set()
# 如果没有需要填写的字段,直接让用户选择是否接受或拒绝
if not properties:
print("无表单字段,请选择 (a)接受 (d)拒绝 (c)取消 [a]: ", end="", file=sys.stderr)
try:
choice = input().strip().lower() or "a"
except EOFError:
return types.ElicitResult(action="cancel")
if choice in ("d", "decline"):
return types.ElicitResult(action="decline")
if choice in ("c", "cancel"):
return types.ElicitResult(action="cancel")
return types.ElicitResult(action="accept", content={})
# 依次提示用户输入每个字段
content = {}
for prop_name, prop_info in properties.items():
# 获取字段描述
desc = prop_info.get("description", prop_name) if isinstance(prop_info, dict) else prop_name
# 获取字段类型,默认为string
prop_type = prop_info.get("type", "string") if isinstance(prop_info, dict) else "string"
# 判断当前字段是否为必填
is_required = prop_name in required
# 根据必填或可选设置提示语
prompt_suffix = " (必填)" if is_required else " (可选,直接回车跳过)"
try:
raw = input(f" {prop_name} [{desc}]{prompt_suffix}: ").strip()
except EOFError:
return types.ElicitResult(action="cancel")
# 可选字段未填写则跳过
if not raw and not is_required:
continue
# 必填字段未填写则取消
if not raw and is_required:
print(" 必填项不能为空,取消本次 elicitation", file=sys.stderr)
return types.ElicitResult(action="cancel")
# 按字段类型转换用户的输入
if prop_type == "boolean":
content[prop_name] = raw.lower() in ("true", "1", "yes", "y", "是")
elif prop_type == "integer":
try:
content[prop_name] = int(raw)
except ValueError:
content[prop_name] = raw
elif prop_type == "number":
try:
content[prop_name] = float(raw)
except ValueError:
content[prop_name] = raw
else:
content[prop_name] = raw
# 输入完所有字段后询问用户提交、拒绝或取消
print("\n (a)接受并提交 (d)拒绝 (c)取消 [a]: ", end="", file=sys.stderr)
try:
choice = input().strip().lower() or "a"
except EOFError:
return types.ElicitResult(action="cancel")
# 处理用户选择并返回对应的结果
if choice in ("d", "decline"):
return types.ElicitResult(action="decline")
if choice in ("c", "cancel"):
return types.ElicitResult(action="cancel")
return types.ElicitResult(action="accept", content=content)
# 定义主函数(mcp_lite 使用同步 API)
def main():
# 获取本文件脚本所在目录
base_dir = os.path.dirname(os.path.abspath(__file__))
# 拼接elicitation_server.py脚本路径
server_path = os.path.join(base_dir, "elicitation_server.py")
# 配置stdio服务器参数(用python运行服务器脚本)
server_params = StdioServerParameters(
command="python",
args=[server_path],
env={},
)
# 使用stdio_client连接服务器,并传递elicitation回调(mcp_lite 为同步 context manager)
with stdio_client(server_params) as (read, write):
session = ClientSession(read, write, elicitation_callback=handle_elicitation)
# 初始化会话,获取服务端基本信息
result = session.initialize()
info = getattr(result, "server_info", None) or getattr(result, "serverInfo", None)
server_name = info.name if info else "unknown"
print(f"[Server] {server_name}", file=sys.stderr)
# 获取工具列表并打印
tools_result = session.list_tools()
tool_names = [t.name for t in tools_result.tools]
print(f"[Tools] {tool_names}", file=sys.stderr)
# 演示一:调用create_task工具(触发表单收集title和priority)
print("\n--- 演示 create_task ---", file=sys.stderr)
r1 = session.call_tool("create_task", arguments={})
if r1.content:
for block in r1.content:
if hasattr(block, "text"):
print(f"[Result] {block.text}")
# 演示二:调用collect_preferences工具
print("\n--- 演示 collect_preferences ---", file=sys.stderr)
r2 = session.call_tool("collect_preferences", arguments={})
if r2.content:
for block in r2.content:
if hasattr(block, "text"):
print(f"[Result] {block.text}")
# 演示三:调用confirm_action工具
print("\n--- 演示 confirm_action ---", file=sys.stderr)
r3 = session.call_tool("confirm_action", arguments={})
if r3.content:
for block in r3.content:
if hasattr(block, "text"):
print(f"[Result] {block.text}")
# 程序入口
if __name__ == "__main__":
# 如果标准输出支持reconfigure则设置为utf-8编码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
main()
官方代码
# 导入os模块处理路径
import os
# 导入sys模块用于标准输入输出
import sys
import asyncio
# 从mcp包导入客户端会话、标准输入输出服务器参数和类型
from mcp import ClientSession, StdioServerParameters, types
# 从mcp.client.stdio导入stdio客户端工具
from mcp.client.stdio import stdio_client
# 定义elicitation回调函数,处理服务端发起的表单收集请求
async def handle_elicitation(
context, # RequestContext[ClientSession, Any]
params: types.ElicitRequestParams,
) -> types.ElicitResult:
# 只支持form模式(表单),如果不是直接拒绝
if getattr(params, "mode", None) != "form":
return types.ElicitResult(action="decline")
# 获取服务端请求信息和schema
message = getattr(params, "message", "请填写以下信息:")
schema = getattr(params, "requested_schema", None) or getattr(
params, "requestedSchema", {}
)
# 打印服务端消息提示
print(f"\n{'='*50}", file=sys.stderr)
print(f"[Elicitation] {message}", file=sys.stderr)
print("=" * 50, file=sys.stderr)
# 从schema中提取属性(fields)和必填项
properties = schema.get("properties", {}) if isinstance(schema, dict) else {}
required = set(schema.get("required", [])) if isinstance(schema, dict) else set()
# 如果没有需要填写的字段,直接让用户选择是否接受或拒绝
if not properties:
print("无表单字段,请选择 (a)接受 (d)拒绝 (c)取消 [a]: ", end="", file=sys.stderr)
try:
choice = input().strip().lower() or "a"
except EOFError:
return types.ElicitResult(action="cancel")
if choice in ("d", "decline"):
return types.ElicitResult(action="decline")
if choice in ("c", "cancel"):
return types.ElicitResult(action="cancel")
return types.ElicitResult(action="accept", content={})
# 依次提示用户输入每个字段
content = {}
for prop_name, prop_info in properties.items():
# 获取字段描述
desc = prop_info.get("description", prop_name) if isinstance(prop_info, dict) else prop_name
# 获取字段类型,默认为string
prop_type = prop_info.get("type", "string") if isinstance(prop_info, dict) else "string"
# 判断当前字段是否为必填
is_required = prop_name in required
# 根据必填或可选设置提示语
prompt_suffix = " (必填)" if is_required else " (可选,直接回车跳过)"
try:
raw = input(f" {prop_name} [{desc}]{prompt_suffix}: ").strip()
except EOFError:
return types.ElicitResult(action="cancel")
# 可选字段未填写则跳过
if not raw and not is_required:
continue
# 必填字段未填写则取消
if not raw and is_required:
print(" 必填项不能为空,取消本次 elicitation", file=sys.stderr)
return types.ElicitResult(action="cancel")
# 按字段类型转换用户的输入
if prop_type == "boolean":
content[prop_name] = raw.lower() in ("true", "1", "yes", "y", "是")
elif prop_type == "integer":
try:
content[prop_name] = int(raw)
except ValueError:
content[prop_name] = raw
elif prop_type == "number":
try:
content[prop_name] = float(raw)
except ValueError:
content[prop_name] = raw
else:
content[prop_name] = raw
# 输入完所有字段后询问用户提交、拒绝或取消
print("\n (a)接受并提交 (d)拒绝 (c)取消 [a]: ", end="", file=sys.stderr)
try:
choice = input().strip().lower() or "a"
except EOFError:
return types.ElicitResult(action="cancel")
# 处理用户选择并返回对应的结果
if choice in ("d", "decline"):
return types.ElicitResult(action="decline")
if choice in ("c", "cancel"):
return types.ElicitResult(action="cancel")
return types.ElicitResult(action="accept", content=content)
# 定义主函数(mcp_lite 使用同步 API)
async def main():
# 获取本文件脚本所在目录
base_dir = os.path.dirname(os.path.abspath(__file__))
# 拼接elicitation_server.py脚本路径
server_path = os.path.join(base_dir, "elicitation_server.py")
# 配置stdio服务器参数(用python运行服务器脚本)
server_params = StdioServerParameters(
command="python",
args=[server_path],
env={},
)
# 使用stdio_client连接服务器,并传递elicitation回调(mcp_lite 为同步 context manager)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write, elicitation_callback=handle_elicitation) as session:
# 初始化会话,获取服务端基本信息
result = await session.initialize()
info = getattr(result, "server_info", None) or getattr(result, "serverInfo", None)
server_name = info.name if info else "unknown"
print(f"[Server] {server_name}", file=sys.stderr)
# 获取工具列表并打印
tools_result = await session.list_tools()
tool_names = [t.name for t in tools_result.tools]
print(f"[Tools] {tool_names}", file=sys.stderr)
# 演示一:调用create_task工具(触发表单收集title和priority)
print("\n--- 演示 create_task ---", file=sys.stderr)
r1 = await session.call_tool("create_task", arguments={})
if r1.content:
for block in r1.content:
if hasattr(block, "text"):
print(f"[Result] {block.text}")
# 演示二:调用collect_preferences工具
print("\n--- 演示 collect_preferences ---", file=sys.stderr)
r2 = await session.call_tool("collect_preferences", arguments={})
if r2.content:
for block in r2.content:
if hasattr(block, "text"):
print(f"[Result] {block.text}")
# 演示三:调用confirm_action工具
print("\n--- 演示 confirm_action ---", file=sys.stderr)
r3 = await session.call_tool("confirm_action", arguments={})
if r3.content:
for block in r3.content:
if hasattr(block, "text"):
print(f"[Result] {block.text}")
# 程序入口
if __name__ == "__main__":
# 如果标准输出支持reconfigure则设置为utf-8编码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
asyncio.run(main())
3. elicitation_server.py #
elicitation_server.py
# 导入 sys 模块,用于访问标准输入输出流
import sys
# 导入 Pydantic 的相关类,用于数据模型和字段校验
from pydantic import BaseModel, Field, field_validator
# 导入 MCP 服务器主类和上下文类型
from mcp_lite.server.fastmcp import FastMCP, Context
# 创建 FastMCP 服务器实例,命名为 "Elicitation Server"
mcp = FastMCP(name="Elicitation Server")
# 定义任务信息的数据结构
class TaskInfo(BaseModel):
# 任务信息数据结构说明文档字符串
"""任务信息数据结构"""
# 任务标题,字符串类型,添加描述
title: str = Field(description="任务标题")
# 任务优先级,字符串类型,要求为 low/medium/high
priority: str = Field(description="任务优先级 (low/medium/high)")
# 定义优先级字段的校验器
@field_validator("priority")
def validate_priority(cls, v):
# 判断优先级是否为允许的值,不是则抛异常
if v.lower() not in ["low", "medium", "high"]:
raise ValueError("优先级必须是 low、medium 或 high")
# 返回小写格式
return v.lower()
# 定义用户偏好设置的数据结构
class UserPreferences(BaseModel):
# 用户偏好设置说明文档字符串
"""用户偏好设置数据结构"""
# 是否检查替代选项,布尔类型
checkAlternative: bool = Field(description="是否检查替代选项?")
# 替代选项名称,字符串类型,默认值为 "option_b"
alternativeOption: str = Field(default="option_b", description="替代选项名称")
# 定义确认操作的数据结构
class ConfirmationData(BaseModel):
# 确认操作数据结构说明文档字符串
"""确认操作数据结构"""
# 是否继续执行,布尔类型
proceed: bool = Field(description="是否继续执行?")
# 继续或取消的原因,字符串类型,默认为空字符串
reason: str = Field(default="", description="继续或取消的原因(可选)")
# 工具1:创建任务(使用 elicitation 收集任务信息)
@mcp.tool()
async def create_task(ctx) -> str:
# 工具的说明文档字符串
"""创建任务,使用 elicitation 收集任务信息"""
try:
# 通过 elicitation 发起用户引导并收集任务信息
result = await ctx.elicit(message="请提供任务信息:", schema=TaskInfo)
# 如果用户接受并填写了数据
if result.action == "accept" and result.data:
# 获取用户提交的任务信息对象
task_info = result.data
# 返回任务创建成功的提示信息
return f" 任务创建成功:{task_info.title} (优先级: {task_info.priority})"
# 如果用户拒绝
elif result.action == "decline":
# 返回用户拒绝的提示信息
return " 用户拒绝了任务创建请求"
# 其他情况(如取消)
else:
# 返回任务创建被取消的提示信息
return " 任务创建被取消"
# 捕捉异常并返回错误信息
except Exception as e:
return f" 任务创建失败:{str(e)}"
# 工具2:收集用户偏好
@mcp.tool()
async def collect_preferences(ctx) -> str:
# 工具的说明文档字符串
"""收集用户偏好设置"""
try:
# 通过 elicitation 收集用户偏好设置
result = await ctx.elicit(
message="请告诉我你的偏好设置:", schema=UserPreferences
)
# 如果用户接受并填写了数据
if result.action == "accept" and result.data:
# 获取用户提交的偏好设置对象
prefs = result.data
# 返回收集到的偏好设置信息
return f" 偏好设置已收集:检查替代={prefs.checkAlternative}, 替代选项={prefs.alternativeOption}"
# 如果用户拒绝
elif result.action == "decline":
# 返回用户拒绝收集偏好的提示
return " 用户拒绝了偏好设置收集"
# 其他情况(如取消)
else:
# 返回用户取消收集偏好的提示
return " 偏好设置收集被取消"
# 捕捉异常并返回错误信息
except Exception as e:
return f" 偏好设置收集失败:{str(e)}"
# 工具3:确认操作
@mcp.tool()
async def confirm_action(ctx) -> str:
# 工具的说明文档字符串
"""确认重要操作"""
try:
# 通过 elicitation 收集用户确认信息
result = await ctx.elicit(
message="这是一个重要操作,请确认是否继续:", schema=ConfirmationData
)
# 如果用户接受并填写了数据
if result.action == "accept" and result.data:
# 获取用户填写的确认数据
confirm_data = result.data
# 如果用户同意继续
if confirm_data.proceed:
# 取出填写的原因,如未填写则给出默认原因
reason = confirm_data.reason or "未提供原因"
return f" 操作已确认,继续执行。原因:{reason}"
# 如果用户选择不继续
else:
# 取出填写的原因,如未填写则给出默认原因
reason = confirm_data.reason or "未提供原因"
return f" 操作被拒绝。原因:{reason}"
# 如果用户拒绝
elif result.action == "decline":
# 返回用户拒绝确认操作的提示
return " 用户拒绝了确认请求"
# 其他情况(如取消)
else:
# 返回用户取消确认操作的提示
return " 确认请求被取消"
# 捕捉异常并返回错误信息
except Exception as e:
return f" 确认操作失败:{str(e)}"
# 主函数:启动服务器
def main():
# 在标准错误输出提示启动服务器
print("启动 Elicitation Server...", file=sys.stderr)
# 在标准错误输出可用工具列表
print("可用工具:create_task, collect_preferences, confirm_action", file=sys.stderr)
# 启动服务器,使用 stdio 方式进行通信
mcp.run(transport="stdio")
# 程序入口
if __name__ == "__main__":
try:
# 检查 sys.stdout 是否有 reconfigure 方法(Python3.7及以上)
if hasattr(sys.stdout, "reconfigure"):
# 设置标准输出流编码为 utf-8
sys.stdout.reconfigure(encoding="utf-8")
# 设置标准输入流编码为 utf-8
sys.stdin.reconfigure(encoding="utf-8")
# 调用主函数,启动服务器
main()
# 捕获键盘中断异常(Ctrl+C)
except KeyboardInterrupt:
# 在标准错误输出服务器已停止提示
print("\n服务器已停止", file=sys.stderr)
# 捕获其他启动异常
except Exception as e:
# 输出错误信息
print(f"服务器启动失败:{e}", file=sys.stderr)
# 输出建议检查 mcp 包版本
print("建议:检查 mcp 包版本是否支持 elicitation 功能", file=sys.stderr)
官方代码
# 导入 sys 模块,用于访问标准输入输出流
import sys
# 导入 Pydantic 的相关类,用于数据模型和字段校验
from pydantic import BaseModel, Field, field_validator
# 导入 MCP 服务器主类和上下文类型
from mcp.server.fastmcp import FastMCP, Context
# 创建 FastMCP 服务器实例,命名为 "Elicitation Server"
mcp = FastMCP(name="Elicitation Server")
# 定义任务信息的数据结构
class TaskInfo(BaseModel):
# 任务信息数据结构说明文档字符串
"""任务信息数据结构"""
# 任务标题,字符串类型,添加描述
title: str = Field(description="任务标题")
# 任务优先级,字符串类型,要求为 low/medium/high
priority: str = Field(description="任务优先级 (low/medium/high)")
# 定义优先级字段的校验器
@field_validator("priority")
def validate_priority(cls, v):
# 判断优先级是否为允许的值,不是则抛异常
if v.lower() not in ["low", "medium", "high"]:
raise ValueError("优先级必须是 low、medium 或 high")
# 返回小写格式
return v.lower()
# 定义用户偏好设置的数据结构
class UserPreferences(BaseModel):
# 用户偏好设置说明文档字符串
"""用户偏好设置数据结构"""
# 是否检查替代选项,布尔类型
checkAlternative: bool = Field(description="是否检查替代选项?")
# 替代选项名称,字符串类型,默认值为 "option_b"
alternativeOption: str = Field(default="option_b", description="替代选项名称")
# 定义确认操作的数据结构
class ConfirmationData(BaseModel):
# 确认操作数据结构说明文档字符串
"""确认操作数据结构"""
# 是否继续执行,布尔类型
proceed: bool = Field(description="是否继续执行?")
# 继续或取消的原因,字符串类型,默认为空字符串
reason: str = Field(default="", description="继续或取消的原因(可选)")
# 工具1:创建任务(使用 elicitation 收集任务信息)
@mcp.tool()
async def create_task(ctx: Context) -> str:
# 工具的说明文档字符串
"""创建任务,使用 elicitation 收集任务信息"""
try:
# 通过 elicitation 发起用户引导并收集任务信息
result = await ctx.elicit(message="请提供任务信息:", schema=TaskInfo)
# 如果用户接受并填写了数据
if result.action == "accept" and result.data:
# 获取用户提交的任务信息对象
task_info = result.data
# 返回任务创建成功的提示信息
return f" 任务创建成功:{task_info.title} (优先级: {task_info.priority})"
# 如果用户拒绝
elif result.action == "decline":
# 返回用户拒绝的提示信息
return " 用户拒绝了任务创建请求"
# 其他情况(如取消)
else:
# 返回任务创建被取消的提示信息
return " 任务创建被取消"
# 捕捉异常并返回错误信息
except Exception as e:
return f" 任务创建失败:{str(e)}"
# 工具2:收集用户偏好
@mcp.tool()
async def collect_preferences(ctx: Context) -> str:
# 工具的说明文档字符串
"""收集用户偏好设置"""
try:
# 通过 elicitation 收集用户偏好设置
result = await ctx.elicit(
message="请告诉我你的偏好设置:", schema=UserPreferences
)
# 如果用户接受并填写了数据
if result.action == "accept" and result.data:
# 获取用户提交的偏好设置对象
prefs = result.data
# 返回收集到的偏好设置信息
return f" 偏好设置已收集:检查替代={prefs.checkAlternative}, 替代选项={prefs.alternativeOption}"
# 如果用户拒绝
elif result.action == "decline":
# 返回用户拒绝收集偏好的提示
return " 用户拒绝了偏好设置收集"
# 其他情况(如取消)
else:
# 返回用户取消收集偏好的提示
return " 偏好设置收集被取消"
# 捕捉异常并返回错误信息
except Exception as e:
return f" 偏好设置收集失败:{str(e)}"
# 工具3:确认操作
@mcp.tool()
async def confirm_action(ctx: Context) -> str:
# 工具的说明文档字符串
"""确认重要操作"""
try:
# 通过 elicitation 收集用户确认信息
result = await ctx.elicit(
message="这是一个重要操作,请确认是否继续:", schema=ConfirmationData
)
# 如果用户接受并填写了数据
if result.action == "accept" and result.data:
# 获取用户填写的确认数据
confirm_data = result.data
# 如果用户同意继续
if confirm_data.proceed:
# 取出填写的原因,如未填写则给出默认原因
reason = confirm_data.reason or "未提供原因"
return f" 操作已确认,继续执行。原因:{reason}"
# 如果用户选择不继续
else:
# 取出填写的原因,如未填写则给出默认原因
reason = confirm_data.reason or "未提供原因"
return f" 操作被拒绝。原因:{reason}"
# 如果用户拒绝
elif result.action == "decline":
# 返回用户拒绝确认操作的提示
return " 用户拒绝了确认请求"
# 其他情况(如取消)
else:
# 返回用户取消确认操作的提示
return " 确认请求被取消"
# 捕捉异常并返回错误信息
except Exception as e:
return f" 确认操作失败:{str(e)}"
# 主函数:启动服务器
def main():
# 在标准错误输出提示启动服务器
print("启动 Elicitation Server...", file=sys.stderr)
# 在标准错误输出可用工具列表
print("可用工具:create_task, collect_preferences, confirm_action", file=sys.stderr)
# 启动服务器,使用 stdio 方式进行通信
mcp.run(transport="stdio")
# 程序入口
if __name__ == "__main__":
try:
# 检查 sys.stdout 是否有 reconfigure 方法(Python3.7及以上)
if hasattr(sys.stdout, "reconfigure"):
# 设置标准输出流编码为 utf-8
sys.stdout.reconfigure(encoding="utf-8")
# 设置标准输入流编码为 utf-8
sys.stdin.reconfigure(encoding="utf-8")
# 调用主函数,启动服务器
main()
# 捕获键盘中断异常(Ctrl+C)
except KeyboardInterrupt:
# 在标准错误输出服务器已停止提示
print("\n服务器已停止", file=sys.stderr)
# 捕获其他启动异常
except Exception as e:
# 输出错误信息
print(f"服务器启动失败:{e}", file=sys.stderr)
# 输出建议检查 mcp 包版本
print("建议:检查 mcp 包版本是否支持 elicitation 功能", file=sys.stderr)
4. session.py #
mcp_lite/client/session.py
import sys
# 导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 导入 asyncio 用于运行异步 elicitation 回调
+import asyncio
# 从 mcp_lite.types 模块导入相关类型和常量
from mcp_lite.types import ( # 从 mcp_lite.types 模块导入以下类型
InitializeRequestParams, # 初始化请求参数类型
InitializeRequest, # 初始化请求类型
LATEST_PROTOCOL_VERSION, # 最新协议版本常量
ClientCapabilities, # 客户端能力类型
+ ElicitationCapability, # Elicitation 能力类型
+ FormElicitationCapability, # Form elicitation 能力
+ UrlElicitationCapability, # URL elicitation 能力
Implementation, # 实现信息类型
InitializedNotification, # 初始化完成通知类型
+ ElicitRequestFormParams, # Elicitation 请求参数(form)
+ ElicitResult, # Elicitation 结果类型
InitializeResult, # 初始化结果类型
JSONRPCRequest, # JSONRPC请求类型
JSONRPCResponse, # JSONRPC响应类型
JSONRPCError, # JSONRPC错误类型
JSONRPCNotification, # JSONRPC通知类型
LoggingMessageNotificationParams, # 日志消息通知参数
ListToolsRequest, # 工具列表请求类型
ListToolsResult, # 工具列表结果类型
CallToolResult, # 调用工具响应类型
CallToolRequest, # 调用工具请求类型
CallToolRequestParams, # 调用工具请求参数类型
ListResourcesRequest, # 资源列表请求类型
ListResourcesResult, # 资源列表响应类型
ListResourceTemplatesRequest, # 资源模板列表请求类型
ListResourceTemplatesResult, # 资源模板列表响应类型
ReadResourceRequest, # 读取资源请求类型
ReadResourceRequestParams, # 读取资源请求参数类型
ReadResourceResult, # 读取资源响应类型
ListPromptsRequest, # Prompt 列表请求类型
ListPromptsResult, # Prompt 列表响应类型
GetPromptRequest, # 获取 Prompt 请求类型
GetPromptRequestParams, # 获取 Prompt 请求参数类型
GetPromptResult, # 获取 Prompt 响应类型
CompleteRequest, # 补全请求类型
CompleteRequestParams, # 补全请求参数类型
CompleteResult, # 补全响应类型
CompletionArgument, # 补全参数类型
CompletionContext, # 补全上下文类型
)
# 定义 MCPError 异常类,用于表示协议相关错误
class MCPError(Exception):
# 初始化方法,接收错误码、错误信息和可选的数据
def __init__(self, code, message, data=None):
# 将错误码、错误信息、附加数据赋值为实例属性
self.code, self.message, self.data = code, message, data
# 调用父类 Exception 的初始化,只传递错误信息
super().__init__(message)
# 类方法,利用 JSONRPCError 对象创建 MCPError 实例
@classmethod
def from_jsonrpc(cls, err):
# 从 JSONRPCError.error 对象中取出 code/message/data 创建 MCPError
return cls(err.error.code, err.error.message, err.error.data)
# 定义 ClientSession 类,用于描述客户端会话
class ClientSession:
# 构造方法,接收读取流和写入流,可选 elicitation_callback 处理服务端发起的 elicitation 请求
+ def __init__(self, read_stream, write_stream, elicitation_callback=None):
# 赋值读取流和写入流
self._read, self._write = read_stream, write_stream
# 初始化请求 id 计数器为 0
self._req_id = 0
# elicitation 回调:当服务端发送 elicitation/create 时调用,返回 ElicitResult
+ self._elicitation_callback = elicitation_callback
# 内部方法,发送请求并同步等待响应
def _request(self, req, progress_callback=None, message_handler=None, logging_callback=None):
# 当前请求 id
rid = self._req_id
# 自增请求 id,确保下次唯一
self._req_id += 1
# 对请求对象进行序列化,转为 dict
d = req.model_dump(by_alias=True, mode="json", exclude_none=True)
# 构建 JSONRPCRequest,再封装入 SessionMessage,通过写入流发送
jreq = JSONRPCRequest(jsonrpc="2.0", id=rid, method=d["method"], params=d.get("params"))
# 将请求对象封装成 SessionMessage 后发送
self._write.send(SessionMessage(message=jreq))
# 循环等待响应
while True:
# 从读取流取出一条消息
msg = self._read.get()
# 若取到 None,说明连接断开,抛出异常
if msg is None:
raise MCPError(-32000, "Connection closed")
# 若消息类型不是 SessionMessage,忽略继续等
if not isinstance(msg, SessionMessage):
continue
# 取消息内容
m = msg.message
# 服务端发起的 elicitation/create 请求(有 id 和 method)
# 判断是否有 elicitation 回调,且消息 m 有 id 且 method 为 "elicitation/create"
+ if (
+ self._elicitation_callback
+ and getattr(m, "id", None) is not None
+ and getattr(m, "method", "") == "elicitation/create"
+ ):
# 获取消息中的 params 字段,若没有则为 {}
+ params_raw = getattr(m, "params", None) or {}
+ try:
# 尝试用 ElicitRequestFormParams 校验并解析参数
+ params = ElicitRequestFormParams.model_validate(params_raw, by_name=False)
+ except Exception:
# 校验失败则降级为原始 dict,不做结构检查
+ params = params_raw
+ try:
# 调用回调处理表单请求,异步支持
+ result = self._elicitation_callback(None, params)
# 检查回调是否为协程,如果是则用 asyncio.run 执行
+ if asyncio.iscoroutine(result):
+ result = asyncio.run(result)
# 若返回类型正确,构造正常响应
+ if isinstance(result, ElicitResult):
+ resp = JSONRPCResponse(
+ jsonrpc="2.0",
+ id=m.id,
+ result=result.model_dump(by_alias=True, exclude_none=True)
+ )
+ else:
# 若返回结果不是 ElicitResult,则以 cancel 响应
+ resp = JSONRPCResponse(
+ jsonrpc="2.0",
+ id=m.id,
+ result=ElicitResult(action="cancel").model_dump(by_alias=True, exclude_none=True)
+ )
# 发送封装后的响应 SessionMessage
+ self._write.send(SessionMessage(message=resp))
+ except Exception:
# 回调过程异常,反馈 cancel,保证流程不挂死
+ resp = JSONRPCResponse(
+ jsonrpc="2.0",
+ id=m.id,
+ result=ElicitResult(action="cancel").model_dump(by_alias=True, exclude_none=True)
+ )
+ self._write.send(SessionMessage(message=resp))
# 处理完表单请求,进入下一个消息处理循环
+ continue
# 判断当前消息是否为通知(即没有 id 字段且具有 method 字段)
if getattr(m, "id", None) is None and hasattr(m, "method"):
# 获取 method 名称
method = getattr(m, "method", "")
# 获取 params 内容(默认为空字典)
params = getattr(m, "params", None) or {}
# 若为进度通知,且传入了进度回调 progress_callback
if method == "notifications/progress" and progress_callback:
# 调用进度回调,传递 progress/total/message 参数
progress_callback(
params.get("progress", 0),
params.get("total"),
params.get("message"),
)
# 若为普通日志/信息通知
elif method == "notifications/message":
# 如果提供日志回调 logging_callback,则尝试解析为日志参数类型并调用
if logging_callback:
try:
# 使用 LoggingMessageNotificationParams 类型校验参数
p = LoggingMessageNotificationParams.model_validate(params, by_name=False)
# 调用日志回调
logging_callback(p)
except Exception:
# 校验失败则直接将原始 params 传递给日志回调
logging_callback(params)
# 如果指定了通用消息处理回调,则调用
if message_handler:
message_handler(m)
# 处理完通知类型后,继续等待下一个消息
continue
# 若为 Response 或 Error,且 id 匹配
if isinstance(m, (JSONRPCResponse, JSONRPCError)) and getattr(m, "id", None) == rid:
# 若为错误,抛出 MCPError 异常
if isinstance(m, JSONRPCError):
raise MCPError.from_jsonrpc(m)
# 为正常响应则返回结果数据
return m.result
# 内部方法,发送通知消息
def _notify(self, n):
# 通知对象序列化为 dict
d = n.model_dump(by_alias=True, mode="json", exclude_none=True)
# 封装为 JSONRPCNotification,再封装成 SessionMessage 后发出
self._write.send(SessionMessage(
message=JSONRPCNotification(
jsonrpc="2.0",
method=d["method"],
params=d.get("params"),
)
))
# 初始化过程,发送初始化请求,收到响应后再发初始化完成通知
def initialize(self):
# 若提供 elicitation_callback 则声明 ElicitationCapability
+ caps = ClientCapabilities()
# 如果存在 elicitation_callback 回调,则配置客户端能力中的 elicitation 字段
+ if self._elicitation_callback:
# 为 elicitation 能力赋值,包括 form 和 url 两种能力
+ caps.elicitation = ElicitationCapability(
+ form=FormElicitationCapability(),
+ url=UrlElicitationCapability()
+ )
# 构造初始化请求,并发送等待返回
r = self._request(InitializeRequest(
params=InitializeRequestParams(
protocol_version=LATEST_PROTOCOL_VERSION, # 协议版本号
+ capabilities=caps, # 客户端能力
client_info=Implementation(name="mcp_lite", version="0.1.0"), # 客户端信息
)
))
# 发送"初始化完成"通知
self._notify(InitializedNotification())
# 用 InitializeResult 验证响应并返回
return InitializeResult.model_validate(r, by_name=False)
def list_tools(self):
# 发送 ListToolsRequest 请求,使用 ListToolsResult 校验并返回
return ListToolsResult.model_validate(self._request(ListToolsRequest()), by_name=False)
# 调用指定工具方法,传入工具名和参数
# 定义 call_tool 方法,用于调用指定名称的工具
def call_tool(self, name, arguments=None, progress_callback=None, message_handler=None, logging_callback=None):
# 如果提供了 progress_callback,则将当前请求 id 作为 progressToken 加入 meta,便于服务端推送进度通知
rid = self._req_id
meta = {"progressToken": rid} if progress_callback else None
# 构建工具调用请求参数,包含名称、参数和 meta 信息
params = CallToolRequestParams(name=name, arguments=arguments or {}, meta=meta)
# 调用 _request 方法发送工具调用请求,并用 CallToolResult 校验和结构化响应
return CallToolResult.model_validate(
self._request(
CallToolRequest(params=params),
progress_callback=progress_callback, # 进度回调函数
message_handler=message_handler, # 通用消息处理回调
logging_callback=logging_callback, # 日志回调函数
),
by_name=False, # 按字段名校验
)
# 列出已注册的静态资源
def list_resources(self):
# 调用 _request 方法发送 ListResourcesRequest 请求
# 使用 ListResourcesResult 的 model_validate 进行结果校验和结构化
return ListResourcesResult.model_validate(
self._request(ListResourcesRequest()),
by_name=False,
)
# 列出资源模板(带占位符的资源)
def list_resource_templates(self):
# 调用 _request 方法发送 ListResourceTemplatesRequest 请求
# 使用 ListResourceTemplatesResult 的 model_validate 进行结果校验和结构化
return ListResourceTemplatesResult.model_validate(
self._request(ListResourceTemplatesRequest()),
by_name=False,
)
# 读取指定 URI 的资源内容
def read_resource(self, uri):
# uri 可为 str 或 AnyUrl 类型
# 构造 ReadResourceRequestParams 并发送 ReadResourceRequest 请求
# 使用 ReadResourceResult 的 model_validate 进行结果校验和结构化
return ReadResourceResult.model_validate(
self._request(ReadResourceRequest(params=ReadResourceRequestParams(uri=str(uri)))),
by_name=False,
)
# 列出已注册的 Prompt
# 定义 list_prompts 方法,用于获取所有已注册的 Prompt
def list_prompts(self):
# 发送 ListPromptsRequest 请求,校验响应并结构化为 ListPromptsResult
return ListPromptsResult.model_validate(
self._request(ListPromptsRequest()), # 请求所有 Prompt
by_name=False, # 按字段名校验
)
# 请求 Prompt 或资源模板参数的补全建议
def complete(self, ref, argument, context_arguments=None):
# 构建补全上下文,若有 context_arguments 则封装为 CompletionContext
context = CompletionContext(arguments=context_arguments) if context_arguments else None
# 若 argument 为字典则转为 CompletionArgument
arg = argument if isinstance(argument, CompletionArgument) else CompletionArgument(**argument)
# 发送 CompleteRequest,校验并返回 CompleteResult
return CompleteResult.model_validate(
self._request(CompleteRequest(
params=CompleteRequestParams(ref=ref, argument=arg, context=context)
)),
by_name=False,
)
# 获取指定 Prompt 的内容
# 定义 get_prompt 方法,根据名称和参数获取指定 Prompt 的详情
def get_prompt(self, name, arguments=None):
# 发送 GetPromptRequest,请求参数包含指定名称和参数,响应结构化为 GetPromptResult
return GetPromptResult.model_validate(
self._request(GetPromptRequest( # 发送请求
params=GetPromptRequestParams( # 构造请求参数
name=name, # Prompt 名称
arguments=arguments or {} # Prompt 参数(默认为空字典)
)
)),
by_name=False, # 按字段名校验
)
5. init.py #
mcp_lite/server/fastmcp/init.py
# FastMCP 包:从原 fastmcp 模块导入所有内容以保持向后兼容
# 原 fastmcp.py 的内容已移入此包,通过 __init__ 导出
# 导入系统模块
import sys
# 导入 base64 编码库
import base64
# 导入 json 序列化
import json
# 导入正则表达式模块
import re
# 导入 url 解码工具
from urllib.parse import unquote
# 导入类型相关辅助函数
from typing import get_args, get_origin, get_type_hints
# 导入 SessionMessage 类型
from mcp_lite.message import SessionMessage
# 导入 stdio 通信模块
from mcp_lite.server import stdio
# 导入函数签名和异步工具
import inspect
import asyncio
from mcp_lite.types import ( # 导入 mcp_lite.types 模块中的多个类型
JSONRPCRequest, # JSONRPC 请求类型
JSONRPCNotification, # JSONRPC 通知类型
JSONRPCError, # JSONRPC 错误响应类型
ErrorData, # 错误数据类型
InitializeResult, # 初始化结果类型
LATEST_PROTOCOL_VERSION, # 最新协议版本号
ToolsCapability, # 工具相关能力描述类型
ResourcesCapability, # 资源相关能力描述类型
PromptsCapability, # Prompt 相关能力描述类型
CompletionsCapability, # 补全能力描述类型
+ ElicitationCapability, # Elicitation 能力类型
+ FormElicitationCapability, # Form elicitation 能力
+ UrlElicitationCapability, # URL elicitation 能力
ServerCapabilities, # 服务器能力类型
Implementation, # 实现信息类型
JSONRPCResponse, # JSONRPC 响应类型
ListToolsResult, # 列出工具结果类型
Tool, # 单个工具类型
TextContent, # 文本内容类型
CallToolRequestParams, # 调用工具请求参数类型
CallToolResult, # 调用工具响应结果类型
+ ElicitResult, # Elicitation 结果类型
Resource, # 静态资源类型
ResourceTemplate, # 资源模板类型
ListResourcesResult, # 资源列表返回结果类型
ListResourceTemplatesResult, # 资源模板列表返回结果类型
ReadResourceRequestParams, # 读取资源请求参数类型
ReadResourceResult, # 读取资源请求响应类型
TextResourceContents, # 资源文本内容类型
BlobResourceContents, # 资源二进制内容类型
Prompt, # Prompt 类型
PromptArgument, # Prompt 参数类型
ListPromptsResult, # Prompt 列表结果类型
GetPromptRequestParams, # 获取 Prompt 请求参数类型
GetPromptResult, # 获取 Prompt 响应结果类型
PromptMessage, # Prompt 消息类型
Completion, # 补全结果内容类型
CompleteRequestParams, # 补全请求参数类型
CompleteResult, # 补全响应类型
)
# 导入 pydantic 的基础模型作为类型判定
from pydantic import BaseModel as PydanticBaseModel
# 导入 Typeddict 测试
from typing_extensions import is_typeddict as _is_typeddict
# 导入本包下 prompts 子模块
from . import prompts
# 辅助函数:提取函数输出 schema 及包裹需求
def _output_schema_and_wrap(fn, structured_output: bool):
# 若不是结构化输出,直接返回
if not structured_output:
return None, False
try:
# 获取函数签名
sig = inspect.signature(fn)
# 获取返回类型注解
ann = sig.return_annotation
# 如果没有注解,返回
if ann is inspect.Parameter.empty:
return None, False
except Exception:
return None, False
# 生成 schema
return _schema_from_annotation(ann, fn.__name__)
# 辅助函数:根据注解和函数名生成 schema
def _schema_from_annotation(ann, func_name: str):
# 没有注解直接返回
if ann is inspect.Parameter.empty:
return None, False
# 获取注解的原类型
origin = get_origin(ann)
wrap = False
schema = None
# 如果是 Pydantic 的模型子类
if PydanticBaseModel and isinstance(ann, type) and issubclass(ann, PydanticBaseModel):
schema = ann.model_json_schema()
return schema, False
# 如果是 Typeddict
if hasattr(ann, "__annotations__") and not (origin is dict or origin is list):
if _is_typeddict(ann):
hints = get_type_hints(ann) if hasattr(ann, "__annotations__") else {}
props = {}
for k, v in hints.items():
t = v
if t is int or t is type(None) and int in get_args(v):
props[k] = {"type": "integer"}
elif t is float:
props[k] = {"type": "number"}
elif t is str:
props[k] = {"type": "string"}
elif t is bool:
props[k] = {"type": "boolean"}
else:
props[k] = {"type": "string"}
schema = {"type": "object", "properties": props, "required": list(props)}
return schema, False
try:
hints = get_type_hints(ann)
if hints:
props = {}
for k, v in hints.items():
if v is int or v is type(None):
props[k] = {"type": "integer"}
elif v is float:
props[k] = {"type": "number"}
elif v is str:
props[k] = {"type": "string"}
elif v is bool:
props[k] = {"type": "boolean"}
elif get_origin(v) is list:
props[k] = {"type": "array", "items": {"type": "string"}}
else:
props[k] = {"type": "string"}
schema = {"type": "object", "properties": props}
return schema, False
except Exception:
pass
# 如果是 dict 类型
if origin is dict:
args = get_args(ann)
if len(args) == 2 and args[0] is str:
vt = args[1]
# 判断 value 类型
if vt is float:
schema = {"type": "object", "additionalProperties": {"type": "number"}}
elif vt is int:
schema = {"type": "object", "additionalProperties": {"type": "integer"}}
elif vt is str:
schema = {"type": "object", "additionalProperties": {"type": "string"}}
else:
schema = {"type": "object", "additionalProperties": {}}
return schema, False
wrap = True
# 如果是 list、基础类型等需要包裹
if origin is list or ann in (int, float, str, bool, type(None)):
wrap = True
# 包裹输出情况
if wrap:
result_schema = {"type": "string"}
if ann is int:
result_schema = {"type": "integer"}
elif ann is float:
result_schema = {"type": "number"}
elif ann is bool:
result_schema = {"type": "boolean"}
elif origin is list:
result_schema = {"type": "array", "items": {"type": "string"}}
schema = {"type": "object", "properties": {"result": result_schema}, "required": ["result"]}
return schema, True
return None, False
# 辅助:对象输出转换为结构化内容
def _to_structured(out, output_schema, wrap_output):
if output_schema is None:
return None
try:
# 如果是 Pydantic 模型
if PydanticBaseModel and isinstance(out, PydanticBaseModel):
return out.model_dump(mode="json")
# 如果需要包裹
if wrap_output:
return {"result": out}
# 如果是字典
if isinstance(out, dict):
return dict(out)
# 带有 __annotations__ 的对象,提取属性
if hasattr(out, "__annotations__"):
hints = get_type_hints(type(out)) if hasattr(type(out), "__annotations__") else getattr(out, "__annotations__", {})
return {k: getattr(out, k) for k in hints if hasattr(out, k)}
# 其它直接包裹
return {"result": out}
except Exception:
# 发生异常返回字符串
return {"result": str(out)}
# 根据函数参数生成 schema
def _schema(fn):
sig = inspect.signature(fn)
props = {}
req = []
for n, p in sig.parameters.items():
# 跳过以下划线开头的参数
if n.startswith("_"):
continue
# 跳过 Context 类型参数(由框架注入)
if _find_context_parameter(fn) == n:
continue
# 参数类型为字符串,标题为参数名
props[n] = {"type": "string", "title": n}
# 必填参数加入 required
if p.default is inspect.Parameter.empty:
req.append(n)
return {"type": "object", "properties": props, "required": req}
# 检测工具函数是否包含 Context 参数
def _find_context_parameter(fn):
+ """若函数有 ctx: Context 参数,或参数名为 ctx,返回参数名;否则返回 None。"""
try:
sig = inspect.signature(fn)
hints = get_type_hints(fn) if hasattr(fn, "__annotations__") else {}
for name, param in sig.parameters.items():
ann = hints.get(name, param.annotation)
if ann is not inspect.Parameter.empty:
n = getattr(ann, "__name__", None) or str(ann)
if n == "Context" or (isinstance(n, str) and n.endswith(".Context")):
return name
# 约定:参数名为 ctx 时也视为 Context 注入
+ if name == "ctx":
+ return name
except Exception:
pass
return None
# 根据 prompt 函数生成 prompt schema
def _prompt_schema(fn):
"""根据函数签名生成 Prompt 参数 schema。"""
sig = inspect.signature(fn)
props = {}
req = []
for n, p in sig.parameters.items():
if n.startswith("_"):
continue
props[n] = {"type": "string", "title": n}
if p.default is inspect.Parameter.empty:
req.append(n)
return {"type": "object", "properties": props, "required": req}
# 简化的 Elicitation 结果类型(与 ctx.elicit 返回的 result.data 兼容)
+class _ElicitationResult:
+ """elicit() 返回的结果,支持 result.action 和 result.data。"""
+ def __init__(self, action: str, data=None):
+ self.action = action
+ self.data = data
# Context 类:工具执行时的上下文,支持 read_resource、report_progress、debug、info、elicit
class Context:
+ """工具执行上下文,提供资源读取、进度报告、日志发送、elicitation 等能力。"""
+ def __init__(self, mcp_server, request_id, send_notification=None, progress_token=None, elicitation_session=None):
self._mcp = mcp_server
self.request_id = request_id
self._send = send_notification or (lambda _: None)
self._progress_token = progress_token
+ self._elicitation_session = elicitation_session
+ async def elicit(self, message: str, schema: type):
+ """通过 elicitation 向客户端收集表单数据。schema 为 Pydantic BaseModel 子类。"""
+ if self._elicitation_session is None:
+ raise RuntimeError("Elicitation not supported: no elicitation session (stdio transport required)")
+ json_schema = schema.model_json_schema() if hasattr(schema, "model_json_schema") else {}
+ params = {"mode": "form", "message": message, "requestedSchema": json_schema}
+ raw = self._elicitation_session.send_request("elicitation/create", params)
+ result = ElicitResult.model_validate(raw, by_name=False)
+ if result.action == "accept" and result.content:
+ validated = schema.model_validate(result.content)
+ return _ElicitationResult(action="accept", data=validated)
+ return _ElicitationResult(action=result.action, data=None)
async def read_resource(self, uri: str):
"""异步读取资源,返回内容块列表(与 ReadResourceResult.contents 结构一致)。"""
uri_str = str(uri)
# 静态资源
if res := self._mcp._resources.get(uri_str):
out = res.run()
from mcp_lite.types import TextResourceContents
return [TextResourceContents(uri=uri_str, text=str(out), mime_type=res.mime_type)]
# 模板资源
for template in self._mcp._resource_templates.values():
if args := template.matches(uri_str):
out = template.run(args)
from mcp_lite.types import TextResourceContents, BlobResourceContents
if isinstance(out, bytes):
return [BlobResourceContents(uri=uri_str, blob=base64.b64encode(out).decode(), mime_type=template.mime_type or "application/octet-stream")]
return [TextResourceContents(uri=uri_str, text=str(out), mime_type=template.mime_type or "text/plain")]
return []
async def report_progress(self, progress: float, total: float | None = None, message: str | None = None):
"""发送进度通知(仅当客户端提供了 progressToken 时有效)。"""
if self._progress_token is not None:
self._send({"method": "notifications/progress", "params": {"progressToken": self._progress_token, "progress": progress, "total": total, "message": message}})
async def debug(self, data):
"""发送 debug 级别日志。"""
self._send({"method": "notifications/message", "params": {"level": "debug", "data": data}})
async def info(self, data):
"""发送 info 级别日志。"""
self._send({"method": "notifications/message", "params": {"level": "info", "data": data}})
# 工具函数封装类
class _Tool:
def __init__(self, fn, name=None, desc=None, structured_output=True):
# 函数本体
self.fn = fn
# 名称
self.name = name or fn.__name__
# 描述
self.desc = desc or (fn.__doc__ or "").strip()
# 入参 schema
self.schema = _schema(fn)
# 是否是异步函数
self.async_fn = asyncio.iscoroutinefunction(fn)
# 是否结构化输出
self.structured_output = structured_output
# 输出 schema 及需否包裹
self.output_schema, self.wrap_output = _output_schema_and_wrap(fn, structured_output) if structured_output else (None, False)
# Context 参数名(若有)
self._ctx_param = _find_context_parameter(fn)
# 转换为 Tool 对象
def to_tool(self):
return Tool(name=self.name, description=self.desc or None, input_schema=self.schema, output_schema=self.output_schema)
# 执行工具(自动支持异步),支持注入 Context
+ def run(self, args, mcp_server=None, request_id=None, progress_token=None, send_notification=None, elicitation_session=None):
if self._ctx_param is not None and mcp_server is not None:
+ ctx = Context(mcp_server, request_id, send_notification, progress_token, elicitation_session)
args = dict(args) if args else {}
args[self._ctx_param] = ctx
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# 静态资源封装类
class _Resource:
def __init__(self, uri, fn, mime_type="text/plain"):
# 资源 URI
self.uri = uri
# 回调函数
self.fn = fn
# MIME 类型
self.mime_type = mime_type
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 资源运行,自动支持异步
def run(self):
if self.async_fn:
return asyncio.run(self.fn())
return self.fn()
# 资源模板封装类(支持 URI 参数模板)
class _ResourceTemplate:
def __init__(self, uri_template, fn, mime_type="text/plain"):
# URI 模板
self.uri_template = uri_template
# 生成资源内容的函数
self.fn = fn
# MIME 类型
self.mime_type = mime_type
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 提取函数里除了 _ 开头之外的参数名
sig = inspect.signature(fn)
self.param_names = [n for n in sig.parameters if not n.startswith("_")]
# 检查 URI 是否与模板匹配,并解析参数
def matches(self, uri):
pattern = self.uri_template.replace("{", "(?P<").replace("}", ">[^/]+)")
m = re.match(f"^{pattern}$", uri)
if m:
return {k: unquote(v) for k, v in m.groupdict().items()}
return None
# 执行模板内容生成函数
def run(self, args):
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# Prompt 封装
class _Prompt:
"""内部 Prompt 封装类。"""
def __init__(self, fn, name=None, title=None, description=None):
# prompt 生成函数
self.fn = fn
# prompt 名称
self.name = name or fn.__name__
# prompt 标题
self.title = title
# prompt 描述
self.description = description or (fn.__doc__ or "").strip()
# 通用 schema
self.schema = _prompt_schema(fn)
# 是否异步
self.async_fn = asyncio.iscoroutinefunction(fn)
# 从 schema 生成 PromptArgument 列表
args_list = []
if "properties" in self.schema:
required = set(self.schema.get("required", []))
for pname, pinfo in self.schema["properties"].items():
args_list.append(
PromptArgument(
name=pname,
description=pinfo.get("title"),
required=pname in required,
)
)
self.arguments = args_list
# 转成 Prompt 对象
def to_prompt(self):
return Prompt(
name=self.name,
title=self.title,
description=self.description or None,
arguments=self.arguments if self.arguments else None,
)
# prompt 执行
def run(self, args):
if self.async_fn:
return asyncio.run(self.fn(**args))
return self.fn(**args)
# FastMCP 主类
class FastMCP:
def __init__(self, name="mcp-server"):
# 服务器名称
self.name = name
# 工具注册表
self._tools = {}
# 静态资源注册表
self._resources = {}
# 资源模板注册表
self._resource_templates = {}
# prompt 注册表
self._prompts = {}
# 补全处理器(注册后用于 completion/complete 请求)
self._completion_handler = None
# 注册工具的装饰器
def tool(self, name=None, description=None, structured_output=True):
def deco(fn):
t = _Tool(fn, name, description, structured_output=structured_output)
self._tools[t.name] = t
return fn
return deco
# 注册资源或模板资源的装饰器
def resource(self, uri, mime_type="text/plain"):
def deco(fn):
# 判断带模板参数还是静态资源
if "{" in uri and "}" in uri:
template = _ResourceTemplate(uri, fn, mime_type)
self._resource_templates[uri] = template
else:
res = _Resource(uri, fn, mime_type)
self._resources[uri] = res
return fn
return deco
# 注册补全处理器的装饰器
def completion(self):
"""注册参数补全处理器,用于 Prompt 或资源模板参数的自动补全。"""
def deco(fn):
self._completion_handler = fn
return fn
return deco
# 注册 Prompt 的装饰器
def prompt(self, name=None, title=None, description=None):
"""注册 Prompt 的装饰器。"""
def deco(fn):
p = _Prompt(fn, name=name, title=title, description=description)
self._prompts[p.name] = p
return fn
return deco
# 核心 RPC 方法分发
+ def _handle(self, req, elicitation_session=None):
# 获取方法名、参数和请求 id
method, params, rid = req.method, req.params or {}, req.id
# 初始化请求
if method == "initialize":
caps = ServerCapabilities(
tools=ToolsCapability(),
resources=ResourcesCapability(),
prompts=PromptsCapability() if self._prompts else None,
completions=CompletionsCapability() if self._completion_handler else None,
+ elicitation=ElicitationCapability(form=FormElicitationCapability(), url=UrlElicitationCapability()),
)
r = InitializeResult(
protocol_version=LATEST_PROTOCOL_VERSION,
capabilities=caps,
server_info=Implementation(name=self.name, version="0.1.0"),
)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 工具列表请求
if method == "tools/list":
r = ListToolsResult(tools=[t.to_tool() for t in self._tools.values()])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 工具调用请求
if method == "tools/call":
p = CallToolRequestParams.model_validate(params, by_name=False)
t = self._tools.get(p.name)
if not t:
return JSONRPCError(
jsonrpc="2.0",
id=rid,
error=ErrorData(code=-32602, message=f"Unknown tool: {p.name}")
)
# 从 meta/_meta 提取 progressToken,用于发送进度/日志通知
meta = p.meta or {}
progress_token = meta.get("progressToken") or meta.get("progress_token")
notifications = []
def _send_notification(payload):
method_name = payload.get("method", "")
params = payload.get("params")
notifications.append(JSONRPCNotification(jsonrpc="2.0", method=method_name, params=params))
try:
out = t.run(
p.arguments or {},
mcp_server=self,
request_id=rid,
progress_token=progress_token,
send_notification=_send_notification,
+ elicitation_session=elicitation_session,
)
# 如果直接返回的是 CallToolResult
if isinstance(out, CallToolResult):
r = out
else:
struct = None
# 结构化输出
if t.output_schema is not None:
struct = _to_structured(out, t.output_schema, t.wrap_output)
# 字符串直接用文本包装
if isinstance(out, str):
c = [TextContent(text=out)]
elif out is None:
c = []
elif struct is not None:
# 结构化结果按 JSON 格式化后返回
c = [TextContent(text=json.dumps(struct, ensure_ascii=False, indent=2))]
else:
# 普通对象尝试转 json,否则转字符串
try:
if PydanticBaseModel and isinstance(out, PydanticBaseModel):
text = json.dumps(out.model_dump(mode="json"), ensure_ascii=False, indent=2)
elif isinstance(out, dict):
text = json.dumps(out, ensure_ascii=False, indent=2)
else:
text = str(out)
except Exception:
text = str(out)
c = [TextContent(text=text)]
r = CallToolResult(content=c, structured_content=struct)
except Exception as e:
r = CallToolResult(content=[TextContent(text=str(e))], is_error=True)
resp = JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
if notifications:
return (notifications, resp)
return resp
# prompts/list 请求
if method == "prompts/list":
prompts_list = [p.to_prompt() for p in self._prompts.values()]
r = ListPromptsResult(prompts=prompts_list)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# prompts/get 请求
if method == "prompts/get":
p = GetPromptRequestParams.model_validate(params, by_name=False)
prompt_obj = self._prompts.get(p.name)
if not prompt_obj:
return JSONRPCError(
jsonrpc="2.0",
id=rid,
error=ErrorData(code=-32602, message=f"Unknown prompt: {p.name}")
)
try:
args = p.arguments or {}
result = prompt_obj.run(args)
messages = []
# 单条消息包装为列表
if not isinstance(result, (list, tuple)):
result = [result]
for msg in result:
# 若为内置的 Message 对象
if isinstance(msg, prompts.base.Message):
content = msg.content
if isinstance(content, str):
content = TextContent(type="text", text=content)
messages.append(PromptMessage(role=msg.role, content=content))
# 字符串消息按 user 角色组装
elif isinstance(msg, str):
messages.append(PromptMessage(role="user", content=TextContent(type="text", text=msg)))
# dict 消息,读 role 和 content
elif isinstance(msg, dict):
role = msg.get("role", "user")
cnt = msg.get("content", "")
if isinstance(cnt, dict) and cnt.get("type") == "text":
content = TextContent(**cnt)
else:
content = TextContent(type="text", text=str(cnt) if not isinstance(cnt, str) else cnt)
messages.append(PromptMessage(role=role, content=content))
# 其它均归为 user 文本
else:
messages.append(PromptMessage(role="user", content=TextContent(type="text", text=str(msg))))
r = GetPromptResult(description=prompt_obj.description, messages=messages)
except Exception as e:
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 资源静态列表
if method == "resources/list":
resources = [
Resource(uri=u, name=u, mime_type=r.mime_type)
for u, r in self._resources.items()
]
r = ListResourcesResult(resources=resources)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 资源模板列表
if method == "resources/templates/list":
templates = [
ResourceTemplate(uri_template=u, name=u, mime_type=t.mime_type)
for u, t in self._resource_templates.items()
]
r = ListResourceTemplatesResult(resource_templates=templates)
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 处理补全请求 completion/complete
if method == "completion/complete":
# 如果没有补全处理器,返回方法未找到错误
if not self._completion_handler:
return JSONRPCError(
jsonrpc="2.0",
id=rid,
error=ErrorData(code=-32601, message="Method not found: completion/complete"),
)
# 校验并解析补全请求参数
p = CompleteRequestParams.model_validate(params, by_name=False)
try:
# 获取补全处理函数
fn = self._completion_handler
# 如果处理函数是协程,则用 asyncio.run 执行,否则直接调用
result = (
asyncio.run(fn(p.ref, p.argument, p.context))
if asyncio.iscoroutinefunction(fn)
else fn(p.ref, p.argument, p.context)
)
# 若处理函数无结果则返回空补全
if result is None:
result = Completion(values=[], total=None, has_more=None)
# 封装补全响应对象
r = CompleteResult(completion=result)
# 构造并返回 JSONRPCResponse 响应
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
except Exception as e:
# 捕获异常并返回内部错误
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
# 读取资源接口
if method == "resources/read":
p = ReadResourceRequestParams.model_validate(params, by_name=False)
uri_str = str(p.uri)
try:
# 静态资源优先
if res := self._resources.get(uri_str):
out = res.run()
content = TextResourceContents(uri=uri_str, text=str(out), mime_type=res.mime_type)
r = ReadResourceResult(contents=[content])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 匹配模板资源
for template in self._resource_templates.values():
if args := template.matches(uri_str):
out = template.run(args)
if isinstance(out, bytes):
content = BlobResourceContents(
uri=uri_str,
blob=base64.b64encode(out).decode(),
mime_type=template.mime_type or "application/octet-stream",
)
else:
content = TextResourceContents(
uri=uri_str,
text=str(out),
mime_type=template.mime_type or "text/plain",
)
r = ReadResourceResult(contents=[content])
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 找不到资源
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32602, message=f"Unknown resource: {uri_str}"))
except Exception as e:
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32603, message=str(e)))
# 未知方法
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32601, message=f"Method not found: {method}"))
# 消息包装与请求
+ def _handle_msg(self, msg, elicitation_session=None):
# 非 SessionMessage 类型忽略
if not isinstance(msg, SessionMessage):
return None
m = msg.message
# 必须是带 id 的 jsonrpc 请求
if not isinstance(m, JSONRPCRequest) or getattr(m, "id", None) is None:
return None
#print("[Server] Request:", m.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
try:
+ resp = self._handle(m, elicitation_session)
#print("[Server] Response:", resp.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
return resp
except Exception as e:
err = JSONRPCError(jsonrpc="2.0", id=m.id, error=ErrorData(code=-32603, message=str(e)))
#print("[Server] Response (error):", err.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
return err
# 运行服务器主逻辑
def run(self, transport="stdio", host: str = "127.0.0.1", port: int = 8000):
# 如果传输方式为 stdio,则启动标准输入输出服务器
if transport == "stdio":
stdio.stdio_server(self._handle_msg)
# 如果传输方式为 streamable-http,则启动基于 HTTP 的服务器
elif transport == "streamable-http":
# 导入 Starlette 框架相关模块
from starlette.applications import Starlette
from starlette.routing import Mount
# 导入 streamable_http_app 工厂方法
from mcp_lite.server.streamable_http import streamable_http_app
# 导入 uvicorn 用于启动 ASGI 服务
import uvicorn
# 创建 Streamable HTTP ASGI 应用
app = streamable_http_app(self._handle_msg, path="/")
# 将 /mcp 路径挂载到应用
full_app = Starlette(routes=[Mount("/mcp", app=app)])
# 启动 uvicorn 服务,监听指定 host 和 port
uvicorn.run(full_app, host=host, port=port)
# 如果传输方式不被支持,则抛出异常
else:
raise ValueError(f"unsupported transport: {transport}")
# 明确导出接口
__all__ = ["FastMCP", "Context", "prompts"]
6. stdio.py #
mcp_lite/server/stdio.py
# 导入 sys 模块,用于标准输入输出操作
import sys
# 从 mcp_lite.message 模块导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 从 mcp_lite.types 模块导入 jsonrpc_message_adapter、JSONRPCRequest、JSONRPCResponse、JSONRPCError、ElicitRequest、ElicitResult
+from mcp_lite.types import (
+ jsonrpc_message_adapter,
+ JSONRPCRequest,
+ JSONRPCResponse,
+ JSONRPCError,
+ ElicitResult,
+)
+class ElicitationSession:
+ """服务端向客户端发起 elicitation 请求的会话,用于 tools/call 中 ctx.elicit() 时发送 elicitation/create 并等待响应。"""
+ def __init__(self, stdin, stdout):
+ self._stdin = stdin
+ self._stdout = stdout
+ self._req_id = 10000 # 使用独立 id 区间,避免与客户端请求 id 冲突
+ def send_request(self, method: str, params: dict) -> dict:
+ """发送 JSON-RPC 请求并同步等待响应。"""
+ self._req_id += 1
+ rid = self._req_id
+ req = JSONRPCRequest(jsonrpc="2.0", id=rid, method=method, params=params)
+ self._stdout.write(req.model_dump_json(by_alias=True, exclude_unset=True) + "\n")
+ self._stdout.flush()
+ line = self._stdin.readline()
+ if not line:
+ raise RuntimeError("Elicitation: connection closed while waiting for response")
+ line = line.strip()
+ if not line:
+ raise RuntimeError("Elicitation: empty response")
+ msg = jsonrpc_message_adapter.validate_json(line, by_name=False)
+ if isinstance(msg, JSONRPCError):
+ raise RuntimeError(f"Elicitation error: {msg.error.message}")
+ if isinstance(msg, JSONRPCResponse) and getattr(msg, "id", None) == rid:
+ return msg.result or {}
+ raise RuntimeError("Elicitation: unexpected response")
# 定义 stdio_server 函数,参数 handler 是处理消息的回调函数
def stdio_server(handler):
# 进入一个无限循环,持续处理标准输入的数据
while True:
# 从标准输入读取一行
line = sys.stdin.readline()
# 如果读取到空字符串,说明 EOF,跳出循环
if not line:
break
# 去除读取行首尾的空白字符
line = line.strip()
# 如果去除空白后字符串仍为空,继续读取下一行
if not line:
continue
try:
# 使用 jsonrpc_message_adapter 对输入字符串进行解析为消息对象
msg = jsonrpc_message_adapter.validate_json(line, by_name=False)
# 创建 elicitation 会话,供 tools/call 中 ctx.elicit() 使用
+ elicitation_session = ElicitationSession(sys.stdin, sys.stdout)
# 使用解析后的消息包装成 SessionMessage 再传递给 handler 处理,获取响应
+ response = handler(SessionMessage(message=msg), elicitation_session)
# 判断 handler 是否返回了响应结果
if response is not None:
# 判断 response 是否为一个长度为 2 的元组(即 (notifications, response))
if isinstance(response, tuple) and len(response) == 2:
# 解包 notifications 和 resp
notifications, resp = response
# 遍历所有通知消息
for n in notifications:
# 将每个通知对象序列化为 JSON 字符串写入标准输出,并添加换行
sys.stdout.write(n.model_dump_json(by_alias=True, exclude_unset=True) + "\n")
# 立即刷新标准输出缓冲区,确保消息及时输出到客户端
sys.stdout.flush()
# 将 response 设置为实际的响应对象
response = resp
# 如果响应对象有 message 属性,则直接使用;否则把响应包装成 SessionMessage
r = response if hasattr(response, "message") else SessionMessage(message=response)
# 将响应的 message 属性序列化为 JSON 字符串,写入标准输出,并换行
sys.stdout.write(r.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n")
# 立即刷新标准输出缓冲区,确保数据被及时输出
sys.stdout.flush()
except Exception:
print("[Server] Error:", sys.exc_info(), file=sys.stderr)
# 捕获所有异常,忽略错误继续处理下一行
pass7. types.py #
mcp_lite/types.py
# 从 pydantic 导入 BaseModel、ConfigDict、Field、TypeAdapter、field_validator
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, field_validator
# 导入 pydantic 的 to_camel 驼峰命名生成器
from pydantic.alias_generators import to_camel
# 导入 Any、Literal、Annotated 类型注解
from typing import Any, Literal, Annotated
# 定义 RequestId 类型,既可以是 int 也可以是 str
RequestId = int | str
# 定义 MCP 的基础模型类,支持驼峰命名和按名称填充
class MCPModel(BaseModel):
# 指定 Pydantic 模型配置:使用驼峰形式命名和按名称填充
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
# Elicitation 能力(用于 initialize 能力协商,需在 ClientCapabilities/ServerCapabilities 前定义)
+class FormElicitationCapability(MCPModel):
+ pass
+class UrlElicitationCapability(MCPModel):
+ pass
+class ElicitationCapability(MCPModel):
+ form: FormElicitationCapability | None = None
+ url: UrlElicitationCapability | None = None
# 定义客户端能力结构体
class ClientCapabilities(MCPModel):
# 可选字段:客户端实验性能力扩展,键为 str,值为字典
experimental: dict[str, dict[str, Any]] | None = None
# 可选字段:客户端 Elicitation 能力(提供 elicitation_callback 时声明)
+ elicitation: ElicitationCapability | None = None
# 定义服务器或客户端的实现信息结构体
class Implementation(MCPModel):
# 实现的名称
name: str = ""
# 实现的版本号
version: str = ""
# 可选字段:人类可读的标题
title: str | None = None
# 可选字段:实现的描述
description: str | None = None
# 定义初始化请求参数结构体
class InitializeRequestParams(MCPModel):
# 协议版本号
protocol_version: str = ""
# 可选字段:客户端能力描述
capabilities: ClientCapabilities = None
# 可选字段:客户端实现信息
client_info: Implementation = None
# 定义初始化请求结构体
class InitializeRequest(MCPModel):
# 方法名,固定为 "initialize"
method: Literal["initialize"] = "initialize"
# 可选字段:参数,类型为 InitializeRequestParams
params: InitializeRequestParams = None
# 当前协议的最新版本号
LATEST_PROTOCOL_VERSION = "2024-11-05"
# 定义工具相关能力结构体
class ToolsCapability(MCPModel):
# 工具列表是否发生变化,可选布尔类型
list_changed: bool | None = None
# 定义资源相关能力结构体
class ResourcesCapability(MCPModel):
# 是否支持资源订阅
subscribe: bool | None = None
# 资源列表是否变化通知
list_changed: bool | None = None
# 定义 Prompt 相关能力结构体
class PromptsCapability(MCPModel):
list_changed: bool | None = None
# 定义补全能力描述(服务端声明支持补全时使用)
class CompletionsCapability(MCPModel):
pass
# 定义服务端能力描述结构体
class ServerCapabilities(MCPModel):
# 可选字段:实验性能力扩展
experimental: dict[str, dict[str, Any]] | None = None
# 可选字段:工具能力
tools: ToolsCapability | None = None
# 可选字段:资源能力
resources: ResourcesCapability | None = None
# 可选字段:Prompt 能力
prompts: PromptsCapability | None = None
# 可选字段:补全能力(注册了 @completion 时声明)
completions: CompletionsCapability | None = None
# 可选字段:Elicitation 能力(服务端支持 elicitation 时声明)
+ elicitation: ElicitationCapability | None = None
# 定义初始化响应结构体
class InitializeResult(MCPModel):
# 协议版本号
protocol_version: str = ""
# 可选字段:服务端能力描述
capabilities: ServerCapabilities = None
# 可选字段:服务端实现信息
server_info: Implementation = None
# 可选字段:初始化说明信息
instructions: str | None = None
# 定义客户端初始化完成通知结构体
class InitializedNotification(MCPModel):
# 方法名,固定为 "notifications/initialized"
method: Literal["notifications/initialized"] = "notifications/initialized"
# 可选字段:通知参数,可以为字典或 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 请求的数据结构
class JSONRPCRequest(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 请求 ID,可以为 int 或 str 类型
id: RequestId = None
# 方法名称,字符串类型
method: str = ""
# 方法参数,为一个字典或 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 通知的数据结构(没有 id 字段)
class JSONRPCNotification(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 通知的方法名称
method: str = ""
# 通知参数,可以为字典或者 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 响应的数据结构
class JSONRPCResponse(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 响应的 ID,需要与请求 ID 匹配
id: RequestId = None
# 响应结果,可以为字典或 None
result: dict[str, Any] = None
# 定义错误的数据结构
class ErrorData(BaseModel):
# 错误码,默认值为 0
code: int = 0
# 错误信息,默认值为空字符串
message: str = ""
# 附加的错误数据,可以为任意类型或 None
data: Any = None
# 定义 JSONRPC 错误消息的数据结构
class JSONRPCError(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 错误对应的请求 ID,可以为 None
id: RequestId | None = None
# 错误的详细信息,类型为 ErrorData
error: ErrorData = None
# 定义所有 JSONRPC 消息的联合类型
JSONRPCMessage = JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError
# 进度通知参数(notifications/progress)
class ProgressNotificationParams(MCPModel):
progress_token: str | int = ""
progress: float = 0.0
total: float | None = None
message: str | None = None
# 日志消息通知参数(notifications/message)
class LoggingMessageNotificationParams(MCPModel):
level: str = "info"
data: Any = None
logger: str | None = None
# 定义 JSONRPC 消息适配器,用于类型自动推断和校验
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)
# 定义工具结果中的文本内容块
class TextContent(MCPModel):
type: Literal["text"] = "text"
text: str = ""
# 定义工具描述数据结构体
class Tool(MCPModel):
# 工具名称
name: str = ""
# 可选字段:工具描述
description: str | None = None
# 工具输入参数的 schema,默认为空字典
input_schema: dict[str, Any] = Field(default_factory=dict)
# 可选字段:工具输出 schema
output_schema: dict[str, Any] | None = None
# 定义获取工具列表请求结构体
class ListToolsRequest(MCPModel):
# 方法名,固定为 "tools/list"
method: Literal["tools/list"] = "tools/list"
# 可选字段:参数,可以为字典或 None
params: dict[str, Any] | None = None
# 定义获取工具列表响应结构体
class ListToolsResult(MCPModel):
# 工具列表,默认为空列表
tools: list[Tool] = []
# 可选字段:分页游标,可为 None
next_cursor: str | None = None
# 定义资源元数据结构体
class Resource(MCPModel):
# 资源 URI
uri: str = ""
# 可选:资源名称
name: str | None = None
# 可选:人类可读标题
title: str | None = None
# 可选:资源描述
description: str | None = None
# 可选:MIME 类型
mime_type: str | None = None
# 定义资源模板结构体
class ResourceTemplate(MCPModel):
# URI 模板,如 greeting://{name}
uri_template: str = ""
# 可选:模板名称
name: str | None = None
# 可选:人类可读标题
title: str | None = None
# 可选:模板描述
description: str | None = None
# 可选:MIME 类型
mime_type: str | None = None
# 定义 resources/list 请求结构体
class ListResourcesRequest(MCPModel):
method: Literal["resources/list"] = "resources/list"
params: dict[str, Any] | None = None
# 定义 resources/list 响应结构体
class ListResourcesResult(MCPModel):
resources: list[Resource] = []
next_cursor: str | None = None
# 定义 resources/templates/list 请求结构体
class ListResourceTemplatesRequest(MCPModel):
method: Literal["resources/templates/list"] = "resources/templates/list"
params: dict[str, Any] | None = None
# 定义 resources/templates/list 响应结构体
class ListResourceTemplatesResult(MCPModel):
resource_templates: list[ResourceTemplate] = []
next_cursor: str | None = None
# 定义 resources/read 请求参数结构体
class ReadResourceRequestParams(MCPModel):
uri: str = ""
# 定义 resources/read 请求结构体
class ReadResourceRequest(MCPModel):
method: Literal["resources/read"] = "resources/read"
params: ReadResourceRequestParams = None
# 定义资源内容基类(文本)
class TextResourceContents(MCPModel):
uri: str = ""
mime_type: str | None = None
text: str = ""
# 定义资源内容基类(二进制)
class BlobResourceContents(MCPModel):
uri: str = ""
mime_type: str | None = None
blob: str = "" # base64 编码
# 定义 resources/read 响应结构体
class ReadResourceResult(MCPModel):
contents: list[TextResourceContents | BlobResourceContents] = []
# 定义 Prompt 参数结构体
# 定义代表 Prompt 参数的模型
class PromptArgument(MCPModel):
# 参数名称
name: str = ""
# 参数描述,可为 None
description: str | None = None
# 是否为必填参数,可为 None
required: bool | None = None
# 定义 Prompt 元数据结构体
# 定义代表 Prompt 元数据的模型
class Prompt(MCPModel):
# Prompt 名称
name: str = ""
# Prompt 描述,可为 None
description: str | None = None
# Prompt 参数列表,可为 None
arguments: list[PromptArgument] | None = None
# Prompt 标题,可为 None
title: str | None = None
# 定义 prompts/list 请求结构体
# 定义 prompts/list 请求的数据模型
class ListPromptsRequest(MCPModel):
# 固定方法字段
method: Literal["prompts/list"] = "prompts/list"
# 请求参数,可为 None
params: dict[str, Any] | None = None
# 定义 prompts/list 响应结构体
# 定义 prompts/list 响应的数据模型
class ListPromptsResult(MCPModel):
# 返回的 Prompt 列表
prompts: list[Prompt] = []
# 下一页游标,可为 None
next_cursor: str | None = None
# 定义 prompts/get 请求参数结构体
# 定义 prompts/get 的参数模型
class GetPromptRequestParams(MCPModel):
# Prompt 名称
name: str = ""
# 传递给 Prompt 的参数,可为 None
arguments: dict[str, str] | None = None
# 定义 prompts/get 请求结构体
# 定义 prompts/get 请求的数据模型
class GetPromptRequest(MCPModel):
# 固定方法字段
method: Literal["prompts/get"] = "prompts/get"
# 请求参数,可为 None
params: GetPromptRequestParams | None = None
# 定义 Prompt 消息结构体(role + content)
# 定义表示 Prompt 里的单条消息的数据模型
class PromptMessage(MCPModel):
# 消息角色(user 或 assistant)
role: Literal["user", "assistant"] = "user"
# 消息内容,可以是 TextContent 或字典,默认为空文本
content: TextContent | dict[str, Any] = Field(default_factory=lambda: TextContent(text=""))
# 字段校验器:在模型初始化前解析 content 字段
@field_validator("content", mode="before")
@classmethod
def _parse_content(cls, v):
# 如果是字典且 type 为 "text",转换为 TextContent 实例
if isinstance(v, dict) and v.get("type") == "text":
return TextContent(text=v.get("text", ""))
# 否则原样返回
return v
# 定义 prompts/get 响应结构体
# 定义 prompts/get 的响应数据模型
class GetPromptResult(MCPModel):
# Prompt 的描述,可为 None
description: str | None = None
# Prompt 返回的消息列表
messages: list[PromptMessage] = []
# 定义 Prompt 引用类型(用于补全请求)
class PromptReference(MCPModel):
# 类型字段,固定为 "ref/prompt"
type: Literal["ref/prompt"] = "ref/prompt"
# Prompt 名称
name: str = ""
# 定义资源模板引用类型(用于补全请求)
class ResourceTemplateReference(MCPModel):
# 类型字段,固定为 "ref/resource"
type: Literal["ref/resource"] = "ref/resource"
# 资源模板的 URI
uri: str = ""
# 定义补全参数类型
class CompletionArgument(MCPModel):
# 参数名称
name: str = ""
# 参数值
value: str = ""
# 定义补全上下文类型(已解析的其它参数)
class CompletionContext(MCPModel):
# 补全上下文参数,字典类型,可能为 None
arguments: dict[str, str] | None = None
# 定义补全结果内容类型
class Completion(MCPModel):
# 候选内容列表,默认为空列表
values: list[str] = Field(default_factory=list)
# 总数,可为 None
total: int | None = None
# 是否有更多结果,可为 None
has_more: bool | None = None
# 定义 completion/complete 请求参数
class CompleteRequestParams(MCPModel):
# 引用字段,用 type 字段区分 PromptReference 与 ResourceTemplateReference
ref: Annotated[
PromptReference | ResourceTemplateReference,
Field(discriminator="type"),
] = None
# 需要补全的参数
argument: CompletionArgument = None
# 辅助的上下文字段,可为 None
context: CompletionContext | None = None
# 定义 completion/complete 请求
class CompleteRequest(MCPModel):
# 方法名,固定为 "completion/complete"
method: Literal["completion/complete"] = "completion/complete"
# 请求参数
params: CompleteRequestParams = None
# 定义 completion/complete 响应
class CompleteResult(MCPModel):
# 补全结果,类型为 Completion
completion: Completion = None
# 定义调用工具请求参数结构体
class CallToolRequestParams(MCPModel):
# 工具名称
name: str = ""
# 可选字段:输入参数,为字典类型或 None
arguments: dict[str, Any] | None = None
# 可选:元数据,含 progressToken 时服务端可发送进度通知
meta: dict[str, Any] | None = Field(default=None, alias="_meta")
# 定义调用工具请求结构体
class CallToolRequest(MCPModel):
# 方法名,固定为 "tools/call"
method: Literal["tools/call"] = "tools/call"
# 可选字段:参数,类型为 CallToolRequestParams
params: CallToolRequestParams = None
# === Elicitation 相关类型 ===
# 定义 Elicitation 请求的 schema 类型,为一个包含 str 键和任意值的字典,用于描述请求的 JSON Schema 子集
+ElicitRequestedSchema: type = dict[str, Any]
# 定义 Form 模式的 elicitation 请求参数结构体
+class ElicitRequestFormParams(MCPModel):
# mode 字段,固定为 "form"
+ mode: Literal["form"] = "form"
# message 字段,请求给用户展示的消息,默认为空字符串
+ message: str = ""
# requested_schema 字段,表示所请求的 schema,默认为空字典
+ requested_schema: dict[str, Any] = Field(default_factory=dict)
# 定义 URL 模式的 elicitation 请求参数结构体(暂时保留,未实现)
+class ElicitRequestURLParams(MCPModel):
# mode 字段,固定为 "url"
+ mode: Literal["url"] = "url"
# message 字段,请求给用户展示的消息,默认为空字符串
+ message: str = ""
# url 字段,请求跳转的 URL,默认为空字符串
+ url: str = ""
# elicitation_id 字段,当前 elicitation 的唯一标识,默认为空字符串
+ elicitation_id: str = ""
# 定义 Elicitation 请求参数的联合类型,可以为表单模式参数或 URL 模式参数
+ElicitRequestParams = ElicitRequestFormParams | ElicitRequestURLParams
# 定义 Elicitation 请求结构体,表示服务端向客户端发送的请求
+class ElicitRequest(MCPModel):
# method 字段,请求的方法名,固定为 "elicitation/create"
+ method: Literal["elicitation/create"] = "elicitation/create"
# params 字段,请求所需参数,可为 Form 或 URL 模式参数,默认为 None
+ params: ElicitRequestFormParams | ElicitRequestURLParams = None
# 定义 Elicitation 结果结构体,表示客户端返回给服务端的结果
+class ElicitResult(MCPModel):
# action 字段,客户端选择的操作,可为 "accept"、"decline" 或 "cancel",默认为 "cancel"
+ action: Literal["accept", "decline", "cancel"] = "cancel"
# content 字段,内容的数据体,可为包含多种基本类型的字典,也可以为 None
+ content: dict[str, str | int | float | bool | list[str] | None] | None = None
# 定义调用工具响应结构体
class CallToolResult(MCPModel):
# 内容字段,由 TextContent 或字典组成的列表,默认为空列表
content: list[TextContent | dict[str, Any]] = Field(default_factory=list)
# 结构化内容字段,可为字典或 None
structured_content: dict[str, Any] | None = None
# 错误标志字段,标识是否为错误结果,默认为 False
is_error: bool = False
# 针对 content 字段的字段校验器,模型初始化前调用
@field_validator("content", mode="before")
@classmethod
def _parse_content(cls, v):
# 如果传入的值不是列表,直接返回
if not isinstance(v, list):
return v
# 初始化输出列表
out = []
# 遍历每一项
for item in v:
# 如果项是字典且类型为 "text",转换为 TextContent 实例
if isinstance(item, dict) and item.get("type") == "text":
out.append(TextContent(text=item.get("text", "")))
# 否则,原样加入输出列表
else:
out.append(item)
# 返回处理后的列表
return out 8.工作流程 #
8.1 功能概述 #
Elicitation 是 MCP 中的引导式表单收集:服务端在工具执行中向客户端发起表单请求,客户端收集用户输入后返回,服务端再继续执行。
8.2 时序图 #
8.3 各模块修改说明 #
8.3.1. 类型层(mcp_lite/types.py) #
新增与 Elicitation 相关的类型:
| 类型 | 用途 |
|---|---|
FormElicitationCapability / UrlElicitationCapability |
声明 form / url 模式能力 |
ElicitationCapability |
封装 form、url 能力 |
ElicitRequestFormParams |
form 模式参数:mode, message, requested_schema |
ElicitRequestURLParams |
url 模式参数(预留) |
ElicitResult |
客户端返回:action(accept/decline/cancel)、content(表单数据) |
ClientCapabilities.elicitation |
客户端声明支持 elicitation |
ServerCapabilities.elicitation |
服务端声明支持 elicitation |
8.3.2. 服务端 stdio(mcp_lite/server/stdio.py) #
ElicitationSession:在 tools/call 中向客户端发起 elicitation 请求并同步等待响应send_request(method, params):构造 JSON-RPC 请求写入 stdout,从 stdin 读取响应- 使用独立 id 区间(10000+)避免与客户端请求 id 冲突
stdio_server:为每次请求创建ElicitationSession,并作为第二参数传给 handler:handler(msg, elicitation_session)
8.3.3. 服务端 FastMCP(mcp_lite/server/fastmcp/__init__.py) #
Context.elicit(message, schema)- 将 Pydantic schema 转为 JSON Schema
- 通过
elicitation_session.send_request("elicitation/create", params)发送请求 - 解析客户端返回的
ElicitResult,若action=="accept"则用 schema 校验content - 返回
_ElicitationResult(action, data),供工具使用result.action和result.data
_find_context_parameter:增加对参数名ctx的识别,即使无类型注解也注入 Context_Tool.run:接收elicitation_session并传给Context_handle:在 tools/call 中把elicitation_session传给_Tool.run;在 initialize 中声明elicitation能力
8.3.4. 客户端(mcp_lite/client/session.py) #
ClientSession.__init__:新增可选参数elicitation_callbackinitialize():若提供elicitation_callback,则在 capabilities 中声明ElicitationCapability_request()循环:- 收到
method == "elicitation/create"且带 id 的消息时,视为服务端发起的 elicitation 请求 - 解析 params,调用
elicitation_callback(None, params) - 支持同步和异步 callback(异步时用
asyncio.run) - 将 callback 返回的
ElicitResult封装为 JSON-RPC 响应发回服务端 - 处理完后
continue,继续等待原请求(如 tools/call)的响应
- 收到
8.3.5. 示例代码 #
elicitation_server.py:三个工具分别用ctx.elicit(message, schema)收集TaskInfo、UserPreferences、ConfirmationDataelicitation_client.py:实现handle_elicitation,按 schema 的properties和required逐项提示用户输入,并返回ElicitResult
8.4 流程要点 #
双向请求:客户端发 tools/call,服务端在处理过程中再发 elicitation/create;客户端在等待 tools/call 响应时,会先处理 elicitation/create 并回复。
同步阻塞:
ElicitationSession.send_request在 stdin 上阻塞等待 elicitation 响应;ClientSession._request在收到 elicitation 请求时立即处理并回复,然后继续等待 tools/call 的响应。id 隔离:elicitation 使用 10000+ 的 id,避免与客户端 0、1、2… 的请求 id 冲突。
ctx 注入:工具函数若包含名为
ctx的参数,即使无类型注解,也会被注入Context,从而可使用ctx.elicit()。