1.任务 #
本章节介绍了如何在 MCP(Multi-Channel Protocol)中实现和使用“任务(Task)”功能,包括服务端和客户端的基本用法。
在任务模型下,客户端可以以“任务”方式调用一些耗时工具,服务端收到任务型请求后会立即返回一个任务ID,实际处理过程在后台异步执行。客户端可以通过任务ID轮询任务进度和最终结果。这样设计可以有效提升用户体验,避免长时间阻塞等待。
主要内容包括:
- 任务服务端(
task_server.py):演示如何定义支持任务的工具,注册任务支持,并实现任务状态更新、进度推送等。 - 任务客户端(
task_client.py):展示如何以任务模式调用服务端工具,获取任务ID、轮询进度、最终获取结果等完整流程。
适合需要处理大型、长时间运行的操作场景(如文件转换、外部API耗时任务等)。
2. task_client.py #
task_client.py
# MCP 任务客户端
"""
演示如何调用任务型工具:发送任务增强请求后立即获得任务 ID,
通过轮询获取状态,完成后获取最终结果。
"""
# 导入 asyncio 模块用于异步操作
import asyncio
# 导入 os 模块用于文件路径操作
import os
# 导入 sys 模块用于系统相关操作
import sys
# 如果 sys.stdout 支持 reconfigure,则重设控制台编码为 utf-8,避免中文乱码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
# 从 mcp 包中导入所需类
from mcp import ClientSession, StdioServerParameters, stdio_client
# 导入任务结果、文本内容相关类型
from mcp.types import CallToolResult, TextContent
# 定义异步主函数
async def run() -> None:
# 获取当前脚本文件所在的目录
base_dir = os.path.dirname(os.path.abspath(__file__))
# 拼接出 task_server.py 文件的完整路径
server_path = os.path.join(base_dir, "task_server.py")
# 拼接虚拟环境中 python 解释器路径
venv_python = os.path.join(base_dir, ".venv", "Scripts", "python.exe")
# 构造启动服务端所需的参数
server_params = StdioServerParameters(
command=venv_python, # 指定使用的 python 解释器
args=[server_path], # 指定启动的服务端脚本
env={}, # 不额外设置环境变量
cwd=base_dir, # 设置工作目录
)
# 使用 stdio_client 启动并连接到服务端
async with stdio_client(server_params) as (read, write):
# 创建与服务端通信的客户端会话
async with ClientSession(read, write) as session:
# 初始化会话,进行握手
await session.initialize()
# 列出可用工具(同步服务端 list_tools)
tools = await session.list_tools()
print(f"可用工具: {[t.name for t in tools.tools]}")
# 以任务方式调用工具
print("\n以任务方式调用 long_running_task...")
# 调用 experimental task 模式,创建任务并获取任务 ID
result = await session.experimental.call_tool_as_task(
"long_running_task",
arguments={}, # 传递给工具的参数,此处为空对象
ttl=60000, # 任务存活时间,单位毫秒
)
# 从结果中获取新的任务 ID
task_id = result.task.taskId
print(f"任务已创建: {task_id}")
# 状态变量初始化
status = None
# 轮询任务状态,直到完成(服务端会指定合适的 pollInterval)
async for status in session.experimental.poll_task(task_id):
# 获取状态消息(statusMessage 字段,若没有则为 "")
msg = getattr(status, "statusMessage", None) or ""
print(f" 状态: {status.status} - {msg}")
# 如果任务被终止或异常(非 completed 状态),打印原因并返回
if status and status.status != "completed":
print(f"任务以状态 {status.status} 结束")
return
# 任务已完成,获取最终结果
task_result = await session.experimental.get_task_result(
task_id, CallToolResult
)
# 获取结果的第一个内容块
content = task_result.content[0]
# 如果内容块类型为文本,打印结果内容
if isinstance(content, TextContent):
print(f"\n结果: {content.text}")
# 判断当前脚本是否为主程序入口
if __name__ == "__main__":
# 异步运行主逻辑
asyncio.run(run())
3. task_server.py #
task_server.py
# MCP 任务服务端说明
"""MCP 任务服务端
任务是 MCP 的实验性功能,用于处理长时间运行的操作。
请求方发送任务增强请求后,接收方立即返回任务 ID,
实际结果通过轮询或延迟获取拿到。
"""
# 导入sys模块,用于系统相关操作
import sys
# 导入anyio,用于异步操作
import anyio
# 从mcp.types中导入类型定义
import mcp.types as types
# 导入Server类
from mcp.server import Server
# 导入实验性任务上下文ServerTaskContext
from mcp.server.experimental.task_context import ServerTaskContext
# 导入stdio_server,用于标准输入输出通讯
from mcp.server.stdio import stdio_server
# 实例化一个Server对象,名称为"task-server"
server = Server("task-server")
# 注册list_tools处理函数,返回可用工具列表
@server.list_tools()
async def handle_list_tools() -> types.ListToolsResult:
# 返回一个只有long_running_task的工具列表
return types.ListToolsResult(
tools=[
types.Tool(
name="long_running_task", # 工具名称
description="一个需要几秒钟完成的任务,会输出进度状态。支持普通调用和任务调用。", # 工具描述
inputSchema={"type": "object", "properties": {}}, # 输入参数定义
execution=types.ToolExecution(task_support=types.TASK_OPTIONAL), # 支持任务可选
)
]
)
# 定义实际耗时任务的模拟实现函数
def _do_long_running_work() -> types.CallToolResult:
# 执行耗时逻辑,返回结果
return types.CallToolResult(
content=[types.TextContent(type="text", text="任务完成!")]
)
# 注册call_tool处理函数,处理工具调用
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> types.CallToolResult | types.CreateTaskResult:
# 若调用的是long_running_task
if name == "long_running_task":
# 获取请求上下文
ctx = server.request_context
# 校验是否支持任务模式
ctx.experimental.validate_task_mode(types.TASK_OPTIONAL)
# 定义异步工作函数,传入任务上下文
async def work(task: ServerTaskContext) -> types.CallToolResult:
# 更新任务状态为开始执行
await task.update_status("开始执行...")
await anyio.sleep(1)
# 更新任务状态为“处理步骤 1...”
await task.update_status("处理步骤 1...")
await anyio.sleep(1)
# 更新任务状态为“处理步骤 2...”
await task.update_status("处理步骤 2...")
await anyio.sleep(1)
# 返回最终结果
return _do_long_running_work()
# 如果请求以任务方式调用,则后台执行,返回创建任务结果
if ctx.experimental.is_task:
return await ctx.experimental.run_task(work)
# 普通方式调用,同步逐步等待三秒后返回
await anyio.sleep(1)
await anyio.sleep(1)
await anyio.sleep(1)
return _do_long_running_work()
# 未知工具,返回错误提示
return types.CallToolResult(
content=[types.TextContent(type="text", text=f"未知工具: {name}")],
isError=True,
)
# 启用任务支持,自动注册 tasks/get、tasks/result、tasks/list、tasks/cancel 等接口
task_support = server.experimental.enable_tasks()
# 异步主函数,服务端入口
async def main() -> None:
# 为 MCP Inspector 预创建一个演示任务,以便 List Tasks 能看到内容
if task_support:
await task_support.store.create_task(
types.TaskMetadata(ttl=300000), # 任务元数据,设置存活时间
task_id="demo-task", # 指定任务ID
)
await task_support.store.update_task(
"demo-task",
status=types.TASK_STATUS_COMPLETED, # 设置任务为已完成
status_message="已完成", # 完成状态消息
)
await task_support.store.store_result(
"demo-task",
types.CallToolResult(
content=[
types.TextContent(
type="text",
text="此任务用于List Tasks 功能",
)
]
),
)
# 启动stdio_server,等待客户端连接
async with stdio_server() as (read_stream, write_stream):
# 运行MCP服务器,处理请求
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
# 当脚本作为主程序入口时执行
if __name__ == "__main__":
# 若sys.stdout支持reconfigure,则设置编码为utf-8,避免中文乱码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
# 启动异步主函数
anyio.run(main)4. 工作流程 #
4.1 整体执行流程 #
4.1.1. 启动与初始化 #
用户执行 uv run task_client.py
│
▼
stdio_client 启动子进程 python task_server.py
│
▼
服务端 main() 执行
│
├── task_support.store.create_task() 创建演示任务 demo-task
├── stdio_server() 建立 stdin/stdout 流
└── server.run() 进入消息循环
│
▼
客户端 session.initialize() 完成 MCP 握手4.1.2. 任务调用主流程(与普通工具调用的区别) #
普通 tools/call:请求 → 等待执行 → 返回 CallToolResult
任务 tools/call:请求(带 task: {ttl})→ 立即返回 CreateTaskResult(含 taskId)
│
▼
客户端轮询 tasks/get 获取状态
│
▼
服务端后台执行 work(),通过 task.update_status() 更新状态
│
▼
状态变为 completed 后,客户端调用 tasks/result 获取 CallToolResult4.2 时序图 #
4.2.1. 任务方式调用 long_running_task 全流程 #
sequenceDiagram
participant Client as task_client
participant Session as ClientSession
participant Stream as stdio
participant Server as task_server
participant Handler as handle_call_tool
participant TaskSupport as TaskSupport
participant Work as work() 后台任务
Client->>Session: call_tool_as_task("long_running_task", ttl=60000)
Session->>Stream: tools/call + params.task: {ttl: 60000}
Stream->>Server: CallToolRequest (task-augmented)
Server->>Handler: handle_call_tool("long_running_task", {})
Handler->>Handler: ctx.experimental.is_task == True
Handler->>Handler: ctx.experimental.run_task(work)
Note over Handler,TaskSupport: run_task 内部
Handler->>TaskSupport: store.create_task(metadata, task_id)
TaskSupport->>TaskSupport: 创建任务,状态=working
Handler->>TaskSupport: task_group.start_soon(execute)
Note over TaskSupport,Work: 后台启动 work()
Handler->>Stream: 立即返回 CreateTaskResult(taskId=xxx)
Stream->>Session: CreateTaskResult
Session->>Client: result.task.taskId
Client->>Client: task_id = result.task.taskId
Client->>Client: print("任务已创建: xxx")
loop 轮询 poll_task
Client->>Session: get_task(task_id) [tasks/get]
Session->>Stream: GetTaskRequest
Stream->>Server: GetTaskRequest
Server->>TaskSupport: store.get_task(task_id)
TaskSupport->>Server: GetTaskResult(status, statusMessage)
Server->>Stream: GetTaskResult
Stream->>Session: GetTaskResult
Session->>Client: status (working / completed)
Client->>Client: print("状态: working - 开始执行...")
end
Note over Work: work() 在后台执行
Work->>Work: task.update_status("开始执行...")
Work->>Server: send TaskStatusNotification
Work->>Work: anyio.sleep(1)
Work->>Work: task.update_status("处理步骤 1...")
Work->>Work: anyio.sleep(1)
Work->>Work: task.update_status("处理步骤 2...")
Work->>Work: anyio.sleep(1)
Work->>Work: return CallToolResult
Work->>TaskSupport: store.store_result(task_id, result)
Work->>TaskSupport: task.status = completed
Client->>Session: get_task_result(task_id) [tasks/result]
Session->>Stream: GetTaskPayloadRequest
Stream->>Server: GetTaskPayloadRequest
Server->>TaskSupport: handler.handle() → 从 store 取 result
TaskSupport->>Server: CallToolResult
Server->>Stream: GetTaskPayloadResult (CallToolResult)
Stream->>Session: CallToolResult
Session->>Client: task_result
Client->>Client: print("结果: 任务完成!")
4.2.2. run_task 内部流程 #
sequenceDiagram
participant Handler as handle_call_tool
participant Experimental as ctx.experimental
participant Store as InMemoryTaskStore
participant TaskGroup as task_group
participant Work as work(task_ctx)
Handler->>Experimental: run_task(work)
Experimental->>Experimental: 校验 task_metadata 存在
Experimental->>Store: create_task(metadata, task_id)
Store->>Experimental: Task(taskId, status=working)
Experimental->>Experimental: 创建 ServerTaskContext(task_ctx)
Experimental->>TaskGroup: start_soon(execute)
Note over TaskGroup: 异步执行,不阻塞
Experimental->>Handler: 立即返回 CreateTaskResult(task)
par 后台执行
TaskGroup->>Work: execute() 调用 work(task_ctx)
Work->>Work: task_ctx.update_status("开始执行...")
Work->>Work: anyio.sleep(1)
Work->>Work: task_ctx.update_status("处理步骤 1...")
Work->>Work: anyio.sleep(1)
Work->>Work: task_ctx.update_status("处理步骤 2...")
Work->>Work: anyio.sleep(1)
Work->>Work: return CallToolResult
Work->>Store: store_result() + update_task(completed)
end
4.2.3. 普通调用 vs 任务调用 分支 #
flowchart TB
A[handle_call_tool 收到请求] --> B{ctx.experimental.is_task?}
B -->|是| C[run_task work]
C --> D[创建任务、后台启动 work]
D --> E[立即返回 CreateTaskResult]
E --> F[客户端轮询 tasks/get]
F --> G[客户端 tasks/result 取结果]
B -->|否| H[anyio.sleep 1+1+1]
H --> I[return _do_long_running_work]
I --> J[直接返回 CallToolResult]
4.2.4. 服务端启动时预创建演示任务 #
sequenceDiagram
participant Main as main()
participant TaskSupport as task_support
participant Store as InMemoryTaskStore
Main->>TaskSupport: enable_tasks() 已调用
Main->>Store: create_task(metadata, "demo-task")
Store->>Store: 创建 Task(status=working)
Main->>Store: update_task("demo-task", status=completed)
Main->>Store: store_result("demo-task", CallToolResult)
Main->>Main: stdio_server() + server.run()
Note over Main: 演示任务已存在,List Tasks 可见
4.3 关键概念 #
| 概念 | 说明 |
|---|---|
| 任务增强请求 | tools/call 的 params 中带 task: { ttl: 60000 },表示以任务方式调用 |
| CreateTaskResult | 服务端立即返回,包含 task.taskId,不等待实际执行完成 |
| tasks/get | 客户端轮询任务状态,返回 status、statusMessage 等 |
| tasks/result | 任务完成后,客户端用此接口获取最终 CallToolResult |
| run_task(work) | 在服务端创建任务、后台执行 work,并立即返回 CreateTaskResult |
| ServerTaskContext | work 的参数,提供 update_status() 更新进度 |
| TASK_OPTIONAL | 工具支持普通调用和任务调用两种方式 |
4.4 消息流概览 #
客户端 服务端
│ │
│ tools/call (task: {ttl}) │
│ ──────────────────────────────────────>│
│ │ run_task → 后台启动 work
│ CreateTaskResult (taskId) │
│ <──────────────────────────────────────│
│ │
│ tasks/get (taskId) │
│ ──────────────────────────────────────>│ store.get_task
│ GetTaskResult (working, "开始执行...") │
│ <──────────────────────────────────────│
│ ... 轮询 ... │ work() 执行中
│ tasks/get │
│ ──────────────────────────────────────>│
│ GetTaskResult (completed) │
│ <──────────────────────────────────────│
│ │
│ tasks/result (taskId) │
│ ──────────────────────────────────────>│ store.get_result
│ CallToolResult ("任务完成!") │
│ <──────────────────────────────────────│