ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1. OAuth 2.0 授权服务器元数据
  • 2. 核心概念
    • 2.1 元数据的作用
    • 2.2 适用场景
  • 3. 元数据端点
    • 3.1 发现机制
    • 3.2 响应格式
  • 4. 关键元数据字段
    • 4.1 必需字段
    • 4.2 可选字段
  • 5. 动态客户端注册(可选)
  • 6. 安全性考虑
  • 7. 实际应用示例
    • 7.1 客户端发现流程
    • 7.2 代码示例(Python)
      • 基础元数据获取示例
      • 完整的OAuth客户端配置示例
      • 元数据验证和缓存示例
  • 8. 与 OpenID Connect 的关系
  • 9. 总结

1. OAuth 2.0 授权服务器元数据 #

OAuth 2.0 授权服务器元数据(RFC 8414)定义了一种标准化的方式,允许客户端自动发现授权服务器的配置信息(如端点 URL、支持的算法、能力声明等)。这简化了客户端的集成,并减少了手动配置错误的风险。

2. 核心概念 #

2.1 元数据的作用 #

  • 自动发现:客户端可通过固定或配置的元数据端点获取授权服务器的详细信息。
  • 标准化接口:统一了不同 OAuth 2.0/OpenID Connect 实现的配置格式。
  • 动态适应:支持运行时获取服务器配置(如端点变更、算法更新)。

2.2 适用场景 #

  • OAuth 2.0 授权服务器(如 Keycloak、Auth0、Azure AD)。
  • OpenID Connect 提供商(OIDC 是 OAuth 2.0 的扩展,复用此元数据规范)。

3. 元数据端点 #

3.1 发现机制 #

  • 固定路径:
    授权服务器通常在 /.well-known/oauth-authorization-server 或 /.well-known/openid-configuration(OIDC)提供元数据。
    GET /.well-known/oauth-authorization-server HTTP/1.1
    Host: auth.example.com

3.2 响应格式 #

元数据以 JSON 格式返回,字段遵循 IANA OAuth 参数注册表。

示例响应:

{
  "issuer": "https://auth.example.com",
  "authorization_endpoint": "https://auth.example.com/oauth/authorize",
  "token_endpoint": "https://auth.example.com/oauth/token",
  "jwks_uri": "https://auth.example.com/oauth/jwks",
  "scopes_supported": ["openid", "profile", "email"],
  "response_types_supported": ["code", "token"],
  "grant_types_supported": ["authorization_code", "client_credentials"],
  "token_endpoint_auth_methods_supported": ["client_secret_basic", "private_key_jwt"],
  "code_challenge_methods_supported": ["S256", "plain"]
}

4. 关键元数据字段 #

4.1 必需字段 #

字段名 描述
issuer 授权服务器的唯一标识(必须为 HTTPS URL)。
authorization_endpoint 授权端点 URL(用户登录和同意页面)。
token_endpoint Access Token端点 URL(用于交换 code 或获取 access_token)。
jwks_uri JWK Set 文档的 URL,包含用于验证Access Token的公钥。

4.2 可选字段 #

字段名 描述
scopes_supported 支持的权限范围(如 openid、profile)。
response_types_supported 支持的响应类型(如 code、token)。
grant_types_supported 支持的授权类型(如 authorization_code、client_credentials)。
token_endpoint_auth_methods_supported Access Token端点的客户端认证方式(如 client_secret_basic、private_key_jwt)。
code_challenge_methods_supported 支持的 PKCE 代码挑战方法(如 S256)。
revocation_endpoint Access Token撤销端点 URL(RFC 7009)。
introspection_endpoint Access Token自省端点 URL(RFC 7662)。

5. 动态客户端注册(可选) #

如果授权服务器支持 RFC 7591,元数据中可能包含以下字段:

{
  "registration_endpoint": "https://auth.example.com/oauth/register",
  "registration_management_endpoint": "https://auth.example.com/oauth/register/{client_id}"
}

6. 安全性考虑 #

  1. HTTPS 强制要求:
    元数据必须通过 HTTPS 传输,防止中间人攻击。
  2. 缓存控制:
    客户端应缓存元数据,但需遵循 Cache-Control 头部(如 max-age=86400)。
  3. 签名验证(OIDC):
    OpenID Connect 要求元数据文档可通过 issuer 的 TLS 证书或签名验证。

7. 实际应用示例 #

7.1 客户端发现流程 #

  1. 客户端访问元数据端点:
    curl https://auth.example.com/.well-known/oauth-authorization-server
  2. 解析响应并配置自身:
    • 使用 authorization_endpoint 发起授权请求。
    • 使用 token_endpoint 交换Access Token。
    • 使用 jwks_uri 验证 JWT 签名。

7.2 代码示例(Python) #

基础元数据获取示例 #

# 导入必要的模块
import requests
import json
from urllib.parse import urljoin

# 定义授权服务器的基础URL
base_url = "https://auth.example.com"

# 构建元数据端点URL
metadata_url = urljoin(base_url, "/.well-known/oauth-authorization-server")

# 发送GET请求获取元数据
response = requests.get(metadata_url)

# 检查请求是否成功
if response.status_code == 200:
    # 解析JSON响应
    metadata = response.json()

    # 打印获取到的元数据
    print(" 成功获取授权服务器元数据:")
    print(json.dumps(metadata, indent=2, ensure_ascii=False))

    # 提取关键信息
    issuer = metadata.get("issuer")
    auth_endpoint = metadata.get("authorization_endpoint")
    token_endpoint = metadata.get("token_endpoint")
    jwks_uri = metadata.get("jwks_uri")

    print(f"\n 服务器标识: {issuer}")
    print(f" 授权端点: {auth_endpoint}")
    print(f"🎫 Access Token端点: {token_endpoint}")
    print(f" JWKS端点: {jwks_uri}")

else:
    # 处理错误情况
    print(f" 获取元数据失败: {response.status_code}")
    print(f"错误信息: {response.text}")

完整的OAuth客户端配置示例 #

# 导入必要的模块
import requests
import json
from urllib.parse import urljoin, urlencode
import secrets
import hashlib
import base64

# 定义OAuth客户端配置类
class OAuthClient:
    # 初始化客户端
    def __init__(self, base_url, client_id, client_secret):
        # 设置基础URL
        self.base_url = base_url
        # 设置客户端ID
        self.client_id = client_id
        # 设置客户端密钥
        self.client_secret = client_secret
        # 初始化元数据
        self.metadata = None
        # 初始化会话
        self.session = requests.Session()

    # 获取授权服务器元数据
    def discover_metadata(self):
        # 构建元数据端点URL
        metadata_url = urljoin(self.base_url, "/.well-known/oauth-authorization-server")

        try:
            # 发送GET请求获取元数据
            response = self.session.get(metadata_url)
            # 检查响应状态
            response.raise_for_status()

            # 解析JSON响应
            self.metadata = response.json()
            print(" 成功获取授权服务器元数据")
            return True

        except requests.exceptions.RequestException as e:
            # 处理请求异常
            print(f" 获取元数据失败: {e}")
            return False

    # 生成授权URL
    def build_authorization_url(self, redirect_uri, scope="openid profile", state=None):
        # 检查元数据是否已获取
        if not self.metadata:
            print(" 请先调用discover_metadata()获取元数据")
            return None

        # 获取授权端点
        auth_endpoint = self.metadata.get("authorization_endpoint")
        if not auth_endpoint:
            print(" 元数据中未找到授权端点")
            return None

        # 生成随机状态值(如果未提供)
        if not state:
            state = secrets.token_urlsafe(32)

        # 生成PKCE代码挑战
        code_verifier = secrets.token_urlsafe(32)
        code_challenge = base64.urlsafe_b64encode(
            hashlib.sha256(code_verifier.encode()).digest()
        ).decode().rstrip('=')

        # 构建授权参数
        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': redirect_uri,
            'scope': scope,
            'state': state,
            'code_challenge': code_challenge,
            'code_challenge_method': 'S256'
        }

        # 构建完整URL
        auth_url = f"{auth_endpoint}?{urlencode(params)}"

        # 保存代码验证器(实际应用中应存储在会话中)
        self.code_verifier = code_verifier

        return auth_url

    # 交换授权码获取访问Access Token
    def exchange_code_for_token(self, authorization_code, redirect_uri):
        # 检查元数据是否已获取
        if not self.metadata:
            print(" 请先调用discover_metadata()获取元数据")
            return None

        # 获取Access Token端点
        token_endpoint = self.metadata.get("token_endpoint")
        if not token_endpoint:
            print(" 元数据中未找到Access Token端点")
            return None

        # 准备Access Token请求数据
        token_data = {
            'grant_type': 'authorization_code',
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'code': authorization_code,
            'redirect_uri': redirect_uri,
            'code_verifier': getattr(self, 'code_verifier', '')
        }

        try:
            # 发送POST请求获取Access Token
            response = self.session.post(token_endpoint, data=token_data)
            # 检查响应状态
            response.raise_for_status()

            # 解析Access Token响应
            token_response = response.json()
            print(" 成功获取访问Access Token")
            return token_response

        except requests.exceptions.RequestException as e:
            # 处理请求异常
            print(f" 获取Access Token失败: {e}")
            return None

    # 获取用户信息(如果支持OpenID Connect)
    def get_user_info(self, access_token):
        # 检查是否支持用户信息端点
        userinfo_endpoint = self.metadata.get("userinfo_endpoint")
        if not userinfo_endpoint:
            print(" 此授权服务器不支持用户信息端点")
            return None

        # 设置授权头
        headers = {'Authorization': f'Bearer {access_token}'}

        try:
            # 发送GET请求获取用户信息
            response = self.session.get(userinfo_endpoint, headers=headers)
            # 检查响应状态
            response.raise_for_status()

            # 解析用户信息
            user_info = response.json()
            print(" 成功获取用户信息")
            return user_info

        except requests.exceptions.RequestException as e:
            # 处理请求异常
            print(f" 获取用户信息失败: {e}")
            return None

# 使用示例
def main():
    # 配置客户端参数
    base_url = "https://auth.example.com"
    client_id = "your_client_id"
    client_secret = "your_client_secret"
    redirect_uri = "https://your-app.com/callback"

    # 创建OAuth客户端实例
    client = OAuthClient(base_url, client_id, client_secret)

    # 获取授权服务器元数据
    if client.discover_metadata():
        # 构建授权URL
        auth_url = client.build_authorization_url(redirect_uri)
        if auth_url:
            print(f"\n🔗 授权URL: {auth_url}")
            print("\n📝 请将此URL发送给用户进行授权")

        # 模拟授权码(实际应用中从回调中获取)
        auth_code = "sample_authorization_code"

        # 交换Access Token
        token_response = client.exchange_code_for_token(auth_code, redirect_uri)
        if token_response:
            print(f"\n🎫 Access Token响应: {json.dumps(token_response, indent=2, ensure_ascii=False)}")

            # 获取用户信息
            access_token = token_response.get("access_token")
            if access_token:
                user_info = client.get_user_info(access_token)
                if user_info:
                    print(f"\n👤 用户信息: {json.dumps(user_info, indent=2, ensure_ascii=False)}")

# 如果直接运行此脚本
if __name__ == "__main__":
    main()

元数据验证和缓存示例 #

# 导入必要的模块
import requests
import json
import time
from urllib.parse import urljoin
from typing import Dict, Optional

# 定义元数据管理器类
class MetadataManager:
    # 初始化元数据管理器
    def __init__(self, base_url: str):
        # 设置基础URL
        self.base_url = base_url
        # 初始化元数据缓存
        self.metadata_cache = {}
        # 设置缓存过期时间(秒)
        self.cache_expiry = 86400  # 24小时

    # 获取元数据端点URL
    def get_metadata_url(self, endpoint_type: str = "oauth") -> str:
        # 根据类型选择端点
        if endpoint_type == "oauth":
            # OAuth 2.0 元数据端点
            path = "/.well-known/oauth-authorization-server"
        elif endpoint_type == "oidc":
            # OpenID Connect 元数据端点
            path = "/.well-known/openid-configuration"
        else:
            # 默认使用OAuth端点
            path = "/.well-known/oauth-authorization-server"

        # 构建完整URL
        return urljoin(self.base_url, path)

    # 验证元数据格式
    def validate_metadata(self, metadata: Dict) -> tuple[bool, list]:
        # 初始化错误列表
        errors = []

        # 检查必需字段
        required_fields = [
            "issuer", 
            "authorization_endpoint", 
            "token_endpoint"
        ]

        # 验证每个必需字段
        for field in required_fields:
            if field not in metadata:
                errors.append(f"缺少必需字段: {field}")
            elif not metadata[field]:
                errors.append(f"字段 {field} 不能为空")

        # 验证issuer字段格式
        if "issuer" in metadata:
            issuer = metadata["issuer"]
            if not issuer.startswith("https://"):
                errors.append("issuer字段必须是HTTPS URL")
            if issuer != self.base_url.rstrip('/'):
                errors.append("issuer字段必须与基础URL匹配")

        # 验证端点URL格式
        url_fields = ["authorization_endpoint", "token_endpoint", "jwks_uri"]
        for field in url_fields:
            if field in metadata and metadata[field]:
                url = metadata[field]
                if not url.startswith("https://"):
                    errors.append(f"字段 {field} 必须是HTTPS URL")

        # 检查是否支持必要的授权类型
        if "grant_types_supported" in metadata:
            supported_types = metadata["grant_types_supported"]
            if "authorization_code" not in supported_types:
                errors.append("必须支持authorization_code授权类型")

        # 返回验证结果
        is_valid = len(errors) == 0
        return is_valid, errors

    # 获取元数据(带缓存)
    def get_metadata(self, endpoint_type: str = "oauth", force_refresh: bool = False) -> Optional[Dict]:
        # 构建缓存键
        cache_key = f"{endpoint_type}_{self.base_url}"

        # 检查缓存是否有效
        if not force_refresh and cache_key in self.metadata_cache:
            cached_data = self.metadata_cache[cache_key]
            # 检查缓存是否过期
            if time.time() - cached_data["timestamp"] < self.cache_expiry:
                print(" 使用缓存的元数据")
                return cached_data["metadata"]
            else:
                # 缓存已过期,删除
                del self.metadata_cache[cache_key]

        # 获取元数据端点URL
        metadata_url = self.get_metadata_url(endpoint_type)

        try:
            # 发送GET请求获取元数据
            print(f"🔄 正在获取元数据: {metadata_url}")
            response = requests.get(metadata_url, timeout=10)

            # 检查响应状态
            response.raise_for_status()

            # 解析JSON响应
            metadata = response.json()

            # 验证元数据格式
            is_valid, errors = self.validate_metadata(metadata)

            if not is_valid:
                # 打印验证错误
                print(" 元数据验证失败:")
                for error in errors:
                    print(f"  - {error}")
                return None

            # 缓存元数据
            self.metadata_cache[cache_key] = {
                "metadata": metadata,
                "timestamp": time.time()
            }

            print(" 成功获取并验证元数据")
            return metadata

        except requests.exceptions.RequestException as e:
            # 处理请求异常
            print(f" 获取元数据失败: {e}")
            return None
        except json.JSONDecodeError as e:
            # 处理JSON解析错误
            print(f" JSON解析失败: {e}")
            return None

    # 清除缓存
    def clear_cache(self):
        # 清空元数据缓存
        self.metadata_cache.clear()
        print("🗑️ 元数据缓存已清除")

    # 获取缓存状态
    def get_cache_status(self) -> Dict:
        # 构建缓存状态信息
        cache_status = {}

        for cache_key, cached_data in self.metadata_cache.items():
            # 计算缓存年龄
            age = time.time() - cached_data["timestamp"]
            # 检查是否过期
            is_expired = age > self.cache_expiry

            cache_status[cache_key] = {
                "age_seconds": int(age),
                "age_hours": round(age / 3600, 2),
                "is_expired": is_expired,
                "expires_in_seconds": max(0, self.cache_expiry - age)
            }

        return cache_status

# 使用示例
def main():
    # 配置授权服务器URL
    base_url = "https://auth.example.com"

    # 创建元数据管理器实例
    manager = MetadataManager(base_url)

    # 获取OAuth元数据
    print("🔍 获取OAuth 2.0元数据...")
    oauth_metadata = manager.get_metadata("oauth")

    if oauth_metadata:
        print("\n OAuth元数据:")
        print(json.dumps(oauth_metadata, indent=2, ensure_ascii=False))

    # 获取OpenID Connect元数据
    print("\n🔍 获取OpenID Connect元数据...")
    oidc_metadata = manager.get_metadata("oidc")

    if oidc_metadata:
        print("\n OIDC元数据:")
        print(json.dumps(oidc_metadata, indent=2, ensure_ascii=False))

    # 显示缓存状态
    print("\n📊 缓存状态:")
    cache_status = manager.get_cache_status()
    for key, status in cache_status.items():
        print(f"  {key}:")
        print(f"    年龄: {status['age_hours']}小时")
        print(f"    是否过期: {'是' if status['is_expired'] else '否'}")
        print(f"    剩余时间: {status['expires_in_seconds']}秒")

    # 测试强制刷新
    print("\n🔄 强制刷新元数据...")
    refreshed_metadata = manager.get_metadata("oauth", force_refresh=True)

    if refreshed_metadata:
        print(" 元数据刷新成功")

    # 清除缓存
    print("\n🗑️ 清除缓存...")
    manager.clear_cache()

# 如果直接运行此脚本
if __name__ == "__main__":
    main()

8. 与 OpenID Connect 的关系 #

  • OIDC 复用元数据:
    OIDC 的发现端点(/.well-known/openid-configuration)扩展了 OAuth 2.0 元数据,添加了 userinfo_endpoint、end_session_endpoint 等字段。
  • 兼容性:
    纯 OAuth 2.0 服务器可能不提供 OIDC 特有的字段(如 claims_supported)。

9. 总结 #

RFC 8414 通过标准化元数据格式,显著简化了 OAuth 2.0 客户端的集成流程,同时提升了安全性和可维护性。开发者应优先使用自动发现机制,而非硬编码服务器配置。

通过使用上述代码示例,您可以:

  1. 自动发现授权服务器配置
  2. 验证元数据格式和内容
  3. 实现完整的OAuth流程
  4. 管理元数据缓存
  5. 支持多种端点类型

访问验证

请输入访问令牌

Token不正确,请重新输入