1. MCP 授权 #
本节介绍 MCP(Microservice Control Platform,微服务控制平台)中的授权机制,采用 OAuth2 标准架构实现资源保护与访问控制。MCP 的授权机制分为认证服务器(auth server)和资源服务器(resource server)两部分,两者通过 OAuth2 Token Introspection(令牌自省,见 RFC 7662)协议解耦。
典型流程为:
令牌颁发
用户(Resource Owner)通过认证服务器,使用用户名密码方式(Resource Owner Password Credentials Grant)获取 access token。资源访问
客户端带上 Bearer Token(即 access token),访问 MCP 资源服务器的受保护接口。令牌验证
资源服务器收到请求后,并不自行校验令牌,而是通过 introspect 端点向认证服务器查询该 access token 是否有效、属于谁、scope等。权限控制
认证服务器在 introspection 响应中指明令牌的有效性、拥有的 scopes、所属 client_id 等。资源服务器据此决定是否放行,以及允许访问哪些资源。
优势说明:
- 实现了认证服务器与资源服务器的解耦,易于分布式部署。
- 支持细粒度权限与多终端/多应用授权。
- 可无缝对接第三方 OAuth2 生态软件。
2. auth_server.py #
auth_server.py
# OAuth2 认证服务器(独立运行)
"""
OAuth2 认证服务器(独立运行)
与资源服务器分离,负责:
- 令牌颁发:POST /oauth/token(Resource Owner Password Credentials 授权)
- 令牌自省:POST /oauth/introspect(RFC 7662,供资源服务器验证令牌)
演示用户:alice/123456、bob/123456
"""
# 兼容未来注解特性
from __future__ import annotations
# 导入异步、os操作、加密、系统、时间和类型相关模块
import asyncio
import os
import secrets
import sys
import time
from typing import Any
# 导入 Starlette 框架相关对象
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from uvicorn import Config, Server
# 定义演示用户,用户名到密码的映射
DEMO_USERS: dict[str, str] = {
"alice": "123456",
"bob": "123456",
}
# 已颁发令牌的存储,token 到包含 client_id、scopes、expires_at 的字典
_tokens: dict[str, dict[str, Any]] = {}
# 生成新的访问令牌
def _issue_token(username: str, scopes: list[str] | None = None) -> dict[str, Any]:
"""颁发访问令牌。"""
# 生成随机令牌
token = f"mcp_{secrets.token_hex(24)}"
# 设置令牌过期时间为当前时间加一小时
expires_at = int(time.time()) + 3600
# 保存令牌数据到全局 _tokens 字典
_tokens[token] = {
"client_id": username,
"scopes": scopes or ["user"],
"expires_at": expires_at,
}
# 返回包含令牌及相关信息的响应内容
return {
"access_token": token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": " ".join(scopes or ["user"]),
}
# 自省令牌:检查令牌是否有效并返回相关信息
def _introspect_token(token: str) -> dict[str, Any] | None:
"""自省令牌,返回 RFC 7662 格式。"""
# 取出令牌数据
data = _tokens.get(token)
# 若找不到则视为无效
if not data:
return None
# 若令牌已过期则删除并视为无效
if data["expires_at"] < time.time():
del _tokens[token]
return None
# 构造自省结果
return {
"active": True,
"client_id": data["client_id"],
"scope": " ".join(data["scopes"]),
"exp": data["expires_at"],
"iat": int(time.time()),
"token_type": "Bearer",
}
# 令牌颁发端点(Resource Owner Password Credentials)
async def token_endpoint(request: Request) -> JSONResponse:
"""OAuth2 令牌端点(Resource Owner Password Credentials Grant)。
POST /oauth/token
Content-Type: application/x-www-form-urlencoded
grant_type=password&username=alice&password=123456
"""
# 尝试获取表单数据
try:
form = await request.form()
except Exception:
# 表单解析失败
return JSONResponse(
{"error": "invalid_request", "error_description": "Invalid form data"},
status_code=400,
)
# 获取授权类型
grant_type = form.get("grant_type")
# 只支持 password 授权类型
if grant_type != "password":
return JSONResponse(
{"error": "unsupported_grant_type", "error_description": "Only password grant supported"},
status_code=400,
)
# 获取用户名和密码
username = form.get("username")
password = form.get("password")
# 检查用户名和密码是否提供
if not username or not password:
return JSONResponse(
{"error": "invalid_request", "error_description": "Missing username or password"},
status_code=400,
)
# 检查参数类型,必须为字符串
if not isinstance(username, str) or not isinstance(password, str):
return JSONResponse(
{"error": "invalid_request", "error_description": "Invalid parameter types"},
status_code=400,
)
# 验证用户名密码是否正确
if DEMO_USERS.get(username) != password:
return JSONResponse(
{"error": "invalid_grant", "error_description": "Invalid credentials"},
status_code=401,
)
# 获取 scope(权限),默认 user
scope_str = form.get("scope", "user")
scopes = scope_str.split() if isinstance(scope_str, str) else ["user"]
# 颁发令牌
token_data = _issue_token(username, scopes)
# 返回 JSON 响应
return JSONResponse(token_data)
# 令牌自省端点(供资源服务器验证令牌)
async def introspect_endpoint(request: Request) -> JSONResponse:
"""OAuth2 令牌自省端点(RFC 7662)。
资源服务器调用此端点验证客户端提供的 Bearer 令牌。
POST /oauth/introspect
Content-Type: application/x-www-form-urlencoded
token=<access_token>
"""
# 获取表单数据
try:
form = await request.form()
except Exception:
# 表单解析失败返回 active: False
return JSONResponse({"active": False}, status_code=400)
# 获取待自省的 token
token = form.get("token")
# 未提供 token 或类型不对
if not token or not isinstance(token, str):
return JSONResponse({"active": False})
# 查询 token
result = _introspect_token(token)
# 查询不到或已过期即 active: False
if not result:
return JSONResponse({"active": False})
# 返回自省结果
return JSONResponse(result)
# 创建 Starlette 应用并注册路由
def create_app() -> Starlette:
# 路由列表,包含令牌和自省两个端点
routes = [
Route("/oauth/token", token_endpoint, methods=["POST"]),
Route("/oauth/introspect", introspect_endpoint, methods=["POST"]),
]
# 返回应用
return Starlette(routes=routes)
# 启动 Uvicorn 服务器
async def run_server(host: str, port: int) -> None:
# 创建应用
app = create_app()
# 配置 uvicorn
config = Config(app, host=host, port=port, log_level="info")
# 创建服务器
server = Server(config)
# 启动服务器
await server.serve()
# 主函数,处理环境变量与启动
def main() -> int:
# 读取主机和端口(默认127.0.0.1:9000)
host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))
# 设置终端编码为utf-8
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
# 打印服务信息
print(f"OAuth2 认证服务器: http://{host}:{port}")
print(" 令牌端点: POST /oauth/token (grant_type=password&username=alice&password=123456)")
print(" 自省端点: POST /oauth/introspect")
print(" 演示用户: alice/123456, bob/123456")
print()
# 启动服务器
asyncio.run(run_server(host, port))
return 0
# Python 主程序入口
if __name__ == "__main__":
sys.exit(main())
3. mcp_client.py #
mcp_client.py
# MCP OAuth 客户端(完整流程)
"""
MCP OAuth 客户端(完整流程)
1. 向认证服务器获取令牌(Resource Owner Password Credentials)
2. 使用 Bearer 令牌访问 MCP 资源服务器
"""
# 导入未来注解特性
from __future__ import annotations
# 导入标准库
import argparse # 命令行参数解析
import asyncio # 异步支持
import os # 操作系统交互
import sys # 系统功能
from typing import Any # 类型提示
# 导入第三方库 httpx
import httpx # 异步 HTTP 客户端
# 导入 MCP 库
from mcp import ClientSession # MCP 客户端会话
from mcp.client.streamable_http import streamable_http_client # 可流式 HTTP 客户端
from pydantic import AnyUrl # URL 类型校验
# 从 MCP 结果提取文本内容
def _extract_texts(result: Any) -> list[str]:
# 初始化文本列表
texts: list[str] = []
# 尝试获取 result.content,如果不存在则尝试 result.contents
content = getattr(result, "content", None) or getattr(result, "contents", [])
# 遍历每个内容块,提取 text 字段
for block in content:
if hasattr(block, "text"):
texts.append(str(block.text))
elif hasattr(block, "type") and getattr(block, "type") == "text":
texts.append(str(getattr(block, "text", "")))
# 返回所有提取到的文本
return texts
# 异步获取访问令牌
async def fetch_token(client: httpx.AsyncClient, auth_url: str, username: str, password: str) -> str:
# 组织令牌请求 URL
token_url = f"{auth_url.rstrip('/')}/oauth/token"
# 发送 POST 请求,携带用户名和密码
response = await client.post(
token_url,
data={
"grant_type": "password",
"username": username,
"password": password,
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
# 检查响应是否成功,否则抛出异常
response.raise_for_status()
# 解析 JSON 数据
data = response.json()
# 获取 access_token 字段
access_token = data.get("access_token")
# 如果没有 access_token,抛异常
if not access_token:
raise ValueError("认证服务器未返回 access_token")
# 返回 access_token
return access_token
# OAuth 客户端主逻辑
async def run_client(
auth_url: str, # 认证服务器地址
resource_url: str, # 资源服务器地址
username: str, # 用户名
password: str, # 密码
) -> None:
# 创建 httpx 异步客户端(用于令牌获取)
async with httpx.AsyncClient(timeout=30.0) as http_client:
# 第一步:获取访问令牌
print("正在向认证服务器获取令牌...")
token = await fetch_token(http_client, auth_url, username, password)
print(f"令牌获取成功\n")
# 第二步:准备带有 Bearer 令牌的请求头
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
# 使用令牌访问 MCP 资源服务器,创建新的 httpx 客户端
async with httpx.AsyncClient(headers=headers, timeout=60.0) as mcp_client:
# 建立 MCP Streamable HTTP 通信通道 (得到读写流,第三个未用变量用 _ 占位)
async with streamable_http_client(resource_url, http_client=mcp_client) as (
read_stream,
write_stream,
_,
):
# 创建 MCP 客户端会话
async with ClientSession(read_stream, write_stream) as session:
# 初始化会话
print("会话初始化...")
await session.initialize()
print("会话初始化成功\n")
# 尝试列举受保护资源
try:
resources = await session.list_resources()
uris = [r.uri for r in resources.resources]
print(f"受保护资源: {uris}")
except Exception as e:
print(f"获取资源列表失败: {e}")
raise
# 尝试列举可用工具
try:
tools = await session.list_tools()
names = [t.name for t in tools.tools]
print(f"受保护工具: {names}\n")
except Exception as e:
print(f"获取工具列表失败: {e}")
raise
# 尝试读取 user://profile 资源
try:
result = await session.read_resource(AnyUrl("user://profile"))
texts = _extract_texts(result)
print(f"读取 user://profile: {' | '.join(texts) or '(空)'}")
except Exception as e:
print(f"读取 user://profile 失败: {e}")
# 尝试读取 user://stats 资源
try:
result = await session.read_resource(AnyUrl("user://stats"))
texts = _extract_texts(result)
print(f"读取 user://stats: {' | '.join(texts) or '(空)'}")
except Exception as e:
print(f"读取 user://stats 失败: {e}")
# 调用 get_user_data 工具
try:
result = await session.call_tool("get_user_data", {"user_id": "current"})
texts = _extract_texts(result)
print(f"调用 get_user_data(current): {' | '.join(texts) or '(空)'}")
except Exception as e:
print(f"调用 get_user_data 失败: {e}")
# 结束提示
print("\nOAuth 客户端演示完成")
# 主程序入口点
def main() -> int:
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description="MCP OAuth 客户端(完整流程)")
# 添加认证服务器参数
parser.add_argument(
"--auth-url",
default=os.environ.get("OAUTH_AUTH_SERVER", "http://127.0.0.1:9000"),
help="认证服务器 URL",
)
# 添加资源服务器参数
parser.add_argument(
"--resource-url",
default=os.environ.get("MCP_OAUTH_URL", "http://127.0.0.1:8000/mcp"),
help="MCP 资源服务器 URL",
)
# 添加用户名参数
parser.add_argument(
"--username",
default=os.environ.get("OAUTH_USERNAME", "alice"),
help="用户名(演示: alice, bob)",
)
# 添加密码参数
parser.add_argument(
"--password",
default=os.environ.get("OAUTH_PASSWORD", "123456"),
help="密码(演示: 123456)",
)
# 解析命令行参数为 args
args = parser.parse_args()
# 打印相关参数
print(f"认证服务器: {args.auth_url}")
print(f"资源服务器: {args.resource_url}")
print(f"用户: {args.username}\n")
# 主执行过程,处理异常和退出码
try:
# 运行主客户端协程
asyncio.run(run_client(args.auth_url, args.resource_url, args.username, args.password))
# 成功返回 0
return 0
except httpx.HTTPStatusError as e:
# 捕获 httpx 的 HTTP 状态异常
if e.response.status_code == 401:
print("认证失败:用户名或密码错误")
else:
print(f"HTTP 错误 {e.response.status_code}: {e}")
return 1
except Exception as e:
# 其他异常进一步判断401或输出异常信息
err_str = str(e).lower()
if "401" in err_str or "unauthorized" in err_str:
print("认证失败:请检查用户名和密码")
else:
print(f"连接失败: {e}")
import traceback
traceback.print_exc()
return 1
# 仅在直接运行本文件时执行
if __name__ == "__main__":
# 判断及设置标准输出输入的编码
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
try:
# 运行主逻辑并传递退出码
sys.exit(main())
except KeyboardInterrupt:
# 捕获 Ctrl+C 中断优雅退出
print("\n客户端已停止")
sys.exit(130)
4. mcp_server.py #
mcp_server.py
# MCP 资源服务器(OAuth2 分离架构)
"""
MCP 资源服务器(OAuth2 分离架构)
与认证服务器分离,通过 Token Introspection(RFC 7662)验证令牌。
- 认证服务器:oauth_auth_server.py(端口 9000)
- 资源服务器:本服务(端口 8000)
"""
# 导入操作系统相关模块
import os
# 导入系统功能模块
import sys
# 导入类型提示
from typing import Any
# 从 pydantic 导入 AnyHttpUrl 类型,用于校验 URL
from pydantic import AnyHttpUrl
# 导入 MCP 的认证设置
from mcp.server.auth.settings import AuthSettings
# 导入 MCP 上下文与主服务类
from mcp.server.fastmcp import Context, FastMCP
# 导入自定义的令牌自省验证器
from token_verifier import IntrospectionTokenVerifier
# 获取当前用户 ID 的工具函数
def _get_current_user_id() -> str | None:
# 从认证上下文获取当前用户 ID
try:
from mcp.server.auth.middleware.auth_context import get_access_token
# 获取当前 access_token
token = get_access_token()
# 返回 client_id(用户名),若无则返回 None
return token.client_id if token else None
except Exception:
# 如果获取失败则返回 None
return None
# 配置:资源服务器与认证服务器分离
# 获取主机地址,先查 FASTMCP_HOST,再查 MCP_OAUTH_HOST,否则用 127.0.0.1
HOST = os.environ.get("FASTMCP_HOST", os.environ.get("MCP_OAUTH_HOST", "127.0.0.1"))
# 获取端口号,先查 FASTMCP_PORT,再查 MCP_OAUTH_PORT,否则用 8000
PORT = int(os.environ.get("FASTMCP_PORT", os.environ.get("MCP_OAUTH_PORT", "8000")))
# 获取认证服务器地址,默认为 http://127.0.0.1:9000
AUTH_SERVER = os.environ.get("OAUTH_AUTH_SERVER", "http://127.0.0.1:9000")
# 资源服务器自身地址
RESOURCE_URL = f"http://{HOST}:{PORT}"
# 令牌自省端点
INTROSPECTION_URL = f"{AUTH_SERVER.rstrip('/')}/oauth/introspect"
# 创建 FastMCP 资源服务器实例
mcp = FastMCP(
# 服务器名称
name="OAuth Resource Server",
# 令牌验证器:通过 introspect 端点验证
token_verifier=IntrospectionTokenVerifier(INTROSPECTION_URL),
# 认证设置
auth=AuthSettings(
issuer_url=AnyHttpUrl(AUTH_SERVER), # 认证服务器 url
resource_server_url=AnyHttpUrl(RESOURCE_URL), # 资源服务器 url
required_scopes=["user"], # 要求 scope "user"
),
)
# 注册受保护资源 user://profile
@mcp.resource("user://profile")
def get_user_profile() -> str:
# 获取当前用户 ID,若获取不到则为 "anonymous"
user_id = _get_current_user_id() or "anonymous"
# 返回用户基本资料(JSON 字符串)
return f'''{{
"user_id": "{user_id}",
"name": "User {user_id}",
"email": "{user_id}@example.com",
"role": "user"
}}'''
# 注册受保护资源 user://stats
@mcp.resource("user://stats")
def get_user_stats() -> str:
# 获取当前用户 ID,若获取不到则为 "anonymous"
user_id = _get_current_user_id() or "anonymous"
# 返回用户统计数据(JSON 字符串)
return f'''{{
"user_id": "{user_id}",
"requests_today": 42,
"last_active": "2024-01-15T10:30:00Z"
}}'''
# 注册受保护工具 get_user_data
@mcp.tool()
def get_user_data(user_id: str = "current", ctx: Context | None = None) -> dict[str, Any]:
# 获取当前用户 ID
current = _get_current_user_id()
# 请求为当前用户
if user_id == "current":
# 如果没有认证,返回未认证信息
if not current:
return {"error": "未认证", "user_id": None}
# 返回当前用户的基本数据
return {
"user_id": current,
"status": "active",
"last_login": "2024-01-15T10:30:00Z",
}
# 请求的 user_id 等于当前用户
if current and user_id == current:
return {
"user_id": user_id,
"status": "active",
"last_login": "2024-01-15T10:30:00Z",
}
# 既不是当前用户,也无权访问
return {"user_id": user_id, "status": "unknown", "error": "无权访问或用户不存在"}
# 仅主程序直接运行时启用
if __name__ == "__main__":
# 检查 stdout 是否支持 reconfigure(Py3.7+)
if hasattr(sys.stdout, "reconfigure"):
# 设置 stdout 的编码为 utf-8
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
# 设置环境变量,用于内部配置
os.environ.setdefault("FASTMCP_HOST", HOST)
os.environ.setdefault("FASTMCP_PORT", str(PORT))
# 打印资源服务器地址
print(f"MCP 资源服务器: http://{HOST}:{PORT}/mcp")
# 打印认证服务器地址
print(f"认证服务器: {AUTH_SERVER}")
# 提示先启动认证服务器
print("请先启动 oauth_auth_server.py,再启动本服务")
# 启动 FastMCP 服务(streamable-http 模式)
mcp.run(transport="streamable-http")
5. token_verifier.py #
token_verifier.py
"""OAuth2 令牌自省验证器(RFC 7662)
资源服务器通过调用认证服务器的 introspection 端点验证 Bearer 令牌。
"""
# 导入未来注解特性(兼容旧版Python类型提示)
from __future__ import annotations
# 导入 httpx 库用于发送 HTTP 请求
import httpx
# 从 mcp.server.auth.provider 模块导入 AccessToken 和 TokenVerifier
from mcp.server.auth.provider import AccessToken, TokenVerifier
# 定义 IntrospectionTokenVerifier 类,继承自 TokenVerifier
class IntrospectionTokenVerifier(TokenVerifier):
"""通过 OAuth2 Token Introspection(RFC 7662)验证令牌。"""
# 构造方法,接收 introspection_endpoint(自省端点地址)
def __init__(self, introspection_endpoint: str):
# 保存自省端点地址到实例变量
self.introspection_endpoint = introspection_endpoint
# 异步方法,用于验证令牌
async def verify_token(self, token: str) -> AccessToken | None:
"""调用认证服务器自省端点验证令牌。"""
# 安全检查,introspection_endpoint 必须以指定前缀(http://localhost等)开头
if not self.introspection_endpoint.startswith(
("https://", "http://localhost", "http://127.0.0.1")
):
# 若地址不合法,直接返回 None
return None
# 使用 httpx 异步客户端创建会话,请求超时 10 秒
async with httpx.AsyncClient(timeout=10.0) as client:
try:
# 向自省端点发送 POST 请求,带上 token 字段
response = await client.post(
self.introspection_endpoint,
data={"token": token},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
# 如果响应状态码不是 200,令牌无效,返回 None
if response.status_code != 200:
return None
# 解析响应 JSON 数据
data = response.json()
# 如果 active 字段为 False 或不存在,说明令牌无效,返回 None
if not data.get("active", False):
return None
# 构造 AccessToken 对象并返回
return AccessToken(
token=token,
client_id=data.get("client_id", "unknown"),
scopes=data.get("scope", "").split() if data.get("scope") else [],
expires_at=data.get("exp"),
resource=data.get("aud"),
)
except Exception:
# 捕获异常,如请求异常等,返回 None
return None6. 工作流程 #
6.1 整体架构 #
这是一个 OAuth2 认证服务器与资源服务器分离 的 MCP 示例,包含四个部分:
| 组件 | 文件 | 端口 | 职责 |
|---|---|---|---|
| 认证服务器 | auth_server.py |
9000 | 颁发令牌、自省令牌 |
| 资源服务器 | mcp_server.py |
8000 | 提供 MCP 工具和资源 |
| 令牌验证器 | token_verifier.py |
- | 调用认证服务器自省接口 |
| 客户端 | mcp_client.py |
- | 获取令牌并访问资源 |
6.2 各文件说明 #
6.2.1. auth_server.py — 认证服务器 #
基于 Starlette,提供两个 HTTP 端点:
① /oauth/token(令牌颁发)
- 使用 Resource Owner Password Credentials(密码模式)
- 请求:
POST,application/x-www-form-urlencoded - 参数:
grant_type=password、username、password - 校验
DEMO_USERS(alice/123456、bob/123456) - 成功后:生成
mcp_{hex(48)}令牌,存入_tokens,返回access_token、expires_in等
② /oauth/introspect(令牌自省,RFC 7662)
- 资源服务器用来验证令牌
- 请求:
POST,application/x-www-form-urlencoded,参数token - 根据
_tokens检查:- 存在且未过期 →
{"active": true, "client_id": "...", "scope": "...", "exp": ...} - 不存在或过期 →
{"active": false}
- 存在且未过期 →
6.2.2. token_verifier.py — 令牌验证器 #
实现 MCP 的 TokenVerifier 协议:
- 构造函数:接收
introspection_endpoint(如http://127.0.0.1:9000/oauth/introspect) verify_token(token):异步调用POST /oauth/introspect,传入token- 若
active: true→ 构造并返回AccessToken(client_id、scopes、expires_at 等) - 否则或异常 → 返回
None
6.2.3. mcp_server.py — 资源服务器 #
配置:
token_verifier=IntrospectionTokenVerifier(INTROSPECTION_URL):用自省接口验证AuthSettings:issuer_url、resource_server_url、required_scopes=["user"]
资源与工具:
user://profile:返回当前用户资料user://stats:返回当前用户统计get_user_data:根据user_id返回用户数据(支持current)
用户身份:
- 通过
get_access_token()从认证上下文获取当前AccessToken - 使用
client_id作为用户标识(如 alice、bob)
6.2.4. mcp_client.py — 客户端 #
流程:
- 获取令牌:
fetch_token()→POST /oauth/token,使用username、password - 访问资源:
streamable_http_client()带上Authorization: Bearer {token} - MCP 操作:
list_resources、list_tools、read_resource、call_tool
参数:
--auth-url、--resource-url、--username、--password(或环境变量)
6.3 时序图 #
6.3.1 完整 #
┌─────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Client │ │ Auth Server │ │ Resource Server │ │ Token Verifier │
│ mcp_client │ │ auth_server │ │ mcp_server │ │ token_verifier │
└──────┬──────┘ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │ │
│ ① POST /oauth/token │ │
│ grant_type=password │ │
│ username=alice&password=123456 │ │
│────────────────────────────────────────────► │
│ │ │ │
│ ② 200 OK │ │ │
│ {access_token, ...}│ │ │
│◄──────────────────────────────────────────── │
│ │ │ │
│ ③ MCP 请求 (Streamable HTTP) │ │
│ Authorization: Bearer <token> │ │
│──────────────────────────────────────────────────────────────────────►
│ │ │ │
│ │ │ ④ verify_token(token)│
│ │ │ │
│ │ │ ⑤ POST /oauth/introspect
│ │ │ token=<access_token> │
│ │ │──────────────────────►│
│ │ │ │
│ │ ⑥ POST /oauth/introspect │
│ │ token=<access_token> │ │
│ │◄──────────────────────│ │
│ │ │ │
│ │ ⑦ 200 OK │ │
│ │ {active:true, │ │
│ │ client_id:alice} │ │
│ │──────────────────────►│ │
│ │ │ │
│ │ │ ⑧ AccessToken │
│ │ │◄──────────────────────│
│ │ │ │
│ ⑨ MCP 响应 (资源/工具结果) │ │
│◄──────────────────────────────────────────────────────────────────────
│ │ │ │6.3.2 (客户端视角) #
┌─────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Client │ │ Auth Server │ │ Resource Server │
└──────┬──────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
│ ① 获取令牌 │ │
│ POST /oauth/token │ │
│ (username, password) │ │
│──────────────────────────►│ │
│ │ │
│ ② access_token │ │
│◄──────────────────────────│ │
│ │ │
│ ③ MCP 请求 │ │
│ Bearer <token> │ │
│─────────────────────────────────────────────────────────►│
│ │ │
│ │ ④ 自省验证 │
│ │ POST /oauth/introspect │
│ │◄────────────────────────────│
│ │ │
│ │ ⑤ {active, client_id} │
│ │─────────────────────────────►│
│ │ │
│ ⑥ MCP 响应 (资源/工具) │ │
│◄─────────────────────────────────────────────────────────│
│ │ │6.4 时序图 #
6.5 数据流小结 #
- 令牌获取:Client → Auth Server:
POST /oauth/token,返回access_token。 - 令牌携带:Client 在后续请求中带上
Authorization: Bearer <token>。 - 令牌验证:Resource Server 收到请求后,通过
TokenVerifier.verify_token()调用 Auth Server 的/oauth/introspect。 - 权限与身份:自省返回
active、client_id、scope等,Resource Server 据此决定是否允许访问并识别当前用户。