导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.FunctionCalling
  • starlette
  • FastAPI
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • LiteLLM
  • pydantic-settings
  • ai_agent
  • format
  • diff
  • mcp_server
  • 1. 什么是 MCP 授权?
  • 2. 本章你将学到
  • 3. 何时需要授权?
    • 3.1 本地 vs 远程:授权方式不同
  • 4. 授权流程概览
  • 5. 授权流程:分步说明
    • 5.1 初始握手:401 + 元数据地址
    • 5.2 受保护资源元数据(PRM)
    • 5.3 授权服务器发现
    • 5.4 客户端注册
    • 5.5 用户授权
    • 5.6 发起已认证请求
  • 1. 启动MCP服务器
    • 1.1. .env
    • 1.2. mcp_server.py
  • 2. 启动MCP客户端
    • 2.1. mcp_client.py
  • 3. 元数据地址
    • 3.1. token_verifier.py
    • 3.2. mcp_server.py
  • 4. 获取受保护资源元数据
    • 4.1. mcp_client.py
  • 5. 获取授权服务器元数据
    • 5.1. auth_server.py
    • 5.2. .env
    • 5.3. mcp_client.py
  • 6. 客户端注册(动态注册)
    • 6.1. auth_server.py
    • 6.2. mcp_client.py
    • 6.3 start.py
  • 7. 引导用户授权
    • 7.1. auth_server.py
    • 7.2. mcp_client.py
  • 8. 使用授权码换取access_token
    • 8.1. auth_server.py
    • 8.2. mcp_client.py
  • 9. 携带access_token访问受保护资源
    • 9.1. auth_server.py
    • 9.2. mcp_client.py
    • 9.3. mcp_server.py
    • 9.4. token_verifier.py

1. 什么是 MCP 授权? #

  • 什么是 MCP 授权?
  • MCP 授权

授权(Authorization)用来控制「谁可以访问什么」:只有通过身份验证、且被允许的用户,才能访问 MCP 服务器提供的敏感资源或执行敏感操作。

概念 通俗理解
无授权 任何人连上服务器就能用,适合本地、测试
有授权 需先登录、同意权限,才能调用工具或资源

MCP 使用 OAuth 2.1 标准实现授权,不绑定特定身份系统,可与各类授权服务(如 Keycloak、Auth0)配合。

MCP 授权基于以下标准:

标准 用途
OAuth 2.1 核心授权框架
RFC 8414 授权服务器元数据发现
RFC 7591 动态客户端注册
RFC 9728 受保护资源元数据
RFC 8707 资源指示符

2. 本章你将学到 #

  • 什么时候需要为 MCP 服务器加授权
  • 本地服务器 vs 远程服务器在授权上的区别
  • OAuth 授权流程的 6 个步骤
  • 用 Keycloak + Python 搭建一个带授权的 MCP 服务器
  • 常见安全陷阱及规避方法

3. 何时需要授权? #

授权是可选的,但以下场景强烈建议使用:

场景 说明
访问用户数据 邮件、文档、数据库等敏感信息
需要审计 记录谁在何时执行了哪些操作
第三方 API 需用户同意 如 OAuth 授权访问 Google、GitHub 等
企业环境 有严格访问控制要求
按用户限流或计费 需要区分不同用户的使用量

3.1 本地 vs 远程:授权方式不同 #

传输方式 典型场景 授权方式
STDIO(本地) 客户端和服务器在同一台机器 可用环境变量、本地凭据或第三方库,不一定走 OAuth
HTTP/SSE(远程) 服务器在远程,客户端通过网络连接 通常用 OAuth:用户登录、授权,客户端拿到令牌后访问

小结:本地 STDIO 服务器授权方式灵活;远程 HTTP 服务器通常用 OAuth 建立「用户已授权」的信任。

4. 授权流程概览 #

当客户端连接受保护的 MCP 服务器时,大致经历以下步骤:

sequenceDiagram participant Client as 客户端 participant MCPServer as MCP服务器(受保护) participant ResMetadata as 受保护资源元数据端点 participant AuthMetadata as 授权服务器元数据端点 participant AuthServer as 授权服务器 participant User as 用户 Client->>MCPServer: 请求受保护资源 MCPServer-->>Client: 401 Unauthorized + 元数据地址 Client->>ResMetadata: 获取受保护资源元数据 ResMetadata-->>Client: 返回元数据(包含授权服务器地址) Client->>AuthMetadata: 获取授权服务器元数据 AuthMetadata-->>Client: 返回元数据(授权端点、令牌端点等) Client->>AuthServer: 客户端注册(动态注册) AuthServer-->>Client: 返回 client_id 等凭证 Client->>User: 引导用户授权 User->>AuthServer: 登录并授权 AuthServer-->>Client: 返回授权码 Client->>AuthServer: 使用授权码换取 access_token AuthServer-->>Client: 返回 access_token Client->>MCPServer: 携带 access_token 请求受保护资源 MCPServer->>AuthServer: (可选)内省或验证 token AuthServer-->>MCPServer: token 有效 MCPServer-->>Client: 返回受保护资源

5. 授权流程:分步说明 #

5.1 初始握手:401 + 元数据地址 #

客户端首次请求时,服务器返回 401 Unauthorized,并在响应头中告诉客户端「去哪获取授权信息」:

HTTP/1.1 401 Unauthorized
WWW-Authenticate: Bearer realm="mcp",
  resource_metadata="http://127.0.0.1:8000/.well-known/oauth-protected-resource/mcp"

客户端据此知道:需要授权,且元数据文档在 resource_metadata 指向的地址。

5.2 受保护资源元数据(PRM) #

客户端请求该元数据文档,获取授权服务器地址、支持的 scope 等:

{
  "resource": "http://127.0.0.1:8000/mcp",
  "authorization_servers": [
    "http://127.0.0.1:9000/"
  ],
  "scopes_supported": [
    "user"
  ],
  "bearer_methods_supported": [
    "header"
  ]
}

更多字段见 RFC 9728 第 3.2 节。

5.3 授权服务器发现 #

客户端根据 PRM 中的 authorization_servers,请求授权服务器的元数据(OIDC Discovery 或 OAuth 2.0 元数据),获取登录、换令牌等端点:

http://127.0.0.1:9000/.well-known/oauth-authorization-server

{
  "issuer": "http://127.0.0.1:9000",
  "authorization_endpoint": "http://127.0.0.1:9000/oauth/authorize",
  "token_endpoint": "http://127.0.0.1:9000/oauth/token",
  "registration_endpoint": "http://127.0.0.1:9000/oauth/register",
  "introspection_endpoint": "http://127.0.0.1:9000/oauth/introspect",
  "response_types_supported": [
    "code"
  ],
  "grant_types_supported": [
    "authorization_code"
  ],
  "scopes_supported": [
    "user"
  ]
}

5.4 客户端注册 #

客户端需要在授权服务器上「登记」自己,有两种方式:

方式 说明
预注册 管理员提前在授权服务器中创建客户端,客户端内置 client_id 等
动态注册(DCR) 客户端向 registration_endpoint 提交信息,自动完成注册

动态注册示例请求:

{
    "client_name": "MCP OAuth 客户端",
    "redirect_uris": ["http://127.0.0.1:8000/callback"],
}

若授权服务器不支持 DCR,且客户端未预注册,则需要用户手动输入客户端信息。

5.5 用户授权 #

客户端打开浏览器,跳转到 /authorize,用户登录并同意授权。授权服务器重定向回客户端,并带上授权码;客户端用授权码换取访问令牌:

{
"access_token": "eyJhbGciOiJSUzI1NiIs...",# 访问令牌
"token_type": "Bearer",                   # 令牌类型为Bearer
"expires_in": 3600,                       # 距离过期剩余秒数
"scope":  ["user"],                       # 空格分隔的作用域字符串
}

5.6 发起已认证请求 #

客户端在后续请求中,将 access_token 放在 Authorization 头中:

GET http://127.0.0.1:8000/mcp HTTP/1.1
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...

服务器验证令牌有效性及 scope,通过后处理请求。

1. 启动MCP服务器 #

uv add "mcp[cli]"

1.1. .env #

.env

MCP_HOST=127.0.0.1
MCP_PORT=8000

1.2. mcp_server.py #

mcp_server.py

# MCP 资源服务器(OAuth2 分离架构)
"""
MCP 资源服务器(OAuth2 分离架构)

与认证服务器分离,通过令牌自省(RFC 7662)验证令牌。
- 认证服务器:oauth_auth_server.py(端口 9000)
- 资源服务器:本服务(端口 8000)
"""

# 导入操作系统相关模块
import os
# 导入系统功能模块
import sys
# 导入 MCP 上下文与主服务类
from mcp.server.fastmcp import FastMCP
# 读取主机地址,优先使用环境变量 MCP_HOS,否则默认 '127.0.0.1'
HOST = os.environ.get("MCP_HOST", "127.0.0.1")
# 读取端口号,优先使用环境变量 MCP_PORT,否则默认 8000(需转换为 int)
PORT = int(os.environ.get("MCP_PORT", "8000"))
# 资源服务器自身的公开 URL,带 /mcp 路径用于元数据发现
RESOURCE_URL = f"http://{HOST}:{PORT}/mcp"
# 创建 FastMCP 资源服务器实例
mcp = FastMCP(
    # 服务器名称
    name="OAuth资源服务器",
    host=HOST,
    port=PORT
)

# 仅当该文件作为主程序直接运行时执行下列代码
if __name__ == "__main__":
    # 打印资源服务器地址
    print(f"MCP 资源服务器: {RESOURCE_URL}")
    # 启动 FastMCP 服务,采用 streamable-http 模式
    mcp.run(transport="streamable-http")

2. 启动MCP客户端 #

2.1. mcp_client.py #

mcp_client.py

"""
MCP OAuth 客户端

按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
    # 头部值不存在直接返回None
    if not www_authenticate:
        return None
    # 匹配resource_metadata字段的内容
    match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
    # 有匹配返回第一个分组,否则返回None
    return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
    # 无令牌访问资源预期获得401
    response = await client.get(resource_url)
    # 如果不是401,说明流程有误
    if response.status_code != 401:
        raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")

    # 从头部解析resource_metadata字段
    return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))

# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
    # 创建默认30秒超时的httpx异步客户端,自动跟随重定向
    async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
        # 第一步:无token请求受保护资源
        print("1) 请求受保护资源(无 token)...")
        resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
        print(f"   收到资源元数据地址: {resource_metadata_url}")

# 命令行主入口
def main():
    try:
        # 受保护资源的元数据地址
        resource_url = "http://127.0.0.1:8000/mcp"
        # 用户名(模拟登录用)
        username = "admin"
        # 密码(模拟登录用)
        password = "123456"
        # 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
        redirect_uri = "http://127.0.0.1:8000/callback"
        # 运行主业务流程
        asyncio.run(run_client(resource_url, username, password, redirect_uri))
        return 0
    except httpx.HTTPStatusError as e:
        # HTTP请求状态异常统一输出
        print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
        return 1
    except Exception as e:
        # 其他异常统一输出
        print(f"执行失败: {e}")
        import traceback

        traceback.print_exc()
        return 1


# 判断是否作为主程序入口执行
if __name__ == "__main__":
    # 设置标准输入输出流的编码为utf-8(如果支持该方法)
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    try:
        # 启动主函数,异常时按约定返回码
        sys.exit(main())
    except KeyboardInterrupt:
        # 捕获Ctrl+C中断,友好打印并退出
        print("\n客户端已停止")
        sys.exit(130)

3. 元数据地址 #

http://127.0.0.1:8000/.well-known/oauth-protected-resource/mcp

{
  "resource": "http://127.0.0.1:8000/mcp",
  "authorization_servers": [
    "http://127.0.0.1:9000/"
  ],
  "scopes_supported": [
    "user"
  ],
  "bearer_methods_supported": [
    "header"
  ]
}

3.1. token_verifier.py #

token_verifier.py

"""
OAuth2 令牌自省验证器(RFC 7662)

资源服务器通过调用认证服务器的令牌自省端点验证 Bearer 令牌。
"""

# 导入 httpx 库用于发送 HTTP 请求
import httpx

# 从 mcp.server.auth.provider 模块导入 AccessToken 和 TokenVerifier
from mcp.server.auth.provider import AccessToken, TokenVerifier

# 定义 IntrospectionTokenVerifier 类,继承自 TokenVerifier
class IntrospectionTokenVerifier(TokenVerifier):
     # 通过 OAuth2 令牌自省(RFC 7662)验证令牌的类文档字符串
    """通过 OAuth2 令牌自省(RFC 7662)验证令牌。"""

    # 构造方法,接收自省端点地址 introspection_endpoint
    def __init__(self, introspection_endpoint):
        # 将自省端点地址保存到实例变量
        self.introspection_endpoint = introspection_endpoint

3.2. mcp_server.py #

mcp_server.py

# MCP 资源服务器(OAuth2 分离架构)
"""
MCP 资源服务器(OAuth2 分离架构)

与认证服务器分离,通过令牌自省(RFC 7662)验证令牌。
- 认证服务器:oauth_auth_server.py(端口 9000)
- 资源服务器:本服务(端口 8000)
"""

# 导入操作系统相关模块
import os
# 导入系统功能模块
import sys
# 导入 MCP 上下文与主服务类
from mcp.server.fastmcp import FastMCP
# 导入 MCP 的认证设置
+from mcp.server.auth.settings import AuthSettings
# 从 pydantic 导入 AnyHttpUrl 类型,用于校验 URL
+from pydantic import AnyHttpUrl
# 读取主机地址,优先使用环境变量 MCP_HOS,否则默认 '127.0.0.1'
HOST = os.environ.get("MCP_HOST", "127.0.0.1")
# 读取端口号,优先使用环境变量 MCP_PORT,否则默认 8000(需转换为 int)
PORT = int(os.environ.get("MCP_PORT", "8000"))
# 资源服务器自身的公开 URL,带 /mcp 路径用于元数据发现
RESOURCE_URL = f"http://{HOST}:{PORT}/mcp"
# 导入自定义的令牌自省验证器
+from token_verifier import IntrospectionTokenVerifier
# 认证服务器地址,优先通过环境变量 OAUTH_AUTH_SERVER,否则默认为 http://127.0.0.1:9000
+AUTH_SERVER = os.environ.get("OAUTH_AUTH_SERVER", "http://127.0.0.1:9000")
# 令牌自省端点 URL
+INTROSPECTION_URL = f"{AUTH_SERVER.rstrip('/')}/oauth/introspect"
# 创建 FastMCP 资源服务器实例
mcp = FastMCP(
    # 服务器名称
    name="OAuth资源服务器",
    host=HOST,
+   port=PORT,
    # 令牌验证器:通过 introspect 端点验证
+   token_verifier=IntrospectionTokenVerifier(INTROSPECTION_URL),
    # 认证相关设置
+   auth=AuthSettings(
        # 指定认证服务器 Issuer 地址
+       issuer_url=AnyHttpUrl(AUTH_SERVER),
        # 资源服务器自身的 URL
+       resource_server_url=AnyHttpUrl(RESOURCE_URL),
        # 需要的 scope
+       required_scopes=["user"],
+   ),
)

# 仅当该文件作为主程序直接运行时执行下列代码
if __name__ == "__main__":
    # 打印资源服务器地址
    print(f"MCP 资源服务器: {RESOURCE_URL}")
    # 启动 FastMCP 服务,采用 streamable-http 模式
    mcp.run(transport="streamable-http")

4. 获取受保护资源元数据 #

4.1. mcp_client.py #

mcp_client.py

"""
MCP OAuth 客户端

按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
    # 头部值不存在直接返回None
    if not www_authenticate:
        return None
    # 匹配resource_metadata字段的内容
    match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
    # 有匹配返回第一个分组,否则返回None
    return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
    # 无令牌访问资源预期获得401
    response = await client.get(resource_url)
    # 如果不是401,说明流程有误
    if response.status_code != 401:
        raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")

    # 从头部解析resource_metadata字段
    return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
+async def fetch_json(client, url):
    # 发起GET请求
+   resp = await client.get(url)
    # 请求状态非200会引发异常
+   resp.raise_for_status()
    # 解析响应内容为JSON
+   data = resp.json()
    # 必须是字典类型
+   if not isinstance(data, dict):
+       raise ValueError(f"元数据响应不是 JSON 对象: {url}")
    # 返回解析结果
+   return data
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
    # 创建默认30秒超时的httpx异步客户端,自动跟随重定向
    async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
        # 第一步:无token请求受保护资源
        print("1) 请求受保护资源(无 token)...")
        resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
        print(f"   收到资源元数据地址: {resource_metadata_url}")
         # 第二步:获取资源元数据
+       print("2) 获取受保护资源元数据...")
+       resource_metadata = await fetch_json(http_client, resource_metadata_url)
        # 检查authorization_servers字段
+       auth_servers = resource_metadata.get("authorization_servers")
+       if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
+           raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
        # 选取第一个授权服务器
+       auth_server = auth_servers[0].rstrip("/")
+       print(f"   授权服务器: {auth_server}")

# 命令行主入口
def main():
    try:
        # 受保护资源的元数据地址
        resource_url = "http://127.0.0.1:8000/mcp"
        # 用户名(模拟登录用)
        username = "admin"
        # 密码(模拟登录用)
        password = "123456"
        # 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
        redirect_uri = "http://127.0.0.1:8000/callback"
        # 运行主业务流程
        asyncio.run(run_client(resource_url, username, password, redirect_uri))
        return 0
    except httpx.HTTPStatusError as e:
        # HTTP请求状态异常统一输出
        print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
        return 1
    except Exception as e:
        # 其他异常统一输出
        print(f"执行失败: {e}")
        import traceback

        traceback.print_exc()
        return 1


# 判断是否作为主程序入口执行
if __name__ == "__main__":
    # 设置标准输入输出流的编码为utf-8(如果支持该方法)
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    try:
        # 启动主函数,异常时按约定返回码
        sys.exit(main())
    except KeyboardInterrupt:
        # 捕获Ctrl+C中断,友好打印并退出
        print("\n客户端已停止")
        sys.exit(130)

5. 获取授权服务器元数据 #

http://127.0.0.1:9000/.well-known/oauth-authorization-server
{
  "issuer": "http://127.0.0.1:9000",                                   # 认证服务器地址
  "authorization_endpoint": "http://127.0.0.1:9000/oauth/authorize",   # 授权端点
  "token_endpoint": "http://127.0.0.1:9000/oauth/token",               # 令牌端点
  "registration_endpoint": "http://127.0.0.1:9000/oauth/register",     # 客户端动态注册端点
  "introspection_endpoint": "http://127.0.0.1:9000/oauth/introspect",  # 令牌自省端点
  "response_types_supported": [
    "code"                                                              # 支持的 response_type
  ],
  "grant_types_supported": [
    "authorization_code"                                                # 支持的授权类型
  ],
  "scopes_supported": [
    "user"                                                              # 支持的 scope(权限域)
  ]
}

5.1. auth_server.py #

auth_server.py

"""
OAuth2 认证服务器(独立运行)

实现完整授权码流程(含元数据发现与动态注册):
- 授权服务器元数据:GET /.well-known/oauth-authorization-server
- 动态客户端注册:POST /oauth/register
- 授权端点:GET /oauth/authorize
- 令牌端点:POST /oauth/token(authorization_code)
- 令牌自省:POST /oauth/introspect(RFC 7662)
"""

# 导入异步IO支持
import asyncio
# 导入操作系统环境变量支持
import os
# 导入随机数生成工具
import secrets
# 导入系统库函数
import sys
# 导入时间函数
import time
# 导入URL编码工具
from urllib.parse import urlencode
# 导入Starlette微框架相关组件
from starlette.applications import Starlette
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Route
# 导入uvicorn用于运行ASGI应用
from uvicorn import Config, Server
# 从请求对象获取服务器的issuer信息
def _issuer_from_request(request):
    # request.url.scheme:获取请求的协议,通常是 http 或 https。
    # request.url.netloc:获取请求的“网络位置”,即域名(或 IP 地址)加上可选的端口号。例如 example.com、localhost:8080。
    return f"{request.url.scheme}://{request.url.netloc}"
# 授权服务器元数据发现端点
async def authorization_server_metadata(request):
    # 获取issuer信息
    issuer = _issuer_from_request(request)
    # 授权端点 用于引导用户授权
    authorization_endpoint = f"{issuer}/oauth/authorize"
    # 令牌端点  用于获取访问令牌
    token_endpoint = f"{issuer}/oauth/token"
    # 注册端点 用于动态注册客户端
    registration_endpoint = f"{issuer}/oauth/register"
    # 自省端点 用于验证令牌
    introspection_endpoint = f"{issuer}/oauth/introspect"
    # 构造元数据响应
    metadata = {
        "issuer": issuer,# 认证服务器地址
        "authorization_endpoint": authorization_endpoint, # 授权端点
        "token_endpoint": token_endpoint, # 令牌端点
        "registration_endpoint": registration_endpoint, # 注册端点
        "introspection_endpoint": introspection_endpoint, # 自省端点
        "response_types_supported": ["code"], # 支持的响应类型
        "grant_types_supported": ["authorization_code"], # 支持的授权类型
        "scopes_supported": ["user"], # 支持的scope,scope指的是访问资源范围
    }
    # 返回JSON响应
    return JSONResponse(metadata)

# 创建Starlette应用,并注册路由
def create_app():
    routes = [
        Route("/.well-known/oauth-authorization-server", authorization_server_metadata, methods=["GET"])
    ]
    return Starlette(routes=routes)
# 运行ASGI服务器
async def run_server(host, port):
    # 创建应用
    app = create_app()
    # 配置uvicorn
    config = Config(app, host=host, port=port, log_level="info")
    # 创建服务器对象
    server = Server(config)
    # 启动服务器服务
    await server.serve()

# 入口函数,负责环境和服务启动
def main():
    # 从环境变量获取服务IP和端口,默认127.0.0.1:9000
    host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
    port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))

    # 服务端点说明
    print(f"OAuth2 认证服务器: http://{host}:{port}")
    print("  元数据端点: GET /.well-known/oauth-authorization-server")
    print("  动态注册端点: POST /oauth/register")
    print("  授权端点: GET /oauth/authorize")
    print("  令牌端点: POST /oauth/token (grant_type=authorization_code)")
    print("  自省端点: POST /oauth/introspect")

    # 启动服务
    asyncio.run(run_server(host, port))
    return 0

# 程序入口
if __name__ == "__main__":
    sys.exit(main())

5.2. .env #

.env

MCP_HOST=127.0.0.1
MCP_PORT=8000
+OAUTH_AUTH_HOST=127.0.0.1
+OAUTH_AUTH_PORT=9000

5.3. mcp_client.py #

mcp_client.py

"""
MCP OAuth 客户端

按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
    # 头部值不存在直接返回None
    if not www_authenticate:
        return None
    # 匹配resource_metadata字段的内容
    match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
    # 有匹配返回第一个分组,否则返回None
    return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
    # 无令牌访问资源预期获得401
    response = await client.get(resource_url)
    # 如果不是401,说明流程有误
    if response.status_code != 401:
        raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")

    # 从头部解析resource_metadata字段
    return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
async def fetch_json(client, url):
    # 发起GET请求
    resp = await client.get(url)
    # 请求状态非200会引发异常
    resp.raise_for_status()
    # 解析响应内容为JSON
    data = resp.json()
    # 必须是字典类型
    if not isinstance(data, dict):
        raise ValueError(f"元数据响应不是 JSON 对象: {url}")
    # 返回解析结果
    return data
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
    # 创建默认30秒超时的httpx异步客户端,自动跟随重定向
    async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
        # 第一步:无token请求受保护资源
        print("1) 请求受保护资源(无 token)...")
        resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
        print(f"   收到资源元数据地址: {resource_metadata_url}")
         # 第二步:获取资源元数据
        print("2) 获取受保护资源元数据...")
        resource_metadata = await fetch_json(http_client, resource_metadata_url)
        # 检查authorization_servers字段
        auth_servers = resource_metadata.get("authorization_servers")
        if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
            raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
        # 选取第一个授权服务器
        auth_server = auth_servers[0].rstrip("/")
        print(f"   授权服务器: {auth_server}")
        # 第三步:获取授权服务器元数据
+       print("3) 获取授权服务器元数据...")
        # 构造授权服务器元数据地址
+       auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
+       print(f"   授权服务器元数据地址: {auth_metadata_url}")
        # 异步获取授权服务器元数据(JSON)
+       auth_metadata = await fetch_json(http_client, auth_metadata_url)
        # 从元数据中获取授权端点
+       authorization_endpoint = str(auth_metadata.get("authorization_endpoint", ""))
        # 从元数据中获取令牌端点
+       token_endpoint = str(auth_metadata.get("token_endpoint", ""))
        # 从元数据中获取注册端点
+       registration_endpoint = str(auth_metadata.get("registration_endpoint", ""))
        # 检查三个端点是否都存在,否则抛出异常
+       if not authorization_endpoint or not token_endpoint or not registration_endpoint:
+           raise ValueError("授权服务器元数据缺少关键端点")
        # 打印授权端点
+       print(f"   授权端点: {authorization_endpoint}")
        # 打印令牌端点
+       print(f"   令牌端点: {token_endpoint}")
        # 打印注册端点
+       print(f"   注册端点: {registration_endpoint}")

# 命令行主入口
def main():
    try:
        # 受保护资源的元数据地址
        resource_url = "http://127.0.0.1:8000/mcp"
        # 用户名(模拟登录用)
        username = "admin"
        # 密码(模拟登录用)
        password = "123456"
        # 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
        redirect_uri = "http://127.0.0.1:8000/callback"
        # 运行主业务流程
        asyncio.run(run_client(resource_url, username, password, redirect_uri))
        return 0
    except httpx.HTTPStatusError as e:
        # HTTP请求状态异常统一输出
        print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
        return 1
    except Exception as e:
        # 其他异常统一输出
        print(f"执行失败: {e}")
        import traceback

        traceback.print_exc()
        return 1


# 判断是否作为主程序入口执行
if __name__ == "__main__":
    # 设置标准输入输出流的编码为utf-8(如果支持该方法)
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    try:
        # 启动主函数,异常时按约定返回码
        sys.exit(main())
    except KeyboardInterrupt:
        # 捕获Ctrl+C中断,友好打印并退出
        print("\n客户端已停止")
        sys.exit(130)

6. 客户端注册(动态注册) #

6.1. auth_server.py #

auth_server.py

"""
OAuth2 认证服务器(独立运行)

实现完整授权码流程(含元数据发现与动态注册):
- 授权服务器元数据:GET /.well-known/oauth-authorization-server
- 动态客户端注册:POST /oauth/register
- 授权端点:GET /oauth/authorize
- 令牌端点:POST /oauth/token(authorization_code)
- 令牌自省:POST /oauth/introspect(RFC 7662)
"""

# 导入异步IO支持
import asyncio
# 导入操作系统环境变量支持
import os
# 导入随机数生成工具
import secrets
# 导入系统库函数
import sys
# 导入时间函数
import time
# 导入URL编码工具
from urllib.parse import urlencode
# 导入Starlette微框架相关组件
from starlette.applications import Starlette
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Route
# 导入uvicorn用于运行ASGI应用
from uvicorn import Config, Server
# 客户端注册信息字典
+_clients = {}
# 从请求对象获取服务器的issuer信息
def _issuer_from_request(request):
    # request.url.scheme:获取请求的协议,通常是 http 或 https。
    # request.url.netloc:获取请求的“网络位置”,即域名(或 IP 地址)加上可选的端口号。例如 example.com、localhost:8080。
    return f"{request.url.scheme}://{request.url.netloc}"
# 授权服务器元数据发现端点
async def authorization_server_metadata(request):
    # 获取issuer信息
    issuer = _issuer_from_request(request)
    # 授权端点 用于引导用户授权
    authorization_endpoint = f"{issuer}/oauth/authorize"
    # 令牌端点  用于获取访问令牌
    token_endpoint = f"{issuer}/oauth/token"
    # 注册端点 用于动态注册客户端
    registration_endpoint = f"{issuer}/oauth/register"
    # 自省端点 用于验证令牌
    introspection_endpoint = f"{issuer}/oauth/introspect"
    # 构造元数据响应
    metadata = {
        "issuer": issuer,# 认证服务器地址
        "authorization_endpoint": authorization_endpoint, # 授权端点
        "token_endpoint": token_endpoint, # 令牌端点
        "registration_endpoint": registration_endpoint, # 注册端点
        "introspection_endpoint": introspection_endpoint, # 自省端点
        "response_types_supported": ["code"], # 支持的响应类型
        "grant_types_supported": ["authorization_code"], # 支持的授权类型
        "scopes_supported": ["user"], # 支持的scope,scope指的是访问资源范围
    }
    # 返回JSON响应
    return JSONResponse(metadata)


# 动态客户端注册端点
+async def register_client(request):
+   try:
        # 解析请求体JSON
+       body = await request.json()
+   except Exception:
        # JSON解析失败,返回错误
+       return JSONResponse(
+           {"error": "invalid_client_metadata", "error_description": "JSON 请求体无效"},
+           status_code=400,
+       )

    # 获取redirect_uris参数
+   redirect_uris = body.get("redirect_uris")
    # 校验redirect_uris类型和内容
+   if not isinstance(redirect_uris, list) or not redirect_uris:
+       return JSONResponse(
+           {"error": "invalid_redirect_uri", "error_description": "缺少 redirect_uris"},
+           status_code=400,
+       )
+   if not all(isinstance(uri, str) and uri.startswith(("http://", "https://")) for uri in redirect_uris):
+       return JSONResponse(
+           {"error": "invalid_redirect_uri", "error_description": "redirect_uris 必须是 HTTP/HTTPS 地址"},
+           status_code=400,
+       )

    # 生成client_id
+   client_id = f"client_{secrets.token_hex(8)}"
    # 构造客户端注册信息
+   client = {
+       "client_id": client_id,
+       "client_name": body.get("client_name", "MCP OAuth 客户端"),
+       "redirect_uris": redirect_uris,
+       "grant_types": ["authorization_code"],
+       "response_types": ["code"],
+       "scope": "user",
+   }
    # 存储客户端信息
+   _clients[client_id] = client
    # 返回注册结果
+   return JSONResponse(client, status_code=201)

# 创建Starlette应用,并注册路由
def create_app():
    routes = [
+       Route("/.well-known/oauth-authorization-server", authorization_server_metadata, methods=["GET"]),
+       Route("/oauth/register", register_client, methods=["POST"]),
    ]
    return Starlette(routes=routes)
# 运行ASGI服务器
async def run_server(host, port):
    # 创建应用
    app = create_app()
    # 配置uvicorn
    config = Config(app, host=host, port=port, log_level="info")
    # 创建服务器对象
    server = Server(config)
    # 启动服务器服务
    await server.serve()

# 入口函数,负责环境和服务启动
def main():
    # 从环境变量获取服务IP和端口,默认127.0.0.1:9000
    host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
    port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))

    # 服务端点说明
    print(f"OAuth2 认证服务器: http://{host}:{port}")
    print("  元数据端点: GET /.well-known/oauth-authorization-server")
    print("  动态注册端点: POST /oauth/register")
    print("  授权端点: GET /oauth/authorize")
    print("  令牌端点: POST /oauth/token (grant_type=authorization_code)")
    print("  自省端点: POST /oauth/introspect")

    # 启动服务
    asyncio.run(run_server(host, port))
    return 0

# 程序入口
if __name__ == "__main__":
    sys.exit(main())

6.2. mcp_client.py #

mcp_client.py

"""
MCP OAuth 客户端

按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
    # 头部值不存在直接返回None
    if not www_authenticate:
        return None
    # 匹配resource_metadata字段的内容
    match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
    # 有匹配返回第一个分组,否则返回None
    return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
    # 无令牌访问资源预期获得401
    response = await client.get(resource_url)
    # 如果不是401,说明流程有误
    if response.status_code != 401:
        raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")

    # 从头部解析resource_metadata字段
    return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
async def fetch_json(client, url):
    # 发起GET请求
    resp = await client.get(url)
    # 请求状态非200会引发异常
    resp.raise_for_status()
    # 解析响应内容为JSON
    data = resp.json()
    # 必须是字典类型
    if not isinstance(data, dict):
        raise ValueError(f"元数据响应不是 JSON 对象: {url}")
    # 返回解析结果
    return data
# 异步动态客户端注册
+async def register_client(client, registration_endpoint, redirect_uri):
    # 通过POST向注册端点发起注册请求
+   resp = await client.post(
+       registration_endpoint,
+       json={
+           "client_name": "MCP OAuth 客户端",
+           "redirect_uris": [redirect_uri],
+       },
+   )
    # 检查响应状态
+   resp.raise_for_status()
    # 解析响应
+   data = resp.json()
    # 获取client_id
+   client_id = data.get("client_id")
    # client_id需为非空字符串
+   if not isinstance(client_id, str) or not client_id:
+       raise ValueError("动态注册失败:未返回客户端标识")
    # 返回client_id
+   return client_id    
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
    # 创建默认30秒超时的httpx异步客户端,自动跟随重定向
    async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
        # 第一步:无token请求受保护资源
        print("1) 请求受保护资源(无 token)...")
        resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
        print(f"   收到资源元数据地址: {resource_metadata_url}")
         # 第二步:获取资源元数据
        print("2) 获取受保护资源元数据...")
        resource_metadata = await fetch_json(http_client, resource_metadata_url)
        # 检查authorization_servers字段
        auth_servers = resource_metadata.get("authorization_servers")
        if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
            raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
        # 选取第一个授权服务器
        auth_server = auth_servers[0].rstrip("/")
        print(f"   授权服务器: {auth_server}")
        # 第三步:获取授权服务器元数据
        print("3) 获取授权服务器元数据...")
        # 构造授权服务器元数据地址
        auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
        print(f"   授权服务器元数据地址: {auth_metadata_url}")
        # 异步获取授权服务器元数据(JSON)
        auth_metadata = await fetch_json(http_client, auth_metadata_url)
        # 从元数据中获取授权端点
        authorization_endpoint = str(auth_metadata.get("authorization_endpoint", ""))
        # 从元数据中获取令牌端点
        token_endpoint = str(auth_metadata.get("token_endpoint", ""))
        # 从元数据中获取注册端点
        registration_endpoint = str(auth_metadata.get("registration_endpoint", ""))
        # 检查三个端点是否都存在,否则抛出异常
        if not authorization_endpoint or not token_endpoint or not registration_endpoint:
            raise ValueError("授权服务器元数据缺少关键端点")
        # 打印授权端点
        print(f"   授权端点: {authorization_endpoint}")
        # 打印令牌端点
        print(f"   令牌端点: {token_endpoint}")
        # 打印注册端点
        print(f"   注册端点: {registration_endpoint}")
        # 第四步:动态客户端注册
+       print("4) 动态客户端注册...")
+       client_id = await register_client(http_client, registration_endpoint, redirect_uri)
+       print(f"   客户端标识: {client_id}")

# 命令行主入口
def main():
    try:
        # 受保护资源的元数据地址
        resource_url = "http://127.0.0.1:8000/mcp"
        # 用户名(模拟登录用)
        username = "admin"
        # 密码(模拟登录用)
        password = "123456"
        # 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
        redirect_uri = "http://127.0.0.1:8000/callback"
        # 运行主业务流程
        asyncio.run(run_client(resource_url, username, password, redirect_uri))
        return 0
    except httpx.HTTPStatusError as e:
        # HTTP请求状态异常统一输出
        print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
        return 1
    except Exception as e:
        # 其他异常统一输出
        print(f"执行失败: {e}")
        import traceback

        traceback.print_exc()
        return 1


# 判断是否作为主程序入口执行
if __name__ == "__main__":
    # 设置标准输入输出流的编码为utf-8(如果支持该方法)
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    try:
        # 启动主函数,异常时按约定返回码
        sys.exit(main())
    except KeyboardInterrupt:
        # 捕获Ctrl+C中断,友好打印并退出
        print("\n客户端已停止")
        sys.exit(130)

6.3 start.py #

# 一键编排启动脚本说明
"""
一键编排启动脚本:
1) 启动 auth_server.py
2) 启动 mcp_server.py
3) 等待两个服务就绪
4) 运行 mcp_client.py
5) 结束后自动停止后台服务
"""

# 导入标准库模块
import os
import subprocess
import sys
import time
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen

# 加载 .env 文件中的环境变量到当前进程
def load_dotenv(dotenv_path: Path) -> None:
    # 检查 .env 文件是否存在
    if not dotenv_path.exists():
        return

    # 逐行读取 .env 文件内容
    for raw_line in dotenv_path.read_text(encoding="utf-8").splitlines():
        line = raw_line.strip()
        # 跳过空行、注释行和无等号行
        if not line or line.startswith("#") or "=" not in line:
            continue
        # 拆分 KEY=VALUE 结构
        key, value = line.split("=", 1)
        key = key.strip()
        value = value.strip()
        # 设置环境变量
        if key:
            os.environ[key] = value

# 用于轮询检测 HTTP 服务是否就绪,直到收到期望状态码为止
def wait_http_ready(
    name: str,
    url: str,
    expected_codes: set[int],
    timeout_seconds: int = 40,
) -> None:
    # 计算超时截止时间
    # time.monotonic() 返回一个**单调递增**的时间值(单位秒),不会受系统时间被手动修改、NTP 校时、时区变化影响。
    # + timeout_seconds` 把允许等待的秒数加上去,得到“最晚到什么时候必须结束等待”。
    deadline = time.monotonic() + timeout_seconds
    while time.monotonic() < deadline:
        # 构造 HTTP GET 请求
        req = Request(url=url, method="GET")
        try:
            with urlopen(req, timeout=2) as resp:
                # 获取响应状态码
                status = int(resp.status)
                # 如果是期望的状态码则认为服务就绪
                if status in expected_codes:
                    print(f"{name} is ready: {url} (HTTP {status})")
                    return
        except HTTPError as exc:
            # 某些接口如受保护资源返回 401 也可作为就绪信号
            status = int(exc.code)
            if status in expected_codes:
                print(f"{name} is ready: {url} (HTTP {status})")
                return
        except URLError:
            # 网络错误忽略,继续重试
            pass
        # 没有就绪则每秒重试
        time.sleep(1)
    # 超时后抛出异常
    raise TimeoutError(f"{name} did not become ready in {timeout_seconds}s: {url}")

# 停止后台进程(先 terminate,失败再 kill),支持异常处理
def stop_process(process: subprocess.Popen | None, name: str) -> None:
    # 检查进程是否存在且未退出
    if process is None or process.poll() is not None:
        return
    try:
        # 尝试优雅终止
        process.terminate()
        process.wait(timeout=5)
    except Exception:
        # 若无法优雅终止,则强制杀死
        process.kill()
        process.wait(timeout=5)
    # 打印停止信息
    print(f"Stopped {name} (pid={process.pid})")

# 主流程入口
def main() -> int:
    # 获取代码仓库根目录
    repo_root = Path(__file__).resolve().parent
    # 切换工作目录到代码根目录
    os.chdir(repo_root)

    # 加载根目录下的 .env 环境变量
    load_dotenv(repo_root / ".env")

    # 读取或设置各服务的主机和端口
    mcp_host = os.environ.get("MCP_HOST", "127.0.0.1")
    mcp_port = int(os.environ.get("MCP_PORT", "8000"))
    auth_host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
    auth_port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))
    # 设置默认的 OAuth 认证服务器地址
    os.environ.setdefault("OAUTH_AUTH_SERVER", f"http://{auth_host}:{auth_port}")

    # 创建 logs 目录用于保存日志
    logs_dir = repo_root / "logs"
    logs_dir.mkdir(exist_ok=True)

    # 获取 Python 可执行文件路径
    python_exe = sys.executable
    # 进程对象初始化
    auth_proc: subprocess.Popen | None = None
    mcp_proc: subprocess.Popen | None = None

    # 打开日志文件用于保存标准输出和标准错误
    auth_out = (logs_dir / "auth_server.out.log").open("w", encoding="utf-8")
    auth_err = (logs_dir / "auth_server.err.log").open("w", encoding="utf-8")
    mcp_out = (logs_dir / "mcp_server.out.log").open("w", encoding="utf-8")
    mcp_err = (logs_dir / "mcp_server.err.log").open("w", encoding="utf-8")

    try:
        # 启动 OAuth 认证服务器(auth_server.py)进程
        print("Starting OAuth auth server...")
        # 启动 OAuth 认证服务器(auth_server.py)进程,返回码为 subprocess.Popen() 的返回值
        auth_proc = subprocess.Popen(
            [python_exe, "auth_server.py"],
            cwd=repo_root,
            stdout=auth_out,
            stderr=auth_err,
            text=True,
        )

        # 启动 MCP 资源服务器(mcp_server.py)进程
        print("Starting MCP resource server...")
        # 启动 MCP 资源服务器(mcp_server.py)进程,返回码为 subprocess.Popen() 的返回值
        mcp_proc = subprocess.Popen(
            [python_exe, "mcp_server.py"],
            cwd=repo_root,
            stdout=mcp_out,
            stderr=mcp_err,
            text=True,
        )

        # 等待认证服务器元数据接口返回 200,确认其就绪
        wait_http_ready(
            name="Auth server",
            url=f"http://{auth_host}:{auth_port}/.well-known/oauth-authorization-server",
            expected_codes={200},
            timeout_seconds=40,
        )
        # 等待MCP资源服务器元数据或资源端点返回 200 或 401
        wait_http_ready(
            name="MCP server",
            url=f"http://{mcp_host}:{mcp_port}/mcp",
            expected_codes={200, 401},
            timeout_seconds=40,
        )

        # 运行 MCP 客户端 (mcp_client.py),输出会直接打印到当前控制台
        print("Running MCP client...")
        # 运行 MCP 客户端 (mcp_client.py),返回码为 subprocess.run() 的返回值
        client_result = subprocess.run(
            [python_exe, "mcp_client.py"],
            cwd=repo_root,
            text=True,
        )
        # 返回 MCP 客户端进程的返回码
        return int(client_result.returncode)
    finally:
        # 关闭日志文件
        auth_out.close()
        auth_err.close()
        mcp_out.close()
        mcp_err.close()
        # 停止已启动的后台服务进程
        stop_process(mcp_proc, "mcp_server.py")
        stop_process(auth_proc, "auth_server.py")
        # 打印结束信息
        print("Background servers stopped.")

# 判断是否作为脚本主程序运行
if __name__ == "__main__":
    # 退出码为主流程的返回结果
    sys.exit(main())

7. 引导用户授权 #

7.1. auth_server.py #

auth_server.py

"""
OAuth2 认证服务器(独立运行)

实现完整授权码流程(含元数据发现与动态注册):
- 授权服务器元数据:GET /.well-known/oauth-authorization-server
- 动态客户端注册:POST /oauth/register
- 授权端点:GET /oauth/authorize
- 令牌端点:POST /oauth/token(authorization_code)
- 令牌自省:POST /oauth/introspect(RFC 7662)
"""

# 导入异步IO支持
import asyncio
# 导入操作系统环境变量支持
import os
# 导入随机数生成工具
import secrets
# 导入系统库函数
import sys
# 导入时间函数
import time
# 导入URL编码工具
from urllib.parse import urlencode
# 导入Starlette微框架相关组件
from starlette.applications import Starlette
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Route
# 导入uvicorn用于运行ASGI应用
from uvicorn import Config, Server
# 用户数据,用户名:密码
+USERS = {
+   "admin": "123456"
+}
# 客户端注册信息字典
_clients = {}
# 授权码存储字典
+_auth_codes = {}
# 规范化scope参数,默认为user
+def _normalize_scopes(scope_str):
+   if not scope_str:
+       return ["user"]
+   scopes = [s for s in scope_str.split() if s]
+   return scopes or ["user"]
# 从请求对象获取服务器的issuer信息
def _issuer_from_request(request):
    # request.url.scheme:获取请求的协议,通常是 http 或 https。
    # request.url.netloc:获取请求的“网络位置”,即域名(或 IP 地址)加上可选的端口号。例如 example.com、localhost:8080。
    return f"{request.url.scheme}://{request.url.netloc}"
# 授权服务器元数据发现端点
async def authorization_server_metadata(request):
    # 获取issuer信息
    issuer = _issuer_from_request(request)
    # 授权端点 用于引导用户授权
    authorization_endpoint = f"{issuer}/oauth/authorize"
    # 令牌端点  用于获取访问令牌
    token_endpoint = f"{issuer}/oauth/token"
    # 注册端点 用于动态注册客户端
    registration_endpoint = f"{issuer}/oauth/register"
    # 自省端点 用于验证令牌
    introspection_endpoint = f"{issuer}/oauth/introspect"
    # 构造元数据响应
    metadata = {
        "issuer": issuer,# 认证服务器地址
        "authorization_endpoint": authorization_endpoint, # 授权端点
        "token_endpoint": token_endpoint, # 令牌端点
        "registration_endpoint": registration_endpoint, # 注册端点
        "introspection_endpoint": introspection_endpoint, # 自省端点
        "response_types_supported": ["code"], # 支持的响应类型
        "grant_types_supported": ["authorization_code"], # 支持的授权类型
        "scopes_supported": ["user"], # 支持的scope,scope指的是访问资源范围
    }
    # 返回JSON响应
    return JSONResponse(metadata)


# 动态客户端注册端点
async def register_client(request):
    try:
        # 解析请求体JSON
        body = await request.json()
    except Exception:
        # JSON解析失败,返回错误
        return JSONResponse(
            {"error": "invalid_client_metadata", "error_description": "JSON 请求体无效"},
            status_code=400,
        )

    # 获取redirect_uris参数
    redirect_uris = body.get("redirect_uris")
    # 校验redirect_uris类型和内容
    if not isinstance(redirect_uris, list) or not redirect_uris:
        return JSONResponse(
            {"error": "invalid_redirect_uri", "error_description": "缺少 redirect_uris"},
            status_code=400,
        )
    if not all(isinstance(uri, str) and uri.startswith(("http://", "https://")) for uri in redirect_uris):
        return JSONResponse(
            {"error": "invalid_redirect_uri", "error_description": "redirect_uris 必须是 HTTP/HTTPS 地址"},
            status_code=400,
        )

    # 生成client_id
    client_id = f"client_{secrets.token_hex(8)}"
    # 构造客户端注册信息
    client = {
        "client_id": client_id,
        "client_name": body.get("client_name", "MCP OAuth 客户端"),
        "redirect_uris": redirect_uris,
        "grant_types": ["authorization_code"],
        "response_types": ["code"],
        "scope": "user",
    }
    # 存储客户端信息
    _clients[client_id] = client
    # 返回注册结果
    return JSONResponse(client, status_code=201)

# 授权端点处理函数,处理用户授权请求
+async def authorize_endpoint(request):
    # 获取URL查询参数对象
+   query = request.query_params
    # 获取response_type参数
+   response_type = query.get("response_type")
    # 获取client_id参数
+   client_id = query.get("client_id")
    # 获取redirect_uri参数
+   redirect_uri = query.get("redirect_uri")
    # 获取scope参数,默认为"user"
+   scope = query.get("scope", "user")
    # 获取state参数
+   state = query.get("state")
    # 获取用户名
+   username = query.get("username")
    # 获取密码
+   password = query.get("password")
    # 获取approve参数,并转换为布尔值,判断用户是否同意授权
+   approve = query.get("approve", "false").lower() == "true"

    # 检查response_type是否为"code",否则返回错误响应
+   if response_type != "code":
+       return JSONResponse({"error": "unsupported_response_type"}, status_code=400)
    # 校验client_id和redirect_uri两个必要参数,缺少则返回错误
+   if not client_id or not redirect_uri:
+       return JSONResponse({"error": "invalid_request", "error_description": "缺少 client_id 或 redirect_uri"}, status_code=400)

    # 根据client_id查找已注册的客户端,未找到则返回错误
+   client = _clients.get(client_id)
+   if not client:
+       return JSONResponse({"error": "unauthorized_client", "error_description": "未知的 client_id"}, status_code=400)
    # 校验redirect_uri是否是在注册信息中登记过的回调地址
+   if redirect_uri not in client.get("redirect_uris", []):
+       return JSONResponse({"error": "invalid_request", "error_description": "redirect_uri 未注册"}, status_code=400)

    # 判断用户名、密码和用户同意(approve)是否都已提交,否则返回需要用户交互的错误
+   if not username or not password or not approve:
+       return JSONResponse(
+           {
+               "error": "interaction_required",  # 需要用户进一步操作
+               "error_description": "需要用户凭据并同意授权",  # 错误说明
+               "hint": "请调用 /oauth/authorize 并携带 username、password、approve=true",  # 提示
+               "required_params": ["username", "password", "approve=true"],  # 要求这些参数
+           },
+           status_code=400,
+       )

    # 校验用户名和密码,若不正确则返回拒绝访问
+   if USERS.get(username) != password:
+       return JSONResponse({"error": "access_denied", "error_description": "用户名或密码错误"}, status_code=401)

    # 生成授权码,格式为code_xxx
+   code = f"code_{secrets.token_hex(16)}"
    # 存储授权码相关数据,包括客户端ID、回调地址、用户名、作用域、过期时间
+   _auth_codes[code] = {
+       "client_id": client_id, #客户端标识
+       "redirect_uri": redirect_uri, #回调地址
+       "username": username, #用户名
+       "scopes": _normalize_scopes(scope), #作用域
+       "expires_at": int(time.time()) + 300,  # 授权码5分钟后过期时间
+   }

    # 构造回调参数字典,包含授权码
+   params = {"code": code}
    # 如果state参数存在,也一并回传
+   if state:
+       params["state"] = state
    # 拼接完整回调地址,附带参数
+   callback_url = f"{redirect_uri}?{urlencode(params)}"
    # 重定向到客户端回调地址,并返回授权码等信息
+   return RedirectResponse(url=callback_url, status_code=302)

# 创建Starlette应用,并注册路由
def create_app():
    routes = [
        Route("/.well-known/oauth-authorization-server", authorization_server_metadata, methods=["GET"]),
        Route("/oauth/register", register_client, methods=["POST"]),
+        Route("/oauth/authorize", authorize_endpoint, methods=["GET"])
    ]
    return Starlette(routes=routes)
# 运行ASGI服务器
async def run_server(host, port):
    # 创建应用
    app = create_app()
    # 配置uvicorn
    config = Config(app, host=host, port=port, log_level="info")
    # 创建服务器对象
    server = Server(config)
    # 启动服务器服务
    await server.serve()

# 入口函数,负责环境和服务启动
def main():
    # 从环境变量获取服务IP和端口,默认127.0.0.1:9000
    host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
    port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))

    # 服务端点说明
    print(f"OAuth2 认证服务器: http://{host}:{port}")
    print("  元数据端点: GET /.well-known/oauth-authorization-server")
    print("  动态注册端点: POST /oauth/register")
    print("  授权端点: GET /oauth/authorize")
    print("  令牌端点: POST /oauth/token (grant_type=authorization_code)")
    print("  自省端点: POST /oauth/introspect")

    # 启动服务
    asyncio.run(run_server(host, port))
    return 0

# 程序入口
if __name__ == "__main__":
    sys.exit(main())

7.2. mcp_client.py #

mcp_client.py

"""
MCP OAuth 客户端

按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入安全随机数模块,用于生成不可预测的 state
+import secrets
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 导入URL解析相关方法
+from urllib.parse import parse_qs, urlparse
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
    # 头部值不存在直接返回None
    if not www_authenticate:
        return None
    # 匹配resource_metadata字段的内容
    match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
    # 有匹配返回第一个分组,否则返回None
    return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
    # 无令牌访问资源预期获得401
    response = await client.get(resource_url)
    # 如果不是401,说明流程有误
    if response.status_code != 401:
        raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")

    # 从头部解析resource_metadata字段
    return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
async def fetch_json(client, url):
    # 发起GET请求
    resp = await client.get(url)
    # 请求状态非200会引发异常
    resp.raise_for_status()
    # 解析响应内容为JSON
    data = resp.json()
    # 必须是字典类型
    if not isinstance(data, dict):
        raise ValueError(f"元数据响应不是 JSON 对象: {url}")
    # 返回解析结果
    return data
# 异步动态客户端注册
async def register_client(client, registration_endpoint, redirect_uri):
    # 通过POST向注册端点发起注册请求
    resp = await client.post(
        registration_endpoint,
        json={
            "client_name": "MCP OAuth 客户端",
            "redirect_uris": [redirect_uri],
        },
    )
    # 检查响应状态
    resp.raise_for_status()
    # 解析响应
    data = resp.json()
    # 获取client_id
    client_id = data.get("client_id")
    # client_id需为非空字符串
    if not isinstance(client_id, str) or not client_id:
        raise ValueError("动态注册失败:未返回客户端标识")
    # 返回client_id
    return client_id    
# 用户授权并获取授权码(用用户名密码)
+async def authorize_with_user(
+   client,#httpx异步客户端
+   authorization_endpoint,#授权端点
+   client_id,#客户端标识
+   redirect_uri,#回调地址
+   username,#用户名
+   password,#密码
+):

    # 固定state用于校验,用于防止CSRF(Cross-Site Request Forgery,跨站请求伪造)攻击
    # CSRF是指攻击者利用你“已登录某网站”的身份,在你不知情时让浏览器替你发起恶意请求。
    # 客户端发起授权请求时带上 state,授权服务器重定向回来时原样带回 state,客户端只接受“回调里 state 与自己最初保存的 state 完全一致”的响应
    # 攻击者即使诱导用户点击恶意链接,也很难猜中客户端会话里的 state,回调校验会失败,从而拒绝这次授权结果
    # 使用高熵随机 state 防止 CSRF,必须每次授权请求都不同且不可预测。
+   state = secrets.token_urlsafe(32)
    # 构造GET到授权端点,携带相关参数(用户名等,模拟快速通过)
+   resp = await client.get(
+       authorization_endpoint,#授权端点
+       params={
+           "response_type": "code",
+           "client_id": client_id,#客户端标识
+           "redirect_uri": redirect_uri,#回调地址
+           "scope": "user",#作用域
+           "state": state,#状态
+           "username": username,#用户名
+           "password": password,#密码
+           "approve": "true",#批准
+       },
+       follow_redirects=False,#不跟随重定向
+   )
    # 授权返回302/303为正常
+   if resp.status_code not in (302, 303):
+       raise RuntimeError(f"授权失败,状态码: {resp.status_code}, body: {resp.text}")

    # 获取重定向Location
+   location = resp.headers.get("location")
+   if not location:
+       raise RuntimeError("授权失败:未返回回调地址")
    # 解析location为URL
+   parsed = urlparse(location)
    # 解析query部分
+   qs = parse_qs(parsed.query)
    # 获取授权码与state
+   code = (qs.get("code") or [None])[0]
+   returned_state = (qs.get("state") or [None])[0]
    # 校验state
+   if returned_state != state:
+       raise RuntimeError("状态参数校验失败")
    # 校验授权码
+   if not code:
+       raise RuntimeError("授权失败:未返回授权码")
    # 返回code
+   return code    
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
    # 创建默认30秒超时的httpx异步客户端,自动跟随重定向
    async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
        # 第一步:无token请求受保护资源
        print("1) 请求受保护资源(无 token)...")
        resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
        print(f"   收到资源元数据地址: {resource_metadata_url}")
         # 第二步:获取资源元数据
        print("2) 获取受保护资源元数据...")
        resource_metadata = await fetch_json(http_client, resource_metadata_url)
        # 检查authorization_servers字段
        auth_servers = resource_metadata.get("authorization_servers")
        if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
            raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
        # 选取第一个授权服务器
        auth_server = auth_servers[0].rstrip("/")
        print(f"   授权服务器: {auth_server}")
        # 第三步:获取授权服务器元数据
        print("3) 获取授权服务器元数据...")
        # 构造授权服务器元数据地址
        auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
        print(f"   授权服务器元数据地址: {auth_metadata_url}")
        # 异步获取授权服务器元数据(JSON)
        auth_metadata = await fetch_json(http_client, auth_metadata_url)
        # 从元数据中获取授权端点
        authorization_endpoint = str(auth_metadata.get("authorization_endpoint", ""))
        # 从元数据中获取令牌端点
        token_endpoint = str(auth_metadata.get("token_endpoint", ""))
        # 从元数据中获取注册端点
        registration_endpoint = str(auth_metadata.get("registration_endpoint", ""))
        # 检查三个端点是否都存在,否则抛出异常
        if not authorization_endpoint or not token_endpoint or not registration_endpoint:
            raise ValueError("授权服务器元数据缺少关键端点")
        # 打印授权端点
        print(f"   授权端点: {authorization_endpoint}")
        # 打印令牌端点
        print(f"   令牌端点: {token_endpoint}")
        # 打印注册端点
        print(f"   注册端点: {registration_endpoint}")
        # 第四步:动态客户端注册
        print("4) 动态客户端注册...")
        client_id = await register_client(http_client, registration_endpoint, redirect_uri)
        print(f"   客户端标识: {client_id}")
        # 第五步:授权获取授权码
+       print("5) 引导用户授权并获取授权码...")
+       code = await authorize_with_user(
+           http_client,#httpx异步客户端
+           authorization_endpoint,#授权端点
+           client_id,#客户端标识
+           redirect_uri,#回调地址
+           username,#用户名
+           password,#密码
+       )#返回授权码
+       print(f"   授权码: {code}")#打印授权码

# 命令行主入口
def main():
    try:
        # 受保护资源的元数据地址
        resource_url = "http://127.0.0.1:8000/mcp"
        # 用户名(模拟登录用)
        username = "admin"
        # 密码(模拟登录用)
        password = "123456"
        # 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
        redirect_uri = "http://127.0.0.1:8000/callback"
        # 运行主业务流程
        asyncio.run(run_client(resource_url, username, password, redirect_uri))
        return 0
    except httpx.HTTPStatusError as e:
        # HTTP请求状态异常统一输出
        print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
        return 1
    except Exception as e:
        # 其他异常统一输出
        print(f"执行失败: {e}")
        import traceback

        traceback.print_exc()
        return 1


# 判断是否作为主程序入口执行
if __name__ == "__main__":
    # 设置标准输入输出流的编码为utf-8(如果支持该方法)
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    try:
        # 启动主函数,异常时按约定返回码
        sys.exit(main())
    except KeyboardInterrupt:
        # 捕获Ctrl+C中断,友好打印并退出
        print("\n客户端已停止")
        sys.exit(130)

8. 使用授权码换取access_token #

8.1. auth_server.py #

auth_server.py

"""
OAuth2 认证服务器(独立运行)

实现完整授权码流程(含元数据发现与动态注册):
- 授权服务器元数据:GET /.well-known/oauth-authorization-server
- 动态客户端注册:POST /oauth/register
- 授权端点:GET /oauth/authorize
- 令牌端点:POST /oauth/token(authorization_code)
- 令牌自省:POST /oauth/introspect(RFC 7662)
"""

# 导入异步IO支持
import asyncio
# 导入操作系统环境变量支持
import os
# 导入随机数生成工具
import secrets
# 导入系统库函数
import sys
# 导入时间函数
import time
# 导入URL编码工具
from urllib.parse import urlencode
# 导入Starlette微框架相关组件
from starlette.applications import Starlette
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Route
# 导入uvicorn用于运行ASGI应用
from uvicorn import Config, Server
# 用户数据,用户名:密码
USERS = {
    "admin": "123456"
}
# 客户端注册信息字典
_clients = {}
# 授权码存储字典
_auth_codes = {}
# 令牌存储字典
+_tokens = {}
# 规范化scope参数,默认为user
def _normalize_scopes(scope_str):
    if not scope_str:
        return ["user"]
    scopes = [s for s in scope_str.split() if s]
    return scopes or ["user"]
# 从请求对象获取服务器的issuer信息
def _issuer_from_request(request):
    # request.url.scheme:获取请求的协议,通常是 http 或 https。
    # request.url.netloc:获取请求的“网络位置”,即域名(或 IP 地址)加上可选的端口号。例如 example.com、localhost:8080。
    return f"{request.url.scheme}://{request.url.netloc}"
# 授权服务器元数据发现端点
async def authorization_server_metadata(request):
    # 获取issuer信息
    issuer = _issuer_from_request(request)
    # 授权端点 用于引导用户授权
    authorization_endpoint = f"{issuer}/oauth/authorize"
    # 令牌端点  用于获取访问令牌
    token_endpoint = f"{issuer}/oauth/token"
    # 注册端点 用于动态注册客户端
    registration_endpoint = f"{issuer}/oauth/register"
    # 自省端点 用于验证令牌
    introspection_endpoint = f"{issuer}/oauth/introspect"
    # 构造元数据响应
    metadata = {
        "issuer": issuer,# 认证服务器地址
        "authorization_endpoint": authorization_endpoint, # 授权端点
        "token_endpoint": token_endpoint, # 令牌端点
        "registration_endpoint": registration_endpoint, # 注册端点
        "introspection_endpoint": introspection_endpoint, # 自省端点
        "response_types_supported": ["code"], # 支持的响应类型
        "grant_types_supported": ["authorization_code"], # 支持的授权类型
        "scopes_supported": ["user"], # 支持的scope,scope指的是访问资源范围
    }
    # 返回JSON响应
    return JSONResponse(metadata)


# 动态客户端注册端点
async def register_client(request):
    try:
        # 解析请求体JSON
        body = await request.json()
    except Exception:
        # JSON解析失败,返回错误
        return JSONResponse(
            {"error": "invalid_client_metadata", "error_description": "JSON 请求体无效"},
            status_code=400,
        )

    # 获取redirect_uris参数
    redirect_uris = body.get("redirect_uris")
    # 校验redirect_uris类型和内容
    if not isinstance(redirect_uris, list) or not redirect_uris:
        return JSONResponse(
            {"error": "invalid_redirect_uri", "error_description": "缺少 redirect_uris"},
            status_code=400,
        )
    if not all(isinstance(uri, str) and uri.startswith(("http://", "https://")) for uri in redirect_uris):
        return JSONResponse(
            {"error": "invalid_redirect_uri", "error_description": "redirect_uris 必须是 HTTP/HTTPS 地址"},
            status_code=400,
        )

    # 生成client_id
    client_id = f"client_{secrets.token_hex(8)}"
    # 构造客户端注册信息
    client = {
        "client_id": client_id,
        "client_name": body.get("client_name", "MCP OAuth 客户端"),
        "redirect_uris": redirect_uris,
        "grant_types": ["authorization_code"],
        "response_types": ["code"],
        "scope": "user",
    }
    # 存储客户端信息
    _clients[client_id] = client
    # 返回注册结果
    return JSONResponse(client, status_code=201)

# 授权端点处理函数,处理用户授权请求
async def authorize_endpoint(request):
    # 获取URL查询参数对象
    query = request.query_params
    # 获取response_type参数
    response_type = query.get("response_type")
    # 获取client_id参数
    client_id = query.get("client_id")
    # 获取redirect_uri参数
    redirect_uri = query.get("redirect_uri")
    # 获取scope参数,默认为"user"
    scope = query.get("scope", "user")
    # 获取state参数
    state = query.get("state")
    # 获取用户名
    username = query.get("username")
    # 获取密码
    password = query.get("password")
    # 获取approve参数,并转换为布尔值,判断用户是否同意授权
    approve = query.get("approve", "false").lower() == "true"

    # 检查response_type是否为"code",否则返回错误响应
    if response_type != "code":
        return JSONResponse({"error": "unsupported_response_type"}, status_code=400)
    # 校验client_id和redirect_uri两个必要参数,缺少则返回错误
    if not client_id or not redirect_uri:
        return JSONResponse({"error": "invalid_request", "error_description": "缺少 client_id 或 redirect_uri"}, status_code=400)

    # 根据client_id查找已注册的客户端,未找到则返回错误
    client = _clients.get(client_id)
    if not client:
        return JSONResponse({"error": "unauthorized_client", "error_description": "未知的 client_id"}, status_code=400)
    # 校验redirect_uri是否是在注册信息中登记过的回调地址
    if redirect_uri not in client.get("redirect_uris", []):
        return JSONResponse({"error": "invalid_request", "error_description": "redirect_uri 未注册"}, status_code=400)

    # 判断用户名、密码和用户同意(approve)是否都已提交,否则返回需要用户交互的错误
    if not username or not password or not approve:
        return JSONResponse(
            {
                "error": "interaction_required",  # 需要用户进一步操作
                "error_description": "需要用户凭据并同意授权",  # 错误说明
                "hint": "请调用 /oauth/authorize 并携带 username、password、approve=true",  # 提示
                "required_params": ["username", "password", "approve=true"],  # 要求这些参数
            },
            status_code=400,
        )

    # 校验用户名和密码,若不正确则返回拒绝访问
    if USERS.get(username) != password:
        return JSONResponse({"error": "access_denied", "error_description": "用户名或密码错误"}, status_code=401)

    # 生成授权码,格式为code_xxx
    code = f"code_{secrets.token_hex(16)}"
    # 存储授权码相关数据,包括客户端ID、回调地址、用户名、作用域、过期时间
    _auth_codes[code] = {
        "client_id": client_id, #客户端标识
        "redirect_uri": redirect_uri, #回调地址
        "username": username, #用户名
        "scopes": _normalize_scopes(scope), #作用域
        "expires_at": int(time.time()) + 300,  # 授权码5分钟后过期时间
    }

    # 构造回调参数字典,包含授权码
    params = {"code": code}
    # 如果state参数存在,也一并回传
    if state:
        params["state"] = state
    # 拼接完整回调地址,附带参数
    callback_url = f"{redirect_uri}?{urlencode(params)}"
    # 重定向到客户端回调地址,并返回授权码等信息
    return RedirectResponse(url=callback_url, status_code=302)

# 生成访问令牌并存储,返回令牌相关数据
+def _issue_token(client_id, subject, scopes=None):
    # 生成一个随机的访问令牌,前缀为"mcp_"
+   token = f"mcp_{secrets.token_hex(24)}"
    # 令牌有效时间,单位为秒,这里为3600秒(1小时)
+   expires_in = 3600
    # 计算令牌到期的时间戳(当前时间+有效期)
+   expires_at = int(time.time()) + expires_in
    # 在全局_tokens字典中存储该令牌对应的信息,便于后续校验和自省
+   _tokens[token] = {
+       "client_id": client_id,               # 客户端标识
+       "sub": subject,                       # 用户唯一标识,通常为用户名
+       "scopes": scopes or ["user"],         # 令牌作用域,默认为["user"]
+       "expires_at": expires_at,             # 令牌过期时间戳
+   }
    # 返回令牌响应内容,包括token本身、类型、有效期、作用域
+   return {
+       "access_token": token,                # 访问令牌
+       "token_type": "Bearer",               # 令牌类型为Bearer
+       "expires_in": expires_in,             # 距离过期剩余秒数
+       "scope": " ".join(scopes or ["user"]),# 空格分隔的作用域字符串
+   }
# 令牌颁发端点(authorization_code模式)
+async def token_endpoint(request):
+   try:
        # 解析表单数据
+       form = await request.form()
+   except Exception:
        # 表单解析失败
+       return JSONResponse(
+           {"error": "invalid_request", "error_description": "表单数据无效"},
+           status_code=400,
+       )

    # 获取授权模式
+   grant_type = form.get("grant_type")
+   if grant_type != "authorization_code":
+       return JSONResponse(
+           {"error": "unsupported_grant_type", "error_description": "仅支持 authorization_code 授权类型"},
+           status_code=400,
+       )

    # 获取参数
+   code = form.get("code")
+   client_id = form.get("client_id")
+   redirect_uri = form.get("redirect_uri")
    # 校验参数存在性
+   if not code or not client_id or not redirect_uri:
+       return JSONResponse(
+           {"error": "invalid_request", "error_description": "缺少 code/client_id/redirect_uri"},
+           status_code=400,
+       )
    # 校验参数类型
+   if not isinstance(code, str) or not isinstance(client_id, str) or not isinstance(redirect_uri, str):
+       return JSONResponse(
+           {"error": "invalid_request", "error_description": "参数类型错误"},
+           status_code=400,
+       )

    # 获取授权码对应的数据
+   code_data = _auth_codes.get(code)
+   if not code_data:
+       return JSONResponse({"error": "invalid_grant", "error_description": "授权码不存在"}, status_code=400)
    # 检查授权码是否过期
+   if code_data["expires_at"] < time.time():
+       del _auth_codes[code]
+       return JSONResponse({"error": "invalid_grant", "error_description": "授权码已过期"}, status_code=400)
    # 校验client_id和redirect_uri是否匹配
+   if code_data["client_id"] != client_id or code_data["redirect_uri"] != redirect_uri:
+       return JSONResponse({"error": "invalid_grant", "error_description": "授权码校验失败"}, status_code=400)

    # 授权码只能用一次,用后删除
+   del _auth_codes[code]
    # 发放访问令牌
+   token_data = _issue_token(client_id=client_id, subject=code_data["username"], scopes=code_data["scopes"])
    # 返回令牌
+   return JSONResponse(token_data)
# 创建Starlette应用,并注册路由
def create_app():
    routes = [
        Route("/.well-known/oauth-authorization-server", authorization_server_metadata, methods=["GET"]),
        Route("/oauth/register", register_client, methods=["POST"]),
+       Route("/oauth/authorize", authorize_endpoint, methods=["GET"]),
+       Route("/oauth/token", token_endpoint, methods=["POST"]),
    ]
    return Starlette(routes=routes)
# 运行ASGI服务器
async def run_server(host, port):
    # 创建应用
    app = create_app()
    # 配置uvicorn
    config = Config(app, host=host, port=port, log_level="info")
    # 创建服务器对象
    server = Server(config)
    # 启动服务器服务
    await server.serve()

# 入口函数,负责环境和服务启动
def main():
    # 从环境变量获取服务IP和端口,默认127.0.0.1:9000
    host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
    port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))

    # 服务端点说明
    print(f"OAuth2 认证服务器: http://{host}:{port}")
    print("  元数据端点: GET /.well-known/oauth-authorization-server")
    print("  动态注册端点: POST /oauth/register")
    print("  授权端点: GET /oauth/authorize")
    print("  令牌端点: POST /oauth/token (grant_type=authorization_code)")
    print("  自省端点: POST /oauth/introspect")

    # 启动服务
    asyncio.run(run_server(host, port))
    return 0

# 程序入口
if __name__ == "__main__":
    sys.exit(main())

8.2. mcp_client.py #

mcp_client.py

"""
MCP OAuth 客户端

按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入安全随机数模块,用于生成不可预测的 state
import secrets
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 导入URL解析相关方法
from urllib.parse import parse_qs, urlparse
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
    # 头部值不存在直接返回None
    if not www_authenticate:
        return None
    # 匹配resource_metadata字段的内容
    match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
    # 有匹配返回第一个分组,否则返回None
    return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
    # 无令牌访问资源预期获得401
    response = await client.get(resource_url)
    # 如果不是401,说明流程有误
    if response.status_code != 401:
        raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")

    # 从头部解析resource_metadata字段
    return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
async def fetch_json(client, url):
    # 发起GET请求
    resp = await client.get(url)
    # 请求状态非200会引发异常
    resp.raise_for_status()
    # 解析响应内容为JSON
    data = resp.json()
    # 必须是字典类型
    if not isinstance(data, dict):
        raise ValueError(f"元数据响应不是 JSON 对象: {url}")
    # 返回解析结果
    return data
# 异步动态客户端注册
async def register_client(client, registration_endpoint, redirect_uri):
    # 通过POST向注册端点发起注册请求
    resp = await client.post(
        registration_endpoint,
        json={
            "client_name": "MCP OAuth 客户端",
            "redirect_uris": [redirect_uri],
        },
    )
    # 检查响应状态
    resp.raise_for_status()
    # 解析响应
    data = resp.json()
    # 获取client_id
    client_id = data.get("client_id")
    # client_id需为非空字符串
    if not isinstance(client_id, str) or not client_id:
        raise ValueError("动态注册失败:未返回客户端标识")
    # 返回client_id
    return client_id    
# 用户授权并获取授权码(用用户名密码)
async def authorize_with_user(
    client,#httpx异步客户端
    authorization_endpoint,#授权端点
    client_id,#客户端标识
    redirect_uri,#回调地址
    username,#用户名
    password,#密码
):

    # 固定state用于校验,用于防止CSRF(Cross-Site Request Forgery,跨站请求伪造)攻击
    # CSRF是指攻击者利用你“已登录某网站”的身份,在你不知情时让浏览器替你发起恶意请求。
    # 客户端发起授权请求时带上 state,授权服务器重定向回来时原样带回 state,客户端只接受“回调里 state 与自己最初保存的 state 完全一致”的响应
    # 攻击者即使诱导用户点击恶意链接,也很难猜中客户端会话里的 state,回调校验会失败,从而拒绝这次授权结果
    # 使用高熵随机 state 防止 CSRF,必须每次授权请求都不同且不可预测。
    state = secrets.token_urlsafe(32)
    # 构造GET到授权端点,携带相关参数(用户名等,模拟快速通过)
    resp = await client.get(
        authorization_endpoint,#授权端点
        params={
            "response_type": "code",
            "client_id": client_id,#客户端标识
            "redirect_uri": redirect_uri,#回调地址
            "scope": "user",#作用域
            "state": state,#状态
            "username": username,#用户名
            "password": password,#密码
            "approve": "true",#批准
        },
        follow_redirects=False,#不跟随重定向
    )
    # 授权返回302/303为正常
    if resp.status_code not in (302, 303):
        raise RuntimeError(f"授权失败,状态码: {resp.status_code}, body: {resp.text}")

    # 获取重定向Location
    location = resp.headers.get("location")
    if not location:
        raise RuntimeError("授权失败:未返回回调地址")
    # 解析location为URL
    parsed = urlparse(location)
    # 解析query部分
    qs = parse_qs(parsed.query)
    # 获取授权码与state
    code = (qs.get("code") or [None])[0]
    returned_state = (qs.get("state") or [None])[0]
    # 校验state
    if returned_state != state:
        raise RuntimeError("状态参数校验失败")
    # 校验授权码
    if not code:
        raise RuntimeError("授权失败:未返回授权码")
    # 返回code
+   return code   

# 异步函数:使用授权码换取访问令牌
+async def exchange_code_for_token(
+   client,                # httpx异步客户端实例
+   token_endpoint,        # 令牌端点URL
+   client_id,             # 客户端标识
+   code,                  # 授权码
+   redirect_uri,          # 回调地址
+):
    # 向令牌端点发起POST请求,提交用于授权码换取访问令牌的数据
+   resp = await client.post(
+       token_endpoint,     # 令牌端点URL
+       data={
+           "grant_type": "authorization_code",    # 授权类型为authorization_code
+           "client_id": client_id,                # 客户端标识
+           "code": code,                          # 授权码
+           "redirect_uri": redirect_uri,          # 回调地址
+       },
+       headers={"Content-Type": "application/x-www-form-urlencoded"}, # 指定表单编码
+   )
    # 如果响应状态码不是200,抛出异常
+   resp.raise_for_status()
    # 解析响应内容为JSON格式
+   data = resp.json()
    # 从返回数据中获取access_token字段
+   access_token = data.get("access_token")
    # 如果access_token字段不是字符串或为空,抛出异常
+   if not isinstance(access_token, str) or not access_token:
+       raise ValueError("令牌端点未返回访问令牌")
    # 返回最终获取到的令牌字符串
+   return access_token     
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
    # 创建默认30秒超时的httpx异步客户端,自动跟随重定向
    async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
        # 第一步:无token请求受保护资源
        print("1) 请求受保护资源(无 token)...")
        resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
        print(f"   收到资源元数据地址: {resource_metadata_url}")
         # 第二步:获取资源元数据
        print("2) 获取受保护资源元数据...")
        resource_metadata = await fetch_json(http_client, resource_metadata_url)
        # 检查authorization_servers字段
        auth_servers = resource_metadata.get("authorization_servers")
        if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
            raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
        # 选取第一个授权服务器
        auth_server = auth_servers[0].rstrip("/")
        print(f"   授权服务器: {auth_server}")
        # 第三步:获取授权服务器元数据
        print("3) 获取授权服务器元数据...")
        # 构造授权服务器元数据地址
        auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
        print(f"   授权服务器元数据地址: {auth_metadata_url}")
        # 异步获取授权服务器元数据(JSON)
        auth_metadata = await fetch_json(http_client, auth_metadata_url)
        # 从元数据中获取授权端点
        authorization_endpoint = str(auth_metadata.get("authorization_endpoint", ""))
        # 从元数据中获取令牌端点
        token_endpoint = str(auth_metadata.get("token_endpoint", ""))
        # 从元数据中获取注册端点
        registration_endpoint = str(auth_metadata.get("registration_endpoint", ""))
        # 检查三个端点是否都存在,否则抛出异常
        if not authorization_endpoint or not token_endpoint or not registration_endpoint:
            raise ValueError("授权服务器元数据缺少关键端点")
        # 打印授权端点
        print(f"   授权端点: {authorization_endpoint}")
        # 打印令牌端点
        print(f"   令牌端点: {token_endpoint}")
        # 打印注册端点
        print(f"   注册端点: {registration_endpoint}")
        # 第四步:动态客户端注册
        print("4) 动态客户端注册...")
        client_id = await register_client(http_client, registration_endpoint, redirect_uri)
        print(f"   客户端标识: {client_id}")
        # 第五步:授权获取授权码
        print("5) 引导用户授权并获取授权码...")
        code = await authorize_with_user(
            http_client,#httpx异步客户端
            authorization_endpoint,#授权端点
            client_id,#客户端标识
            redirect_uri,#回调地址
            username,#用户名
            password,#密码
        )#返回授权码
        print(f"   授权码: {code}")#打印授权码
        # 第六步:授权码换令牌
+       print("6) 使用授权码换取访问令牌...")
+       token = await exchange_code_for_token(http_client, token_endpoint, client_id, code, redirect_uri)
+       print(f"   访问令牌获取成功: {token}")

# 命令行主入口
def main():
    try:
        # 受保护资源的元数据地址
        resource_url = "http://127.0.0.1:8000/mcp"
        # 用户名(模拟登录用)
        username = "admin"
        # 密码(模拟登录用)
        password = "123456"
        # 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
        redirect_uri = "http://127.0.0.1:8000/callback"
        # 运行主业务流程
        asyncio.run(run_client(resource_url, username, password, redirect_uri))
        return 0
    except httpx.HTTPStatusError as e:
        # HTTP请求状态异常统一输出
        print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
        return 1
    except Exception as e:
        # 其他异常统一输出
        print(f"执行失败: {e}")
        import traceback

        traceback.print_exc()
        return 1


# 判断是否作为主程序入口执行
if __name__ == "__main__":
    # 设置标准输入输出流的编码为utf-8(如果支持该方法)
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    try:
        # 启动主函数,异常时按约定返回码
        sys.exit(main())
    except KeyboardInterrupt:
        # 捕获Ctrl+C中断,友好打印并退出
        print("\n客户端已停止")
        sys.exit(130)

9. 携带access_token访问受保护资源 #

9.1. auth_server.py #

auth_server.py

"""
OAuth2 认证服务器(独立运行)

实现完整授权码流程(含元数据发现与动态注册):
- 授权服务器元数据:GET /.well-known/oauth-authorization-server
- 动态客户端注册:POST /oauth/register
- 授权端点:GET /oauth/authorize
- 令牌端点:POST /oauth/token(authorization_code)
- 令牌自省:POST /oauth/introspect(RFC 7662)
"""

# 导入异步IO支持
import asyncio
# 导入操作系统环境变量支持
import os
# 导入随机数生成工具
import secrets
# 导入系统库函数
import sys
# 导入时间函数
import time
# 导入URL编码工具
from urllib.parse import urlencode
# 导入Starlette微框架相关组件
from starlette.applications import Starlette
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Route
# 导入uvicorn用于运行ASGI应用
from uvicorn import Config, Server
# 用户数据,用户名:密码
USERS = {
    "admin": "123456"
}
# 客户端注册信息字典
_clients = {}
# 授权码存储字典
_auth_codes = {}
# 令牌存储字典
_tokens = {}
# 规范化scope参数,默认为user
def _normalize_scopes(scope_str):
    if not scope_str:
        return ["user"]
    scopes = [s for s in scope_str.split() if s]
    return scopes or ["user"]
# 从请求对象获取服务器的issuer信息
def _issuer_from_request(request):
    # request.url.scheme:获取请求的协议,通常是 http 或 https。
    # request.url.netloc:获取请求的“网络位置”,即域名(或 IP 地址)加上可选的端口号。例如 example.com、localhost:8080。
    return f"{request.url.scheme}://{request.url.netloc}"
# 授权服务器元数据发现端点
async def authorization_server_metadata(request):
    # 获取issuer信息
    issuer = _issuer_from_request(request)
    # 授权端点 用于引导用户授权
    authorization_endpoint = f"{issuer}/oauth/authorize"
    # 令牌端点  用于获取访问令牌
    token_endpoint = f"{issuer}/oauth/token"
    # 注册端点 用于动态注册客户端
    registration_endpoint = f"{issuer}/oauth/register"
    # 自省端点 用于验证令牌
    introspection_endpoint = f"{issuer}/oauth/introspect"
    # 构造元数据响应
    metadata = {
        "issuer": issuer,# 认证服务器地址
        "authorization_endpoint": authorization_endpoint, # 授权端点
        "token_endpoint": token_endpoint, # 令牌端点
        "registration_endpoint": registration_endpoint, # 注册端点
        "introspection_endpoint": introspection_endpoint, # 自省端点
        "response_types_supported": ["code"], # 支持的响应类型
        "grant_types_supported": ["authorization_code"], # 支持的授权类型
        "scopes_supported": ["user"], # 支持的scope,scope指的是访问资源范围
    }
    # 返回JSON响应
    return JSONResponse(metadata)


# 动态客户端注册端点
async def register_client(request):
    try:
        # 解析请求体JSON
        body = await request.json()
    except Exception:
        # JSON解析失败,返回错误
        return JSONResponse(
            {"error": "invalid_client_metadata", "error_description": "JSON 请求体无效"},
            status_code=400,
        )

    # 获取redirect_uris参数
    redirect_uris = body.get("redirect_uris")
    # 校验redirect_uris类型和内容
    if not isinstance(redirect_uris, list) or not redirect_uris:
        return JSONResponse(
            {"error": "invalid_redirect_uri", "error_description": "缺少 redirect_uris"},
            status_code=400,
        )
    if not all(isinstance(uri, str) and uri.startswith(("http://", "https://")) for uri in redirect_uris):
        return JSONResponse(
            {"error": "invalid_redirect_uri", "error_description": "redirect_uris 必须是 HTTP/HTTPS 地址"},
            status_code=400,
        )

    # 生成client_id
    client_id = f"client_{secrets.token_hex(8)}"
    # 构造客户端注册信息
    client = {
        "client_id": client_id,
        "client_name": body.get("client_name", "MCP OAuth 客户端"),
        "redirect_uris": redirect_uris,
        "grant_types": ["authorization_code"],
        "response_types": ["code"],
        "scope": "user",
    }
    # 存储客户端信息
    _clients[client_id] = client
    # 返回注册结果
    return JSONResponse(client, status_code=201)

# 授权端点处理函数,处理用户授权请求
async def authorize_endpoint(request):
    # 获取URL查询参数对象
    query = request.query_params
    # 获取response_type参数
    response_type = query.get("response_type")
    # 获取client_id参数
    client_id = query.get("client_id")
    # 获取redirect_uri参数
    redirect_uri = query.get("redirect_uri")
    # 获取scope参数,默认为"user"
    scope = query.get("scope", "user")
    # 获取state参数
    state = query.get("state")
    # 获取用户名
    username = query.get("username")
    # 获取密码
    password = query.get("password")
    # 获取approve参数,并转换为布尔值,判断用户是否同意授权
    approve = query.get("approve", "false").lower() == "true"

    # 检查response_type是否为"code",否则返回错误响应
    if response_type != "code":
        return JSONResponse({"error": "unsupported_response_type"}, status_code=400)
    # 校验client_id和redirect_uri两个必要参数,缺少则返回错误
    if not client_id or not redirect_uri:
        return JSONResponse({"error": "invalid_request", "error_description": "缺少 client_id 或 redirect_uri"}, status_code=400)

    # 根据client_id查找已注册的客户端,未找到则返回错误
    client = _clients.get(client_id)
    if not client:
        return JSONResponse({"error": "unauthorized_client", "error_description": "未知的 client_id"}, status_code=400)
    # 校验redirect_uri是否是在注册信息中登记过的回调地址
    if redirect_uri not in client.get("redirect_uris", []):
        return JSONResponse({"error": "invalid_request", "error_description": "redirect_uri 未注册"}, status_code=400)

    # 判断用户名、密码和用户同意(approve)是否都已提交,否则返回需要用户交互的错误
    if not username or not password or not approve:
        return JSONResponse(
            {
                "error": "interaction_required",  # 需要用户进一步操作
                "error_description": "需要用户凭据并同意授权",  # 错误说明
                "hint": "请调用 /oauth/authorize 并携带 username、password、approve=true",  # 提示
                "required_params": ["username", "password", "approve=true"],  # 要求这些参数
            },
            status_code=400,
        )

    # 校验用户名和密码,若不正确则返回拒绝访问
    if USERS.get(username) != password:
        return JSONResponse({"error": "access_denied", "error_description": "用户名或密码错误"}, status_code=401)

    # 生成授权码,格式为code_xxx
    code = f"code_{secrets.token_hex(16)}"
    # 存储授权码相关数据,包括客户端ID、回调地址、用户名、作用域、过期时间
    _auth_codes[code] = {
        "client_id": client_id, #客户端标识
        "redirect_uri": redirect_uri, #回调地址
        "username": username, #用户名
        "scopes": _normalize_scopes(scope), #作用域
        "expires_at": int(time.time()) + 300,  # 授权码5分钟后过期时间
    }

    # 构造回调参数字典,包含授权码
    params = {"code": code}
    # 如果state参数存在,也一并回传
    if state:
        params["state"] = state
    # 拼接完整回调地址,附带参数
    callback_url = f"{redirect_uri}?{urlencode(params)}"
    # 重定向到客户端回调地址,并返回授权码等信息
    return RedirectResponse(url=callback_url, status_code=302)

# 生成访问令牌并存储,返回令牌相关数据
def _issue_token(client_id, subject, scopes=None):
    # 生成一个随机的访问令牌,前缀为"mcp_"
    token = f"mcp_{secrets.token_hex(24)}"
    # 令牌有效时间,单位为秒,这里为3600秒(1小时)
    expires_in = 3600
    # 计算令牌到期的时间戳(当前时间+有效期)
    expires_at = int(time.time()) + expires_in
    # 在全局_tokens字典中存储该令牌对应的信息,便于后续校验和自省
    _tokens[token] = {
        "client_id": client_id,               # 客户端标识
        "sub": subject,                       # 用户唯一标识,通常为用户名
        "scopes": scopes or ["user"],         # 令牌作用域,默认为["user"]
        "expires_at": expires_at,             # 令牌过期时间戳
    }
    # 返回令牌响应内容,包括token本身、类型、有效期、作用域
    return {
        "access_token": token,                # 访问令牌
        "token_type": "Bearer",               # 令牌类型为Bearer
        "expires_in": expires_in,             # 距离过期剩余秒数
        "scope": " ".join(scopes or ["user"]),# 空格分隔的作用域字符串
    }
# 检查令牌有效性,返回自省结果
+def _introspect_token(token):
    # 从全局_tokens字典中获取该token的信息
+   data = _tokens.get(token)
    # 如果未找到对应的token信息,则认为无效,返回None
+   if not data:
+       return None
    # 若令牌已过期(当前时间大于expires_at),则从dict中删除该token并返回None
+   if data["expires_at"] < time.time():
+       del _tokens[token]
+       return None
    # 如果令牌有效,构造自省返回字典
+   return {
        # 标记令牌为激活状态
+       "active": True,
        # 返回token绑定的客户端标识
+       "client_id": data["client_id"],
        # 返回该token对应的用户唯一标识
+       "sub": data["sub"],
        # 将scope列表连接为以空格分隔的字符串
+       "scope": " ".join(data["scopes"]),
        # 返回token的过期时间戳
+       "exp": data["expires_at"],
        # 颁发本自省响应时的iat时间戳
+       "iat": int(time.time()),
        # 令牌类型为Bearer
+       "token_type": "Bearer",
+   }
# 令牌自省端点
+async def introspect_endpoint(request):
+   try:
        # 解析表单数据
+       form = await request.form()
+   except Exception:
        # 解析失败,假定令牌无效
+       return JSONResponse({"active": False}, status_code=400)

    # 获取待自省的token
+   token = form.get("token")
    # 校验token
+   if not token or not isinstance(token, str):
+       return JSONResponse({"active": False})

    # 执行自省
+   result = _introspect_token(token)
+   if not result:
+       return JSONResponse({"active": False})
    # 返回自省信息
+   return JSONResponse(result)    
# 令牌颁发端点(authorization_code模式)
async def token_endpoint(request):
    try:
        # 解析表单数据
        form = await request.form()
    except Exception:
        # 表单解析失败
        return JSONResponse(
            {"error": "invalid_request", "error_description": "表单数据无效"},
            status_code=400,
        )

    # 获取授权模式
    grant_type = form.get("grant_type")
    if grant_type != "authorization_code":
        return JSONResponse(
            {"error": "unsupported_grant_type", "error_description": "仅支持 authorization_code 授权类型"},
            status_code=400,
        )

    # 获取参数
    code = form.get("code")
    client_id = form.get("client_id")
    redirect_uri = form.get("redirect_uri")
    # 校验参数存在性
    if not code or not client_id or not redirect_uri:
        return JSONResponse(
            {"error": "invalid_request", "error_description": "缺少 code/client_id/redirect_uri"},
            status_code=400,
        )
    # 校验参数类型
    if not isinstance(code, str) or not isinstance(client_id, str) or not isinstance(redirect_uri, str):
        return JSONResponse(
            {"error": "invalid_request", "error_description": "参数类型错误"},
            status_code=400,
        )

    # 获取授权码对应的数据
    code_data = _auth_codes.get(code)
    if not code_data:
        return JSONResponse({"error": "invalid_grant", "error_description": "授权码不存在"}, status_code=400)
    # 检查授权码是否过期
    if code_data["expires_at"] < time.time():
        del _auth_codes[code]
        return JSONResponse({"error": "invalid_grant", "error_description": "授权码已过期"}, status_code=400)
    # 校验client_id和redirect_uri是否匹配
    if code_data["client_id"] != client_id or code_data["redirect_uri"] != redirect_uri:
        return JSONResponse({"error": "invalid_grant", "error_description": "授权码校验失败"}, status_code=400)

    # 授权码只能用一次,用后删除
    del _auth_codes[code]
    # 发放访问令牌
    token_data = _issue_token(client_id=client_id, subject=code_data["username"], scopes=code_data["scopes"])
    # 返回令牌
    return JSONResponse(token_data)
# 创建Starlette应用,并注册路由
def create_app():
    routes = [
        Route("/.well-known/oauth-authorization-server", authorization_server_metadata, methods=["GET"]),
        Route("/oauth/register", register_client, methods=["POST"]),
        Route("/oauth/authorize", authorize_endpoint, methods=["GET"]),
        Route("/oauth/token", token_endpoint, methods=["POST"]),
+       Route("/oauth/introspect", introspect_endpoint, methods=["POST"]),
    ]
    return Starlette(routes=routes)
# 运行ASGI服务器
async def run_server(host, port):
    # 创建应用
    app = create_app()
    # 配置uvicorn
    config = Config(app, host=host, port=port, log_level="info")
    # 创建服务器对象
    server = Server(config)
    # 启动服务器服务
    await server.serve()

# 入口函数,负责环境和服务启动
def main():
    # 从环境变量获取服务IP和端口,默认127.0.0.1:9000
    host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
    port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))

    # 服务端点说明
    print(f"OAuth2 认证服务器: http://{host}:{port}")
    print("  元数据端点: GET /.well-known/oauth-authorization-server")
    print("  动态注册端点: POST /oauth/register")
    print("  授权端点: GET /oauth/authorize")
    print("  令牌端点: POST /oauth/token (grant_type=authorization_code)")
    print("  自省端点: POST /oauth/introspect")

    # 启动服务
    asyncio.run(run_server(host, port))
    return 0

# 程序入口
if __name__ == "__main__":
    sys.exit(main())

9.2. mcp_client.py #

mcp_client.py

"""
MCP OAuth 客户端

按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入安全随机数模块,用于生成不可预测的 state
import secrets
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 导入URL解析相关方法
from urllib.parse import parse_qs, urlparse
# 导入MCP的客户端会话管理
+from mcp import ClientSession
# 导入用于MCP流式HTTP通信的客户端
+from mcp.client.streamable_http import streamable_http_client
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
    # 头部值不存在直接返回None
    if not www_authenticate:
        return None
    # 匹配resource_metadata字段的内容
    match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
    # 有匹配返回第一个分组,否则返回None
    return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
    # 无令牌访问资源预期获得401
    response = await client.get(resource_url)
    # 如果不是401,说明流程有误
    if response.status_code != 401:
        raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")

    # 从头部解析resource_metadata字段
    return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
async def fetch_json(client, url):
    # 发起GET请求
    resp = await client.get(url)
    # 请求状态非200会引发异常
    resp.raise_for_status()
    # 解析响应内容为JSON
    data = resp.json()
    # 必须是字典类型
    if not isinstance(data, dict):
        raise ValueError(f"元数据响应不是 JSON 对象: {url}")
    # 返回解析结果
    return data
# 异步动态客户端注册
async def register_client(client, registration_endpoint, redirect_uri):
    # 通过POST向注册端点发起注册请求
    resp = await client.post(
        registration_endpoint,
        json={
            "client_name": "MCP OAuth 客户端",
            "redirect_uris": [redirect_uri],
        },
    )
    # 检查响应状态
    resp.raise_for_status()
    # 解析响应
    data = resp.json()
    # 获取client_id
    client_id = data.get("client_id")
    # client_id需为非空字符串
    if not isinstance(client_id, str) or not client_id:
        raise ValueError("动态注册失败:未返回客户端标识")
    # 返回client_id
    return client_id    
# 用户授权并获取授权码(用用户名密码)
async def authorize_with_user(
    client,#httpx异步客户端
    authorization_endpoint,#授权端点
    client_id,#客户端标识
    redirect_uri,#回调地址
    username,#用户名
    password,#密码
):

    # 固定state用于校验,用于防止CSRF(Cross-Site Request Forgery,跨站请求伪造)攻击
    # CSRF是指攻击者利用你“已登录某网站”的身份,在你不知情时让浏览器替你发起恶意请求。
    # 客户端发起授权请求时带上 state,授权服务器重定向回来时原样带回 state,客户端只接受“回调里 state 与自己最初保存的 state 完全一致”的响应
    # 攻击者即使诱导用户点击恶意链接,也很难猜中客户端会话里的 state,回调校验会失败,从而拒绝这次授权结果
    # 使用高熵随机 state 防止 CSRF,必须每次授权请求都不同且不可预测。
    state = secrets.token_urlsafe(32)
    # 构造GET到授权端点,携带相关参数(用户名等,模拟快速通过)
    resp = await client.get(
        authorization_endpoint,#授权端点
        params={
            "response_type": "code",
            "client_id": client_id,#客户端标识
            "redirect_uri": redirect_uri,#回调地址
            "scope": "user",#作用域
            "state": state,#状态
            "username": username,#用户名
            "password": password,#密码
            "approve": "true",#批准
        },
        follow_redirects=False,#不跟随重定向
    )
    # 授权返回302/303为正常
    if resp.status_code not in (302, 303):
        raise RuntimeError(f"授权失败,状态码: {resp.status_code}, body: {resp.text}")

    # 获取重定向Location
    location = resp.headers.get("location")
    if not location:
        raise RuntimeError("授权失败:未返回回调地址")
    # 解析location为URL
    parsed = urlparse(location)
    # 解析query部分
    qs = parse_qs(parsed.query)
    # 获取授权码与state
    code = (qs.get("code") or [None])[0]
    returned_state = (qs.get("state") or [None])[0]
    # 校验state
    if returned_state != state:
        raise RuntimeError("状态参数校验失败")
    # 校验授权码
    if not code:
        raise RuntimeError("授权失败:未返回授权码")
    # 返回code
    return code   

# 异步函数:使用授权码换取访问令牌
async def exchange_code_for_token(
    client,                # httpx异步客户端实例
    token_endpoint,        # 令牌端点URL
    client_id,             # 客户端标识
    code,                  # 授权码
    redirect_uri,          # 回调地址
):
    # 向令牌端点发起POST请求,提交用于授权码换取访问令牌的数据
    resp = await client.post(
        token_endpoint,     # 令牌端点URL
        data={
            "grant_type": "authorization_code",    # 授权类型为authorization_code
            "client_id": client_id,                # 客户端标识
            "code": code,                          # 授权码
            "redirect_uri": redirect_uri,          # 回调地址
        },
        headers={"Content-Type": "application/x-www-form-urlencoded"}, # 指定表单编码
    )
    # 如果响应状态码不是200,抛出异常
    resp.raise_for_status()
    # 解析响应内容为JSON格式
    data = resp.json()
    # 从返回数据中获取access_token字段
    access_token = data.get("access_token")
    # 如果access_token字段不是字符串或为空,抛出异常
    if not isinstance(access_token, str) or not access_token:
        raise ValueError("令牌端点未返回访问令牌")
    # 返回最终获取到的令牌字符串
    return access_token     

# 从MCP接口结果中提取文本内容
+def _extract_texts(result):
    # 用于存储所有文本块
+   texts = []
    # 尝试获取content或contents属性
+   content = getattr(result, "content", None) or getattr(result, "contents", [])
    # 遍历所有块
+   for block in content:
        # 如果有text属性,则提取
+       if hasattr(block, "text"):
+           texts.append(str(block.text))
        # 如果有type属性且为"text",提取text
+       elif hasattr(block, "type") and getattr(block, "type") == "text":
+           texts.append(str(getattr(block, "text", "")))
    # 返回提取到的所有文本
+   return texts
# 携带令牌访问受保护的MCP资源
+async def call_protected_mcp(resource_url, token):
    # 构建请求头,包含Bearer类型的访问令牌及指定内容类型为json
+   headers = {
+       "Authorization": f"Bearer {token}",
+       "Content-Type": "application/json",
+   }
    # 创建带认证头和60秒超时时间的httpx异步客户端
+   async with httpx.AsyncClient(headers=headers, timeout=60.0) as mcp_http_client:
        # 以认证客户端与受保护资源建立可流式操作的数据会话
+       async with streamable_http_client(resource_url, http_client=mcp_http_client) as (
+           read_stream,         # 读取数据流
+           write_stream,        # 写入数据流
+           _,                   # 占位符(忽略的返回值)
+       ):
            # 创建新的MCP客户端会话,传入读写流
+           async with ClientSession(read_stream, write_stream) as session:
                # 初始化MCP连接(完成协议握手)
+               await session.initialize()
                # 打印初始化成功提示
+               print("MCP 会话初始化成功")

                # 调用API列出所有受保护的资源对象
+               resources = await session.list_resources()
                # 提取资源对象的URI,生成列表
+               uris = [r.uri for r in resources.resources]
                # 打印所有可访问的受保护资源URI
+               print(f"受保护资源: {uris}")

                # 调用API获取当前可用方法工具列表
+               tools = await session.list_tools()
                # 提取工具名称,生成名称列表
+               names = [t.name for t in tools.tools]
                # 打印所有可访问工具列表
+               print(f"受保护工具: {names}")

                # 调用受保护工具get_user_data,传入user_id为current
+               result = await session.call_tool("get_user_data", {"user_id": "current"})
                # 解析工具响应,提取可用文本输出
+               texts = _extract_texts(result)
                # 打印工具调用结果(多个文本用|拼接,没有则显示(空))
+               print(f"调用 get_user_data(current): {' | '.join(texts) or '(空)'}")    
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
    # 创建默认30秒超时的httpx异步客户端,自动跟随重定向
    async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
        # 第一步:无token请求受保护资源
        print("1) 请求受保护资源(无 token)...")
        resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
        print(f"   收到资源元数据地址: {resource_metadata_url}")
         # 第二步:获取资源元数据
        print("2) 获取受保护资源元数据...")
        resource_metadata = await fetch_json(http_client, resource_metadata_url)
        # 检查authorization_servers字段
        auth_servers = resource_metadata.get("authorization_servers")
        if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
            raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
        # 选取第一个授权服务器
        auth_server = auth_servers[0].rstrip("/")
        print(f"   授权服务器: {auth_server}")
        # 第三步:获取授权服务器元数据
        print("3) 获取授权服务器元数据...")
        # 构造授权服务器元数据地址
        auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
        print(f"   授权服务器元数据地址: {auth_metadata_url}")
        # 异步获取授权服务器元数据(JSON)
        auth_metadata = await fetch_json(http_client, auth_metadata_url)
        # 从元数据中获取授权端点
        authorization_endpoint = str(auth_metadata.get("authorization_endpoint", ""))
        # 从元数据中获取令牌端点
        token_endpoint = str(auth_metadata.get("token_endpoint", ""))
        # 从元数据中获取注册端点
        registration_endpoint = str(auth_metadata.get("registration_endpoint", ""))
        # 检查三个端点是否都存在,否则抛出异常
        if not authorization_endpoint or not token_endpoint or not registration_endpoint:
            raise ValueError("授权服务器元数据缺少关键端点")
        # 打印授权端点
        print(f"   授权端点: {authorization_endpoint}")
        # 打印令牌端点
        print(f"   令牌端点: {token_endpoint}")
        # 打印注册端点
        print(f"   注册端点: {registration_endpoint}")
        # 第四步:动态客户端注册
        print("4) 动态客户端注册...")
        client_id = await register_client(http_client, registration_endpoint, redirect_uri)
        print(f"   客户端标识: {client_id}")
        # 第五步:授权获取授权码
        print("5) 引导用户授权并获取授权码...")
        code = await authorize_with_user(
            http_client,#httpx异步客户端
            authorization_endpoint,#授权端点
            client_id,#客户端标识
            redirect_uri,#回调地址
            username,#用户名
            password,#密码
        )#返回授权码
        print(f"   授权码: {code}")#打印授权码
        # 第六步:授权码换令牌
        print("6) 使用授权码换取访问令牌...")
        token = await exchange_code_for_token(http_client, token_endpoint, client_id, code, redirect_uri)
        print(f"   访问令牌获取成功: {token}")
         # 第七步:携令牌访问受保护MCP资源
+       print("7) 携带访问令牌请求 MCP 受保护资源...")
+       await call_protected_mcp(resource_url, token)
+       print("\n完整 OAuth 流程完成")

# 命令行主入口
def main():
    try:
        # 受保护资源的元数据地址
        resource_url = "http://127.0.0.1:8000/mcp"
        # 用户名(模拟登录用)
        username = "admin"
        # 密码(模拟登录用)
        password = "123456"
        # 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
        redirect_uri = "http://127.0.0.1:8000/callback"
        # 运行主业务流程
        asyncio.run(run_client(resource_url, username, password, redirect_uri))
        return 0
    except httpx.HTTPStatusError as e:
        # HTTP请求状态异常统一输出
        print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
        return 1
    except Exception as e:
        # 其他异常统一输出
        print(f"执行失败: {e}")
        import traceback

        traceback.print_exc()
        return 1


# 判断是否作为主程序入口执行
if __name__ == "__main__":
    # 设置标准输入输出流的编码为utf-8(如果支持该方法)
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
        sys.stdin.reconfigure(encoding="utf-8")
    try:
        # 启动主函数,异常时按约定返回码
        sys.exit(main())
    except KeyboardInterrupt:
        # 捕获Ctrl+C中断,友好打印并退出
        print("\n客户端已停止")
        sys.exit(130)

9.3. mcp_server.py #

mcp_server.py

# MCP 资源服务器(OAuth2 分离架构)
"""
MCP 资源服务器(OAuth2 分离架构)

与认证服务器分离,通过令牌自省(RFC 7662)验证令牌。
- 认证服务器:oauth_auth_server.py(端口 9000)
- 资源服务器:本服务(端口 8000)
"""

# 导入操作系统相关模块
import os
# 导入系统功能模块
import sys
# 导入 MCP 上下文与主服务类
from mcp.server.fastmcp import FastMCP
# 导入 MCP 的认证设置
from mcp.server.auth.settings import AuthSettings
# 从 pydantic 导入 AnyHttpUrl 类型,用于校验 URL
from pydantic import AnyHttpUrl
# 导入获取 access_token 的方法
+from mcp.server.auth.middleware.auth_context import get_access_token
# 读取主机地址,优先使用环境变量 MCP_HOS,否则默认 '127.0.0.1'
HOST = os.environ.get("MCP_HOST", "127.0.0.1")
# 读取端口号,优先使用环境变量 MCP_PORT,否则默认 8000(需转换为 int)
PORT = int(os.environ.get("MCP_PORT", "8000"))
# 资源服务器自身的公开 URL,带 /mcp 路径用于元数据发现
RESOURCE_URL = f"http://{HOST}:{PORT}/mcp"
# 导入自定义的令牌自省验证器
from token_verifier import IntrospectionTokenVerifier
# 认证服务器地址,优先通过环境变量 OAUTH_AUTH_SERVER,否则默认为 http://127.0.0.1:9000
AUTH_SERVER = os.environ.get("OAUTH_AUTH_SERVER", "http://127.0.0.1:9000")
# 令牌自省端点 URL
INTROSPECTION_URL = f"{AUTH_SERVER.rstrip('/')}/oauth/introspect"
# 创建 FastMCP 资源服务器实例
mcp = FastMCP(
    # 服务器名称
    name="OAuth资源服务器",
    host=HOST,
    port=PORT,
    # 令牌验证器:通过 introspect 端点验证
    token_verifier=IntrospectionTokenVerifier(INTROSPECTION_URL),
    # 认证相关设置
    auth=AuthSettings(
        # 指定认证服务器 Issuer 地址
        issuer_url=AnyHttpUrl(AUTH_SERVER),
        # 资源服务器自身的 URL
        resource_server_url=AnyHttpUrl(RESOURCE_URL),
        # 需要的 scope
        required_scopes=["user"],
    ),
)
# 获取当前用户 ID 的工具函数
+def _get_current_user_id():
    # 从认证上下文获取当前用户 ID
+   try:
        # 获取当前 access_token
+       token = get_access_token()
        # 返回 client_id(用户名),若无则返回 None
+       return token.client_id if token else None
+   except Exception:
        # 如果获取失败则返回 None
+       return None
# 注册受保护资源 user://profile
+@mcp.resource("user://profile")
+def get_user_profile():
    # 获取当前用户 ID,若获取不到则用“匿名用户”
+   user_id = _get_current_user_id() or "匿名用户"
    # 返回当前用户的基本资料字符串(JSON 结构)
+   return f'''{{
+ "user_id": "{user_id}",
+ "name": "用户 {user_id}",
+ "email": "{user_id}@example.com",
+ "role": "user"
+}}'''

# 注册受保护资源 user://stats
+@mcp.resource("user://stats")
+def get_user_stats():
    # 获取当前用户 ID,若获取不到则用“匿名用户”
+   user_id = _get_current_user_id() or "匿名用户"
    # 返回用户统计数据字符串(JSON 结构)
+   return f'''{{
+ "user_id": "{user_id}",
+ "requests_today": 42,
+ "last_active": "2024-01-15T10:30:00Z"
+}}'''

# 注册受保护工具 get_user_data
+@mcp.tool()
+def get_user_data(user_id="current", ctx=None):
    # 获取当前用户 ID
+   current = _get_current_user_id()
    # 判断请求的 user_id 是否为当前用户
+   if user_id == "current":
        # 如果当前未认证(current 为空),返回未认证信息
+       if not current:
+           return {"error": "未认证", "user_id": None}
        # 否则返回当前用户的基本信息
+       return {
+           "user_id": current,
+           "status": "active",
+           "last_login": "2024-01-15T10:30:00Z",
+       }
    # 如果请求的 user_id 等于当前用户
+   if current and user_id == current:
+       return {
+           "user_id": user_id,
+           "status": "active",
+           "last_login": "2024-01-15T10:30:00Z",
+       }
    # 既不是当前用户,也不是已知用户,返回无权访问或用户不存在
+   return {"user_id": user_id, "status": "unknown", "error": "无权访问或用户不存在"}

# 仅当该文件作为主程序直接运行时执行下列代码
if __name__ == "__main__":
    # 打印资源服务器地址
    print(f"MCP 资源服务器: {RESOURCE_URL}")
    # 启动 FastMCP 服务,采用 streamable-http 模式
    mcp.run(transport="streamable-http")

9.4. token_verifier.py #

token_verifier.py

"""
OAuth2 令牌自省验证器(RFC 7662)

资源服务器通过调用认证服务器的令牌自省端点验证 Bearer 令牌。
"""

# 导入 httpx 库用于发送 HTTP 请求
import httpx

# 从 mcp.server.auth.provider 模块导入 AccessToken 和 TokenVerifier
from mcp.server.auth.provider import AccessToken, TokenVerifier

# 定义 IntrospectionTokenVerifier 类,继承自 TokenVerifier
class IntrospectionTokenVerifier(TokenVerifier):
     # 通过 OAuth2 令牌自省(RFC 7662)验证令牌的类文档字符串
    """通过 OAuth2 令牌自省(RFC 7662)验证令牌。"""

    # 构造方法,接收自省端点地址 introspection_endpoint
    def __init__(self, introspection_endpoint):
        # 将自省端点地址保存到实例变量
        self.introspection_endpoint = introspection_endpoint
     # 定义异步验证令牌的方法
+   async def verify_token(self, token):
        # 方法文档,说明功能为调用认证服务器自省端点验证令牌
+       """调用认证服务器自省端点验证令牌。"""
        # 检查自省端点地址,只允许部分安全前缀
+       if not self.introspection_endpoint.startswith(
+           ("https://", "http://localhost", "http://127.0.0.1")
+       ):
            # 如果地址非法,立即返回 None
+           return None

        # 使用 httpx 异步客户端,并设置超时时间为 10 秒
+       async with httpx.AsyncClient(timeout=10.0) as client:
+           try:
                # 通过 POST 请求向自省端点发送带 token 字段的表单数据
+               response = await client.post(
+                   self.introspection_endpoint,
+                   data={"token": token},
+                   headers={"Content-Type": "application/x-www-form-urlencoded"},
+               )
                # 如果响应的状态码不是 200,表示校验失败,直接返回 None
+               if response.status_code != 200:
+                   return None

                # 解析响应体的 JSON 内容
+               data = response.json()
                # 检查 active 字段是否为 True,如果为 False 或缺失则返回 None
+               if not data.get("active", False):
+                   return None

                # 获取 sub(通常为当前用户)用于构造 AccessToken
+               subject = data.get("sub")
                # 返回 AccessToken 对象,字段包括令牌、client_id、scope、exp、aud
+               return AccessToken(
+                   token=token,
                    # 若 sub 存在并为字符串则用作 client_id,否则取 client_id 字段或 "unknown"
+                   client_id=subject if isinstance(subject, str) and subject else data.get("client_id", "unknown"),
                    # 解析 scope 字段(若有),按空格分割为列表,否则为空列表
+                   scopes=data.get("scope", "").split() if data.get("scope") else [],
                    # 有效期使用 exp 字段
+                   expires_at=data.get("exp"),
                    # aud 字段作为 resource 赋值
+                   resource=data.get("aud"),
+               )
+           except Exception:
                # 捕获并忽略所有异常,直接返回 None
+               return None    
← 上一节 43.分页 下一节 45.FunctionCalling →

访问验证

请输入访问令牌

Token不正确,请重新输入