ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1. OAuth 2.0 动态客户端注册协议
  • 2. 核心目标与适用场景
    • 2.1 解决的问题
    • 2.2 适用场景
  • 3. 协议流程
    • 3.1 客户端注册请求
    • 3.2 授权服务器响应
  • 4. 关键客户端元数据字段
    • 4.1 必需字段
    • 4.2 可选字段
  • 5. 安全性设计
    • 5.1 认证要求
    • 5.2 敏感字段保护
    • 5.3 客户端分类处理
  • 6. 客户端管理操作
    • 6.1 更新客户端
    • 6.2 删除客户端
  • 7. 错误处理
  • 8. 实际应用示例
    • 8.1 基础动态客户端注册示例
    • 8.2 完整的动态客户端注册管理器
    • 8.3 与 OIDC 的集成示例
  • 9. 与相关规范的关联
  • 10. 总结

1. OAuth 2.0 动态客户端注册协议 #

OAuth 2.0 动态客户端注册协议(RFC 7591)定义了一种标准化的方法,允许 OAuth 客户端在授权服务器上动态注册,无需手动配置客户端凭据(如 client_id 和 client_secret)。这一机制特别适合多租户应用、移动应用或 CI/CD 自动化场景。

2. 核心目标与适用场景 #

2.1 解决的问题 #

  • 减少手动配置:避免管理员手动预注册客户端。
  • 支持多租户架构:例如 SaaS 应用为每个租户自动创建客户端。
  • 增强灵活性:移动应用或 IoT 设备可在首次运行时动态注册。

2.2 适用场景 #

  • 公共客户端(如 SPA、移动应用)。
  • 需要自动化管理的机密客户端(如微服务)。

3. 协议流程 #

动态客户端注册分为两个主要阶段:

  1. 客户端注册请求 → 2. 授权服务器响应。

3.1 客户端注册请求 #

客户端向授权服务器的注册端点(registration_endpoint,通常通过 RFC 8414 元数据发现)发送 HTTP POST 请求,包含客户端元数据(JSON 格式)。

请求示例:

POST /oauth/register HTTP/1.1
Host: auth.example.com
Content-Type: application/json
Accept: application/json

{
  "client_name": "My Example App",
  "redirect_uris": ["https://client.example.org/callback"],
  "grant_types": ["authorization_code", "refresh_token"],
  "response_types": ["code"],
  "token_endpoint_auth_method": "client_secret_basic",
  "scope": "read write"
}

3.2 授权服务器响应 #

成功时返回 201 Created,包含生成的客户端凭据和注册信息。

响应示例:

{
  "client_id": "s6BhdRkqt3",
  "client_secret": "cf136dc3c1fc93f31185e5885805d",
  "client_secret_expires_at": 0, // 0 表示永不过期
  "client_id_issued_at": 1609459200,
  "redirect_uris": ["https://client.example.org/callback"],
  "grant_types": ["authorization_code", "refresh_token"],
  "scope": "read write"
}

4. 关键客户端元数据字段 #

4.1 必需字段 #

字段名 描述
redirect_uris 允许的重定向 URI(必须与授权请求中的 redirect_uri 精确匹配)。
client_name 客户端的人类可读名称(用于用户同意界面)。

4.2 可选字段 #

字段名 描述
grant_types 支持的授权类型(如 authorization_code、client_credentials)。
response_types 支持的响应类型(如 code、token)。
token_endpoint_auth_method Access Token端点认证方式(如 client_secret_basic、none 用于公共客户端)。
scope 默认请求的权限范围。
jwks_uri 客户端公钥集 URL(用于 private_key_jwt 认证)。

5. 安全性设计 #

5.1 认证要求 #

  • 初始注册:通常无需认证(依赖其他机制如 TLS 或 IP 白名单)。
  • 后续管理:需认证(如使用 registration_access_token)。

5.2 敏感字段保护 #

  • client_secret:仅在响应中返回一次,建议客户端立即安全存储。
  • registration_access_token:用于更新或删除客户端(类似 OAuth 的 refresh_token)。

5.3 客户端分类处理 #

客户端类型 认证方式建议
公共客户端 token_endpoint_auth_method=none
机密客户端 client_secret_basic 或 private_key_jwt

6. 客户端管理操作 #

6.1 更新客户端 #

使用 registration_access_token 发送 HTTP PUT 请求:

PUT /oauth/register/s6BhdRkqt3 HTTP/1.1
Host: auth.example.com
Authorization: Bearer reg-23410913-abewfq.123483
Content-Type: application/json

{
  "client_name": "Updated App Name",
  "redirect_uris": ["https://client.example.org/new_callback"]
}

6.2 删除客户端 #

发送 HTTP DELETE 请求:

DELETE /oauth/register/s6BhdRkqt3 HTTP/1.1
Host: auth.example.com
Authorization: Bearer reg-23410913-abewfq.123483

7. 错误处理 #

授权服务器返回标准 OAuth 错误格式(RFC 6749):

{
  "error": "invalid_redirect_uri",
  "error_description": "The redirect_uri is not allowed."
}

常见错误码:

  • invalid_redirect_uri:重定向 URI 不符合策略。
  • invalid_client_metadata:元数据字段无效。
  • access_denied:管理操作未授权。

8. 实际应用示例 #

8.1 基础动态客户端注册示例 #

# 导入必要的模块
import requests
import json
from urllib.parse import urljoin

# 定义授权服务器的基础URL
base_url = "https://auth.example.com"

# 构建元数据发现端点URL
metadata_url = urljoin(base_url, "/.well-known/oauth-authorization-server")

try:
    # 获取授权服务器元数据
    print("🔍 正在获取授权服务器元数据...")
    metadata_response = requests.get(metadata_url, timeout=10)
    metadata_response.raise_for_status()

    # 解析元数据
    metadata = metadata_response.json()

    # 检查是否支持动态客户端注册
    registration_endpoint = metadata.get("registration_endpoint")
    if not registration_endpoint:
        print(" 此授权服务器不支持动态客户端注册")
        exit(1)

    print(f" 找到注册端点: {registration_endpoint}")

    # 准备客户端注册数据
    client_data = {
        "client_name": "我的示例应用",
        "redirect_uris": ["https://myapp.example/callback"],
        "grant_types": ["authorization_code"]
    }

    # 发送客户端注册请求
    print("📝 正在注册新客户端...")
    registration_response = requests.post(
        registration_endpoint,
        json=client_data,
        headers={"Content-Type": "application/json"},
        timeout=10
    )

    # 检查注册响应
    if registration_response.status_code == 201:
        # 注册成功
        client_info = registration_response.json()
        print(" 客户端注册成功!")
        print(f" 客户端ID: {client_info['client_id']}")
        print(f" 客户端密钥: {client_info['client_secret']}")
        print(f"⏰ 注册时间: {client_info['client_id_issued_at']}")

        # 保存客户端信息(实际应用中应加密存储)
        with open("client_info.json", "w", encoding="utf-8") as f:
            json.dump(client_info, f, indent=2, ensure_ascii=False)
        print("💾 客户端信息已保存到 client_info.json")

    else:
        # 注册失败
        print(f" 客户端注册失败: {registration_response.status_code}")
        error_info = registration_response.json()
        print(f"错误类型: {error_info['error']}")
        print(f"错误描述: {error_info['error_description']}")

except requests.exceptions.RequestException as e:
    print(f" 网络请求失败: {e}")
except json.JSONDecodeError as e:
    print(f" JSON解析失败: {e}")
except Exception as e:
    print(f" 发生未知错误: {e}")

8.2 完整的动态客户端注册管理器 #

# 导入必要的模块
import requests
import json
import time
from urllib.parse import urljoin
from typing import Dict, List, Optional, Union
import os

# 定义动态客户端注册管理器类
class DynamicClientRegistrationManager:
    # 初始化注册管理器
    def __init__(self, base_url: str):
        # 设置授权服务器基础URL
        self.base_url = base_url
        # 初始化会话
        self.session = requests.Session()
        # 存储元数据
        self.metadata = None
        # 存储注册端点
        self.registration_endpoint = None

    # 发现授权服务器元数据
    def discover_metadata(self) -> bool:
        # 构建元数据发现端点URL
        metadata_url = urljoin(self.base_url, "/.well-known/oauth-authorization-server")

        try:
            # 发送GET请求获取元数据
            print(f"🔍 正在发现授权服务器元数据: {metadata_url}")
            response = self.session.get(metadata_url, timeout=10)
            response.raise_for_status()

            # 解析元数据
            self.metadata = response.json()

            # 检查是否支持动态客户端注册
            self.registration_endpoint = self.metadata.get("registration_endpoint")
            if not self.registration_endpoint:
                print(" 此授权服务器不支持动态客户端注册")
                return False

            print(f" 成功发现元数据,注册端点: {self.registration_endpoint}")
            return True

        except requests.exceptions.RequestException as e:
            print(f" 获取元数据失败: {e}")
            return False
        except json.JSONDecodeError as e:
            print(f" 解析元数据失败: {e}")
            return False

    # 注册新客户端
    def register_client(self, client_metadata: Dict) -> Optional[Dict]:
        # 检查是否已发现元数据
        if not self.registration_endpoint:
            print(" 请先调用discover_metadata()发现元数据")
            return None

        try:
            # 发送POST请求注册客户端
            print("📝 正在注册新客户端...")
            response = self.session.post(
                self.registration_endpoint,
                json=client_metadata,
                headers={"Content-Type": "application/json"},
                timeout=10
            )

            # 检查响应状态
            if response.status_code == 201:
                # 注册成功
client_info = response.json()
                print(" 客户端注册成功!")
                return client_info
            else:
                # 注册失败
                print(f" 客户端注册失败: {response.status_code}")
                error_info = response.json()
                print(f"错误类型: {error_info['error']}")
                print(f"错误描述: {error_info['error_description']}")
                return None

        except requests.exceptions.RequestException as e:
            print(f" 注册请求失败: {e}")
            return None
        except json.JSONDecodeError as e:
            print(f" 解析响应失败: {e}")
            return None

    # 更新客户端信息
    def update_client(self, client_id: str, registration_access_token: str, 
                     updated_metadata: Dict) -> Optional[Dict]:
        # 构建更新端点URL
        update_url = f"{self.registration_endpoint}/{client_id}"

        try:
            # 发送PUT请求更新客户端
            print(f"🔄 正在更新客户端: {client_id}")
            response = self.session.put(
                update_url,
                json=updated_metadata,
                headers={
                    "Content-Type": "application/json",
                    "Authorization": f"Bearer {registration_access_token}"
                },
                timeout=10
            )

            # 检查响应状态
            if response.status_code == 200:
                # 更新成功
                updated_info = response.json()
                print(" 客户端更新成功!")
                return updated_info
            else:
                # 更新失败
                print(f" 客户端更新失败: {response.status_code}")
                error_info = response.json()
                print(f"错误类型: {error_info['error']}")
                print(f"错误描述: {error_info['error_description']}")
                return None

        except requests.exceptions.RequestException as e:
            print(f" 更新请求失败: {e}")
            return None
        except json.JSONDecodeError as e:
            print(f" 解析响应失败: {e}")
            return None

    # 删除客户端
    def delete_client(self, client_id: str, registration_access_token: str) -> bool:
        # 构建删除端点URL
        delete_url = f"{self.registration_endpoint}/{client_id}"

        try:
            # 发送DELETE请求删除客户端
            print(f"🗑️ 正在删除客户端: {client_id}")
            response = self.session.delete(
                delete_url,
                headers={"Authorization": f"Bearer {registration_access_token}"},
                timeout=10
            )

            # 检查响应状态
            if response.status_code == 204:
                # 删除成功
                print(" 客户端删除成功!")
                return True
            else:
                # 删除失败
                print(f" 客户端删除失败: {response.status_code}")
                error_info = response.json()
                print(f"错误类型: {error_info['error']}")
                print(f"错误描述: {error_info['error_description']}")
                return False

        except requests.exceptions.RequestException as e:
            print(f" 删除请求失败: {e}")
            return False
        except json.JSONDecodeError as e:
            print(f" 解析响应失败: {e}")
            return False

    # 获取支持的客户端元数据字段
    def get_supported_metadata_fields(self) -> Dict:
        # 从元数据中提取支持的字段信息
        if not self.metadata:
            return {}

        supported_fields = {}

        # 检查支持的授权类型
        if "grant_types_supported" in self.metadata:
            supported_fields["grant_types_supported"] = self.metadata["grant_types_supported"]

        # 检查支持的响应类型
        if "response_types_supported" in self.metadata:
            supported_fields["response_types_supported"] = self.metadata["response_types_supported"]

        # 检查支持的Access Token端点认证方法
        if "token_endpoint_auth_methods_supported" in self.metadata:
            supported_fields["token_endpoint_auth_methods_supported"] = self.metadata["token_endpoint_auth_methods_supported"]

        # 检查支持的代码挑战方法
        if "code_challenge_methods_supported" in self.metadata:
            supported_fields["code_challenge_methods_supported"] = self.metadata["code_challenge_methods_supported"]

        return supported_fields

# 定义客户端元数据构建器类
class ClientMetadataBuilder:
    # 构建公共客户端元数据
    @staticmethod
    def build_public_client(client_name: str, redirect_uris: List[str], 
                           scopes: List[str] = None) -> Dict:
        # 构建公共客户端元数据
        metadata = {
            "client_name": client_name,
            "redirect_uris": redirect_uris,
            "grant_types": ["authorization_code"],
            "response_types": ["code"],
            "token_endpoint_auth_method": "none",  # 公共客户端无需认证
            "scope": " ".join(scopes) if scopes else "openid profile"
        }
        return metadata

    # 构建机密客户端元数据
    @staticmethod
    def build_confidential_client(client_name: str, redirect_uris: List[str],
                                grant_types: List[str] = None, 
                                scopes: List[str] = None) -> Dict:
        # 构建机密客户端元数据
        if grant_types is None:
            grant_types = ["authorization_code", "refresh_token"]

        metadata = {
            "client_name": client_name,
            "redirect_uris": redirect_uris,
            "grant_types": grant_types,
            "response_types": ["code"],
            "token_endpoint_auth_method": "client_secret_basic",
            "scope": " ".join(scopes) if scopes else "openid profile email"
        }
        return metadata

    # 构建OpenID Connect客户端元数据
    @staticmethod
    def build_oidc_client(client_name: str, redirect_uris: List[str],
                         application_type: str = "web",
                         contacts: List[str] = None) -> Dict:
        # 构建OpenID Connect客户端元数据
        metadata = {
            "client_name": client_name,
            "redirect_uris": redirect_uris,
            "grant_types": ["authorization_code"],
            "response_types": ["code"],
            "token_endpoint_auth_method": "client_secret_basic",
            "scope": "openid profile email",
            "application_type": application_type
        }

        # 添加联系人信息(如果提供)
        if contacts:
            metadata["contacts"] = contacts

        return metadata

# 使用示例
def main():
    # 配置授权服务器URL
    base_url = "https://auth.example.com"

    # 创建动态客户端注册管理器
    manager = DynamicClientRegistrationManager(base_url)

    # 发现授权服务器元数据
    if not manager.discover_metadata():
        print(" 无法发现授权服务器元数据,退出程序")
        return

    # 显示支持的元数据字段
    print("\n 支持的元数据字段:")
    supported_fields = manager.get_supported_metadata_fields()
    for field, values in supported_fields.items():
        print(f"  {field}: {values}")

    # 创建客户端元数据构建器
    builder = ClientMetadataBuilder()

    # 构建公共客户端元数据
    print("\n🔧 构建公共客户端元数据...")
    public_client_metadata = builder.build_public_client(
        client_name="我的公共应用",
        redirect_uris=["https://myapp.example.com/callback"],
        scopes=["openid", "profile"]
    )

    # 注册公共客户端
    print("📝 注册公共客户端...")
    public_client_info = manager.register_client(public_client_metadata)

    if public_client_info:
        print(f" 公共客户端注册成功,ID: {public_client_info.get('client_id')}")

        # 保存客户端信息
        filename = f"public_client_{int(time.time())}.json"
        with open(filename, "w", encoding="utf-8") as f:
            json.dump(public_client_info, f, indent=2, ensure_ascii=False)
        print(f"💾 客户端信息已保存到 {filename}")

    # 构建机密客户端元数据
    print("\n🔧 构建机密客户端元数据...")
    confidential_client_metadata = builder.build_confidential_client(
        client_name="我的机密应用",
        redirect_uris=["https://myapp.example.com/callback"],
        grant_types=["authorization_code", "refresh_token", "client_credentials"],
        scopes=["openid", "profile", "email", "read", "write"]
    )

    # 注册机密客户端
    print("📝 注册机密客户端...")
    confidential_client_info = manager.register_client(confidential_client_metadata)

    if confidential_client_info:
        print(f" 机密客户端注册成功,ID: {confidential_client_info.get('client_id')}")

        # 保存客户端信息
        filename = f"confidential_client_{int(time.time())}.json"
        with open(filename, "w", encoding="utf-8") as f:
            json.dump(confidential_client_info, f, indent=2, ensure_ascii=False)
        print(f"💾 客户端信息已保存到 {filename}")

        # 演示更新客户端(需要registration_access_token)
        registration_access_token = confidential_client_info.get("registration_access_token")
        if registration_access_token:
            print("\n🔄 演示更新客户端...")
            updated_metadata = {
                "client_name": "更新后的机密应用名称",
                "scope": "openid profile email read"
            }

            updated_info = manager.update_client(
                confidential_client_info["client_id"],
                registration_access_token,
                updated_metadata
            )

            if updated_info:
                print(" 客户端更新成功!")

    # 构建OpenID Connect客户端元数据
    print("\n🔧 构建OpenID Connect客户端元数据...")
    oidc_client_metadata = builder.build_oidc_client(
        client_name="我的OIDC应用",
        redirect_uris=["https://myapp.example.com/callback"],
        application_type="web",
        contacts=["admin@example.com"]
    )

    # 注册OpenID Connect客户端
    print("📝 注册OpenID Connect客户端...")
    oidc_client_info = manager.register_client(oidc_client_metadata)

    if oidc_client_info:
        print(f" OpenID Connect客户端注册成功,ID: {oidc_client_info.get('client_id')}")

        # 保存客户端信息
        filename = f"oidc_client_{int(time.time())}.json"
        with open(filename, "w", encoding="utf-8") as f:
            json.dump(oidc_client_info, f, indent=2, ensure_ascii=False)
        print(f"💾 客户端信息已保存到 {filename}")

# 如果直接运行此脚本
if __name__ == "__main__":
    main()

8.3 与 OIDC 的集成示例 #

# 导入必要的模块
import requests
import json
from urllib.parse import urljoin

# 定义OpenID Connect动态客户端注册示例
def oidc_dynamic_registration_example():
    # 配置授权服务器URL
    base_url = "https://auth.example.com"

    # 构建OpenID Connect发现端点
    oidc_discovery_url = urljoin(base_url, "/.well-known/openid-configuration")

    try:
        # 获取OpenID Connect配置
        print("🔍 正在获取OpenID Connect配置...")
        oidc_response = requests.get(oidc_discovery_url, timeout=10)
        oidc_response.raise_for_status()

        # 解析OpenID Connect配置
        oidc_config = oidc_response.json()

        # 检查是否支持动态客户端注册
        registration_endpoint = oidc_config.get("registration_endpoint")
        if not registration_endpoint:
            print(" 此OpenID Connect提供商不支持动态客户端注册")
            return

        print(f" 找到OIDC注册端点: {registration_endpoint}")

        # 构建OpenID Connect客户端元数据
        oidc_client_metadata = {
  "application_type": "web",
            "client_name": "我的OpenID Connect应用",
            "redirect_uris": ["https://myapp.example.com/callback"],
            "grant_types": ["authorization_code"],
            "response_types": ["code"],
            "token_endpoint_auth_method": "client_secret_basic",
            "scope": "openid profile email",
  "contacts": ["admin@example.com"],
            "logo_uri": "https://myapp.example.com/logo.png",
            "policy_uri": "https://myapp.example.com/policy",
            "terms_of_service_uri": "https://myapp.example.com/terms"
        }

        # 发送OIDC客户端注册请求
        print("📝 正在注册OpenID Connect客户端...")
        registration_response = requests.post(
            registration_endpoint,
            json=oidc_client_metadata,
            headers={"Content-Type": "application/json"},
            timeout=10
        )

        # 检查注册响应
        if registration_response.status_code == 201:
            # 注册成功
            client_info = registration_response.json()
            print(" OpenID Connect客户端注册成功!")
            print(f" 客户端ID: {client_info.get('client_id')}")
            print(f" 客户端密钥: {client_info.get('client_secret')}")
            print(f"⏰ 注册时间: {client_info.get('client_id_issued_at')}")

            # 保存客户端信息
            with open("oidc_client_info.json", "w", encoding="utf-8") as f:
                json.dump(client_info, f, indent=2, ensure_ascii=False)
            print("💾 OIDC客户端信息已保存到 oidc_client_info.json")

        else:
            # 注册失败
            print(f" OIDC客户端注册失败: {registration_response.status_code}")
            error_info = registration_response.json()
            print(f"错误类型: {error_info.get('error')}")
            print(f"错误描述: {error_info.get('error_description')}")

    except requests.exceptions.RequestException as e:
        print(f" 网络请求失败: {e}")
    except json.JSONDecodeError as e:
        print(f" JSON解析失败: {e}")
    except Exception as e:
        print(f" 发生未知错误: {e}")

# 运行OIDC示例
if __name__ == "__main__":
    oidc_dynamic_registration_example()

9. 与相关规范的关联 #

  • RFC 8414(元数据):提供 registration_endpoint 的发现机制。
  • RFC 7592(客户端配置管理):定义客户端配置的读写操作。
  • OIDC 动态注册:扩展了 OAuth 注册协议,支持 OpenID 特有字段。

10. 总结 #

RFC 7591 通过标准化动态客户端注册流程,显著提升了 OAuth 2.0 生态的自动化能力。开发者应注意:

  1. 安全存储凭据:client_secret 和 registration_access_token 需加密保存。
  2. 最小权限原则:注册时仅请求必要的 scope 和 redirect_uris。
  3. 生命周期管理:及时更新或删除不再使用的客户端。

通过使用上述代码示例,您可以:

  1. 自动发现授权服务器的注册端点
  2. 动态注册不同类型的OAuth客户端
  3. 管理客户端生命周期(更新、删除)
  4. 支持多种客户端类型(公共、机密、OIDC)
  5. 构建灵活的客户端元数据

访问验证

请输入访问令牌

Token不正确,请重新输入