1. OAuth 2.0 受保护资源元数据 #
RFC 9728 定义了 OAuth 2.0 受保护资源(Protected Resource)的元数据规范,旨在标准化资源服务器(Resource Server)的能力声明和配置发现机制。它是 OAuth 2.0 生态的扩展,与授权服务器元数据(RFC 8414)形成互补,为客户端提供更完整的自动化集成能力。
2. 核心目标 #
2.1 解决的问题 #
- 资源服务器发现:客户端如何动态获取资源服务器的配置(如接口端点、支持的Access Token类型、鉴权方式等)。
- 标准化元数据:统一不同资源服务器的配置格式,减少手动配置错误。
- 安全策略透明化:明确资源服务器对访问Access Token(
access_token)的要求(如签名算法、必需声明等)。
2.2 适用场景 #
- API 网关或微服务架构中的资源服务器。
- 需要动态适应多资源服务器的客户端(如企业集成工具)。
- OpenID Connect 的 UserInfo 端点或其他 OAuth 2.0 受保护 API。
3. 元数据发现机制 #
3.1 元数据端点 #
资源服务器在固定路径提供元数据(通常为 /.well-known/oauth-resource-metadata),通过 HTTPS 访问:
GET /.well-known/oauth-resource-metadata HTTP/1.1
Host: api.example.com3.2 响应格式 #
元数据以 JSON 格式返回,字段遵循 IANA 注册的 OAuth 参数。
示例响应:
{
"resource": "https://api.example.com",
"token_endpoint": "https://auth.example.com/oauth/token",
"token_introspection_endpoint": "https://auth.example.com/oauth/introspect",
"jwks_uri": "https://auth.example.com/oauth/jwks",
"scopes_supported": ["read", "write", "delete"],
"token_signing_alg_values_supported": ["RS256", "ES256"],
"required_token_claims": ["iss", "exp", "aud"],
"access_token_lifetime": 3600,
"resource_signing_alg": "ES256"
}4. 关键元数据字段 #
4.1 必需字段 #
| 字段名 | 描述 |
|---|---|
resource |
资源服务器的唯一标识(通常为基 URL)。 |
jwks_uri |
验证Access Token的公钥集 URL(JWK Set)。 |
4.2 可选字段 #
| 字段名 | 描述 |
|---|---|
token_endpoint |
关联的Access Token端点(用于Access Token刷新或交换)。 |
token_introspection_endpoint |
Access Token自省端点(RFC 7662)。 |
scopes_supported |
资源服务器支持的权限范围。 |
token_signing_alg_values_supported |
支持的Access Token签名算法(如 RS256)。 |
required_token_claims |
访问Access Token必须包含的声明(如 aud、iss)。 |
access_token_lifetime |
资源服务器期望的Access Token有效期(秒)。 |
resource_signing_alg |
资源服务器响应数据的签名算法(如用于 JWT 封装的 UserInfo)。 |
5. 安全性设计 #
5.1 Access Token验证要求 #
- 签名验证:客户端需使用
jwks_uri的公钥验证Access Token签名。 - 声明检查:必须验证
required_token_claims中的声明(如aud是否匹配资源服务器标识)。 - 算法约束:仅接受
token_signing_alg_values_supported列出的算法。
5.2 传输安全 #
- 强制 HTTPS:元数据必须通过 TLS 传输。
- 缓存控制:建议客户端缓存元数据,但遵循
Cache-Control头部(如max-age=86400)。
6. 与相关协议的交互 #
6.1 与 RFC 8414(授权服务器元数据)的关系 #
- 互补性:
RFC 8414 描述授权服务器的能力,而 RFC 9728 描述资源服务器的能力。

6.2 与 OpenID Connect 的集成 #
- UserInfo 端点:
可视为一种受保护资源,其元数据可能包含 OIDC 特有字段(如claims_supported)。 - 混合用例:
OIDC 提供方可能同时实现 RFC 8414 和 RFC 9728。
7. 实际应用示例 #
7.1 基础资源服务器元数据发现示例 #
# 导入必要的模块
import requests
import json
from urllib.parse import urljoin
# 定义资源服务器的基础URL
base_url = "https://api.example.com"
# 构建资源服务器元数据端点URL
metadata_url = urljoin(base_url, "/.well-known/oauth-resource-metadata")
try:
# 获取资源服务器元数据
print("🔍 正在获取资源服务器元数据...")
metadata_response = requests.get(metadata_url, timeout=10)
metadata_response.raise_for_status()
# 解析元数据
resource_metadata = metadata_response.json()
# 打印获取到的元数据
print(" 成功获取资源服务器元数据:")
print(json.dumps(resource_metadata, indent=2, ensure_ascii=False))
# 提取关键信息
resource = resource_metadata.get("resource")
jwks_uri = resource_metadata.get("jwks_uri")
required_claims = resource_metadata.get("required_token_claims", [])
scopes_supported = resource_metadata.get("scopes_supported", [])
print(f"\n 资源服务器标识: {resource}")
print(f" JWKS端点: {jwks_uri}")
print(f" 必需Access Token声明: {required_claims}")
print(f" 支持的权限范围: {scopes_supported}")
# 检查必需字段
if not resource or not jwks_uri:
print("⚠️ 警告: 缺少必需的元数据字段")
else:
print(" 元数据包含所有必需字段")
except requests.exceptions.RequestException as e:
print(f" 网络请求失败: {e}")
except json.JSONDecodeError as e:
print(f" JSON解析失败: {e}")
except Exception as e:
print(f" 发生未知错误: {e}")7.2 完整的资源服务器元数据管理器 #
# 导入必要的模块
import requests
import json
import time
from urllib.parse import urljoin
from typing import Dict, List, Optional, Union
import jwt
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
# 定义资源服务器元数据管理器类
class ResourceServerMetadataManager:
# 初始化元数据管理器
def __init__(self, base_url: str):
# 设置资源服务器基础URL
self.base_url = base_url
# 初始化会话
self.session = requests.Session()
# 存储元数据
self.metadata = None
# 存储JWKS
self.jwks = None
# 设置缓存过期时间(秒)
self.cache_expiry = 86400 # 24小时
# 缓存时间戳
self.cache_timestamp = 0
# 发现资源服务器元数据
def discover_metadata(self, force_refresh: bool = False) -> bool:
# 检查缓存是否有效
if not force_refresh and self.metadata and (time.time() - self.cache_timestamp) < self.cache_expiry:
print(" 使用缓存的元数据")
return True
# 构建元数据端点URL
metadata_url = urljoin(self.base_url, "/.well-known/oauth-resource-metadata")
try:
# 发送GET请求获取元数据
print(f"🔍 正在发现资源服务器元数据: {metadata_url}")
response = self.session.get(metadata_url, timeout=10)
response.raise_for_status()
# 解析元数据
self.metadata = response.json()
# 验证必需字段
if not self.validate_metadata(self.metadata):
print(" 元数据验证失败")
return False
# 更新缓存时间戳
self.cache_timestamp = time.time()
print(" 成功发现并验证元数据")
return True
except requests.exceptions.RequestException as e:
print(f" 获取元数据失败: {e}")
return False
except json.JSONDecodeError as e:
print(f" 解析元数据失败: {e}")
return False
# 验证元数据格式
def validate_metadata(self, metadata: Dict) -> bool:
# 检查必需字段
required_fields = ["resource", "jwks_uri"]
for field in required_fields:
if field not in metadata:
print(f" 缺少必需字段: {field}")
return False
if not metadata[field]:
print(f" 字段 {field} 不能为空")
return False
# 验证resource字段格式
resource = metadata["resource"]
if not resource.startswith("https://"):
print(" resource字段必须是HTTPS URL")
return False
# 验证jwks_uri字段格式
jwks_uri = metadata["jwks_uri"]
if not jwks_uri.startswith("https://"):
print(" jwks_uri字段必须是HTTPS URL")
return False
return True
# 获取JWKS(JSON Web Key Set)
def get_jwks(self, force_refresh: bool = False) -> Optional[Dict]:
# 检查元数据是否已获取
if not self.metadata:
print(" 请先调用discover_metadata()获取元数据")
return None
# 检查缓存是否有效
if not force_refresh and self.jwks and (time.time() - self.cache_timestamp) < self.cache_expiry:
print(" 使用缓存的JWKS")
return self.jwks
# 获取JWKS端点
jwks_uri = self.metadata.get("jwks_uri")
try:
# 发送GET请求获取JWKS
print(f" 正在获取JWKS: {jwks_uri}")
response = self.session.get(jwks_uri, timeout=10)
response.raise_for_status()
# 解析JWKS
self.jwks = response.json()
print(" 成功获取JWKS")
return self.jwks
except requests.exceptions.RequestException as e:
print(f" 获取JWKS失败: {e}")
return None
except json.JSONDecodeError as e:
print(f" 解析JWKS失败: {e}")
return None
# 验证访问Access Token
def validate_access_token(self, access_token: str) -> tuple[bool, Dict]:
# 获取JWKS
jwks = self.get_jwks()
if not jwks:
return False, {"error": "无法获取JWKS"}
try:
# 解码Access Token(不验证签名,仅获取头部和载荷)
unverified_token = jwt.decode(access_token, options={"verify_signature": False})
# 检查必需声明
required_claims = self.metadata.get("required_token_claims", [])
missing_claims = []
for claim in required_claims:
if claim not in unverified_token:
missing_claims.append(claim)
if missing_claims:
return False, {
"error": "missing_required_claims",
"missing_claims": missing_claims
}
# 检查aud声明(如果存在)
if "aud" in unverified_token:
expected_aud = self.metadata.get("resource")
if unverified_token["aud"] != expected_aud:
return False, {
"error": "invalid_audience",
"expected": expected_aud,
"actual": unverified_token["aud"]
}
# 检查Access Token是否过期
if "exp" in unverified_token:
current_time = int(time.time())
if current_time > unverified_token["exp"]:
return False, {
"error": "token_expired",
"expires_at": unverified_token["exp"],
"current_time": current_time
}
# 检查签名算法
if "alg" in unverified_token:
supported_algs = self.metadata.get("token_signing_alg_values_supported", [])
if supported_algs and unverified_token["alg"] not in supported_algs:
return False, {
"error": "unsupported_algorithm",
"algorithm": unverified_token["alg"],
"supported": supported_algs
}
print(" 访问Access Token验证通过")
return True, unverified_token
except jwt.InvalidTokenError as e:
return False, {"error": "invalid_token", "details": str(e)}
except Exception as e:
return False, {"error": "validation_error", "details": str(e)}
# 获取支持的权限范围
def get_supported_scopes(self) -> List[str]:
# 从元数据中获取支持的权限范围
if not self.metadata:
return []
return self.metadata.get("scopes_supported", [])
# 检查权限范围是否支持
def is_scope_supported(self, scope: str) -> bool:
# 检查指定的权限范围是否被支持
supported_scopes = self.get_supported_scopes()
return scope in supported_scopes
# 获取Access Token自省端点
def get_introspection_endpoint(self) -> Optional[str]:
# 从元数据中获取Access Token自省端点
if not self.metadata:
return None
return self.metadata.get("token_introspection_endpoint")
# 获取Access Token端点
def get_token_endpoint(self) -> Optional[str]:
# 从元数据中获取Access Token端点
if not self.metadata:
return None
return self.metadata.get("token_endpoint")
# 获取访问Access Token生命周期
def get_access_token_lifetime(self) -> Optional[int]:
# 从元数据中获取访问Access Token生命周期
if not self.metadata:
return None
return self.metadata.get("access_token_lifetime")
# 清除缓存
def clear_cache(self):
# 清空缓存
self.metadata = None
self.jwks = None
self.cache_timestamp = 0
print("🗑️ 缓存已清除")
# 获取缓存状态
def get_cache_status(self) -> Dict:
# 构建缓存状态信息
if not self.metadata:
return {"status": "no_cache"}
age = time.time() - self.cache_timestamp
is_expired = age > self.cache_expiry
return {
"status": "cached",
"age_seconds": int(age),
"age_hours": round(age / 3600, 2),
"is_expired": is_expired,
"expires_in_seconds": max(0, self.cache_expiry - age)
}
# 定义资源服务器元数据构建器类
class ResourceServerMetadataBuilder:
# 构建基础资源服务器元数据
@staticmethod
def build_basic_metadata(resource_url: str, jwks_uri: str) -> Dict:
# 构建基础元数据
metadata = {
"resource": resource_url,
"jwks_uri": jwks_uri
}
return metadata
# 构建完整资源服务器元数据
@staticmethod
def build_full_metadata(resource_url: str, jwks_uri: str,
scopes: List[str] = None,
signing_algs: List[str] = None,
required_claims: List[str] = None) -> Dict:
# 构建完整元数据
metadata = {
"resource": resource_url,
"jwks_uri": jwks_uri
}
# 添加可选的权限范围
if scopes:
metadata["scopes_supported"] = scopes
# 添加支持的签名算法
if signing_algs:
metadata["token_signing_alg_values_supported"] = signing_algs
# 添加必需的Access Token声明
if required_claims:
metadata["required_token_claims"] = required_claims
# 添加其他常用字段
metadata["access_token_lifetime"] = 3600 # 默认1小时
return metadata
# 构建OpenID Connect资源服务器元数据
@staticmethod
def build_oidc_metadata(resource_url: str, jwks_uri: str,
claims_supported: List[str] = None) -> Dict:
# 构建OIDC资源服务器元数据
metadata = {
"resource": resource_url,
"jwks_uri": jwks_uri,
"scopes_supported": ["openid", "profile", "email"],
"token_signing_alg_values_supported": ["RS256", "ES256"],
"required_token_claims": ["iss", "exp", "aud", "sub"]
}
# 添加OIDC特有的声明支持
if claims_supported:
metadata["claims_supported"] = claims_supported
return metadata
# 使用示例
def main():
# 配置资源服务器URL
base_url = "https://api.example.com"
# 创建资源服务器元数据管理器
manager = ResourceServerMetadataManager(base_url)
# 发现资源服务器元数据
if not manager.discover_metadata():
print(" 无法发现资源服务器元数据,退出程序")
return
# 显示元数据信息
print("\n 资源服务器元数据:")
print(f" 资源标识: {manager.metadata.get('resource')}")
print(f" JWKS端点: {manager.metadata.get('jwks_uri')}")
print(f" 支持的权限范围: {manager.get_supported_scopes()}")
print(f" Access Token端点: {manager.get_token_endpoint()}")
print(f" 自省端点: {manager.get_introspection_endpoint()}")
print(f" Access Token生命周期: {manager.get_access_token_lifetime()}秒")
# 获取JWKS
jwks = manager.get_jwks()
if jwks:
print(f"\n JWKS包含 {len(jwks.get('keys', []))} 个密钥")
# 演示Access Token验证(使用示例Access Token)
print("\n🔍 演示Access Token验证...")
# 注意:这是一个示例Access Token,实际应用中应该使用真实的访问Access Token
sample_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
is_valid, result = manager.validate_access_token(sample_token)
if is_valid:
print(" 示例Access Token验证通过")
print(f" Access Token载荷: {result}")
else:
print(" 示例Access Token验证失败")
print(f" 错误信息: {result}")
# 显示缓存状态
print("\n📊 缓存状态:")
cache_status = manager.get_cache_status()
for key, value in cache_status.items():
print(f" {key}: {value}")
# 测试权限范围检查
print("\n 权限范围检查:")
test_scopes = ["read", "write", "admin"]
for scope in test_scopes:
is_supported = manager.is_scope_supported(scope)
status = " 支持" if is_supported else " 不支持"
print(f" {scope}: {status}")
# 清除缓存
print("\n🗑️ 清除缓存...")
manager.clear_cache()
# 如果直接运行此脚本
if __name__ == "__main__":
main()7.3 资源服务器实现示例 #
# 导入必要的模块
from flask import Flask, jsonify, request, make_response
import json
import time
from datetime import datetime, timedelta
# 创建Flask应用实例
app = Flask(__name__)
# 配置资源服务器元数据
RESOURCE_METADATA = {
"resource": "https://api.example.com",
"jwks_uri": "https://api.example.com/oauth/jwks",
"token_endpoint": "https://auth.example.com/oauth/token",
"token_introspection_endpoint": "https://auth.example.com/oauth/introspect",
"scopes_supported": ["read", "write", "delete", "admin"],
"token_signing_alg_values_supported": ["RS256", "ES256"],
"required_token_claims": ["iss", "exp", "aud", "sub"],
"access_token_lifetime": 3600,
"resource_signing_alg": "ES256"
}
# 模拟JWKS端点
@app.route('/oauth/jwks')
def jwks():
# 返回模拟的JWKS
jwks_data = {
"keys": [
{
"kty": "RSA",
"kid": "example-key-1",
"use": "sig",
"alg": "RS256",
"n": "example-modulus",
"e": "AQAB"
}
]
}
# 设置响应头
response = make_response(jsonify(jwks_data))
response.headers['Content-Type'] = 'application/json'
response.headers['Cache-Control'] = 'public, max-age=86400'
return response
# 资源服务器元数据端点
@app.route('/.well-known/oauth-resource-metadata')
def resource_metadata():
# 返回资源服务器元数据
response = make_response(jsonify(RESOURCE_METADATA))
response.headers['Content-Type'] = 'application/json'
response.headers['Cache-Control'] = 'public, max-age=86400'
return response
# 受保护的API端点示例
@app.route('/api/protected')
def protected_resource():
# 获取Authorization头
auth_header = request.headers.get('Authorization')
if not auth_header:
# 返回401未授权错误
return jsonify({
"error": "unauthorized",
"error_description": "缺少访问Access Token"
}), 401
# 检查BearerAccess Token格式
if not auth_header.startswith('Bearer '):
return jsonify({
"error": "invalid_request",
"error_description": "无效的授权头格式"
}), 400
# 提取访问Access Token
access_token = auth_header[7:] # 移除"Bearer "前缀
# 这里应该验证Access Token(简化示例)
# 实际应用中应该使用JWT库验证Access Token签名和声明
# 模拟Access Token验证
if not validate_token_simple(access_token):
return jsonify({
"error": "invalid_token",
"error_description": "访问Access Token无效或已过期"
}), 401
# 返回受保护的资源
return jsonify({
"message": "这是受保护的资源",
"timestamp": datetime.now().isoformat(),
"resource": "protected-api",
"access_granted": True
})
# 简化的Access Token验证函数(仅用于演示)
def validate_token_simple(token: str) -> bool:
# 这是一个简化的验证函数,实际应用中应该:
# 1. 验证JWT签名
# 2. 检查必需声明
# 3. 验证过期时间
# 4. 检查权限范围
# 简单检查:Access Token不为空且长度合理
if not token or len(token) < 10:
return False
# 模拟检查:Access Token包含"valid"字符串
if "valid" in token.lower():
return True
return False
# 健康检查端点
@app.route('/health')
def health_check():
# 返回服务健康状态
return jsonify({
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"service": "resource-server",
"version": "1.0.0"
})
# 错误处理
@app.errorhandler(404)
def not_found(error):
# 处理404错误
return jsonify({
"error": "not_found",
"error_description": "请求的资源不存在"
}), 404
@app.errorhandler(500)
def internal_error(error):
# 处理500错误
return jsonify({
"error": "internal_server_error",
"error_description": "服务器内部错误"
}), 500
# 主函数
if __name__ == '__main__':
# 打印启动信息
print(" 启动资源服务器...")
print(f" 元数据端点: http://localhost:5000/.well-known/oauth-resource-metadata")
print(f" JWKS端点: http://localhost:5000/oauth/jwks")
print(f" 受保护API: http://localhost:5000/api/protected")
print(f"💚 健康检查: http://localhost:5000/health")
# 启动Flask应用
app.run(host='0.0.0.0', port=5000, debug=True)8. 错误处理 #
资源服务器可能返回以下错误:
{
"error": "invalid_token",
"error_description": "The access token is missing required 'aud' claim."
}常见错误码:
invalid_request:元数据请求格式错误。invalid_token:Access Token验证失败(如签名无效或声明缺失)。
9. 总结与最佳实践 #
- 优先使用动态发现:避免硬编码资源服务器配置。
- 严格验证Access Token:遵循
required_token_claims和算法约束。 - 生命周期管理:定期更新缓存的元数据和 JWKS。
- 与 OIDC 协同:若资源服务器同时支持 OIDC,检查扩展元数据字段。
通过使用上述代码示例,您可以:
- 自动发现资源服务器的配置信息
- 验证访问Access Token的签名和声明
- 管理元数据缓存和JWKS更新
- 实现完整的资源服务器功能
- 支持多种Access Token验证策略
RFC 9728 填补了 OAuth 2.0 生态中资源服务器标准化配置的空白,与 RFC 8414 共同构成了完整的"客户端-授权服务器-资源服务器"自动化集成链条。