1. 什么是 MCP 授权? #
授权(Authorization)用来控制「谁可以访问什么」:只有通过身份验证、且被允许的用户,才能访问 MCP 服务器提供的敏感资源或执行敏感操作。
| 概念 | 通俗理解 |
|---|---|
| 无授权 | 任何人连上服务器就能用,适合本地、测试 |
| 有授权 | 需先登录、同意权限,才能调用工具或资源 |
MCP 使用 OAuth 2.1 标准实现授权,不绑定特定身份系统,可与各类授权服务(如 Keycloak、Auth0)配合。
MCP 授权基于以下标准:
| 标准 | 用途 |
|---|---|
| OAuth 2.1 | 核心授权框架 |
| RFC 8414 | 授权服务器元数据发现 |
| RFC 7591 | 动态客户端注册 |
| RFC 9728 | 受保护资源元数据 |
| RFC 8707 | 资源指示符 |
2. 本章你将学到 #
- 什么时候需要为 MCP 服务器加授权
- 本地服务器 vs 远程服务器在授权上的区别
- OAuth 授权流程的 6 个步骤
- 用 Keycloak + Python 搭建一个带授权的 MCP 服务器
- 常见安全陷阱及规避方法
3. 何时需要授权? #
授权是可选的,但以下场景强烈建议使用:
| 场景 | 说明 |
|---|---|
| 访问用户数据 | 邮件、文档、数据库等敏感信息 |
| 需要审计 | 记录谁在何时执行了哪些操作 |
| 第三方 API 需用户同意 | 如 OAuth 授权访问 Google、GitHub 等 |
| 企业环境 | 有严格访问控制要求 |
| 按用户限流或计费 | 需要区分不同用户的使用量 |
3.1 本地 vs 远程:授权方式不同 #
| 传输方式 | 典型场景 | 授权方式 |
|---|---|---|
| STDIO(本地) | 客户端和服务器在同一台机器 | 可用环境变量、本地凭据或第三方库,不一定走 OAuth |
| HTTP/SSE(远程) | 服务器在远程,客户端通过网络连接 | 通常用 OAuth:用户登录、授权,客户端拿到令牌后访问 |
小结:本地 STDIO 服务器授权方式灵活;远程 HTTP 服务器通常用 OAuth 建立「用户已授权」的信任。
4. 授权流程概览 #
当客户端连接受保护的 MCP 服务器时,大致经历以下步骤:
5. 授权流程:分步说明 #
5.1 初始握手:401 + 元数据地址 #
客户端首次请求时,服务器返回 401 Unauthorized,并在响应头中告诉客户端「去哪获取授权信息」:
HTTP/1.1 401 Unauthorized
WWW-Authenticate: Bearer realm="mcp",
resource_metadata="http://127.0.0.1:8000/.well-known/oauth-protected-resource/mcp"客户端据此知道:需要授权,且元数据文档在 resource_metadata 指向的地址。
5.2 受保护资源元数据(PRM) #
客户端请求该元数据文档,获取授权服务器地址、支持的 scope 等:
{
"resource": "http://127.0.0.1:8000/mcp",
"authorization_servers": [
"http://127.0.0.1:9000/"
],
"scopes_supported": [
"user"
],
"bearer_methods_supported": [
"header"
]
}更多字段见 RFC 9728 第 3.2 节。
5.3 授权服务器发现 #
客户端根据 PRM 中的 authorization_servers,请求授权服务器的元数据(OIDC Discovery 或 OAuth 2.0 元数据),获取登录、换令牌等端点:
http://127.0.0.1:9000/.well-known/oauth-authorization-server
{
"issuer": "http://127.0.0.1:9000",
"authorization_endpoint": "http://127.0.0.1:9000/oauth/authorize",
"token_endpoint": "http://127.0.0.1:9000/oauth/token",
"registration_endpoint": "http://127.0.0.1:9000/oauth/register",
"introspection_endpoint": "http://127.0.0.1:9000/oauth/introspect",
"response_types_supported": [
"code"
],
"grant_types_supported": [
"authorization_code"
],
"scopes_supported": [
"user"
]
}5.4 客户端注册 #
客户端需要在授权服务器上「登记」自己,有两种方式:
| 方式 | 说明 |
|---|---|
| 预注册 | 管理员提前在授权服务器中创建客户端,客户端内置 client_id 等 |
| 动态注册(DCR) | 客户端向 registration_endpoint 提交信息,自动完成注册 |
动态注册示例请求:
{
"client_name": "MCP OAuth 客户端",
"redirect_uris": ["http://127.0.0.1:8000/callback"],
}若授权服务器不支持 DCR,且客户端未预注册,则需要用户手动输入客户端信息。
5.5 用户授权 #
客户端打开浏览器,跳转到 /authorize,用户登录并同意授权。授权服务器重定向回客户端,并带上授权码;客户端用授权码换取访问令牌:
{
"access_token": "eyJhbGciOiJSUzI1NiIs...",# 访问令牌
"token_type": "Bearer", # 令牌类型为Bearer
"expires_in": 3600, # 距离过期剩余秒数
"scope": ["user"], # 空格分隔的作用域字符串
}5.6 发起已认证请求 #
客户端在后续请求中,将 access_token 放在 Authorization 头中:
GET http://127.0.0.1:8000/mcp HTTP/1.1
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...服务器验证令牌有效性及 scope,通过后处理请求。
1. 启动MCP服务器 #
uv add "mcp[cli]"1.1. .env #
.env
MCP_HOST=127.0.0.1
MCP_PORT=80001.2. mcp_server.py #
mcp_server.py
# MCP 资源服务器(OAuth2 分离架构)
"""
MCP 资源服务器(OAuth2 分离架构)
与认证服务器分离,通过令牌自省(RFC 7662)验证令牌。
- 认证服务器:oauth_auth_server.py(端口 9000)
- 资源服务器:本服务(端口 8000)
"""
# 导入操作系统相关模块
import os
# 导入系统功能模块
import sys
# 导入 MCP 上下文与主服务类
from mcp.server.fastmcp import FastMCP
# 读取主机地址,优先使用环境变量 MCP_HOS,否则默认 '127.0.0.1'
HOST = os.environ.get("MCP_HOST", "127.0.0.1")
# 读取端口号,优先使用环境变量 MCP_PORT,否则默认 8000(需转换为 int)
PORT = int(os.environ.get("MCP_PORT", "8000"))
# 资源服务器自身的公开 URL,带 /mcp 路径用于元数据发现
RESOURCE_URL = f"http://{HOST}:{PORT}/mcp"
# 创建 FastMCP 资源服务器实例
mcp = FastMCP(
# 服务器名称
name="OAuth资源服务器",
host=HOST,
port=PORT
)
# 仅当该文件作为主程序直接运行时执行下列代码
if __name__ == "__main__":
# 打印资源服务器地址
print(f"MCP 资源服务器: {RESOURCE_URL}")
# 启动 FastMCP 服务,采用 streamable-http 模式
mcp.run(transport="streamable-http")2. 启动MCP客户端 #
2.1. mcp_client.py #
mcp_client.py
"""
MCP OAuth 客户端
按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
# 头部值不存在直接返回None
if not www_authenticate:
return None
# 匹配resource_metadata字段的内容
match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
# 有匹配返回第一个分组,否则返回None
return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
# 无令牌访问资源预期获得401
response = await client.get(resource_url)
# 如果不是401,说明流程有误
if response.status_code != 401:
raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")
# 从头部解析resource_metadata字段
return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
# 创建默认30秒超时的httpx异步客户端,自动跟随重定向
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
# 第一步:无token请求受保护资源
print("1) 请求受保护资源(无 token)...")
resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
print(f" 收到资源元数据地址: {resource_metadata_url}")
# 命令行主入口
def main():
try:
# 受保护资源的元数据地址
resource_url = "http://127.0.0.1:8000/mcp"
# 用户名(模拟登录用)
username = "admin"
# 密码(模拟登录用)
password = "123456"
# 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
redirect_uri = "http://127.0.0.1:8000/callback"
# 运行主业务流程
asyncio.run(run_client(resource_url, username, password, redirect_uri))
return 0
except httpx.HTTPStatusError as e:
# HTTP请求状态异常统一输出
print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
return 1
except Exception as e:
# 其他异常统一输出
print(f"执行失败: {e}")
import traceback
traceback.print_exc()
return 1
# 判断是否作为主程序入口执行
if __name__ == "__main__":
# 设置标准输入输出流的编码为utf-8(如果支持该方法)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
try:
# 启动主函数,异常时按约定返回码
sys.exit(main())
except KeyboardInterrupt:
# 捕获Ctrl+C中断,友好打印并退出
print("\n客户端已停止")
sys.exit(130)
3. 元数据地址 #
http://127.0.0.1:8000/.well-known/oauth-protected-resource/mcp
{
"resource": "http://127.0.0.1:8000/mcp",
"authorization_servers": [
"http://127.0.0.1:9000/"
],
"scopes_supported": [
"user"
],
"bearer_methods_supported": [
"header"
]
}3.1. token_verifier.py #
token_verifier.py
"""
OAuth2 令牌自省验证器(RFC 7662)
资源服务器通过调用认证服务器的令牌自省端点验证 Bearer 令牌。
"""
# 导入 httpx 库用于发送 HTTP 请求
import httpx
# 从 mcp.server.auth.provider 模块导入 AccessToken 和 TokenVerifier
from mcp.server.auth.provider import AccessToken, TokenVerifier
# 定义 IntrospectionTokenVerifier 类,继承自 TokenVerifier
class IntrospectionTokenVerifier(TokenVerifier):
# 通过 OAuth2 令牌自省(RFC 7662)验证令牌的类文档字符串
"""通过 OAuth2 令牌自省(RFC 7662)验证令牌。"""
# 构造方法,接收自省端点地址 introspection_endpoint
def __init__(self, introspection_endpoint):
# 将自省端点地址保存到实例变量
self.introspection_endpoint = introspection_endpoint
3.2. mcp_server.py #
mcp_server.py
# MCP 资源服务器(OAuth2 分离架构)
"""
MCP 资源服务器(OAuth2 分离架构)
与认证服务器分离,通过令牌自省(RFC 7662)验证令牌。
- 认证服务器:oauth_auth_server.py(端口 9000)
- 资源服务器:本服务(端口 8000)
"""
# 导入操作系统相关模块
import os
# 导入系统功能模块
import sys
# 导入 MCP 上下文与主服务类
from mcp.server.fastmcp import FastMCP
# 导入 MCP 的认证设置
+from mcp.server.auth.settings import AuthSettings
# 从 pydantic 导入 AnyHttpUrl 类型,用于校验 URL
+from pydantic import AnyHttpUrl
# 读取主机地址,优先使用环境变量 MCP_HOS,否则默认 '127.0.0.1'
HOST = os.environ.get("MCP_HOST", "127.0.0.1")
# 读取端口号,优先使用环境变量 MCP_PORT,否则默认 8000(需转换为 int)
PORT = int(os.environ.get("MCP_PORT", "8000"))
# 资源服务器自身的公开 URL,带 /mcp 路径用于元数据发现
RESOURCE_URL = f"http://{HOST}:{PORT}/mcp"
# 导入自定义的令牌自省验证器
+from token_verifier import IntrospectionTokenVerifier
# 认证服务器地址,优先通过环境变量 OAUTH_AUTH_SERVER,否则默认为 http://127.0.0.1:9000
+AUTH_SERVER = os.environ.get("OAUTH_AUTH_SERVER", "http://127.0.0.1:9000")
# 令牌自省端点 URL
+INTROSPECTION_URL = f"{AUTH_SERVER.rstrip('/')}/oauth/introspect"
# 创建 FastMCP 资源服务器实例
mcp = FastMCP(
# 服务器名称
name="OAuth资源服务器",
host=HOST,
+ port=PORT,
# 令牌验证器:通过 introspect 端点验证
+ token_verifier=IntrospectionTokenVerifier(INTROSPECTION_URL),
# 认证相关设置
+ auth=AuthSettings(
# 指定认证服务器 Issuer 地址
+ issuer_url=AnyHttpUrl(AUTH_SERVER),
# 资源服务器自身的 URL
+ resource_server_url=AnyHttpUrl(RESOURCE_URL),
# 需要的 scope
+ required_scopes=["user"],
+ ),
)
# 仅当该文件作为主程序直接运行时执行下列代码
if __name__ == "__main__":
# 打印资源服务器地址
print(f"MCP 资源服务器: {RESOURCE_URL}")
# 启动 FastMCP 服务,采用 streamable-http 模式
mcp.run(transport="streamable-http")
4. 获取受保护资源元数据 #
4.1. mcp_client.py #
mcp_client.py
"""
MCP OAuth 客户端
按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
# 头部值不存在直接返回None
if not www_authenticate:
return None
# 匹配resource_metadata字段的内容
match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
# 有匹配返回第一个分组,否则返回None
return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
# 无令牌访问资源预期获得401
response = await client.get(resource_url)
# 如果不是401,说明流程有误
if response.status_code != 401:
raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")
# 从头部解析resource_metadata字段
return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
+async def fetch_json(client, url):
# 发起GET请求
+ resp = await client.get(url)
# 请求状态非200会引发异常
+ resp.raise_for_status()
# 解析响应内容为JSON
+ data = resp.json()
# 必须是字典类型
+ if not isinstance(data, dict):
+ raise ValueError(f"元数据响应不是 JSON 对象: {url}")
# 返回解析结果
+ return data
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
# 创建默认30秒超时的httpx异步客户端,自动跟随重定向
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
# 第一步:无token请求受保护资源
print("1) 请求受保护资源(无 token)...")
resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
print(f" 收到资源元数据地址: {resource_metadata_url}")
# 第二步:获取资源元数据
+ print("2) 获取受保护资源元数据...")
+ resource_metadata = await fetch_json(http_client, resource_metadata_url)
# 检查authorization_servers字段
+ auth_servers = resource_metadata.get("authorization_servers")
+ if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
+ raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
# 选取第一个授权服务器
+ auth_server = auth_servers[0].rstrip("/")
+ print(f" 授权服务器: {auth_server}")
# 命令行主入口
def main():
try:
# 受保护资源的元数据地址
resource_url = "http://127.0.0.1:8000/mcp"
# 用户名(模拟登录用)
username = "admin"
# 密码(模拟登录用)
password = "123456"
# 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
redirect_uri = "http://127.0.0.1:8000/callback"
# 运行主业务流程
asyncio.run(run_client(resource_url, username, password, redirect_uri))
return 0
except httpx.HTTPStatusError as e:
# HTTP请求状态异常统一输出
print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
return 1
except Exception as e:
# 其他异常统一输出
print(f"执行失败: {e}")
import traceback
traceback.print_exc()
return 1
# 判断是否作为主程序入口执行
if __name__ == "__main__":
# 设置标准输入输出流的编码为utf-8(如果支持该方法)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
try:
# 启动主函数,异常时按约定返回码
sys.exit(main())
except KeyboardInterrupt:
# 捕获Ctrl+C中断,友好打印并退出
print("\n客户端已停止")
sys.exit(130)
5. 获取授权服务器元数据 #
http://127.0.0.1:9000/.well-known/oauth-authorization-server{
"issuer": "http://127.0.0.1:9000", # 认证服务器地址
"authorization_endpoint": "http://127.0.0.1:9000/oauth/authorize", # 授权端点
"token_endpoint": "http://127.0.0.1:9000/oauth/token", # 令牌端点
"registration_endpoint": "http://127.0.0.1:9000/oauth/register", # 客户端动态注册端点
"introspection_endpoint": "http://127.0.0.1:9000/oauth/introspect", # 令牌自省端点
"response_types_supported": [
"code" # 支持的 response_type
],
"grant_types_supported": [
"authorization_code" # 支持的授权类型
],
"scopes_supported": [
"user" # 支持的 scope(权限域)
]
}5.1. auth_server.py #
auth_server.py
"""
OAuth2 认证服务器(独立运行)
实现完整授权码流程(含元数据发现与动态注册):
- 授权服务器元数据:GET /.well-known/oauth-authorization-server
- 动态客户端注册:POST /oauth/register
- 授权端点:GET /oauth/authorize
- 令牌端点:POST /oauth/token(authorization_code)
- 令牌自省:POST /oauth/introspect(RFC 7662)
"""
# 导入异步IO支持
import asyncio
# 导入操作系统环境变量支持
import os
# 导入随机数生成工具
import secrets
# 导入系统库函数
import sys
# 导入时间函数
import time
# 导入URL编码工具
from urllib.parse import urlencode
# 导入Starlette微框架相关组件
from starlette.applications import Starlette
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Route
# 导入uvicorn用于运行ASGI应用
from uvicorn import Config, Server
# 从请求对象获取服务器的issuer信息
def _issuer_from_request(request):
# request.url.scheme:获取请求的协议,通常是 http 或 https。
# request.url.netloc:获取请求的“网络位置”,即域名(或 IP 地址)加上可选的端口号。例如 example.com、localhost:8080。
return f"{request.url.scheme}://{request.url.netloc}"
# 授权服务器元数据发现端点
async def authorization_server_metadata(request):
# 获取issuer信息
issuer = _issuer_from_request(request)
# 授权端点 用于引导用户授权
authorization_endpoint = f"{issuer}/oauth/authorize"
# 令牌端点 用于获取访问令牌
token_endpoint = f"{issuer}/oauth/token"
# 注册端点 用于动态注册客户端
registration_endpoint = f"{issuer}/oauth/register"
# 自省端点 用于验证令牌
introspection_endpoint = f"{issuer}/oauth/introspect"
# 构造元数据响应
metadata = {
"issuer": issuer,# 认证服务器地址
"authorization_endpoint": authorization_endpoint, # 授权端点
"token_endpoint": token_endpoint, # 令牌端点
"registration_endpoint": registration_endpoint, # 注册端点
"introspection_endpoint": introspection_endpoint, # 自省端点
"response_types_supported": ["code"], # 支持的响应类型
"grant_types_supported": ["authorization_code"], # 支持的授权类型
"scopes_supported": ["user"], # 支持的scope,scope指的是访问资源范围
}
# 返回JSON响应
return JSONResponse(metadata)
# 创建Starlette应用,并注册路由
def create_app():
routes = [
Route("/.well-known/oauth-authorization-server", authorization_server_metadata, methods=["GET"])
]
return Starlette(routes=routes)
# 运行ASGI服务器
async def run_server(host, port):
# 创建应用
app = create_app()
# 配置uvicorn
config = Config(app, host=host, port=port, log_level="info")
# 创建服务器对象
server = Server(config)
# 启动服务器服务
await server.serve()
# 入口函数,负责环境和服务启动
def main():
# 从环境变量获取服务IP和端口,默认127.0.0.1:9000
host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))
# 服务端点说明
print(f"OAuth2 认证服务器: http://{host}:{port}")
print(" 元数据端点: GET /.well-known/oauth-authorization-server")
print(" 动态注册端点: POST /oauth/register")
print(" 授权端点: GET /oauth/authorize")
print(" 令牌端点: POST /oauth/token (grant_type=authorization_code)")
print(" 自省端点: POST /oauth/introspect")
# 启动服务
asyncio.run(run_server(host, port))
return 0
# 程序入口
if __name__ == "__main__":
sys.exit(main())
5.2. .env #
.env
MCP_HOST=127.0.0.1
MCP_PORT=8000
+OAUTH_AUTH_HOST=127.0.0.1
+OAUTH_AUTH_PORT=90005.3. mcp_client.py #
mcp_client.py
"""
MCP OAuth 客户端
按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
# 头部值不存在直接返回None
if not www_authenticate:
return None
# 匹配resource_metadata字段的内容
match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
# 有匹配返回第一个分组,否则返回None
return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
# 无令牌访问资源预期获得401
response = await client.get(resource_url)
# 如果不是401,说明流程有误
if response.status_code != 401:
raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")
# 从头部解析resource_metadata字段
return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
async def fetch_json(client, url):
# 发起GET请求
resp = await client.get(url)
# 请求状态非200会引发异常
resp.raise_for_status()
# 解析响应内容为JSON
data = resp.json()
# 必须是字典类型
if not isinstance(data, dict):
raise ValueError(f"元数据响应不是 JSON 对象: {url}")
# 返回解析结果
return data
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
# 创建默认30秒超时的httpx异步客户端,自动跟随重定向
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
# 第一步:无token请求受保护资源
print("1) 请求受保护资源(无 token)...")
resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
print(f" 收到资源元数据地址: {resource_metadata_url}")
# 第二步:获取资源元数据
print("2) 获取受保护资源元数据...")
resource_metadata = await fetch_json(http_client, resource_metadata_url)
# 检查authorization_servers字段
auth_servers = resource_metadata.get("authorization_servers")
if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
# 选取第一个授权服务器
auth_server = auth_servers[0].rstrip("/")
print(f" 授权服务器: {auth_server}")
# 第三步:获取授权服务器元数据
+ print("3) 获取授权服务器元数据...")
# 构造授权服务器元数据地址
+ auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
+ print(f" 授权服务器元数据地址: {auth_metadata_url}")
# 异步获取授权服务器元数据(JSON)
+ auth_metadata = await fetch_json(http_client, auth_metadata_url)
# 从元数据中获取授权端点
+ authorization_endpoint = str(auth_metadata.get("authorization_endpoint", ""))
# 从元数据中获取令牌端点
+ token_endpoint = str(auth_metadata.get("token_endpoint", ""))
# 从元数据中获取注册端点
+ registration_endpoint = str(auth_metadata.get("registration_endpoint", ""))
# 检查三个端点是否都存在,否则抛出异常
+ if not authorization_endpoint or not token_endpoint or not registration_endpoint:
+ raise ValueError("授权服务器元数据缺少关键端点")
# 打印授权端点
+ print(f" 授权端点: {authorization_endpoint}")
# 打印令牌端点
+ print(f" 令牌端点: {token_endpoint}")
# 打印注册端点
+ print(f" 注册端点: {registration_endpoint}")
# 命令行主入口
def main():
try:
# 受保护资源的元数据地址
resource_url = "http://127.0.0.1:8000/mcp"
# 用户名(模拟登录用)
username = "admin"
# 密码(模拟登录用)
password = "123456"
# 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
redirect_uri = "http://127.0.0.1:8000/callback"
# 运行主业务流程
asyncio.run(run_client(resource_url, username, password, redirect_uri))
return 0
except httpx.HTTPStatusError as e:
# HTTP请求状态异常统一输出
print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
return 1
except Exception as e:
# 其他异常统一输出
print(f"执行失败: {e}")
import traceback
traceback.print_exc()
return 1
# 判断是否作为主程序入口执行
if __name__ == "__main__":
# 设置标准输入输出流的编码为utf-8(如果支持该方法)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
try:
# 启动主函数,异常时按约定返回码
sys.exit(main())
except KeyboardInterrupt:
# 捕获Ctrl+C中断,友好打印并退出
print("\n客户端已停止")
sys.exit(130)
6. 客户端注册(动态注册) #
6.1. auth_server.py #
auth_server.py
"""
OAuth2 认证服务器(独立运行)
实现完整授权码流程(含元数据发现与动态注册):
- 授权服务器元数据:GET /.well-known/oauth-authorization-server
- 动态客户端注册:POST /oauth/register
- 授权端点:GET /oauth/authorize
- 令牌端点:POST /oauth/token(authorization_code)
- 令牌自省:POST /oauth/introspect(RFC 7662)
"""
# 导入异步IO支持
import asyncio
# 导入操作系统环境变量支持
import os
# 导入随机数生成工具
import secrets
# 导入系统库函数
import sys
# 导入时间函数
import time
# 导入URL编码工具
from urllib.parse import urlencode
# 导入Starlette微框架相关组件
from starlette.applications import Starlette
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Route
# 导入uvicorn用于运行ASGI应用
from uvicorn import Config, Server
# 客户端注册信息字典
+_clients = {}
# 从请求对象获取服务器的issuer信息
def _issuer_from_request(request):
# request.url.scheme:获取请求的协议,通常是 http 或 https。
# request.url.netloc:获取请求的“网络位置”,即域名(或 IP 地址)加上可选的端口号。例如 example.com、localhost:8080。
return f"{request.url.scheme}://{request.url.netloc}"
# 授权服务器元数据发现端点
async def authorization_server_metadata(request):
# 获取issuer信息
issuer = _issuer_from_request(request)
# 授权端点 用于引导用户授权
authorization_endpoint = f"{issuer}/oauth/authorize"
# 令牌端点 用于获取访问令牌
token_endpoint = f"{issuer}/oauth/token"
# 注册端点 用于动态注册客户端
registration_endpoint = f"{issuer}/oauth/register"
# 自省端点 用于验证令牌
introspection_endpoint = f"{issuer}/oauth/introspect"
# 构造元数据响应
metadata = {
"issuer": issuer,# 认证服务器地址
"authorization_endpoint": authorization_endpoint, # 授权端点
"token_endpoint": token_endpoint, # 令牌端点
"registration_endpoint": registration_endpoint, # 注册端点
"introspection_endpoint": introspection_endpoint, # 自省端点
"response_types_supported": ["code"], # 支持的响应类型
"grant_types_supported": ["authorization_code"], # 支持的授权类型
"scopes_supported": ["user"], # 支持的scope,scope指的是访问资源范围
}
# 返回JSON响应
return JSONResponse(metadata)
# 动态客户端注册端点
+async def register_client(request):
+ try:
# 解析请求体JSON
+ body = await request.json()
+ except Exception:
# JSON解析失败,返回错误
+ return JSONResponse(
+ {"error": "invalid_client_metadata", "error_description": "JSON 请求体无效"},
+ status_code=400,
+ )
# 获取redirect_uris参数
+ redirect_uris = body.get("redirect_uris")
# 校验redirect_uris类型和内容
+ if not isinstance(redirect_uris, list) or not redirect_uris:
+ return JSONResponse(
+ {"error": "invalid_redirect_uri", "error_description": "缺少 redirect_uris"},
+ status_code=400,
+ )
+ if not all(isinstance(uri, str) and uri.startswith(("http://", "https://")) for uri in redirect_uris):
+ return JSONResponse(
+ {"error": "invalid_redirect_uri", "error_description": "redirect_uris 必须是 HTTP/HTTPS 地址"},
+ status_code=400,
+ )
# 生成client_id
+ client_id = f"client_{secrets.token_hex(8)}"
# 构造客户端注册信息
+ client = {
+ "client_id": client_id,
+ "client_name": body.get("client_name", "MCP OAuth 客户端"),
+ "redirect_uris": redirect_uris,
+ "grant_types": ["authorization_code"],
+ "response_types": ["code"],
+ "scope": "user",
+ }
# 存储客户端信息
+ _clients[client_id] = client
# 返回注册结果
+ return JSONResponse(client, status_code=201)
# 创建Starlette应用,并注册路由
def create_app():
routes = [
+ Route("/.well-known/oauth-authorization-server", authorization_server_metadata, methods=["GET"]),
+ Route("/oauth/register", register_client, methods=["POST"]),
]
return Starlette(routes=routes)
# 运行ASGI服务器
async def run_server(host, port):
# 创建应用
app = create_app()
# 配置uvicorn
config = Config(app, host=host, port=port, log_level="info")
# 创建服务器对象
server = Server(config)
# 启动服务器服务
await server.serve()
# 入口函数,负责环境和服务启动
def main():
# 从环境变量获取服务IP和端口,默认127.0.0.1:9000
host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))
# 服务端点说明
print(f"OAuth2 认证服务器: http://{host}:{port}")
print(" 元数据端点: GET /.well-known/oauth-authorization-server")
print(" 动态注册端点: POST /oauth/register")
print(" 授权端点: GET /oauth/authorize")
print(" 令牌端点: POST /oauth/token (grant_type=authorization_code)")
print(" 自省端点: POST /oauth/introspect")
# 启动服务
asyncio.run(run_server(host, port))
return 0
# 程序入口
if __name__ == "__main__":
sys.exit(main())
6.2. mcp_client.py #
mcp_client.py
"""
MCP OAuth 客户端
按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
# 头部值不存在直接返回None
if not www_authenticate:
return None
# 匹配resource_metadata字段的内容
match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
# 有匹配返回第一个分组,否则返回None
return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
# 无令牌访问资源预期获得401
response = await client.get(resource_url)
# 如果不是401,说明流程有误
if response.status_code != 401:
raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")
# 从头部解析resource_metadata字段
return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
async def fetch_json(client, url):
# 发起GET请求
resp = await client.get(url)
# 请求状态非200会引发异常
resp.raise_for_status()
# 解析响应内容为JSON
data = resp.json()
# 必须是字典类型
if not isinstance(data, dict):
raise ValueError(f"元数据响应不是 JSON 对象: {url}")
# 返回解析结果
return data
# 异步动态客户端注册
+async def register_client(client, registration_endpoint, redirect_uri):
# 通过POST向注册端点发起注册请求
+ resp = await client.post(
+ registration_endpoint,
+ json={
+ "client_name": "MCP OAuth 客户端",
+ "redirect_uris": [redirect_uri],
+ },
+ )
# 检查响应状态
+ resp.raise_for_status()
# 解析响应
+ data = resp.json()
# 获取client_id
+ client_id = data.get("client_id")
# client_id需为非空字符串
+ if not isinstance(client_id, str) or not client_id:
+ raise ValueError("动态注册失败:未返回客户端标识")
# 返回client_id
+ return client_id
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
# 创建默认30秒超时的httpx异步客户端,自动跟随重定向
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
# 第一步:无token请求受保护资源
print("1) 请求受保护资源(无 token)...")
resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
print(f" 收到资源元数据地址: {resource_metadata_url}")
# 第二步:获取资源元数据
print("2) 获取受保护资源元数据...")
resource_metadata = await fetch_json(http_client, resource_metadata_url)
# 检查authorization_servers字段
auth_servers = resource_metadata.get("authorization_servers")
if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
# 选取第一个授权服务器
auth_server = auth_servers[0].rstrip("/")
print(f" 授权服务器: {auth_server}")
# 第三步:获取授权服务器元数据
print("3) 获取授权服务器元数据...")
# 构造授权服务器元数据地址
auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
print(f" 授权服务器元数据地址: {auth_metadata_url}")
# 异步获取授权服务器元数据(JSON)
auth_metadata = await fetch_json(http_client, auth_metadata_url)
# 从元数据中获取授权端点
authorization_endpoint = str(auth_metadata.get("authorization_endpoint", ""))
# 从元数据中获取令牌端点
token_endpoint = str(auth_metadata.get("token_endpoint", ""))
# 从元数据中获取注册端点
registration_endpoint = str(auth_metadata.get("registration_endpoint", ""))
# 检查三个端点是否都存在,否则抛出异常
if not authorization_endpoint or not token_endpoint or not registration_endpoint:
raise ValueError("授权服务器元数据缺少关键端点")
# 打印授权端点
print(f" 授权端点: {authorization_endpoint}")
# 打印令牌端点
print(f" 令牌端点: {token_endpoint}")
# 打印注册端点
print(f" 注册端点: {registration_endpoint}")
# 第四步:动态客户端注册
+ print("4) 动态客户端注册...")
+ client_id = await register_client(http_client, registration_endpoint, redirect_uri)
+ print(f" 客户端标识: {client_id}")
# 命令行主入口
def main():
try:
# 受保护资源的元数据地址
resource_url = "http://127.0.0.1:8000/mcp"
# 用户名(模拟登录用)
username = "admin"
# 密码(模拟登录用)
password = "123456"
# 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
redirect_uri = "http://127.0.0.1:8000/callback"
# 运行主业务流程
asyncio.run(run_client(resource_url, username, password, redirect_uri))
return 0
except httpx.HTTPStatusError as e:
# HTTP请求状态异常统一输出
print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
return 1
except Exception as e:
# 其他异常统一输出
print(f"执行失败: {e}")
import traceback
traceback.print_exc()
return 1
# 判断是否作为主程序入口执行
if __name__ == "__main__":
# 设置标准输入输出流的编码为utf-8(如果支持该方法)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
try:
# 启动主函数,异常时按约定返回码
sys.exit(main())
except KeyboardInterrupt:
# 捕获Ctrl+C中断,友好打印并退出
print("\n客户端已停止")
sys.exit(130)6.3 start.py #
# 一键编排启动脚本说明
"""
一键编排启动脚本:
1) 启动 auth_server.py
2) 启动 mcp_server.py
3) 等待两个服务就绪
4) 运行 mcp_client.py
5) 结束后自动停止后台服务
"""
# 导入标准库模块
import os
import subprocess
import sys
import time
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
# 加载 .env 文件中的环境变量到当前进程
def load_dotenv(dotenv_path: Path) -> None:
# 检查 .env 文件是否存在
if not dotenv_path.exists():
return
# 逐行读取 .env 文件内容
for raw_line in dotenv_path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
# 跳过空行、注释行和无等号行
if not line or line.startswith("#") or "=" not in line:
continue
# 拆分 KEY=VALUE 结构
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
# 设置环境变量
if key:
os.environ[key] = value
# 用于轮询检测 HTTP 服务是否就绪,直到收到期望状态码为止
def wait_http_ready(
name: str,
url: str,
expected_codes: set[int],
timeout_seconds: int = 40,
) -> None:
# 计算超时截止时间
# time.monotonic() 返回一个**单调递增**的时间值(单位秒),不会受系统时间被手动修改、NTP 校时、时区变化影响。
# + timeout_seconds` 把允许等待的秒数加上去,得到“最晚到什么时候必须结束等待”。
deadline = time.monotonic() + timeout_seconds
while time.monotonic() < deadline:
# 构造 HTTP GET 请求
req = Request(url=url, method="GET")
try:
with urlopen(req, timeout=2) as resp:
# 获取响应状态码
status = int(resp.status)
# 如果是期望的状态码则认为服务就绪
if status in expected_codes:
print(f"{name} is ready: {url} (HTTP {status})")
return
except HTTPError as exc:
# 某些接口如受保护资源返回 401 也可作为就绪信号
status = int(exc.code)
if status in expected_codes:
print(f"{name} is ready: {url} (HTTP {status})")
return
except URLError:
# 网络错误忽略,继续重试
pass
# 没有就绪则每秒重试
time.sleep(1)
# 超时后抛出异常
raise TimeoutError(f"{name} did not become ready in {timeout_seconds}s: {url}")
# 停止后台进程(先 terminate,失败再 kill),支持异常处理
def stop_process(process: subprocess.Popen | None, name: str) -> None:
# 检查进程是否存在且未退出
if process is None or process.poll() is not None:
return
try:
# 尝试优雅终止
process.terminate()
process.wait(timeout=5)
except Exception:
# 若无法优雅终止,则强制杀死
process.kill()
process.wait(timeout=5)
# 打印停止信息
print(f"Stopped {name} (pid={process.pid})")
# 主流程入口
def main() -> int:
# 获取代码仓库根目录
repo_root = Path(__file__).resolve().parent
# 切换工作目录到代码根目录
os.chdir(repo_root)
# 加载根目录下的 .env 环境变量
load_dotenv(repo_root / ".env")
# 读取或设置各服务的主机和端口
mcp_host = os.environ.get("MCP_HOST", "127.0.0.1")
mcp_port = int(os.environ.get("MCP_PORT", "8000"))
auth_host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
auth_port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))
# 设置默认的 OAuth 认证服务器地址
os.environ.setdefault("OAUTH_AUTH_SERVER", f"http://{auth_host}:{auth_port}")
# 创建 logs 目录用于保存日志
logs_dir = repo_root / "logs"
logs_dir.mkdir(exist_ok=True)
# 获取 Python 可执行文件路径
python_exe = sys.executable
# 进程对象初始化
auth_proc: subprocess.Popen | None = None
mcp_proc: subprocess.Popen | None = None
# 打开日志文件用于保存标准输出和标准错误
auth_out = (logs_dir / "auth_server.out.log").open("w", encoding="utf-8")
auth_err = (logs_dir / "auth_server.err.log").open("w", encoding="utf-8")
mcp_out = (logs_dir / "mcp_server.out.log").open("w", encoding="utf-8")
mcp_err = (logs_dir / "mcp_server.err.log").open("w", encoding="utf-8")
try:
# 启动 OAuth 认证服务器(auth_server.py)进程
print("Starting OAuth auth server...")
# 启动 OAuth 认证服务器(auth_server.py)进程,返回码为 subprocess.Popen() 的返回值
auth_proc = subprocess.Popen(
[python_exe, "auth_server.py"],
cwd=repo_root,
stdout=auth_out,
stderr=auth_err,
text=True,
)
# 启动 MCP 资源服务器(mcp_server.py)进程
print("Starting MCP resource server...")
# 启动 MCP 资源服务器(mcp_server.py)进程,返回码为 subprocess.Popen() 的返回值
mcp_proc = subprocess.Popen(
[python_exe, "mcp_server.py"],
cwd=repo_root,
stdout=mcp_out,
stderr=mcp_err,
text=True,
)
# 等待认证服务器元数据接口返回 200,确认其就绪
wait_http_ready(
name="Auth server",
url=f"http://{auth_host}:{auth_port}/.well-known/oauth-authorization-server",
expected_codes={200},
timeout_seconds=40,
)
# 等待MCP资源服务器元数据或资源端点返回 200 或 401
wait_http_ready(
name="MCP server",
url=f"http://{mcp_host}:{mcp_port}/mcp",
expected_codes={200, 401},
timeout_seconds=40,
)
# 运行 MCP 客户端 (mcp_client.py),输出会直接打印到当前控制台
print("Running MCP client...")
# 运行 MCP 客户端 (mcp_client.py),返回码为 subprocess.run() 的返回值
client_result = subprocess.run(
[python_exe, "mcp_client.py"],
cwd=repo_root,
text=True,
)
# 返回 MCP 客户端进程的返回码
return int(client_result.returncode)
finally:
# 关闭日志文件
auth_out.close()
auth_err.close()
mcp_out.close()
mcp_err.close()
# 停止已启动的后台服务进程
stop_process(mcp_proc, "mcp_server.py")
stop_process(auth_proc, "auth_server.py")
# 打印结束信息
print("Background servers stopped.")
# 判断是否作为脚本主程序运行
if __name__ == "__main__":
# 退出码为主流程的返回结果
sys.exit(main())7. 引导用户授权 #
7.1. auth_server.py #
auth_server.py
"""
OAuth2 认证服务器(独立运行)
实现完整授权码流程(含元数据发现与动态注册):
- 授权服务器元数据:GET /.well-known/oauth-authorization-server
- 动态客户端注册:POST /oauth/register
- 授权端点:GET /oauth/authorize
- 令牌端点:POST /oauth/token(authorization_code)
- 令牌自省:POST /oauth/introspect(RFC 7662)
"""
# 导入异步IO支持
import asyncio
# 导入操作系统环境变量支持
import os
# 导入随机数生成工具
import secrets
# 导入系统库函数
import sys
# 导入时间函数
import time
# 导入URL编码工具
from urllib.parse import urlencode
# 导入Starlette微框架相关组件
from starlette.applications import Starlette
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Route
# 导入uvicorn用于运行ASGI应用
from uvicorn import Config, Server
# 用户数据,用户名:密码
+USERS = {
+ "admin": "123456"
+}
# 客户端注册信息字典
_clients = {}
# 授权码存储字典
+_auth_codes = {}
# 规范化scope参数,默认为user
+def _normalize_scopes(scope_str):
+ if not scope_str:
+ return ["user"]
+ scopes = [s for s in scope_str.split() if s]
+ return scopes or ["user"]
# 从请求对象获取服务器的issuer信息
def _issuer_from_request(request):
# request.url.scheme:获取请求的协议,通常是 http 或 https。
# request.url.netloc:获取请求的“网络位置”,即域名(或 IP 地址)加上可选的端口号。例如 example.com、localhost:8080。
return f"{request.url.scheme}://{request.url.netloc}"
# 授权服务器元数据发现端点
async def authorization_server_metadata(request):
# 获取issuer信息
issuer = _issuer_from_request(request)
# 授权端点 用于引导用户授权
authorization_endpoint = f"{issuer}/oauth/authorize"
# 令牌端点 用于获取访问令牌
token_endpoint = f"{issuer}/oauth/token"
# 注册端点 用于动态注册客户端
registration_endpoint = f"{issuer}/oauth/register"
# 自省端点 用于验证令牌
introspection_endpoint = f"{issuer}/oauth/introspect"
# 构造元数据响应
metadata = {
"issuer": issuer,# 认证服务器地址
"authorization_endpoint": authorization_endpoint, # 授权端点
"token_endpoint": token_endpoint, # 令牌端点
"registration_endpoint": registration_endpoint, # 注册端点
"introspection_endpoint": introspection_endpoint, # 自省端点
"response_types_supported": ["code"], # 支持的响应类型
"grant_types_supported": ["authorization_code"], # 支持的授权类型
"scopes_supported": ["user"], # 支持的scope,scope指的是访问资源范围
}
# 返回JSON响应
return JSONResponse(metadata)
# 动态客户端注册端点
async def register_client(request):
try:
# 解析请求体JSON
body = await request.json()
except Exception:
# JSON解析失败,返回错误
return JSONResponse(
{"error": "invalid_client_metadata", "error_description": "JSON 请求体无效"},
status_code=400,
)
# 获取redirect_uris参数
redirect_uris = body.get("redirect_uris")
# 校验redirect_uris类型和内容
if not isinstance(redirect_uris, list) or not redirect_uris:
return JSONResponse(
{"error": "invalid_redirect_uri", "error_description": "缺少 redirect_uris"},
status_code=400,
)
if not all(isinstance(uri, str) and uri.startswith(("http://", "https://")) for uri in redirect_uris):
return JSONResponse(
{"error": "invalid_redirect_uri", "error_description": "redirect_uris 必须是 HTTP/HTTPS 地址"},
status_code=400,
)
# 生成client_id
client_id = f"client_{secrets.token_hex(8)}"
# 构造客户端注册信息
client = {
"client_id": client_id,
"client_name": body.get("client_name", "MCP OAuth 客户端"),
"redirect_uris": redirect_uris,
"grant_types": ["authorization_code"],
"response_types": ["code"],
"scope": "user",
}
# 存储客户端信息
_clients[client_id] = client
# 返回注册结果
return JSONResponse(client, status_code=201)
# 授权端点处理函数,处理用户授权请求
+async def authorize_endpoint(request):
# 获取URL查询参数对象
+ query = request.query_params
# 获取response_type参数
+ response_type = query.get("response_type")
# 获取client_id参数
+ client_id = query.get("client_id")
# 获取redirect_uri参数
+ redirect_uri = query.get("redirect_uri")
# 获取scope参数,默认为"user"
+ scope = query.get("scope", "user")
# 获取state参数
+ state = query.get("state")
# 获取用户名
+ username = query.get("username")
# 获取密码
+ password = query.get("password")
# 获取approve参数,并转换为布尔值,判断用户是否同意授权
+ approve = query.get("approve", "false").lower() == "true"
# 检查response_type是否为"code",否则返回错误响应
+ if response_type != "code":
+ return JSONResponse({"error": "unsupported_response_type"}, status_code=400)
# 校验client_id和redirect_uri两个必要参数,缺少则返回错误
+ if not client_id or not redirect_uri:
+ return JSONResponse({"error": "invalid_request", "error_description": "缺少 client_id 或 redirect_uri"}, status_code=400)
# 根据client_id查找已注册的客户端,未找到则返回错误
+ client = _clients.get(client_id)
+ if not client:
+ return JSONResponse({"error": "unauthorized_client", "error_description": "未知的 client_id"}, status_code=400)
# 校验redirect_uri是否是在注册信息中登记过的回调地址
+ if redirect_uri not in client.get("redirect_uris", []):
+ return JSONResponse({"error": "invalid_request", "error_description": "redirect_uri 未注册"}, status_code=400)
# 判断用户名、密码和用户同意(approve)是否都已提交,否则返回需要用户交互的错误
+ if not username or not password or not approve:
+ return JSONResponse(
+ {
+ "error": "interaction_required", # 需要用户进一步操作
+ "error_description": "需要用户凭据并同意授权", # 错误说明
+ "hint": "请调用 /oauth/authorize 并携带 username、password、approve=true", # 提示
+ "required_params": ["username", "password", "approve=true"], # 要求这些参数
+ },
+ status_code=400,
+ )
# 校验用户名和密码,若不正确则返回拒绝访问
+ if USERS.get(username) != password:
+ return JSONResponse({"error": "access_denied", "error_description": "用户名或密码错误"}, status_code=401)
# 生成授权码,格式为code_xxx
+ code = f"code_{secrets.token_hex(16)}"
# 存储授权码相关数据,包括客户端ID、回调地址、用户名、作用域、过期时间
+ _auth_codes[code] = {
+ "client_id": client_id, #客户端标识
+ "redirect_uri": redirect_uri, #回调地址
+ "username": username, #用户名
+ "scopes": _normalize_scopes(scope), #作用域
+ "expires_at": int(time.time()) + 300, # 授权码5分钟后过期时间
+ }
# 构造回调参数字典,包含授权码
+ params = {"code": code}
# 如果state参数存在,也一并回传
+ if state:
+ params["state"] = state
# 拼接完整回调地址,附带参数
+ callback_url = f"{redirect_uri}?{urlencode(params)}"
# 重定向到客户端回调地址,并返回授权码等信息
+ return RedirectResponse(url=callback_url, status_code=302)
# 创建Starlette应用,并注册路由
def create_app():
routes = [
Route("/.well-known/oauth-authorization-server", authorization_server_metadata, methods=["GET"]),
Route("/oauth/register", register_client, methods=["POST"]),
+ Route("/oauth/authorize", authorize_endpoint, methods=["GET"])
]
return Starlette(routes=routes)
# 运行ASGI服务器
async def run_server(host, port):
# 创建应用
app = create_app()
# 配置uvicorn
config = Config(app, host=host, port=port, log_level="info")
# 创建服务器对象
server = Server(config)
# 启动服务器服务
await server.serve()
# 入口函数,负责环境和服务启动
def main():
# 从环境变量获取服务IP和端口,默认127.0.0.1:9000
host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))
# 服务端点说明
print(f"OAuth2 认证服务器: http://{host}:{port}")
print(" 元数据端点: GET /.well-known/oauth-authorization-server")
print(" 动态注册端点: POST /oauth/register")
print(" 授权端点: GET /oauth/authorize")
print(" 令牌端点: POST /oauth/token (grant_type=authorization_code)")
print(" 自省端点: POST /oauth/introspect")
# 启动服务
asyncio.run(run_server(host, port))
return 0
# 程序入口
if __name__ == "__main__":
sys.exit(main())
7.2. mcp_client.py #
mcp_client.py
"""
MCP OAuth 客户端
按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入安全随机数模块,用于生成不可预测的 state
+import secrets
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 导入URL解析相关方法
+from urllib.parse import parse_qs, urlparse
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
# 头部值不存在直接返回None
if not www_authenticate:
return None
# 匹配resource_metadata字段的内容
match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
# 有匹配返回第一个分组,否则返回None
return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
# 无令牌访问资源预期获得401
response = await client.get(resource_url)
# 如果不是401,说明流程有误
if response.status_code != 401:
raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")
# 从头部解析resource_metadata字段
return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
async def fetch_json(client, url):
# 发起GET请求
resp = await client.get(url)
# 请求状态非200会引发异常
resp.raise_for_status()
# 解析响应内容为JSON
data = resp.json()
# 必须是字典类型
if not isinstance(data, dict):
raise ValueError(f"元数据响应不是 JSON 对象: {url}")
# 返回解析结果
return data
# 异步动态客户端注册
async def register_client(client, registration_endpoint, redirect_uri):
# 通过POST向注册端点发起注册请求
resp = await client.post(
registration_endpoint,
json={
"client_name": "MCP OAuth 客户端",
"redirect_uris": [redirect_uri],
},
)
# 检查响应状态
resp.raise_for_status()
# 解析响应
data = resp.json()
# 获取client_id
client_id = data.get("client_id")
# client_id需为非空字符串
if not isinstance(client_id, str) or not client_id:
raise ValueError("动态注册失败:未返回客户端标识")
# 返回client_id
return client_id
# 用户授权并获取授权码(用用户名密码)
+async def authorize_with_user(
+ client,#httpx异步客户端
+ authorization_endpoint,#授权端点
+ client_id,#客户端标识
+ redirect_uri,#回调地址
+ username,#用户名
+ password,#密码
+):
# 固定state用于校验,用于防止CSRF(Cross-Site Request Forgery,跨站请求伪造)攻击
# CSRF是指攻击者利用你“已登录某网站”的身份,在你不知情时让浏览器替你发起恶意请求。
# 客户端发起授权请求时带上 state,授权服务器重定向回来时原样带回 state,客户端只接受“回调里 state 与自己最初保存的 state 完全一致”的响应
# 攻击者即使诱导用户点击恶意链接,也很难猜中客户端会话里的 state,回调校验会失败,从而拒绝这次授权结果
# 使用高熵随机 state 防止 CSRF,必须每次授权请求都不同且不可预测。
+ state = secrets.token_urlsafe(32)
# 构造GET到授权端点,携带相关参数(用户名等,模拟快速通过)
+ resp = await client.get(
+ authorization_endpoint,#授权端点
+ params={
+ "response_type": "code",
+ "client_id": client_id,#客户端标识
+ "redirect_uri": redirect_uri,#回调地址
+ "scope": "user",#作用域
+ "state": state,#状态
+ "username": username,#用户名
+ "password": password,#密码
+ "approve": "true",#批准
+ },
+ follow_redirects=False,#不跟随重定向
+ )
# 授权返回302/303为正常
+ if resp.status_code not in (302, 303):
+ raise RuntimeError(f"授权失败,状态码: {resp.status_code}, body: {resp.text}")
# 获取重定向Location
+ location = resp.headers.get("location")
+ if not location:
+ raise RuntimeError("授权失败:未返回回调地址")
# 解析location为URL
+ parsed = urlparse(location)
# 解析query部分
+ qs = parse_qs(parsed.query)
# 获取授权码与state
+ code = (qs.get("code") or [None])[0]
+ returned_state = (qs.get("state") or [None])[0]
# 校验state
+ if returned_state != state:
+ raise RuntimeError("状态参数校验失败")
# 校验授权码
+ if not code:
+ raise RuntimeError("授权失败:未返回授权码")
# 返回code
+ return code
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
# 创建默认30秒超时的httpx异步客户端,自动跟随重定向
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
# 第一步:无token请求受保护资源
print("1) 请求受保护资源(无 token)...")
resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
print(f" 收到资源元数据地址: {resource_metadata_url}")
# 第二步:获取资源元数据
print("2) 获取受保护资源元数据...")
resource_metadata = await fetch_json(http_client, resource_metadata_url)
# 检查authorization_servers字段
auth_servers = resource_metadata.get("authorization_servers")
if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
# 选取第一个授权服务器
auth_server = auth_servers[0].rstrip("/")
print(f" 授权服务器: {auth_server}")
# 第三步:获取授权服务器元数据
print("3) 获取授权服务器元数据...")
# 构造授权服务器元数据地址
auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
print(f" 授权服务器元数据地址: {auth_metadata_url}")
# 异步获取授权服务器元数据(JSON)
auth_metadata = await fetch_json(http_client, auth_metadata_url)
# 从元数据中获取授权端点
authorization_endpoint = str(auth_metadata.get("authorization_endpoint", ""))
# 从元数据中获取令牌端点
token_endpoint = str(auth_metadata.get("token_endpoint", ""))
# 从元数据中获取注册端点
registration_endpoint = str(auth_metadata.get("registration_endpoint", ""))
# 检查三个端点是否都存在,否则抛出异常
if not authorization_endpoint or not token_endpoint or not registration_endpoint:
raise ValueError("授权服务器元数据缺少关键端点")
# 打印授权端点
print(f" 授权端点: {authorization_endpoint}")
# 打印令牌端点
print(f" 令牌端点: {token_endpoint}")
# 打印注册端点
print(f" 注册端点: {registration_endpoint}")
# 第四步:动态客户端注册
print("4) 动态客户端注册...")
client_id = await register_client(http_client, registration_endpoint, redirect_uri)
print(f" 客户端标识: {client_id}")
# 第五步:授权获取授权码
+ print("5) 引导用户授权并获取授权码...")
+ code = await authorize_with_user(
+ http_client,#httpx异步客户端
+ authorization_endpoint,#授权端点
+ client_id,#客户端标识
+ redirect_uri,#回调地址
+ username,#用户名
+ password,#密码
+ )#返回授权码
+ print(f" 授权码: {code}")#打印授权码
# 命令行主入口
def main():
try:
# 受保护资源的元数据地址
resource_url = "http://127.0.0.1:8000/mcp"
# 用户名(模拟登录用)
username = "admin"
# 密码(模拟登录用)
password = "123456"
# 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
redirect_uri = "http://127.0.0.1:8000/callback"
# 运行主业务流程
asyncio.run(run_client(resource_url, username, password, redirect_uri))
return 0
except httpx.HTTPStatusError as e:
# HTTP请求状态异常统一输出
print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
return 1
except Exception as e:
# 其他异常统一输出
print(f"执行失败: {e}")
import traceback
traceback.print_exc()
return 1
# 判断是否作为主程序入口执行
if __name__ == "__main__":
# 设置标准输入输出流的编码为utf-8(如果支持该方法)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
try:
# 启动主函数,异常时按约定返回码
sys.exit(main())
except KeyboardInterrupt:
# 捕获Ctrl+C中断,友好打印并退出
print("\n客户端已停止")
sys.exit(130)8. 使用授权码换取access_token #
8.1. auth_server.py #
auth_server.py
"""
OAuth2 认证服务器(独立运行)
实现完整授权码流程(含元数据发现与动态注册):
- 授权服务器元数据:GET /.well-known/oauth-authorization-server
- 动态客户端注册:POST /oauth/register
- 授权端点:GET /oauth/authorize
- 令牌端点:POST /oauth/token(authorization_code)
- 令牌自省:POST /oauth/introspect(RFC 7662)
"""
# 导入异步IO支持
import asyncio
# 导入操作系统环境变量支持
import os
# 导入随机数生成工具
import secrets
# 导入系统库函数
import sys
# 导入时间函数
import time
# 导入URL编码工具
from urllib.parse import urlencode
# 导入Starlette微框架相关组件
from starlette.applications import Starlette
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Route
# 导入uvicorn用于运行ASGI应用
from uvicorn import Config, Server
# 用户数据,用户名:密码
USERS = {
"admin": "123456"
}
# 客户端注册信息字典
_clients = {}
# 授权码存储字典
_auth_codes = {}
# 令牌存储字典
+_tokens = {}
# 规范化scope参数,默认为user
def _normalize_scopes(scope_str):
if not scope_str:
return ["user"]
scopes = [s for s in scope_str.split() if s]
return scopes or ["user"]
# 从请求对象获取服务器的issuer信息
def _issuer_from_request(request):
# request.url.scheme:获取请求的协议,通常是 http 或 https。
# request.url.netloc:获取请求的“网络位置”,即域名(或 IP 地址)加上可选的端口号。例如 example.com、localhost:8080。
return f"{request.url.scheme}://{request.url.netloc}"
# 授权服务器元数据发现端点
async def authorization_server_metadata(request):
# 获取issuer信息
issuer = _issuer_from_request(request)
# 授权端点 用于引导用户授权
authorization_endpoint = f"{issuer}/oauth/authorize"
# 令牌端点 用于获取访问令牌
token_endpoint = f"{issuer}/oauth/token"
# 注册端点 用于动态注册客户端
registration_endpoint = f"{issuer}/oauth/register"
# 自省端点 用于验证令牌
introspection_endpoint = f"{issuer}/oauth/introspect"
# 构造元数据响应
metadata = {
"issuer": issuer,# 认证服务器地址
"authorization_endpoint": authorization_endpoint, # 授权端点
"token_endpoint": token_endpoint, # 令牌端点
"registration_endpoint": registration_endpoint, # 注册端点
"introspection_endpoint": introspection_endpoint, # 自省端点
"response_types_supported": ["code"], # 支持的响应类型
"grant_types_supported": ["authorization_code"], # 支持的授权类型
"scopes_supported": ["user"], # 支持的scope,scope指的是访问资源范围
}
# 返回JSON响应
return JSONResponse(metadata)
# 动态客户端注册端点
async def register_client(request):
try:
# 解析请求体JSON
body = await request.json()
except Exception:
# JSON解析失败,返回错误
return JSONResponse(
{"error": "invalid_client_metadata", "error_description": "JSON 请求体无效"},
status_code=400,
)
# 获取redirect_uris参数
redirect_uris = body.get("redirect_uris")
# 校验redirect_uris类型和内容
if not isinstance(redirect_uris, list) or not redirect_uris:
return JSONResponse(
{"error": "invalid_redirect_uri", "error_description": "缺少 redirect_uris"},
status_code=400,
)
if not all(isinstance(uri, str) and uri.startswith(("http://", "https://")) for uri in redirect_uris):
return JSONResponse(
{"error": "invalid_redirect_uri", "error_description": "redirect_uris 必须是 HTTP/HTTPS 地址"},
status_code=400,
)
# 生成client_id
client_id = f"client_{secrets.token_hex(8)}"
# 构造客户端注册信息
client = {
"client_id": client_id,
"client_name": body.get("client_name", "MCP OAuth 客户端"),
"redirect_uris": redirect_uris,
"grant_types": ["authorization_code"],
"response_types": ["code"],
"scope": "user",
}
# 存储客户端信息
_clients[client_id] = client
# 返回注册结果
return JSONResponse(client, status_code=201)
# 授权端点处理函数,处理用户授权请求
async def authorize_endpoint(request):
# 获取URL查询参数对象
query = request.query_params
# 获取response_type参数
response_type = query.get("response_type")
# 获取client_id参数
client_id = query.get("client_id")
# 获取redirect_uri参数
redirect_uri = query.get("redirect_uri")
# 获取scope参数,默认为"user"
scope = query.get("scope", "user")
# 获取state参数
state = query.get("state")
# 获取用户名
username = query.get("username")
# 获取密码
password = query.get("password")
# 获取approve参数,并转换为布尔值,判断用户是否同意授权
approve = query.get("approve", "false").lower() == "true"
# 检查response_type是否为"code",否则返回错误响应
if response_type != "code":
return JSONResponse({"error": "unsupported_response_type"}, status_code=400)
# 校验client_id和redirect_uri两个必要参数,缺少则返回错误
if not client_id or not redirect_uri:
return JSONResponse({"error": "invalid_request", "error_description": "缺少 client_id 或 redirect_uri"}, status_code=400)
# 根据client_id查找已注册的客户端,未找到则返回错误
client = _clients.get(client_id)
if not client:
return JSONResponse({"error": "unauthorized_client", "error_description": "未知的 client_id"}, status_code=400)
# 校验redirect_uri是否是在注册信息中登记过的回调地址
if redirect_uri not in client.get("redirect_uris", []):
return JSONResponse({"error": "invalid_request", "error_description": "redirect_uri 未注册"}, status_code=400)
# 判断用户名、密码和用户同意(approve)是否都已提交,否则返回需要用户交互的错误
if not username or not password or not approve:
return JSONResponse(
{
"error": "interaction_required", # 需要用户进一步操作
"error_description": "需要用户凭据并同意授权", # 错误说明
"hint": "请调用 /oauth/authorize 并携带 username、password、approve=true", # 提示
"required_params": ["username", "password", "approve=true"], # 要求这些参数
},
status_code=400,
)
# 校验用户名和密码,若不正确则返回拒绝访问
if USERS.get(username) != password:
return JSONResponse({"error": "access_denied", "error_description": "用户名或密码错误"}, status_code=401)
# 生成授权码,格式为code_xxx
code = f"code_{secrets.token_hex(16)}"
# 存储授权码相关数据,包括客户端ID、回调地址、用户名、作用域、过期时间
_auth_codes[code] = {
"client_id": client_id, #客户端标识
"redirect_uri": redirect_uri, #回调地址
"username": username, #用户名
"scopes": _normalize_scopes(scope), #作用域
"expires_at": int(time.time()) + 300, # 授权码5分钟后过期时间
}
# 构造回调参数字典,包含授权码
params = {"code": code}
# 如果state参数存在,也一并回传
if state:
params["state"] = state
# 拼接完整回调地址,附带参数
callback_url = f"{redirect_uri}?{urlencode(params)}"
# 重定向到客户端回调地址,并返回授权码等信息
return RedirectResponse(url=callback_url, status_code=302)
# 生成访问令牌并存储,返回令牌相关数据
+def _issue_token(client_id, subject, scopes=None):
# 生成一个随机的访问令牌,前缀为"mcp_"
+ token = f"mcp_{secrets.token_hex(24)}"
# 令牌有效时间,单位为秒,这里为3600秒(1小时)
+ expires_in = 3600
# 计算令牌到期的时间戳(当前时间+有效期)
+ expires_at = int(time.time()) + expires_in
# 在全局_tokens字典中存储该令牌对应的信息,便于后续校验和自省
+ _tokens[token] = {
+ "client_id": client_id, # 客户端标识
+ "sub": subject, # 用户唯一标识,通常为用户名
+ "scopes": scopes or ["user"], # 令牌作用域,默认为["user"]
+ "expires_at": expires_at, # 令牌过期时间戳
+ }
# 返回令牌响应内容,包括token本身、类型、有效期、作用域
+ return {
+ "access_token": token, # 访问令牌
+ "token_type": "Bearer", # 令牌类型为Bearer
+ "expires_in": expires_in, # 距离过期剩余秒数
+ "scope": " ".join(scopes or ["user"]),# 空格分隔的作用域字符串
+ }
# 令牌颁发端点(authorization_code模式)
+async def token_endpoint(request):
+ try:
# 解析表单数据
+ form = await request.form()
+ except Exception:
# 表单解析失败
+ return JSONResponse(
+ {"error": "invalid_request", "error_description": "表单数据无效"},
+ status_code=400,
+ )
# 获取授权模式
+ grant_type = form.get("grant_type")
+ if grant_type != "authorization_code":
+ return JSONResponse(
+ {"error": "unsupported_grant_type", "error_description": "仅支持 authorization_code 授权类型"},
+ status_code=400,
+ )
# 获取参数
+ code = form.get("code")
+ client_id = form.get("client_id")
+ redirect_uri = form.get("redirect_uri")
# 校验参数存在性
+ if not code or not client_id or not redirect_uri:
+ return JSONResponse(
+ {"error": "invalid_request", "error_description": "缺少 code/client_id/redirect_uri"},
+ status_code=400,
+ )
# 校验参数类型
+ if not isinstance(code, str) or not isinstance(client_id, str) or not isinstance(redirect_uri, str):
+ return JSONResponse(
+ {"error": "invalid_request", "error_description": "参数类型错误"},
+ status_code=400,
+ )
# 获取授权码对应的数据
+ code_data = _auth_codes.get(code)
+ if not code_data:
+ return JSONResponse({"error": "invalid_grant", "error_description": "授权码不存在"}, status_code=400)
# 检查授权码是否过期
+ if code_data["expires_at"] < time.time():
+ del _auth_codes[code]
+ return JSONResponse({"error": "invalid_grant", "error_description": "授权码已过期"}, status_code=400)
# 校验client_id和redirect_uri是否匹配
+ if code_data["client_id"] != client_id or code_data["redirect_uri"] != redirect_uri:
+ return JSONResponse({"error": "invalid_grant", "error_description": "授权码校验失败"}, status_code=400)
# 授权码只能用一次,用后删除
+ del _auth_codes[code]
# 发放访问令牌
+ token_data = _issue_token(client_id=client_id, subject=code_data["username"], scopes=code_data["scopes"])
# 返回令牌
+ return JSONResponse(token_data)
# 创建Starlette应用,并注册路由
def create_app():
routes = [
Route("/.well-known/oauth-authorization-server", authorization_server_metadata, methods=["GET"]),
Route("/oauth/register", register_client, methods=["POST"]),
+ Route("/oauth/authorize", authorize_endpoint, methods=["GET"]),
+ Route("/oauth/token", token_endpoint, methods=["POST"]),
]
return Starlette(routes=routes)
# 运行ASGI服务器
async def run_server(host, port):
# 创建应用
app = create_app()
# 配置uvicorn
config = Config(app, host=host, port=port, log_level="info")
# 创建服务器对象
server = Server(config)
# 启动服务器服务
await server.serve()
# 入口函数,负责环境和服务启动
def main():
# 从环境变量获取服务IP和端口,默认127.0.0.1:9000
host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))
# 服务端点说明
print(f"OAuth2 认证服务器: http://{host}:{port}")
print(" 元数据端点: GET /.well-known/oauth-authorization-server")
print(" 动态注册端点: POST /oauth/register")
print(" 授权端点: GET /oauth/authorize")
print(" 令牌端点: POST /oauth/token (grant_type=authorization_code)")
print(" 自省端点: POST /oauth/introspect")
# 启动服务
asyncio.run(run_server(host, port))
return 0
# 程序入口
if __name__ == "__main__":
sys.exit(main())
8.2. mcp_client.py #
mcp_client.py
"""
MCP OAuth 客户端
按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入安全随机数模块,用于生成不可预测的 state
import secrets
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 导入URL解析相关方法
from urllib.parse import parse_qs, urlparse
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
# 头部值不存在直接返回None
if not www_authenticate:
return None
# 匹配resource_metadata字段的内容
match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
# 有匹配返回第一个分组,否则返回None
return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
# 无令牌访问资源预期获得401
response = await client.get(resource_url)
# 如果不是401,说明流程有误
if response.status_code != 401:
raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")
# 从头部解析resource_metadata字段
return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
async def fetch_json(client, url):
# 发起GET请求
resp = await client.get(url)
# 请求状态非200会引发异常
resp.raise_for_status()
# 解析响应内容为JSON
data = resp.json()
# 必须是字典类型
if not isinstance(data, dict):
raise ValueError(f"元数据响应不是 JSON 对象: {url}")
# 返回解析结果
return data
# 异步动态客户端注册
async def register_client(client, registration_endpoint, redirect_uri):
# 通过POST向注册端点发起注册请求
resp = await client.post(
registration_endpoint,
json={
"client_name": "MCP OAuth 客户端",
"redirect_uris": [redirect_uri],
},
)
# 检查响应状态
resp.raise_for_status()
# 解析响应
data = resp.json()
# 获取client_id
client_id = data.get("client_id")
# client_id需为非空字符串
if not isinstance(client_id, str) or not client_id:
raise ValueError("动态注册失败:未返回客户端标识")
# 返回client_id
return client_id
# 用户授权并获取授权码(用用户名密码)
async def authorize_with_user(
client,#httpx异步客户端
authorization_endpoint,#授权端点
client_id,#客户端标识
redirect_uri,#回调地址
username,#用户名
password,#密码
):
# 固定state用于校验,用于防止CSRF(Cross-Site Request Forgery,跨站请求伪造)攻击
# CSRF是指攻击者利用你“已登录某网站”的身份,在你不知情时让浏览器替你发起恶意请求。
# 客户端发起授权请求时带上 state,授权服务器重定向回来时原样带回 state,客户端只接受“回调里 state 与自己最初保存的 state 完全一致”的响应
# 攻击者即使诱导用户点击恶意链接,也很难猜中客户端会话里的 state,回调校验会失败,从而拒绝这次授权结果
# 使用高熵随机 state 防止 CSRF,必须每次授权请求都不同且不可预测。
state = secrets.token_urlsafe(32)
# 构造GET到授权端点,携带相关参数(用户名等,模拟快速通过)
resp = await client.get(
authorization_endpoint,#授权端点
params={
"response_type": "code",
"client_id": client_id,#客户端标识
"redirect_uri": redirect_uri,#回调地址
"scope": "user",#作用域
"state": state,#状态
"username": username,#用户名
"password": password,#密码
"approve": "true",#批准
},
follow_redirects=False,#不跟随重定向
)
# 授权返回302/303为正常
if resp.status_code not in (302, 303):
raise RuntimeError(f"授权失败,状态码: {resp.status_code}, body: {resp.text}")
# 获取重定向Location
location = resp.headers.get("location")
if not location:
raise RuntimeError("授权失败:未返回回调地址")
# 解析location为URL
parsed = urlparse(location)
# 解析query部分
qs = parse_qs(parsed.query)
# 获取授权码与state
code = (qs.get("code") or [None])[0]
returned_state = (qs.get("state") or [None])[0]
# 校验state
if returned_state != state:
raise RuntimeError("状态参数校验失败")
# 校验授权码
if not code:
raise RuntimeError("授权失败:未返回授权码")
# 返回code
+ return code
# 异步函数:使用授权码换取访问令牌
+async def exchange_code_for_token(
+ client, # httpx异步客户端实例
+ token_endpoint, # 令牌端点URL
+ client_id, # 客户端标识
+ code, # 授权码
+ redirect_uri, # 回调地址
+):
# 向令牌端点发起POST请求,提交用于授权码换取访问令牌的数据
+ resp = await client.post(
+ token_endpoint, # 令牌端点URL
+ data={
+ "grant_type": "authorization_code", # 授权类型为authorization_code
+ "client_id": client_id, # 客户端标识
+ "code": code, # 授权码
+ "redirect_uri": redirect_uri, # 回调地址
+ },
+ headers={"Content-Type": "application/x-www-form-urlencoded"}, # 指定表单编码
+ )
# 如果响应状态码不是200,抛出异常
+ resp.raise_for_status()
# 解析响应内容为JSON格式
+ data = resp.json()
# 从返回数据中获取access_token字段
+ access_token = data.get("access_token")
# 如果access_token字段不是字符串或为空,抛出异常
+ if not isinstance(access_token, str) or not access_token:
+ raise ValueError("令牌端点未返回访问令牌")
# 返回最终获取到的令牌字符串
+ return access_token
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
# 创建默认30秒超时的httpx异步客户端,自动跟随重定向
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
# 第一步:无token请求受保护资源
print("1) 请求受保护资源(无 token)...")
resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
print(f" 收到资源元数据地址: {resource_metadata_url}")
# 第二步:获取资源元数据
print("2) 获取受保护资源元数据...")
resource_metadata = await fetch_json(http_client, resource_metadata_url)
# 检查authorization_servers字段
auth_servers = resource_metadata.get("authorization_servers")
if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
# 选取第一个授权服务器
auth_server = auth_servers[0].rstrip("/")
print(f" 授权服务器: {auth_server}")
# 第三步:获取授权服务器元数据
print("3) 获取授权服务器元数据...")
# 构造授权服务器元数据地址
auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
print(f" 授权服务器元数据地址: {auth_metadata_url}")
# 异步获取授权服务器元数据(JSON)
auth_metadata = await fetch_json(http_client, auth_metadata_url)
# 从元数据中获取授权端点
authorization_endpoint = str(auth_metadata.get("authorization_endpoint", ""))
# 从元数据中获取令牌端点
token_endpoint = str(auth_metadata.get("token_endpoint", ""))
# 从元数据中获取注册端点
registration_endpoint = str(auth_metadata.get("registration_endpoint", ""))
# 检查三个端点是否都存在,否则抛出异常
if not authorization_endpoint or not token_endpoint or not registration_endpoint:
raise ValueError("授权服务器元数据缺少关键端点")
# 打印授权端点
print(f" 授权端点: {authorization_endpoint}")
# 打印令牌端点
print(f" 令牌端点: {token_endpoint}")
# 打印注册端点
print(f" 注册端点: {registration_endpoint}")
# 第四步:动态客户端注册
print("4) 动态客户端注册...")
client_id = await register_client(http_client, registration_endpoint, redirect_uri)
print(f" 客户端标识: {client_id}")
# 第五步:授权获取授权码
print("5) 引导用户授权并获取授权码...")
code = await authorize_with_user(
http_client,#httpx异步客户端
authorization_endpoint,#授权端点
client_id,#客户端标识
redirect_uri,#回调地址
username,#用户名
password,#密码
)#返回授权码
print(f" 授权码: {code}")#打印授权码
# 第六步:授权码换令牌
+ print("6) 使用授权码换取访问令牌...")
+ token = await exchange_code_for_token(http_client, token_endpoint, client_id, code, redirect_uri)
+ print(f" 访问令牌获取成功: {token}")
# 命令行主入口
def main():
try:
# 受保护资源的元数据地址
resource_url = "http://127.0.0.1:8000/mcp"
# 用户名(模拟登录用)
username = "admin"
# 密码(模拟登录用)
password = "123456"
# 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
redirect_uri = "http://127.0.0.1:8000/callback"
# 运行主业务流程
asyncio.run(run_client(resource_url, username, password, redirect_uri))
return 0
except httpx.HTTPStatusError as e:
# HTTP请求状态异常统一输出
print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
return 1
except Exception as e:
# 其他异常统一输出
print(f"执行失败: {e}")
import traceback
traceback.print_exc()
return 1
# 判断是否作为主程序入口执行
if __name__ == "__main__":
# 设置标准输入输出流的编码为utf-8(如果支持该方法)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
try:
# 启动主函数,异常时按约定返回码
sys.exit(main())
except KeyboardInterrupt:
# 捕获Ctrl+C中断,友好打印并退出
print("\n客户端已停止")
sys.exit(130)9. 携带access_token访问受保护资源 #
9.1. auth_server.py #
auth_server.py
"""
OAuth2 认证服务器(独立运行)
实现完整授权码流程(含元数据发现与动态注册):
- 授权服务器元数据:GET /.well-known/oauth-authorization-server
- 动态客户端注册:POST /oauth/register
- 授权端点:GET /oauth/authorize
- 令牌端点:POST /oauth/token(authorization_code)
- 令牌自省:POST /oauth/introspect(RFC 7662)
"""
# 导入异步IO支持
import asyncio
# 导入操作系统环境变量支持
import os
# 导入随机数生成工具
import secrets
# 导入系统库函数
import sys
# 导入时间函数
import time
# 导入URL编码工具
from urllib.parse import urlencode
# 导入Starlette微框架相关组件
from starlette.applications import Starlette
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Route
# 导入uvicorn用于运行ASGI应用
from uvicorn import Config, Server
# 用户数据,用户名:密码
USERS = {
"admin": "123456"
}
# 客户端注册信息字典
_clients = {}
# 授权码存储字典
_auth_codes = {}
# 令牌存储字典
_tokens = {}
# 规范化scope参数,默认为user
def _normalize_scopes(scope_str):
if not scope_str:
return ["user"]
scopes = [s for s in scope_str.split() if s]
return scopes or ["user"]
# 从请求对象获取服务器的issuer信息
def _issuer_from_request(request):
# request.url.scheme:获取请求的协议,通常是 http 或 https。
# request.url.netloc:获取请求的“网络位置”,即域名(或 IP 地址)加上可选的端口号。例如 example.com、localhost:8080。
return f"{request.url.scheme}://{request.url.netloc}"
# 授权服务器元数据发现端点
async def authorization_server_metadata(request):
# 获取issuer信息
issuer = _issuer_from_request(request)
# 授权端点 用于引导用户授权
authorization_endpoint = f"{issuer}/oauth/authorize"
# 令牌端点 用于获取访问令牌
token_endpoint = f"{issuer}/oauth/token"
# 注册端点 用于动态注册客户端
registration_endpoint = f"{issuer}/oauth/register"
# 自省端点 用于验证令牌
introspection_endpoint = f"{issuer}/oauth/introspect"
# 构造元数据响应
metadata = {
"issuer": issuer,# 认证服务器地址
"authorization_endpoint": authorization_endpoint, # 授权端点
"token_endpoint": token_endpoint, # 令牌端点
"registration_endpoint": registration_endpoint, # 注册端点
"introspection_endpoint": introspection_endpoint, # 自省端点
"response_types_supported": ["code"], # 支持的响应类型
"grant_types_supported": ["authorization_code"], # 支持的授权类型
"scopes_supported": ["user"], # 支持的scope,scope指的是访问资源范围
}
# 返回JSON响应
return JSONResponse(metadata)
# 动态客户端注册端点
async def register_client(request):
try:
# 解析请求体JSON
body = await request.json()
except Exception:
# JSON解析失败,返回错误
return JSONResponse(
{"error": "invalid_client_metadata", "error_description": "JSON 请求体无效"},
status_code=400,
)
# 获取redirect_uris参数
redirect_uris = body.get("redirect_uris")
# 校验redirect_uris类型和内容
if not isinstance(redirect_uris, list) or not redirect_uris:
return JSONResponse(
{"error": "invalid_redirect_uri", "error_description": "缺少 redirect_uris"},
status_code=400,
)
if not all(isinstance(uri, str) and uri.startswith(("http://", "https://")) for uri in redirect_uris):
return JSONResponse(
{"error": "invalid_redirect_uri", "error_description": "redirect_uris 必须是 HTTP/HTTPS 地址"},
status_code=400,
)
# 生成client_id
client_id = f"client_{secrets.token_hex(8)}"
# 构造客户端注册信息
client = {
"client_id": client_id,
"client_name": body.get("client_name", "MCP OAuth 客户端"),
"redirect_uris": redirect_uris,
"grant_types": ["authorization_code"],
"response_types": ["code"],
"scope": "user",
}
# 存储客户端信息
_clients[client_id] = client
# 返回注册结果
return JSONResponse(client, status_code=201)
# 授权端点处理函数,处理用户授权请求
async def authorize_endpoint(request):
# 获取URL查询参数对象
query = request.query_params
# 获取response_type参数
response_type = query.get("response_type")
# 获取client_id参数
client_id = query.get("client_id")
# 获取redirect_uri参数
redirect_uri = query.get("redirect_uri")
# 获取scope参数,默认为"user"
scope = query.get("scope", "user")
# 获取state参数
state = query.get("state")
# 获取用户名
username = query.get("username")
# 获取密码
password = query.get("password")
# 获取approve参数,并转换为布尔值,判断用户是否同意授权
approve = query.get("approve", "false").lower() == "true"
# 检查response_type是否为"code",否则返回错误响应
if response_type != "code":
return JSONResponse({"error": "unsupported_response_type"}, status_code=400)
# 校验client_id和redirect_uri两个必要参数,缺少则返回错误
if not client_id or not redirect_uri:
return JSONResponse({"error": "invalid_request", "error_description": "缺少 client_id 或 redirect_uri"}, status_code=400)
# 根据client_id查找已注册的客户端,未找到则返回错误
client = _clients.get(client_id)
if not client:
return JSONResponse({"error": "unauthorized_client", "error_description": "未知的 client_id"}, status_code=400)
# 校验redirect_uri是否是在注册信息中登记过的回调地址
if redirect_uri not in client.get("redirect_uris", []):
return JSONResponse({"error": "invalid_request", "error_description": "redirect_uri 未注册"}, status_code=400)
# 判断用户名、密码和用户同意(approve)是否都已提交,否则返回需要用户交互的错误
if not username or not password or not approve:
return JSONResponse(
{
"error": "interaction_required", # 需要用户进一步操作
"error_description": "需要用户凭据并同意授权", # 错误说明
"hint": "请调用 /oauth/authorize 并携带 username、password、approve=true", # 提示
"required_params": ["username", "password", "approve=true"], # 要求这些参数
},
status_code=400,
)
# 校验用户名和密码,若不正确则返回拒绝访问
if USERS.get(username) != password:
return JSONResponse({"error": "access_denied", "error_description": "用户名或密码错误"}, status_code=401)
# 生成授权码,格式为code_xxx
code = f"code_{secrets.token_hex(16)}"
# 存储授权码相关数据,包括客户端ID、回调地址、用户名、作用域、过期时间
_auth_codes[code] = {
"client_id": client_id, #客户端标识
"redirect_uri": redirect_uri, #回调地址
"username": username, #用户名
"scopes": _normalize_scopes(scope), #作用域
"expires_at": int(time.time()) + 300, # 授权码5分钟后过期时间
}
# 构造回调参数字典,包含授权码
params = {"code": code}
# 如果state参数存在,也一并回传
if state:
params["state"] = state
# 拼接完整回调地址,附带参数
callback_url = f"{redirect_uri}?{urlencode(params)}"
# 重定向到客户端回调地址,并返回授权码等信息
return RedirectResponse(url=callback_url, status_code=302)
# 生成访问令牌并存储,返回令牌相关数据
def _issue_token(client_id, subject, scopes=None):
# 生成一个随机的访问令牌,前缀为"mcp_"
token = f"mcp_{secrets.token_hex(24)}"
# 令牌有效时间,单位为秒,这里为3600秒(1小时)
expires_in = 3600
# 计算令牌到期的时间戳(当前时间+有效期)
expires_at = int(time.time()) + expires_in
# 在全局_tokens字典中存储该令牌对应的信息,便于后续校验和自省
_tokens[token] = {
"client_id": client_id, # 客户端标识
"sub": subject, # 用户唯一标识,通常为用户名
"scopes": scopes or ["user"], # 令牌作用域,默认为["user"]
"expires_at": expires_at, # 令牌过期时间戳
}
# 返回令牌响应内容,包括token本身、类型、有效期、作用域
return {
"access_token": token, # 访问令牌
"token_type": "Bearer", # 令牌类型为Bearer
"expires_in": expires_in, # 距离过期剩余秒数
"scope": " ".join(scopes or ["user"]),# 空格分隔的作用域字符串
}
# 检查令牌有效性,返回自省结果
+def _introspect_token(token):
# 从全局_tokens字典中获取该token的信息
+ data = _tokens.get(token)
# 如果未找到对应的token信息,则认为无效,返回None
+ if not data:
+ return None
# 若令牌已过期(当前时间大于expires_at),则从dict中删除该token并返回None
+ if data["expires_at"] < time.time():
+ del _tokens[token]
+ return None
# 如果令牌有效,构造自省返回字典
+ return {
# 标记令牌为激活状态
+ "active": True,
# 返回token绑定的客户端标识
+ "client_id": data["client_id"],
# 返回该token对应的用户唯一标识
+ "sub": data["sub"],
# 将scope列表连接为以空格分隔的字符串
+ "scope": " ".join(data["scopes"]),
# 返回token的过期时间戳
+ "exp": data["expires_at"],
# 颁发本自省响应时的iat时间戳
+ "iat": int(time.time()),
# 令牌类型为Bearer
+ "token_type": "Bearer",
+ }
# 令牌自省端点
+async def introspect_endpoint(request):
+ try:
# 解析表单数据
+ form = await request.form()
+ except Exception:
# 解析失败,假定令牌无效
+ return JSONResponse({"active": False}, status_code=400)
# 获取待自省的token
+ token = form.get("token")
# 校验token
+ if not token or not isinstance(token, str):
+ return JSONResponse({"active": False})
# 执行自省
+ result = _introspect_token(token)
+ if not result:
+ return JSONResponse({"active": False})
# 返回自省信息
+ return JSONResponse(result)
# 令牌颁发端点(authorization_code模式)
async def token_endpoint(request):
try:
# 解析表单数据
form = await request.form()
except Exception:
# 表单解析失败
return JSONResponse(
{"error": "invalid_request", "error_description": "表单数据无效"},
status_code=400,
)
# 获取授权模式
grant_type = form.get("grant_type")
if grant_type != "authorization_code":
return JSONResponse(
{"error": "unsupported_grant_type", "error_description": "仅支持 authorization_code 授权类型"},
status_code=400,
)
# 获取参数
code = form.get("code")
client_id = form.get("client_id")
redirect_uri = form.get("redirect_uri")
# 校验参数存在性
if not code or not client_id or not redirect_uri:
return JSONResponse(
{"error": "invalid_request", "error_description": "缺少 code/client_id/redirect_uri"},
status_code=400,
)
# 校验参数类型
if not isinstance(code, str) or not isinstance(client_id, str) or not isinstance(redirect_uri, str):
return JSONResponse(
{"error": "invalid_request", "error_description": "参数类型错误"},
status_code=400,
)
# 获取授权码对应的数据
code_data = _auth_codes.get(code)
if not code_data:
return JSONResponse({"error": "invalid_grant", "error_description": "授权码不存在"}, status_code=400)
# 检查授权码是否过期
if code_data["expires_at"] < time.time():
del _auth_codes[code]
return JSONResponse({"error": "invalid_grant", "error_description": "授权码已过期"}, status_code=400)
# 校验client_id和redirect_uri是否匹配
if code_data["client_id"] != client_id or code_data["redirect_uri"] != redirect_uri:
return JSONResponse({"error": "invalid_grant", "error_description": "授权码校验失败"}, status_code=400)
# 授权码只能用一次,用后删除
del _auth_codes[code]
# 发放访问令牌
token_data = _issue_token(client_id=client_id, subject=code_data["username"], scopes=code_data["scopes"])
# 返回令牌
return JSONResponse(token_data)
# 创建Starlette应用,并注册路由
def create_app():
routes = [
Route("/.well-known/oauth-authorization-server", authorization_server_metadata, methods=["GET"]),
Route("/oauth/register", register_client, methods=["POST"]),
Route("/oauth/authorize", authorize_endpoint, methods=["GET"]),
Route("/oauth/token", token_endpoint, methods=["POST"]),
+ Route("/oauth/introspect", introspect_endpoint, methods=["POST"]),
]
return Starlette(routes=routes)
# 运行ASGI服务器
async def run_server(host, port):
# 创建应用
app = create_app()
# 配置uvicorn
config = Config(app, host=host, port=port, log_level="info")
# 创建服务器对象
server = Server(config)
# 启动服务器服务
await server.serve()
# 入口函数,负责环境和服务启动
def main():
# 从环境变量获取服务IP和端口,默认127.0.0.1:9000
host = os.environ.get("OAUTH_AUTH_HOST", "127.0.0.1")
port = int(os.environ.get("OAUTH_AUTH_PORT", "9000"))
# 服务端点说明
print(f"OAuth2 认证服务器: http://{host}:{port}")
print(" 元数据端点: GET /.well-known/oauth-authorization-server")
print(" 动态注册端点: POST /oauth/register")
print(" 授权端点: GET /oauth/authorize")
print(" 令牌端点: POST /oauth/token (grant_type=authorization_code)")
print(" 自省端点: POST /oauth/introspect")
# 启动服务
asyncio.run(run_server(host, port))
return 0
# 程序入口
if __name__ == "__main__":
sys.exit(main())
9.2. mcp_client.py #
mcp_client.py
"""
MCP OAuth 客户端
按如下顺序执行:
1) 请求受保护资源,拿到 401 + 资源元数据地址
2) 获取受保护资源元数据(授权服务器地址)
3) 获取授权服务器元数据(授权端点、令牌端点、注册端点)
4) 动态注册客户端
5) 引导用户授权(此演示用用户名/密码 + approve=true 模拟)
6) 用授权码换取访问令牌
7) 携带访问令牌访问 MCP 受保护资源
"""
# 导入系统模块
import sys
# 导入正则表达式模块
import re
# 导入安全随机数模块,用于生成不可预测的 state
import secrets
# 导入异步IO模块
import asyncio
# 导入httpx,用于处理HTTP请求
import httpx
# 导入URL解析相关方法
from urllib.parse import parse_qs, urlparse
# 导入MCP的客户端会话管理
+from mcp import ClientSession
# 导入用于MCP流式HTTP通信的客户端
+from mcp.client.streamable_http import streamable_http_client
# 从"www-authenticate"头部解析出resource_metadata字段
def _parse_resource_metadata_from_www_authenticate(www_authenticate):
# 头部值不存在直接返回None
if not www_authenticate:
return None
# 匹配resource_metadata字段的内容
match = re.search(r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE)
# 有匹配返回第一个分组,否则返回None
return match.group(1) if match else None
# 发现受保护资源的元数据地址(401响应推断或头部直接获取)
async def discover_resource_metadata(client, resource_url):
# 无令牌访问资源预期获得401
response = await client.get(resource_url)
# 如果不是401,说明流程有误
if response.status_code != 401:
raise RuntimeError(f"预期先返回 401,但收到 {response.status_code}")
# 从头部解析resource_metadata字段
return _parse_resource_metadata_from_www_authenticate(response.headers.get("www-authenticate"))
# 异步获取JSON,并校验为字典对象
async def fetch_json(client, url):
# 发起GET请求
resp = await client.get(url)
# 请求状态非200会引发异常
resp.raise_for_status()
# 解析响应内容为JSON
data = resp.json()
# 必须是字典类型
if not isinstance(data, dict):
raise ValueError(f"元数据响应不是 JSON 对象: {url}")
# 返回解析结果
return data
# 异步动态客户端注册
async def register_client(client, registration_endpoint, redirect_uri):
# 通过POST向注册端点发起注册请求
resp = await client.post(
registration_endpoint,
json={
"client_name": "MCP OAuth 客户端",
"redirect_uris": [redirect_uri],
},
)
# 检查响应状态
resp.raise_for_status()
# 解析响应
data = resp.json()
# 获取client_id
client_id = data.get("client_id")
# client_id需为非空字符串
if not isinstance(client_id, str) or not client_id:
raise ValueError("动态注册失败:未返回客户端标识")
# 返回client_id
return client_id
# 用户授权并获取授权码(用用户名密码)
async def authorize_with_user(
client,#httpx异步客户端
authorization_endpoint,#授权端点
client_id,#客户端标识
redirect_uri,#回调地址
username,#用户名
password,#密码
):
# 固定state用于校验,用于防止CSRF(Cross-Site Request Forgery,跨站请求伪造)攻击
# CSRF是指攻击者利用你“已登录某网站”的身份,在你不知情时让浏览器替你发起恶意请求。
# 客户端发起授权请求时带上 state,授权服务器重定向回来时原样带回 state,客户端只接受“回调里 state 与自己最初保存的 state 完全一致”的响应
# 攻击者即使诱导用户点击恶意链接,也很难猜中客户端会话里的 state,回调校验会失败,从而拒绝这次授权结果
# 使用高熵随机 state 防止 CSRF,必须每次授权请求都不同且不可预测。
state = secrets.token_urlsafe(32)
# 构造GET到授权端点,携带相关参数(用户名等,模拟快速通过)
resp = await client.get(
authorization_endpoint,#授权端点
params={
"response_type": "code",
"client_id": client_id,#客户端标识
"redirect_uri": redirect_uri,#回调地址
"scope": "user",#作用域
"state": state,#状态
"username": username,#用户名
"password": password,#密码
"approve": "true",#批准
},
follow_redirects=False,#不跟随重定向
)
# 授权返回302/303为正常
if resp.status_code not in (302, 303):
raise RuntimeError(f"授权失败,状态码: {resp.status_code}, body: {resp.text}")
# 获取重定向Location
location = resp.headers.get("location")
if not location:
raise RuntimeError("授权失败:未返回回调地址")
# 解析location为URL
parsed = urlparse(location)
# 解析query部分
qs = parse_qs(parsed.query)
# 获取授权码与state
code = (qs.get("code") or [None])[0]
returned_state = (qs.get("state") or [None])[0]
# 校验state
if returned_state != state:
raise RuntimeError("状态参数校验失败")
# 校验授权码
if not code:
raise RuntimeError("授权失败:未返回授权码")
# 返回code
return code
# 异步函数:使用授权码换取访问令牌
async def exchange_code_for_token(
client, # httpx异步客户端实例
token_endpoint, # 令牌端点URL
client_id, # 客户端标识
code, # 授权码
redirect_uri, # 回调地址
):
# 向令牌端点发起POST请求,提交用于授权码换取访问令牌的数据
resp = await client.post(
token_endpoint, # 令牌端点URL
data={
"grant_type": "authorization_code", # 授权类型为authorization_code
"client_id": client_id, # 客户端标识
"code": code, # 授权码
"redirect_uri": redirect_uri, # 回调地址
},
headers={"Content-Type": "application/x-www-form-urlencoded"}, # 指定表单编码
)
# 如果响应状态码不是200,抛出异常
resp.raise_for_status()
# 解析响应内容为JSON格式
data = resp.json()
# 从返回数据中获取access_token字段
access_token = data.get("access_token")
# 如果access_token字段不是字符串或为空,抛出异常
if not isinstance(access_token, str) or not access_token:
raise ValueError("令牌端点未返回访问令牌")
# 返回最终获取到的令牌字符串
return access_token
# 从MCP接口结果中提取文本内容
+def _extract_texts(result):
# 用于存储所有文本块
+ texts = []
# 尝试获取content或contents属性
+ content = getattr(result, "content", None) or getattr(result, "contents", [])
# 遍历所有块
+ for block in content:
# 如果有text属性,则提取
+ if hasattr(block, "text"):
+ texts.append(str(block.text))
# 如果有type属性且为"text",提取text
+ elif hasattr(block, "type") and getattr(block, "type") == "text":
+ texts.append(str(getattr(block, "text", "")))
# 返回提取到的所有文本
+ return texts
# 携带令牌访问受保护的MCP资源
+async def call_protected_mcp(resource_url, token):
# 构建请求头,包含Bearer类型的访问令牌及指定内容类型为json
+ headers = {
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json",
+ }
# 创建带认证头和60秒超时时间的httpx异步客户端
+ async with httpx.AsyncClient(headers=headers, timeout=60.0) as mcp_http_client:
# 以认证客户端与受保护资源建立可流式操作的数据会话
+ async with streamable_http_client(resource_url, http_client=mcp_http_client) as (
+ read_stream, # 读取数据流
+ write_stream, # 写入数据流
+ _, # 占位符(忽略的返回值)
+ ):
# 创建新的MCP客户端会话,传入读写流
+ async with ClientSession(read_stream, write_stream) as session:
# 初始化MCP连接(完成协议握手)
+ await session.initialize()
# 打印初始化成功提示
+ print("MCP 会话初始化成功")
# 调用API列出所有受保护的资源对象
+ resources = await session.list_resources()
# 提取资源对象的URI,生成列表
+ uris = [r.uri for r in resources.resources]
# 打印所有可访问的受保护资源URI
+ print(f"受保护资源: {uris}")
# 调用API获取当前可用方法工具列表
+ tools = await session.list_tools()
# 提取工具名称,生成名称列表
+ names = [t.name for t in tools.tools]
# 打印所有可访问工具列表
+ print(f"受保护工具: {names}")
# 调用受保护工具get_user_data,传入user_id为current
+ result = await session.call_tool("get_user_data", {"user_id": "current"})
# 解析工具响应,提取可用文本输出
+ texts = _extract_texts(result)
# 打印工具调用结果(多个文本用|拼接,没有则显示(空))
+ print(f"调用 get_user_data(current): {' | '.join(texts) or '(空)'}")
# 主运行入口,完成OAuth全流程
async def run_client(resource_url, username, password, redirect_uri):
# 创建默认30秒超时的httpx异步客户端,自动跟随重定向
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as http_client:
# 第一步:无token请求受保护资源
print("1) 请求受保护资源(无 token)...")
resource_metadata_url = await discover_resource_metadata(http_client, resource_url)
print(f" 收到资源元数据地址: {resource_metadata_url}")
# 第二步:获取资源元数据
print("2) 获取受保护资源元数据...")
resource_metadata = await fetch_json(http_client, resource_metadata_url)
# 检查authorization_servers字段
auth_servers = resource_metadata.get("authorization_servers")
if not isinstance(auth_servers, list) or not auth_servers or not isinstance(auth_servers[0], str):
raise ValueError("受保护资源元数据缺少 authorization_servers 字段")
# 选取第一个授权服务器
auth_server = auth_servers[0].rstrip("/")
print(f" 授权服务器: {auth_server}")
# 第三步:获取授权服务器元数据
print("3) 获取授权服务器元数据...")
# 构造授权服务器元数据地址
auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
print(f" 授权服务器元数据地址: {auth_metadata_url}")
# 异步获取授权服务器元数据(JSON)
auth_metadata = await fetch_json(http_client, auth_metadata_url)
# 从元数据中获取授权端点
authorization_endpoint = str(auth_metadata.get("authorization_endpoint", ""))
# 从元数据中获取令牌端点
token_endpoint = str(auth_metadata.get("token_endpoint", ""))
# 从元数据中获取注册端点
registration_endpoint = str(auth_metadata.get("registration_endpoint", ""))
# 检查三个端点是否都存在,否则抛出异常
if not authorization_endpoint or not token_endpoint or not registration_endpoint:
raise ValueError("授权服务器元数据缺少关键端点")
# 打印授权端点
print(f" 授权端点: {authorization_endpoint}")
# 打印令牌端点
print(f" 令牌端点: {token_endpoint}")
# 打印注册端点
print(f" 注册端点: {registration_endpoint}")
# 第四步:动态客户端注册
print("4) 动态客户端注册...")
client_id = await register_client(http_client, registration_endpoint, redirect_uri)
print(f" 客户端标识: {client_id}")
# 第五步:授权获取授权码
print("5) 引导用户授权并获取授权码...")
code = await authorize_with_user(
http_client,#httpx异步客户端
authorization_endpoint,#授权端点
client_id,#客户端标识
redirect_uri,#回调地址
username,#用户名
password,#密码
)#返回授权码
print(f" 授权码: {code}")#打印授权码
# 第六步:授权码换令牌
print("6) 使用授权码换取访问令牌...")
token = await exchange_code_for_token(http_client, token_endpoint, client_id, code, redirect_uri)
print(f" 访问令牌获取成功: {token}")
# 第七步:携令牌访问受保护MCP资源
+ print("7) 携带访问令牌请求 MCP 受保护资源...")
+ await call_protected_mcp(resource_url, token)
+ print("\n完整 OAuth 流程完成")
# 命令行主入口
def main():
try:
# 受保护资源的元数据地址
resource_url = "http://127.0.0.1:8000/mcp"
# 用户名(模拟登录用)
username = "admin"
# 密码(模拟登录用)
password = "123456"
# 回调地址(OAuth2 授权码流程需要,客户端监听此地址以接收授权码)
redirect_uri = "http://127.0.0.1:8000/callback"
# 运行主业务流程
asyncio.run(run_client(resource_url, username, password, redirect_uri))
return 0
except httpx.HTTPStatusError as e:
# HTTP请求状态异常统一输出
print(f"HTTP 请求错误 {e.response.status_code}: {e.response.text}")
return 1
except Exception as e:
# 其他异常统一输出
print(f"执行失败: {e}")
import traceback
traceback.print_exc()
return 1
# 判断是否作为主程序入口执行
if __name__ == "__main__":
# 设置标准输入输出流的编码为utf-8(如果支持该方法)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
sys.stdin.reconfigure(encoding="utf-8")
try:
# 启动主函数,异常时按约定返回码
sys.exit(main())
except KeyboardInterrupt:
# 捕获Ctrl+C中断,友好打印并退出
print("\n客户端已停止")
sys.exit(130)
9.3. mcp_server.py #
mcp_server.py
# MCP 资源服务器(OAuth2 分离架构)
"""
MCP 资源服务器(OAuth2 分离架构)
与认证服务器分离,通过令牌自省(RFC 7662)验证令牌。
- 认证服务器:oauth_auth_server.py(端口 9000)
- 资源服务器:本服务(端口 8000)
"""
# 导入操作系统相关模块
import os
# 导入系统功能模块
import sys
# 导入 MCP 上下文与主服务类
from mcp.server.fastmcp import FastMCP
# 导入 MCP 的认证设置
from mcp.server.auth.settings import AuthSettings
# 从 pydantic 导入 AnyHttpUrl 类型,用于校验 URL
from pydantic import AnyHttpUrl
# 导入获取 access_token 的方法
+from mcp.server.auth.middleware.auth_context import get_access_token
# 读取主机地址,优先使用环境变量 MCP_HOS,否则默认 '127.0.0.1'
HOST = os.environ.get("MCP_HOST", "127.0.0.1")
# 读取端口号,优先使用环境变量 MCP_PORT,否则默认 8000(需转换为 int)
PORT = int(os.environ.get("MCP_PORT", "8000"))
# 资源服务器自身的公开 URL,带 /mcp 路径用于元数据发现
RESOURCE_URL = f"http://{HOST}:{PORT}/mcp"
# 导入自定义的令牌自省验证器
from token_verifier import IntrospectionTokenVerifier
# 认证服务器地址,优先通过环境变量 OAUTH_AUTH_SERVER,否则默认为 http://127.0.0.1:9000
AUTH_SERVER = os.environ.get("OAUTH_AUTH_SERVER", "http://127.0.0.1:9000")
# 令牌自省端点 URL
INTROSPECTION_URL = f"{AUTH_SERVER.rstrip('/')}/oauth/introspect"
# 创建 FastMCP 资源服务器实例
mcp = FastMCP(
# 服务器名称
name="OAuth资源服务器",
host=HOST,
port=PORT,
# 令牌验证器:通过 introspect 端点验证
token_verifier=IntrospectionTokenVerifier(INTROSPECTION_URL),
# 认证相关设置
auth=AuthSettings(
# 指定认证服务器 Issuer 地址
issuer_url=AnyHttpUrl(AUTH_SERVER),
# 资源服务器自身的 URL
resource_server_url=AnyHttpUrl(RESOURCE_URL),
# 需要的 scope
required_scopes=["user"],
),
)
# 获取当前用户 ID 的工具函数
+def _get_current_user_id():
# 从认证上下文获取当前用户 ID
+ try:
# 获取当前 access_token
+ token = get_access_token()
# 返回 client_id(用户名),若无则返回 None
+ return token.client_id if token else None
+ except Exception:
# 如果获取失败则返回 None
+ return None
# 注册受保护资源 user://profile
+@mcp.resource("user://profile")
+def get_user_profile():
# 获取当前用户 ID,若获取不到则用“匿名用户”
+ user_id = _get_current_user_id() or "匿名用户"
# 返回当前用户的基本资料字符串(JSON 结构)
+ return f'''{{
+ "user_id": "{user_id}",
+ "name": "用户 {user_id}",
+ "email": "{user_id}@example.com",
+ "role": "user"
+}}'''
# 注册受保护资源 user://stats
+@mcp.resource("user://stats")
+def get_user_stats():
# 获取当前用户 ID,若获取不到则用“匿名用户”
+ user_id = _get_current_user_id() or "匿名用户"
# 返回用户统计数据字符串(JSON 结构)
+ return f'''{{
+ "user_id": "{user_id}",
+ "requests_today": 42,
+ "last_active": "2024-01-15T10:30:00Z"
+}}'''
# 注册受保护工具 get_user_data
+@mcp.tool()
+def get_user_data(user_id="current", ctx=None):
# 获取当前用户 ID
+ current = _get_current_user_id()
# 判断请求的 user_id 是否为当前用户
+ if user_id == "current":
# 如果当前未认证(current 为空),返回未认证信息
+ if not current:
+ return {"error": "未认证", "user_id": None}
# 否则返回当前用户的基本信息
+ return {
+ "user_id": current,
+ "status": "active",
+ "last_login": "2024-01-15T10:30:00Z",
+ }
# 如果请求的 user_id 等于当前用户
+ if current and user_id == current:
+ return {
+ "user_id": user_id,
+ "status": "active",
+ "last_login": "2024-01-15T10:30:00Z",
+ }
# 既不是当前用户,也不是已知用户,返回无权访问或用户不存在
+ return {"user_id": user_id, "status": "unknown", "error": "无权访问或用户不存在"}
# 仅当该文件作为主程序直接运行时执行下列代码
if __name__ == "__main__":
# 打印资源服务器地址
print(f"MCP 资源服务器: {RESOURCE_URL}")
# 启动 FastMCP 服务,采用 streamable-http 模式
mcp.run(transport="streamable-http")
9.4. token_verifier.py #
token_verifier.py
"""
OAuth2 令牌自省验证器(RFC 7662)
资源服务器通过调用认证服务器的令牌自省端点验证 Bearer 令牌。
"""
# 导入 httpx 库用于发送 HTTP 请求
import httpx
# 从 mcp.server.auth.provider 模块导入 AccessToken 和 TokenVerifier
from mcp.server.auth.provider import AccessToken, TokenVerifier
# 定义 IntrospectionTokenVerifier 类,继承自 TokenVerifier
class IntrospectionTokenVerifier(TokenVerifier):
# 通过 OAuth2 令牌自省(RFC 7662)验证令牌的类文档字符串
"""通过 OAuth2 令牌自省(RFC 7662)验证令牌。"""
# 构造方法,接收自省端点地址 introspection_endpoint
def __init__(self, introspection_endpoint):
# 将自省端点地址保存到实例变量
self.introspection_endpoint = introspection_endpoint
# 定义异步验证令牌的方法
+ async def verify_token(self, token):
# 方法文档,说明功能为调用认证服务器自省端点验证令牌
+ """调用认证服务器自省端点验证令牌。"""
# 检查自省端点地址,只允许部分安全前缀
+ if not self.introspection_endpoint.startswith(
+ ("https://", "http://localhost", "http://127.0.0.1")
+ ):
# 如果地址非法,立即返回 None
+ return None
# 使用 httpx 异步客户端,并设置超时时间为 10 秒
+ async with httpx.AsyncClient(timeout=10.0) as client:
+ try:
# 通过 POST 请求向自省端点发送带 token 字段的表单数据
+ response = await client.post(
+ self.introspection_endpoint,
+ data={"token": token},
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ )
# 如果响应的状态码不是 200,表示校验失败,直接返回 None
+ if response.status_code != 200:
+ return None
# 解析响应体的 JSON 内容
+ data = response.json()
# 检查 active 字段是否为 True,如果为 False 或缺失则返回 None
+ if not data.get("active", False):
+ return None
# 获取 sub(通常为当前用户)用于构造 AccessToken
+ subject = data.get("sub")
# 返回 AccessToken 对象,字段包括令牌、client_id、scope、exp、aud
+ return AccessToken(
+ token=token,
# 若 sub 存在并为字符串则用作 client_id,否则取 client_id 字段或 "unknown"
+ client_id=subject if isinstance(subject, str) and subject else data.get("client_id", "unknown"),
# 解析 scope 字段(若有),按空格分割为列表,否则为空列表
+ scopes=data.get("scope", "").split() if data.get("scope") else [],
# 有效期使用 exp 字段
+ expires_at=data.get("exp"),
# aud 字段作为 resource 赋值
+ resource=data.get("aud"),
+ )
+ except Exception:
# 捕获并忽略所有异常,直接返回 None
+ return None