导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.授权
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • starlette
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • z
  • 1. MCP 授权
  • 2. auth_server.py
  • 3. mcp_client.py
  • 4. mcp_server.py
  • 5. token_verifier.py
  • 6. 工作流程
    • 6.1 整体架构
    • 6.2 各文件说明
      • 6.2.1. auth_server.py — 认证服务器
      • 6.2.2. token_verifier.py — 令牌验证器
      • 6.2.3. mcp_server.py — 资源服务器
      • 6.2.4. mcp_client.py — 客户端
    • 6.3 时序图
      • 6.3.1 完整
      • 6.3.2 (客户端视角)
    • 6.4 时序图
    • 6.5 数据流小结

1. MCP 授权 #

本节介绍 MCP(Microservice Control Platform,微服务控制平台)中的授权机制,采用 OAuth2 标准架构实现资源保护与访问控制。MCP 的授权机制分为认证服务器(auth server)和资源服务器(resource server)两部分,两者通过 OAuth2 Token Introspection(令牌自省,见 RFC 7662)协议解耦。

典型流程为:

  1. 令牌颁发
    用户(Resource Owner)通过认证服务器,使用用户名密码方式(Resource Owner Password Credentials Grant)获取 access token。

  2. 资源访问
    客户端带上 Bearer Token(即 access token),访问 MCP 资源服务器的受保护接口。

  3. 令牌验证
    资源服务器收到请求后,并不自行校验令牌,而是通过 introspect 端点向认证服务器查询该 access token 是否有效、属于谁、scope等。

  4. 权限控制
    认证服务器在 introspection 响应中指明令牌的有效性、拥有的 scopes、所属 client_id 等。资源服务器据此决定是否放行,以及允许访问哪些资源。

优势说明:

  • 实现了认证服务器与资源服务器的解耦,易于分布式部署。
  • 支持细粒度权限与多终端/多应用授权。
  • 可无缝对接第三方 OAuth2 生态软件。

2. auth_server.py #

auth_server.py

# OAuth2 认证服务器(独立运行)
"""
OAuth2 认证服务器(独立运行)

与资源服务器分离,负责:
- 令牌颁发:POST /oauth/token(Resource Owner Password Credentials 授权)
- 令牌自省:POST /oauth/introspect(RFC 7662,供资源服务器验证令牌)

演示用户:alice/123456、bob/123456
"""

# 兼容未来注解特性
from __future__ import annotations

# 导入异步、os操作、加密、系统、时间和类型相关模块
import asyncio
import os
import secrets
import sys
import time
from typing import Any

# 导入 Starlette 框架相关对象
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from uvicorn import Config, Server

# 定义演示用户,用户名到密码的映射
DEMO_USERS: dict[str, str] = {
    "alice": "123456",
    "bob": "123456",
}

# 已颁发令牌的存储,token 到包含 client_id、scopes、expires_at 的字典
_tokens: dict[str, dict[str, Any]] = {}

# 生成新的访问令牌
def _issue_token(username: str, scopes: list[str] | None = None) -> dict[str, Any]:
    """颁发访问令牌。"""
    # 生成随机令牌
    token = f"mcp_{secrets.token_hex(24)}"
    # 设置令牌过期时间为当前时间加一小时
    expires_at = int(time.time()) + 3600
    # 保存令牌数据到全局 _tokens 字典
    _tokens[token] = {
        "client_id": username,
        "scopes": scopes or ["user"],
        "expires_at": expires_at,
    }
    # 返回包含令牌及相关信息的响应内容
    return {
        "access_token": token,
        "token_type": "Bearer",
        "expires_in": 3600,
        "scope": " ".join(scopes or ["user"]),
    }

# 自省令牌:检查令牌是否有效并返回相关信息
def _introspect_token(token: str) -> dict[str, Any] | None:
    """自省令牌,返回 RFC 7662 格式。"""
    # 取出令牌数据
    data = _tokens.get(token)
    # 若找不到则视为无效
    if not data:
        return None
    # 若令牌已过期则删除并视为无效
    if data["expires_at"] < time.time():
        del _tokens[token]
        return None
    # 构造自省结果
    return {
        "active": True,
        "client_id": data["client_id"],
        "scope": " ".join(data["scopes"]),
        "exp": data["expires_at"],
        "iat": int(time.time()),
        "token_type": "Bearer",
    }

# 令牌颁发端点(Resource Owner Password Credentials)
async def token_endpoint(request: Request) -> JSONResponse:
    """OAuth2 令牌端点(Resource Owner Password Credentials Grant)。

    POST /oauth/token
    Content-Type: application/x-www-form-urlencoded

    grant_type=password&username=alice&password=123456
    """
    # 尝试获取表单数据
    try:
        form = await request.form()
    except Exception:
        # 表单解析失败
        return JSONResponse(
            {"error": "invalid_request", "error_description": "Invalid form data"},
            status_code=400,
        )

    # 获取授权类型
    grant_type = form.get("grant_type")
    # 只支持 password 授权类型
    if grant_type != "password":
        return JSONResponse(
            {"error": "unsupported_grant_type", "error_description": "Only password grant supported"},
            status_code=400,
        )

    # 获取用户名和密码
    username = form.get("username")
    password = form.get("password")
    # 检查用户名和密码是否提供
    if not username or not password:
        return JSONResponse(
            {"error": "invalid_request", "error_description": "Missing username or password"},
            status_code=400,
        )

    # 检查参数类型,必须为字符串
    if not isinstance(username, str) or not isinstance(password, str):
        return JSONResponse(
            {"error": "invalid_request", "error_description": "Invalid parameter types"},
            status_code=400,
        )

    # 验证用户名密码是否正确
    if DEMO_USERS.get(username) != password:
        return JSONResponse(
            {"error": "invalid_grant", "error_description": "Invalid credentials"},
            status_code=401,
        )

    # 获取 scope(权限),默认 user
    scope_str = form.get("scope", "user")
    scopes = scope_str.split() if isinstance(scope_str, str) else ["user"]

    # 颁发令牌
    token_data = _issue_token(username, scopes)
    # 返回 JSON 响应
    return JSONResponse(token_data)

# 令牌自省端点(供资源服务器验证令牌)
async def introspect_endpoint(request: Request) -> JSONResponse:
    """OAuth2 令牌自省端点(RFC 7662)。

    资源服务器调用此端点验证客户端提供的 Bearer 令牌。

    POST /oauth/introspect
    Content-Type: application/x-www-form-urlencoded

    token=<access_token>
    """
    # 获取表单数据
    try:
        form = await request.form()
    except Exception:
        # 表单解析失败返回 active: False
        return JSONResponse({"active": False}, status_code=400)

    # 获取待自省的 token
    token = form.get("token")
    # 未提供 token 或类型不对
    if not token or not isinstance(token, str):
        return JSONResponse({"active": False})

    # 查询 token
    result = _introspect_token(token)
    # 查询不到或已过期即 active: False
    if not result:
        return JSONResponse({"active": False})

    # 返回自省结果
    return JSONResponse(result)

# 创建 Starlette 应用并注册路由
def create_app() -> Starlette:
    # 路由列表,包含令牌和自省两个端点
    routes = [
        Route("/oauth/token", token_endpoint, methods=["POST"]),
        Route("/oauth/introspect", introspect_endpoint, methods=["POST"]),
    ]
    # 返回应用
    return Starlette(routes=routes)

# 启动 Uvicorn 服务器
async def run_server(host: str, port: int) -> None:
    # 创建应用
    app = create_app()
    # 配置 uvicorn
    config = Config(app, host=host, port=port, log_level="info")
    # 创建服务器
    server = Server(config)
    # 启动服务器
    await server.serve()

# 主函数,处理环境变量与启动
def main() -> int:
    # 读取主机和端口(默认127.0.0.1:9000)
    host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
    port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))

    # 设置终端编码为utf-8
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")

    # 打印服务信息
    print(f"OAuth2 认证服务器: http://{host}:{port}")
    print("  令牌端点: POST /oauth/token (grant_type=password&username=alice&password=123456)")
    print("  自省端点: POST /oauth/introspect")
    print("  演示用户: alice/123456, bob/123456")
    print()

    # 启动服务器
    asyncio.run(run_server(host, port))
    return 0

# Python 主程序入口
if __name__ == "__main__":
    sys.exit(main())

3. mcp_client.py #

mcp_client.py

# MCP OAuth 客户端(完整流程)
"""
MCP OAuth 客户端(完整流程)

1. 向认证服务器获取令牌(Resource Owner Password Credentials)
2. 使用 Bearer 令牌访问 MCP 资源服务器
"""

# 导入未来注解特性
from __future__ import annotations

# 导入标准库
import argparse    # 命令行参数解析
import asyncio     # 异步支持
import os          # 操作系统交互
import sys         # 系统功能
from typing import Any    # 类型提示

# 导入第三方库 httpx
import httpx       # 异步 HTTP 客户端

# 导入 MCP 库
from mcp import ClientSession                      # MCP 客户端会话
from mcp.client.streamable_http import streamable_http_client   # 可流式 HTTP 客户端
from pydantic import AnyUrl                        # URL 类型校验

# 从 MCP 结果提取文本内容
def _extract_texts(result: Any) -> list[str]:
    # 初始化文本列表
    texts: list[str] = []
    # 尝试获取 result.content,如果不存在则尝试 result.contents
    content = getattr(result, "content", None) or getattr(result, "contents", [])
    # 遍历每个内容块,提取 text 字段
    for block in content:
        if hasattr(block, "text"):
            texts.append(str(block.text))
        elif hasattr(block, "type") and getattr(block, "type") == "text":
            texts.append(str(getattr(block, "text", "")))
    # 返回所有提取到的文本
    return texts

# 异步获取访问令牌
async def fetch_token(client: httpx.AsyncClient, auth_url: str, username: str, password: str) -> str:
    # 组织令牌请求 URL
    token_url = f"{auth_url.rstrip('/')}/oauth/token"
    # 发送 POST 请求,携带用户名和密码
    response = await client.post(
        token_url,
        data={
            "grant_type": "password",
            "username": username,
            "password": password,
        },
        headers={"Content-Type": "application/x-www-form-urlencoded"},
    )
    # 检查响应是否成功,否则抛出异常
    response.raise_for_status()
    # 解析 JSON 数据
    data = response.json()
    # 获取 access_token 字段
    access_token = data.get("access_token")
    # 如果没有 access_token,抛异常
    if not access_token:
        raise ValueError("认证服务器未返回 access_token")
    # 返回 access_token
    return access_token

# OAuth 客户端主逻辑
async def run_client(
    auth_url: str,       # 认证服务器地址
    resource_url: str,   # 资源服务器地址
    username: str,       # 用户名
    password: str,       # 密码
) -> None:
    # 创建 httpx 异步客户端(用于令牌获取)
    async with httpx.AsyncClient(timeout=30.0) as http_client:
        # 第一步:获取访问令牌
        print("正在向认证服务器获取令牌...")
        token = await fetch_token(http_client, auth_url, username, password)
        print(f"令牌获取成功\n")

        # 第二步:准备带有 Bearer 令牌的请求头
        headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
        }

        # 使用令牌访问 MCP 资源服务器,创建新的 httpx 客户端
        async with httpx.AsyncClient(headers=headers, timeout=60.0) as mcp_client:
            # 建立 MCP Streamable HTTP 通信通道 (得到读写流,第三个未用变量用 _ 占位)
            async with streamable_http_client(resource_url, http_client=mcp_client) as (
                read_stream,
                write_stream,
                _,
            ):
                # 创建 MCP 客户端会话
                async with ClientSession(read_stream, write_stream) as session:
                    # 初始化会话
                    print("会话初始化...")
                    await session.initialize()
                    print("会话初始化成功\n")

                    # 尝试列举受保护资源
                    try:
                        resources = await session.list_resources()
                        uris = [r.uri for r in resources.resources]
                        print(f"受保护资源: {uris}")
                    except Exception as e:
                        print(f"获取资源列表失败: {e}")
                        raise

                    # 尝试列举可用工具
                    try:
                        tools = await session.list_tools()
                        names = [t.name for t in tools.tools]
                        print(f"受保护工具: {names}\n")
                    except Exception as e:
                        print(f"获取工具列表失败: {e}")
                        raise

                    # 尝试读取 user://profile 资源
                    try:
                        result = await session.read_resource(AnyUrl("user://profile"))
                        texts = _extract_texts(result)
                        print(f"读取 user://profile: {' | '.join(texts) or '(空)'}")
                    except Exception as e:
                        print(f"读取 user://profile 失败: {e}")

                    # 尝试读取 user://stats 资源
                    try:
                        result = await session.read_resource(AnyUrl("user://stats"))
                        texts = _extract_texts(result)
                        print(f"读取 user://stats: {' | '.join(texts) or '(空)'}")
                    except Exception as e:
                        print(f"读取 user://stats 失败: {e}")

                    # 调用 get_user_data 工具
                    try:
                        result = await session.call_tool("get_user_data", {"user_id": "current"})
                        texts = _extract_texts(result)
                        print(f"调用 get_user_data(current): {' | '.join(texts) or '(空)'}")
                    except Exception as e:
                        print(f"调用 get_user_data 失败: {e}")

                    # 结束提示
                    print("\nOAuth 客户端演示完成")

# 主程序入口点
def main() -> int:
    # 创建命令行参数解析器
    parser = argparse.ArgumentParser(description="MCP OAuth 客户端(完整流程)")
    # 添加认证服务器参数
    parser.add_argument(
        "--auth-url",
        default=os.environ.get("OAUTH_AUTH_SERVER", "http://127.0.0.1:9000"),
        help="认证服务器 URL",
    )
    # 添加资源服务器参数
    parser.add_argument(
        "--resource-url",
        default=os.environ.get("MCP_OAUTH_URL", "http://127.0.0.1:8000/mcp"),
        help="MCP 资源服务器 URL",
    )
    # 添加用户名参数
    parser.add_argument(
        "--username",
        default=os.environ.get("OAUTH_USERNAME", "alice"),
        help="用户名(演示: alice, bob)",
    )
    # 添加密码参数
    parser.add_argument(
        "--password",
        default=os.environ.get("OAUTH_PASSWORD", "123456"),
        help="密码(演示: 123456)",
    )
    # 解析命令行参数为 args
    args = parser.parse_args()

    # 打印相关参数
    print(f"认证服务器: {args.auth_url}")
    print(f"资源服务器: {args.resource_url}")
    print(f"用户: {args.username}\n")

    # 主执行过程,处理异常和退出码
    try:
        # 运行主客户端协程
        asyncio.run(run_client(args.auth_url, args.resource_url, args.username, args.password))
        # 成功返回 0
        return 0
    except httpx.HTTPStatusError as e:
        # 捕获 httpx 的 HTTP 状态异常
        if e.response.status_code == 401:
            print("认证失败:用户名或密码错误")
        else:
            print(f"HTTP 错误 {e.response.status_code}: {e}")
        return 1
    except Exception as e:
        # 其他异常进一步判断401或输出异常信息
        err_str = str(e).lower()
        if "401" in err_str or "unauthorized" in err_str:
            print("认证失败:请检查用户名和密码")
        else:
            print(f"连接失败: {e}")
            import traceback
            traceback.print_exc()
        return 1

# 仅在直接运行本文件时执行
if __name__ == "__main__":
    # 判断及设置标准输出输入的编码
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    try:
        # 运行主逻辑并传递退出码
        sys.exit(main())
    except KeyboardInterrupt:
        # 捕获 Ctrl+C 中断优雅退出
        print("\n客户端已停止")
        sys.exit(130)

4. mcp_server.py #

mcp_server.py

# MCP 资源服务器(OAuth2 分离架构)
"""
MCP 资源服务器(OAuth2 分离架构)

与认证服务器分离,通过 Token Introspection(RFC 7662)验证令牌。
- 认证服务器:oauth_auth_server.py(端口 9000)
- 资源服务器:本服务(端口 8000)
"""

# 导入操作系统相关模块
import os
# 导入系统功能模块
import sys
# 导入类型提示
from typing import Any

# 从 pydantic 导入 AnyHttpUrl 类型,用于校验 URL
from pydantic import AnyHttpUrl

# 导入 MCP 的认证设置
from mcp.server.auth.settings import AuthSettings
# 导入 MCP 上下文与主服务类
from mcp.server.fastmcp import Context, FastMCP

# 导入自定义的令牌自省验证器
from token_verifier import IntrospectionTokenVerifier

# 获取当前用户 ID 的工具函数
def _get_current_user_id() -> str | None:
    # 从认证上下文获取当前用户 ID
    try:
        from mcp.server.auth.middleware.auth_context import get_access_token

        # 获取当前 access_token
        token = get_access_token()
        # 返回 client_id(用户名),若无则返回 None
        return token.client_id if token else None
    except Exception:
        # 如果获取失败则返回 None
        return None

# 配置:资源服务器与认证服务器分离
# 获取主机地址,先查 FASTMCP_HOST,再查 MCP_OAUTH_HOST,否则用 127.0.0.1
HOST = os.environ.get("FASTMCP_HOST", os.environ.get("MCP_OAUTH_HOST", "127.0.0.1"))
# 获取端口号,先查 FASTMCP_PORT,再查 MCP_OAUTH_PORT,否则用 8000
PORT = int(os.environ.get("FASTMCP_PORT", os.environ.get("MCP_OAUTH_PORT", "8000")))
# 获取认证服务器地址,默认为 http://127.0.0.1:9000
AUTH_SERVER = os.environ.get("OAUTH_AUTH_SERVER", "http://127.0.0.1:9000")
# 资源服务器自身地址
RESOURCE_URL = f"http://{HOST}:{PORT}"
# 令牌自省端点
INTROSPECTION_URL = f"{AUTH_SERVER.rstrip('/')}/oauth/introspect"

# 创建 FastMCP 资源服务器实例
mcp = FastMCP(
    # 服务器名称
    name="OAuth Resource Server",
    # 令牌验证器:通过 introspect 端点验证
    token_verifier=IntrospectionTokenVerifier(INTROSPECTION_URL),
    # 认证设置
    auth=AuthSettings(
        issuer_url=AnyHttpUrl(AUTH_SERVER),              # 认证服务器 url
        resource_server_url=AnyHttpUrl(RESOURCE_URL),    # 资源服务器 url
        required_scopes=["user"],                        # 要求 scope "user"
    ),
)

# 注册受保护资源 user://profile
@mcp.resource("user://profile")
def get_user_profile() -> str:
    # 获取当前用户 ID,若获取不到则为 "anonymous"
    user_id = _get_current_user_id() or "anonymous"
    # 返回用户基本资料(JSON 字符串)
    return f'''{{
  "user_id": "{user_id}",
  "name": "User {user_id}",
  "email": "{user_id}@example.com",
  "role": "user"
}}'''

# 注册受保护资源 user://stats
@mcp.resource("user://stats")
def get_user_stats() -> str:
    # 获取当前用户 ID,若获取不到则为 "anonymous"
    user_id = _get_current_user_id() or "anonymous"
    # 返回用户统计数据(JSON 字符串)
    return f'''{{
  "user_id": "{user_id}",
  "requests_today": 42,
  "last_active": "2024-01-15T10:30:00Z"
}}'''

# 注册受保护工具 get_user_data
@mcp.tool()
def get_user_data(user_id: str = "current", ctx: Context | None = None) -> dict[str, Any]:
    # 获取当前用户 ID
    current = _get_current_user_id()
    # 请求为当前用户
    if user_id == "current":
        # 如果没有认证,返回未认证信息
        if not current:
            return {"error": "未认证", "user_id": None}
        # 返回当前用户的基本数据
        return {
            "user_id": current,
            "status": "active",
            "last_login": "2024-01-15T10:30:00Z",
        }
    # 请求的 user_id 等于当前用户
    if current and user_id == current:
        return {
            "user_id": user_id,
            "status": "active",
            "last_login": "2024-01-15T10:30:00Z",
        }
    # 既不是当前用户,也无权访问
    return {"user_id": user_id, "status": "unknown", "error": "无权访问或用户不存在"}

# 仅主程序直接运行时启用
if __name__ == "__main__":
    # 检查 stdout 是否支持 reconfigure(Py3.7+)
    if hasattr(sys.stdout, "reconfigure"):
        # 设置 stdout 的编码为 utf-8
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")

    # 设置环境变量,用于内部配置
    os.environ.setdefault("FASTMCP_HOST", HOST)
    os.environ.setdefault("FASTMCP_PORT", str(PORT))
    # 打印资源服务器地址
    print(f"MCP 资源服务器: http://{HOST}:{PORT}/mcp")
    # 打印认证服务器地址
    print(f"认证服务器: {AUTH_SERVER}")
    # 提示先启动认证服务器
    print("请先启动 oauth_auth_server.py,再启动本服务")
    # 启动 FastMCP 服务(streamable-http 模式)
    mcp.run(transport="streamable-http")

5. token_verifier.py #

token_verifier.py

"""OAuth2 令牌自省验证器(RFC 7662)

资源服务器通过调用认证服务器的 introspection 端点验证 Bearer 令牌。
"""

# 导入未来注解特性(兼容旧版Python类型提示)
from __future__ import annotations

# 导入 httpx 库用于发送 HTTP 请求
import httpx

# 从 mcp.server.auth.provider 模块导入 AccessToken 和 TokenVerifier
from mcp.server.auth.provider import AccessToken, TokenVerifier

# 定义 IntrospectionTokenVerifier 类,继承自 TokenVerifier
class IntrospectionTokenVerifier(TokenVerifier):
    """通过 OAuth2 Token Introspection(RFC 7662)验证令牌。"""

    # 构造方法,接收 introspection_endpoint(自省端点地址)
    def __init__(self, introspection_endpoint: str):
        # 保存自省端点地址到实例变量
        self.introspection_endpoint = introspection_endpoint

    # 异步方法,用于验证令牌
    async def verify_token(self, token: str) -> AccessToken | None:
        """调用认证服务器自省端点验证令牌。"""
        # 安全检查,introspection_endpoint 必须以指定前缀(http://localhost等)开头
        if not self.introspection_endpoint.startswith(
            ("https://", "http://localhost", "http://127.0.0.1")
        ):
            # 若地址不合法,直接返回 None
            return None

        # 使用 httpx 异步客户端创建会话,请求超时 10 秒
        async with httpx.AsyncClient(timeout=10.0) as client:
            try:
                # 向自省端点发送 POST 请求,带上 token 字段
                response = await client.post(
                    self.introspection_endpoint,
                    data={"token": token},
                    headers={"Content-Type": "application/x-www-form-urlencoded"},
                )
                # 如果响应状态码不是 200,令牌无效,返回 None
                if response.status_code != 200:
                    return None

                # 解析响应 JSON 数据
                data = response.json()
                # 如果 active 字段为 False 或不存在,说明令牌无效,返回 None
                if not data.get("active", False):
                    return None

                # 构造 AccessToken 对象并返回
                return AccessToken(
                    token=token,
                    client_id=data.get("client_id", "unknown"),
                    scopes=data.get("scope", "").split() if data.get("scope") else [],
                    expires_at=data.get("exp"),
                    resource=data.get("aud"),
                )
            except Exception:
                # 捕获异常,如请求异常等,返回 None
                return None

6. 工作流程 #

6.1 整体架构 #

这是一个 OAuth2 认证服务器与资源服务器分离 的 MCP 示例,包含四个部分:

组件 文件 端口 职责
认证服务器 auth_server.py 9000 颁发令牌、自省令牌
资源服务器 mcp_server.py 8000 提供 MCP 工具和资源
令牌验证器 token_verifier.py - 调用认证服务器自省接口
客户端 mcp_client.py - 获取令牌并访问资源

6.2 各文件说明 #

6.2.1. auth_server.py — 认证服务器 #

基于 Starlette,提供两个 HTTP 端点:

① /oauth/token(令牌颁发)

  • 使用 Resource Owner Password Credentials(密码模式)
  • 请求:POST,application/x-www-form-urlencoded
  • 参数:grant_type=password、username、password
  • 校验 DEMO_USERS(alice/123456、bob/123456)
  • 成功后:生成 mcp_{hex(48)} 令牌,存入 _tokens,返回 access_token、expires_in 等

② /oauth/introspect(令牌自省,RFC 7662)

  • 资源服务器用来验证令牌
  • 请求:POST,application/x-www-form-urlencoded,参数 token
  • 根据 _tokens 检查:
    • 存在且未过期 → {"active": true, "client_id": "...", "scope": "...", "exp": ...}
    • 不存在或过期 → {"active": false}

6.2.2. token_verifier.py — 令牌验证器 #

实现 MCP 的 TokenVerifier 协议:

  • 构造函数:接收 introspection_endpoint(如 http://127.0.0.1:9000/oauth/introspect)
  • verify_token(token):异步调用 POST /oauth/introspect,传入 token
  • 若 active: true → 构造并返回 AccessToken(client_id、scopes、expires_at 等)
  • 否则或异常 → 返回 None

6.2.3. mcp_server.py — 资源服务器 #

配置:

  • token_verifier=IntrospectionTokenVerifier(INTROSPECTION_URL):用自省接口验证
  • AuthSettings:issuer_url、resource_server_url、required_scopes=["user"]

资源与工具:

  • user://profile:返回当前用户资料
  • user://stats:返回当前用户统计
  • get_user_data:根据 user_id 返回用户数据(支持 current)

用户身份:

  • 通过 get_access_token() 从认证上下文获取当前 AccessToken
  • 使用 client_id 作为用户标识(如 alice、bob)

6.2.4. mcp_client.py — 客户端 #

流程:

  1. 获取令牌:fetch_token() → POST /oauth/token,使用 username、password
  2. 访问资源:streamable_http_client() 带上 Authorization: Bearer {token}
  3. MCP 操作:list_resources、list_tools、read_resource、call_tool

参数:

  • --auth-url、--resource-url、--username、--password(或环境变量)

6.3 时序图 #

6.3.1 完整 #

┌─────────────┐     ┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   Client    │     │  Auth Server    │     │ Resource Server │     │ Token Verifier  │
│ mcp_client  │     │   auth_server   │     │   mcp_server     │     │ token_verifier  │
└──────┬──────┘     └────────┬────────┘     └────────┬────────┘     └────────┬────────┘
       │                     │                       │                       │
       │  ① POST /oauth/token                        │                       │
       │  grant_type=password                        │                       │
       │  username=alice&password=123456             │                       │
       │────────────────────────────────────────────►                       │
       │                     │                       │                       │
       │  ② 200 OK           │                       │                       │
       │  {access_token, ...}│                       │                       │
       │◄────────────────────────────────────────────                       │
       │                     │                       │                       │
       │  ③ MCP 请求 (Streamable HTTP)               │                       │
       │  Authorization: Bearer <token>             │                       │
       │──────────────────────────────────────────────────────────────────────►
       │                     │                       │                       │
       │                     │                       │  ④ verify_token(token)│
       │                     │                       │                       │
       │                     │                       │  ⑤ POST /oauth/introspect
       │                     │                       │  token=<access_token>  │
       │                     │                       │──────────────────────►│
       │                     │                       │                       │
       │                     │  ⑥ POST /oauth/introspect                     │
       │                     │  token=<access_token>  │                       │
       │                     │◄──────────────────────│                       │
       │                     │                       │                       │
       │                     │  ⑦ 200 OK            │                       │
       │                     │  {active:true,       │                       │
       │                     │   client_id:alice}    │                       │
       │                     │──────────────────────►│                       │
       │                     │                       │                       │
       │                     │                       │  ⑧ AccessToken        │
       │                     │                       │◄──────────────────────│
       │                     │                       │                       │
       │  ⑨ MCP 响应 (资源/工具结果)               │                       │
       │◄──────────────────────────────────────────────────────────────────────
       │                     │                       │                       │

6.3.2 (客户端视角) #

┌─────────────┐           ┌─────────────────┐           ┌─────────────────┐
│   Client    │           │  Auth Server     │           │ Resource Server  │
└──────┬──────┘           └────────┬────────┘           └────────┬────────┘
       │                           │                             │
       │  ① 获取令牌               │                             │
       │  POST /oauth/token        │                             │
       │  (username, password)      │                             │
       │──────────────────────────►│                             │
       │                           │                             │
       │  ② access_token            │                             │
       │◄──────────────────────────│                             │
       │                           │                             │
       │  ③ MCP 请求               │                             │
       │  Bearer <token>           │                             │
       │─────────────────────────────────────────────────────────►│
       │                           │                             │
       │                           │  ④ 自省验证                 │
       │                           │  POST /oauth/introspect     │
       │                           │◄────────────────────────────│
       │                           │                             │
       │                           │  ⑤ {active, client_id}      │
       │                           │─────────────────────────────►│
       │                           │                             │
       │  ⑥ MCP 响应 (资源/工具)   │                             │
       │◄─────────────────────────────────────────────────────────│
       │                           │                             │

6.4 时序图 #

sequenceDiagram participant C as Client participant A as Auth Server participant R as Resource Server participant V as Token Verifier C->>A: ① POST /oauth/token (username, password) A->>C: ② access_token C->>R: ③ MCP 请求 + Bearer token R->>V: verify_token(token) V->>A: ④ POST /oauth/introspect (token) A->>V: ⑤ {active: true, client_id} V->>R: AccessToken R->>C: ⑥ MCP 响应 (资源/工具结果)

6.5 数据流小结 #

  1. 令牌获取:Client → Auth Server:POST /oauth/token,返回 access_token。
  2. 令牌携带:Client 在后续请求中带上 Authorization: Bearer <token>。
  3. 令牌验证:Resource Server 收到请求后,通过 TokenVerifier.verify_token() 调用 Auth Server 的 /oauth/introspect。
  4. 权限与身份:自省返回 active、client_id、scope 等,Resource Server 据此决定是否允许访问并识别当前用户。
← 上一节 44.授权 下一节 asyncio →

访问验证

请输入访问令牌

Token不正确,请重新输入