1.取消 #
本节介绍了 MCP 协议中「取消」(Cancel)功能的实现机制与典型用法。
取消用于提前终止正在执行的请求或任务。例如用户点击界面上的「停止」,或前端/后端检测到不再需要结果(如超时、切换了 tab)时,可通过相应的取消机制快速释放资源,提高交互体验。MCP 18.0 及以上版本原生支持取消标准,适用于长时间运行的工具调用等场景。
MCP 支持两类取消方式:
任务型请求(tasks/cancel):
主要面向以任务方式运行的工具(如调用方式call_tool_as_task),可通过接口/tasks/cancel取消后台任务。
客户端和服务端均需实现任务 API 支持,服务端在收到取消请求后应尽快终止相关处理,并返回cancelled状态。普通请求(notifications/cancelled):
针对非任务型的普通请求,可以发送notifications/cancelled通知对方取消特定的调用。该方式对服务端的消息处理模型有一定并发/异步要求,通常用于不适合任务接口的场景。
注意:若使用 tasks/cancel,服务端工具方法应采用协作式取消(如判断 canceled 状态、适当中断等待等)。本例服务端
slow_task的任务实现通过轮询进度并可被取消。
2. cancel_client.py #
cancel_client.py
"""MCP 取消功能客户端
演示客户端通过 tasks/cancel 中止正在运行的任务。
模拟用户点击「停止」按钮:以任务方式启动耗时工具,轮询 1~2 次状态后
发送 tasks/cancel 请求,服务端将中止任务并返回 cancelled 状态。
"""
# 导入操作系统相关功能,用于处理路径
import os
# 导入系统相关模块
import sys
# 导入 anyio,用于异步IO操作
import anyio
# 从 mcp 包中导入客户端会话与通信、服务器参数定义等
from mcp import ClientSession, StdioServerParameters, stdio_client
# 获取当前文件所在目录
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 根据操作系统类型拼接出虚拟环境 python 解释器的路径
VENV_PYTHON = os.path.join(
BASE_DIR,
".venv",
"Scripts" if os.name == "nt" else "bin",
"python.exe" if os.name == "nt" else "python",
)
# 定义主异步函数
async def main() -> None:
# 构造启动服务端所需的参数
server_params = StdioServerParameters(
command=VENV_PYTHON, # 指定 python 解释器路径
args=[os.path.join(BASE_DIR, "cancel_server.py")], # 指定要启动的服务端脚本
env={}, # 不额外指定环境变量
cwd=BASE_DIR, # 设置工作目录为当前目录
)
# 通过 stdio_client 启动并连接到服务端,并获得读写流
async with stdio_client(server_params) as (read, write):
# 创建与服务端通信的客户端会话
async with ClientSession(read, write) as session:
# 初始化会话,进行握手
await session.initialize()
print("会话已初始化")
# 获取服务端可用工具列表
tools = await session.list_tools()
print(f"可用工具: {[t.name for t in tools.tools]}")
# 以任务方式调用耗时工具 slow_task
print("\n以任务方式调用 slow_task...")
result = await session.experimental.call_tool_as_task(
"slow_task", # 要调用的工具名称
arguments={}, # 传递参数,这里为空对象
ttl=60000, # 任务存活时间(单位毫秒)
)
# 获取新创建任务的 ID
task_id = result.task.taskId
print(f"任务已创建: {task_id}")
# 初始化轮询计数器
poll_count = 0
# 异步轮询任务状态
async for status in session.experimental.poll_task(task_id):
# 获取当前状态的消息内容
msg = getattr(status, "statusMessage", None) or ""
print(f" 状态: {status.status} - {msg}")
poll_count += 1
# 轮询两次后发送取消请求
if poll_count >= 2:
print("\n发送 tasks/cancel 请求(模拟用户点击停止)...")
# 调用取消任务接口
cancel_result = await session.experimental.cancel_task(task_id)
print(f"取消结果: 任务状态 = {cancel_result.status}")
# 任务已取消,退出轮询
break
# 演示结束提示
print("\n取消功能演示完成")
# 判断当前脚本是否为主程序入口
if __name__ == "__main__":
# 如果 stdout 支持 reconfigure,则将编码设置为 utf-8,防止中文乱码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
# 异步运行主函数
anyio.run(main)
3. cancel_server.py #
cancel_server.py
# MCP 取消功能服务端说明文档
"""
MCP 取消功能服务端
取消允许任一方中止正在进行的请求。例如用户点击「停止」按钮,
或客户端/服务器发现请求已超时、不再需要结果时,可发送取消通知。
MCP 有两种取消方式:
- tasks/cancel:用于中止任务型请求(本演示使用此方式)
- notifications/cancelled:用于中止普通请求(需服务端并发处理消息)
本服务端提供耗时任务 slow_task,客户端可通过 tasks/cancel 中止。
"""
# 导入sys模块,用于系统相关操作
import sys
# 导入anyio库,实现异步IO
import anyio
# 导入mcp.types,包含类型定义
import mcp.types as types
# 导入MCP服务端基类
from mcp.server import Server
# 导入实验性任务上下文
from mcp.server.experimental.task_context import ServerTaskContext
# 导入标准输入输出服务端
from mcp.server.stdio import stdio_server
# 实例化Server对象,服务名为"cancel-server"
server = Server("cancel-server")
# 注册list_tools处理函数,返回可用工具列表
@server.list_tools()
async def handle_list_tools() -> types.ListToolsResult:
# 返回一个包含slow_task的工具列表
return types.ListToolsResult(
tools=[
types.Tool(
name="slow_task", # 工具名称
description="模拟耗时任务(约 5 秒),可被客户端通过 tasks/cancel 中止。", # 工具描述
inputSchema={"type": "object", "properties": {}}, # 工具输入参数定义
execution=types.ToolExecution(task_support=types.TASK_OPTIONAL), # 支持任务调用
)
]
)
# 注册call_tool处理函数,处理工具调用
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> types.CallToolResult | types.CreateTaskResult:
# 如果工具名称不是 slow_task,返回错误提示
if name != "slow_task":
# 返回错误结果:未知工具
return types.CallToolResult(
content=[types.TextContent(type="text", text=f"未知工具: {name}")],
isError=True,
)
# 获取服务端请求上下文对象
ctx = server.request_context
# 校验请求的任务模式(允许普通模式或任务模式)
ctx.experimental.validate_task_mode(types.TASK_OPTIONAL)
# 定义异步工作函数,处理任务型调用
async def work(task: ServerTaskContext) -> types.CallToolResult:
# 循环五次,每次模拟一步任务进度
for i in range(5):
# 更新任务当前状态
await task.update_status(f"步骤 {i + 1}/5...")
# 每一步延时1秒,模拟耗时
await anyio.sleep(1)
# 任务完成时返回结果
return types.CallToolResult(
content=[types.TextContent(type="text", text="任务完成")]
)
# 如果请求为任务模式则后台运行work函数,并返回创建任务结果
if ctx.experimental.is_task:
return await ctx.experimental.run_task(work)
# 否则为普通调用方式,直接阻塞5秒
for i in range(5):
await anyio.sleep(1)
# 普通模式下任务完成返回相同的完成结果
return types.CallToolResult(
content=[types.TextContent(type="text", text="任务完成")]
)
# 启用实验性任务支持,自动提供任务管理接口
server.experimental.enable_tasks()
# 定义主异步函数,作为服务端主入口
async def main() -> None:
# 启动stdio服务端,并获取读写流
async with stdio_server() as (read_stream, write_stream):
# 运行MCP服务器主循环,处理所有请求
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
# 如果当前脚本为主模块则执行以下代码
if __name__ == "__main__":
# 若标准输出支持reconfigure,则设置标准输入/输出编码为utf-8,防止中文乱码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
# 启动anyio运行主协程
anyio.run(main)4.执行流程 #
4.1 整体流程 #
演示 MCP 的 tasks/cancel 取消机制:客户端以任务方式调用耗时工具,轮询几次后发送 tasks/cancel,服务端将任务标记为 cancelled 并返回。
4.2 服务端 cancel_server.py 讲解 #
4.2.1. 核心结构 #
server = Server("cancel-server")
@server.list_tools() # 声明 tools 能力
@server.call_tool() # 处理 tools/call
server.experimental.enable_tasks() # 启用任务,注册 tasks/cancel 等4.2.2. 工具定义 #
| 字段 | 说明 |
|---|---|
| name | slow_task |
| execution.task_support | TASK_OPTIONAL:支持普通调用和任务调用 |
| work() | 5 步,每步 update_status + sleep(1) |
4.2.3. 调用分支 #
- 任务调用(
ctx.experimental.is_task):run_task(work)在后台执行,立即返回CreateTaskResult(含taskId) - 普通调用:同步执行 5 秒后返回
4.2.4. 任务支持 #
enable_tasks() 会注册:
tasks/get:查询任务状态tasks/result:获取任务结果tasks/list:列出任务tasks/cancel:取消任务(默认实现:将 store 中任务状态改为cancelled)
4.3 客户端 cancel_client.py 讲解 #
4.3.1. 核心流程 #
# 1. 以任务方式调用
result = await session.experimental.call_tool_as_task("slow_task", {}, ttl=60000)
task_id = result.task.taskId
# 2. 轮询任务状态
async for status in session.experimental.poll_task(task_id):
print(status.status, status.statusMessage)
if poll_count >= 2:
# 3. 发送取消请求
cancel_result = await session.experimental.cancel_task(task_id)
break4.3.2. 关键点 #
| 步骤 | 说明 |
|---|---|
| call_tool_as_task | 发送带 task: {ttl} 的 tools/call,服务端返回 CreateTaskResult |
| poll_task | 循环调用 tasks/get,直到任务进入终态(completed/failed/cancelled) |
| cancel_task | 发送 tasks/cancel,服务端更新 store 为 cancelled 并返回 CancelTaskResult |
4.4 时序图 #
4.5 消息格式(JSON-RPC) #
tasks/cancel 请求:
{"jsonrpc": "2.0", "id": 3, "method": "tasks/cancel", "params": {"taskId": "xxx-uuid"}}tasks/cancel 响应:
{
"jsonrpc": "2.0",
"id": 3,
"result": {
"taskId": "xxx-uuid",
"status": "cancelled",
"statusMessage": null,
"createdAt": "...",
"lastUpdatedAt": "...",
"ttl": 60000
}
}4.6 两种取消方式对比 #
| 方式 | 适用场景 | 本演示 |
|---|---|---|
| tasks/cancel | 任务型请求(call_tool_as_task 等) |
使用 |
| notifications/cancelled | 普通请求(tools/call 等),需服务端能并发处理消息 |
未演示 |
4.7 运行方式 #
uv run cancel_client.py客户端会启动服务端子进程,创建任务,轮询 2 次后发送 tasks/cancel,并打印 任务状态 = cancelled。