导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.FunctionCalling
  • starlette
  • FastAPI
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • LiteLLM
  • pydantic-settings
  • ai_agent
  • format
  • diff
  • mcp_server
  • 1.任务
  • 2. task_client.py
  • 3. task_server.py
  • 4. 工作流程
    • 4.1 整体执行流程
      • 4.1.1. 启动与初始化
      • 4.1.2. 任务调用主流程(与普通工具调用的区别)
    • 4.2 时序图
      • 4.2.1. 任务方式调用 long_running_task 全流程
      • 4.2.2. run_task 内部流程
      • 4.2.3. 普通调用 vs 任务调用 分支
      • 4.2.4. 服务端启动时预创建演示任务
    • 4.3 关键概念
      • 4.4 消息流概览

1.任务 #

  • 什么是任务?

本章节介绍了如何在 MCP(Multi-Channel Protocol)中实现和使用“任务(Task)”功能,包括服务端和客户端的基本用法。
在任务模型下,客户端可以以“任务”方式调用一些耗时工具,服务端收到任务型请求后会立即返回一个任务ID,实际处理过程在后台异步执行。客户端可以通过任务ID轮询任务进度和最终结果。这样设计可以有效提升用户体验,避免长时间阻塞等待。

主要内容包括:

  • 任务服务端(task_server.py):演示如何定义支持任务的工具,注册任务支持,并实现任务状态更新、进度推送等。
  • 任务客户端(task_client.py):展示如何以任务模式调用服务端工具,获取任务ID、轮询进度、最终获取结果等完整流程。

适合需要处理大型、长时间运行的操作场景(如文件转换、外部API耗时任务等)。

2. task_client.py #

task_client.py

# MCP 任务客户端
"""
演示如何调用任务型工具:发送任务增强请求后立即获得任务 ID,
通过轮询获取状态,完成后获取最终结果。
"""

# 导入 asyncio 模块用于异步操作
import asyncio
# 导入 os 模块用于文件路径操作
import os
# 导入 sys 模块用于系统相关操作
import sys

# 如果 sys.stdout 支持 reconfigure,则重设控制台编码为 utf-8,避免中文乱码
if hasattr(sys.stdout, "reconfigure"):
    sys.stdout.reconfigure(encoding="utf-8")
    sys.stdin.reconfigure(encoding="utf-8")

# 从 mcp 包中导入所需类
from mcp import ClientSession, StdioServerParameters, stdio_client
# 导入任务结果、文本内容相关类型
from mcp.types import CallToolResult, TextContent


# 定义异步主函数
async def run() -> None:
    # 获取当前脚本文件所在的目录
    base_dir = os.path.dirname(os.path.abspath(__file__))
    # 拼接出 task_server.py 文件的完整路径
    server_path = os.path.join(base_dir, "task_server.py")

    # 拼接虚拟环境中 python 解释器路径
    venv_python = os.path.join(base_dir, ".venv", "Scripts", "python.exe")
    # 构造启动服务端所需的参数
    server_params = StdioServerParameters(
        command=venv_python,       # 指定使用的 python 解释器
        args=[server_path],        # 指定启动的服务端脚本
        env={},                    # 不额外设置环境变量
        cwd=base_dir,              # 设置工作目录
    )

    # 使用 stdio_client 启动并连接到服务端
    async with stdio_client(server_params) as (read, write):
        # 创建与服务端通信的客户端会话
        async with ClientSession(read, write) as session:
            # 初始化会话,进行握手
            await session.initialize()

            # 列出可用工具(同步服务端 list_tools)
            tools = await session.list_tools()
            print(f"可用工具: {[t.name for t in tools.tools]}")

            # 以任务方式调用工具
            print("\n以任务方式调用 long_running_task...")

            # 调用 experimental task 模式,创建任务并获取任务 ID
            result = await session.experimental.call_tool_as_task(
                "long_running_task",
                arguments={},        # 传递给工具的参数,此处为空对象
                ttl=60000,          # 任务存活时间,单位毫秒
            )
            # 从结果中获取新的任务 ID
            task_id = result.task.taskId
            print(f"任务已创建: {task_id}")

            # 状态变量初始化
            status = None
            # 轮询任务状态,直到完成(服务端会指定合适的 pollInterval)
            async for status in session.experimental.poll_task(task_id):
                # 获取状态消息(statusMessage 字段,若没有则为 "")
                msg = getattr(status, "statusMessage", None) or ""
                print(f"  状态: {status.status} - {msg}")

            # 如果任务被终止或异常(非 completed 状态),打印原因并返回
            if status and status.status != "completed":
                print(f"任务以状态 {status.status} 结束")
                return

            # 任务已完成,获取最终结果
            task_result = await session.experimental.get_task_result(
                task_id, CallToolResult
            )
            # 获取结果的第一个内容块
            content = task_result.content[0]
            # 如果内容块类型为文本,打印结果内容
            if isinstance(content, TextContent):
                print(f"\n结果: {content.text}")


# 判断当前脚本是否为主程序入口
if __name__ == "__main__":
    # 异步运行主逻辑
    asyncio.run(run())

3. task_server.py #

task_server.py

# MCP 任务服务端说明
"""MCP 任务服务端

任务是 MCP 的实验性功能,用于处理长时间运行的操作。
请求方发送任务增强请求后,接收方立即返回任务 ID,
实际结果通过轮询或延迟获取拿到。
"""

# 导入sys模块,用于系统相关操作
import sys

# 导入anyio,用于异步操作
import anyio
# 从mcp.types中导入类型定义
import mcp.types as types
# 导入Server类
from mcp.server import Server
# 导入实验性任务上下文ServerTaskContext
from mcp.server.experimental.task_context import ServerTaskContext
# 导入stdio_server,用于标准输入输出通讯
from mcp.server.stdio import stdio_server

# 实例化一个Server对象,名称为"task-server"
server = Server("task-server")

# 注册list_tools处理函数,返回可用工具列表
@server.list_tools()
async def handle_list_tools() -> types.ListToolsResult:
    # 返回一个只有long_running_task的工具列表
    return types.ListToolsResult(
        tools=[
            types.Tool(
                name="long_running_task",  # 工具名称
                description="一个需要几秒钟完成的任务,会输出进度状态。支持普通调用和任务调用。",  # 工具描述
                inputSchema={"type": "object", "properties": {}},  # 输入参数定义
                execution=types.ToolExecution(task_support=types.TASK_OPTIONAL),  # 支持任务可选
            )
        ]
    )

# 定义实际耗时任务的模拟实现函数
def _do_long_running_work() -> types.CallToolResult:
    # 执行耗时逻辑,返回结果
    return types.CallToolResult(
        content=[types.TextContent(type="text", text="任务完成!")]
    )

# 注册call_tool处理函数,处理工具调用
@server.call_tool()
async def handle_call_tool(
    name: str, arguments: dict | None
) -> types.CallToolResult | types.CreateTaskResult:
    # 若调用的是long_running_task
    if name == "long_running_task":
        # 获取请求上下文
        ctx = server.request_context
        # 校验是否支持任务模式
        ctx.experimental.validate_task_mode(types.TASK_OPTIONAL)

        # 定义异步工作函数,传入任务上下文
        async def work(task: ServerTaskContext) -> types.CallToolResult:
            # 更新任务状态为开始执行
            await task.update_status("开始执行...")
            await anyio.sleep(1)

            # 更新任务状态为“处理步骤 1...”
            await task.update_status("处理步骤 1...")
            await anyio.sleep(1)

            # 更新任务状态为“处理步骤 2...”
            await task.update_status("处理步骤 2...")
            await anyio.sleep(1)

            # 返回最终结果
            return _do_long_running_work()

        # 如果请求以任务方式调用,则后台执行,返回创建任务结果
        if ctx.experimental.is_task:
            return await ctx.experimental.run_task(work)

        # 普通方式调用,同步逐步等待三秒后返回
        await anyio.sleep(1)
        await anyio.sleep(1)
        await anyio.sleep(1)
        return _do_long_running_work()

    # 未知工具,返回错误提示
    return types.CallToolResult(
        content=[types.TextContent(type="text", text=f"未知工具: {name}")],
        isError=True,
    )

# 启用任务支持,自动注册 tasks/get、tasks/result、tasks/list、tasks/cancel 等接口
task_support = server.experimental.enable_tasks()

# 异步主函数,服务端入口
async def main() -> None:
    # 为 MCP Inspector 预创建一个演示任务,以便 List Tasks 能看到内容
    if task_support:
        await task_support.store.create_task(
            types.TaskMetadata(ttl=300000),  # 任务元数据,设置存活时间
            task_id="demo-task",   # 指定任务ID
        )
        await task_support.store.update_task(
            "demo-task",
            status=types.TASK_STATUS_COMPLETED,   # 设置任务为已完成
            status_message="已完成",  # 完成状态消息
        )
        await task_support.store.store_result(
            "demo-task",
            types.CallToolResult(
                content=[
                    types.TextContent(
                        type="text",
                        text="此任务用于List Tasks 功能",
                    )
                ]
            ),
        )

    # 启动stdio_server,等待客户端连接
    async with stdio_server() as (read_stream, write_stream):
        # 运行MCP服务器,处理请求
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options(),
        )

# 当脚本作为主程序入口时执行
if __name__ == "__main__":
    # 若sys.stdout支持reconfigure,则设置编码为utf-8,避免中文乱码
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    # 启动异步主函数
    anyio.run(main)

4. 工作流程 #

4.1 整体执行流程 #

4.1.1. 启动与初始化 #

用户执行 uv run task_client.py
        │
        ▼
stdio_client 启动子进程 python task_server.py
        │
        ▼
服务端 main() 执行
        │
        ├── task_support.store.create_task() 创建演示任务 demo-task
        ├── stdio_server() 建立 stdin/stdout 流
        └── server.run() 进入消息循环
        │
        ▼
客户端 session.initialize() 完成 MCP 握手

4.1.2. 任务调用主流程(与普通工具调用的区别) #

普通 tools/call:请求 → 等待执行 → 返回 CallToolResult

任务 tools/call:请求(带 task: {ttl})→ 立即返回 CreateTaskResult(含 taskId)
        │
        ▼
客户端轮询 tasks/get 获取状态
        │
        ▼
服务端后台执行 work(),通过 task.update_status() 更新状态
        │
        ▼
状态变为 completed 后,客户端调用 tasks/result 获取 CallToolResult

4.2 时序图 #

4.2.1. 任务方式调用 long_running_task 全流程 #

sequenceDiagram participant Client as task_client participant Session as ClientSession participant Stream as stdio participant Server as task_server participant Handler as handle_call_tool participant TaskSupport as TaskSupport participant Work as work() 后台任务 Client->>Session: call_tool_as_task("long_running_task", ttl=60000) Session->>Stream: tools/call + params.task: {ttl: 60000} Stream->>Server: CallToolRequest (task-augmented) Server->>Handler: handle_call_tool("long_running_task", {}) Handler->>Handler: ctx.experimental.is_task == True Handler->>Handler: ctx.experimental.run_task(work) Note over Handler,TaskSupport: run_task 内部 Handler->>TaskSupport: store.create_task(metadata, task_id) TaskSupport->>TaskSupport: 创建任务,状态=working Handler->>TaskSupport: task_group.start_soon(execute) Note over TaskSupport,Work: 后台启动 work() Handler->>Stream: 立即返回 CreateTaskResult(taskId=xxx) Stream->>Session: CreateTaskResult Session->>Client: result.task.taskId Client->>Client: task_id = result.task.taskId Client->>Client: print("任务已创建: xxx") loop 轮询 poll_task Client->>Session: get_task(task_id) [tasks/get] Session->>Stream: GetTaskRequest Stream->>Server: GetTaskRequest Server->>TaskSupport: store.get_task(task_id) TaskSupport->>Server: GetTaskResult(status, statusMessage) Server->>Stream: GetTaskResult Stream->>Session: GetTaskResult Session->>Client: status (working / completed) Client->>Client: print("状态: working - 开始执行...") end Note over Work: work() 在后台执行 Work->>Work: task.update_status("开始执行...") Work->>Server: send TaskStatusNotification Work->>Work: anyio.sleep(1) Work->>Work: task.update_status("处理步骤 1...") Work->>Work: anyio.sleep(1) Work->>Work: task.update_status("处理步骤 2...") Work->>Work: anyio.sleep(1) Work->>Work: return CallToolResult Work->>TaskSupport: store.store_result(task_id, result) Work->>TaskSupport: task.status = completed Client->>Session: get_task_result(task_id) [tasks/result] Session->>Stream: GetTaskPayloadRequest Stream->>Server: GetTaskPayloadRequest Server->>TaskSupport: handler.handle() → 从 store 取 result TaskSupport->>Server: CallToolResult Server->>Stream: GetTaskPayloadResult (CallToolResult) Stream->>Session: CallToolResult Session->>Client: task_result Client->>Client: print("结果: 任务完成!")

4.2.2. run_task 内部流程 #

sequenceDiagram participant Handler as handle_call_tool participant Experimental as ctx.experimental participant Store as InMemoryTaskStore participant TaskGroup as task_group participant Work as work(task_ctx) Handler->>Experimental: run_task(work) Experimental->>Experimental: 校验 task_metadata 存在 Experimental->>Store: create_task(metadata, task_id) Store->>Experimental: Task(taskId, status=working) Experimental->>Experimental: 创建 ServerTaskContext(task_ctx) Experimental->>TaskGroup: start_soon(execute) Note over TaskGroup: 异步执行,不阻塞 Experimental->>Handler: 立即返回 CreateTaskResult(task) par 后台执行 TaskGroup->>Work: execute() 调用 work(task_ctx) Work->>Work: task_ctx.update_status("开始执行...") Work->>Work: anyio.sleep(1) Work->>Work: task_ctx.update_status("处理步骤 1...") Work->>Work: anyio.sleep(1) Work->>Work: task_ctx.update_status("处理步骤 2...") Work->>Work: anyio.sleep(1) Work->>Work: return CallToolResult Work->>Store: store_result() + update_task(completed) end

4.2.3. 普通调用 vs 任务调用 分支 #

flowchart TB A[handle_call_tool 收到请求] --> B{ctx.experimental.is_task?} B -->|是| C[run_task work] C --> D[创建任务、后台启动 work] D --> E[立即返回 CreateTaskResult] E --> F[客户端轮询 tasks/get] F --> G[客户端 tasks/result 取结果] B -->|否| H[anyio.sleep 1+1+1] H --> I[return _do_long_running_work] I --> J[直接返回 CallToolResult]

4.2.4. 服务端启动时预创建演示任务 #

sequenceDiagram participant Main as main() participant TaskSupport as task_support participant Store as InMemoryTaskStore Main->>TaskSupport: enable_tasks() 已调用 Main->>Store: create_task(metadata, "demo-task") Store->>Store: 创建 Task(status=working) Main->>Store: update_task("demo-task", status=completed) Main->>Store: store_result("demo-task", CallToolResult) Main->>Main: stdio_server() + server.run() Note over Main: 演示任务已存在,List Tasks 可见

4.3 关键概念 #

概念 说明
任务增强请求 tools/call 的 params 中带 task: { ttl: 60000 },表示以任务方式调用
CreateTaskResult 服务端立即返回,包含 task.taskId,不等待实际执行完成
tasks/get 客户端轮询任务状态,返回 status、statusMessage 等
tasks/result 任务完成后,客户端用此接口获取最终 CallToolResult
run_task(work) 在服务端创建任务、后台执行 work,并立即返回 CreateTaskResult
ServerTaskContext work 的参数,提供 update_status() 更新进度
TASK_OPTIONAL 工具支持普通调用和任务调用两种方式

4.4 消息流概览 #

客户端                                    服务端
   │                                         │
   │  tools/call (task: {ttl})               │
   │ ──────────────────────────────────────>│
   │                                         │ run_task → 后台启动 work
   │  CreateTaskResult (taskId)              │
   │ <──────────────────────────────────────│
   │                                         │
   │  tasks/get (taskId)                     │
   │ ──────────────────────────────────────>│  store.get_task
   │  GetTaskResult (working, "开始执行...")  │
   │ <──────────────────────────────────────│
   │  ... 轮询 ...                           │  work() 执行中
   │  tasks/get                              │
   │ ──────────────────────────────────────>│
   │  GetTaskResult (completed)              │
   │ <──────────────────────────────────────│
   │                                         │
   │  tasks/result (taskId)                  │
   │ ──────────────────────────────────────>│  store.get_result
   │  CallToolResult ("任务完成!")           │
   │ <──────────────────────────────────────│
← 上一节 38.LowLevel 下一节 40.取消 →

访问验证

请输入访问令牌

Token不正确,请重新输入