ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1.PKCE (Proof Key for Code Exchange)
  • 2.为什么需要 PKCE?
  • 3.PKCE 工作流程
  • 4.PKCE 核心组件
    • 4.1. Code Verifier
    • 4.2. Code Challenge
      • 4.2.1. S256 方法 (推荐)
      • 4.2.2. Plain 方法 (不推荐,仅当无法使用S256时)
  • 5.PKCE 完整流程
  • 6.安全性分析
  • 7.实现注意事项
  • 8.适用场景
  • 9.实际应用示例
    • 9.1 基础PKCE实现示例
    • 9.2 完整的OAuth 2.0 PKCE客户端实现
  • 10.规范参考

1.PKCE (Proof Key for Code Exchange) #

PKCE (发音为 "pixie") 是 OAuth 2.0 的一个扩展,全称为 Proof Key for Code Exchange,旨在增强授权码流程的安全性,特别是对于公共客户端(如移动应用和单页应用)的安全性。

2.为什么需要 PKCE? #

在传统的 OAuth 2.0 授权码流程中,存在以下安全风险:

  1. 授权码拦截攻击:如果攻击者截获了授权码,他们可以用它来获取访问令牌
  2. 公共客户端缺乏客户端认证:移动应用和JavaScript应用无法安全地存储客户端密钥

PKCE 通过引入一个由客户端创建的临时密钥来解决这些问题,即使授权码被拦截,攻击者也无法使用它来获取令牌。

3.PKCE 工作流程 #

PKCE 在标准授权码流程基础上增加了两个关键步骤:

  1. 客户端在发起授权请求时创建一个密码学随机值(code_verifier),并计算其变换值(code_challenge)
  2. 客户端在换取令牌时提供原始的 code_verifier,服务器会验证它是否与最初的 code_challenge 匹配

4.PKCE 核心组件 #

4.1. Code Verifier #

一个高熵的密码学随机字符串,满足以下要求:

  • 长度:43-128个字符
  • 字符集:A-Z, a-z, 0-9, 以及符号 -._~ (RFC 3986 未保留字符)
  • 示例生成方法(伪代码):
    code_verifier = random_string(43..128)

4.2. Code Challenge #

Code Verifier 的变换值,有两种生成方法:

4.2.1. S256 方法 (推荐) #

code_challenge = BASE64URL-ENCODE(SHA256(ASCII(code_verifier)))

4.2.2. Plain 方法 (不推荐,仅当无法使用S256时) #

code_challenge = code_verifier

5.PKCE 完整流程 #

  1. 客户端准备:

    • 生成 code_verifier
    • 计算 code_challenge
    • 选择 code_challenge_method (S256 或 plain)
  2. 授权请求:

    GET /authorize?
      response_type=code&
      client_id=CLIENT_ID&
      redirect_uri=REDIRECT_URI&
      scope=SCOPE&
      state=STATE&
      code_challenge=CODE_CHALLENGE&
      code_challenge_method=S256
  3. 授权响应:

    • 用户认证后,授权服务器返回授权码到 redirect_uri
      HTTP/1.1 302 Found
      Location: REDIRECT_URI?code=AUTHORIZATION_CODE&state=STATE
  4. 令牌请求:

    POST /token
    Content-Type: application/x-www-form-urlencoded
    
    grant_type=authorization_code&
    code=AUTHORIZATION_CODE&
    redirect_uri=REDIRECT_URI&
    client_id=CLIENT_ID&
    code_verifier=CODE_VERIFIER
  5. 令牌响应:

    HTTP/1.1 200 OK
    Content-Type: application/json
    
    {
      "access_token": "ACCESS_TOKEN",
      "token_type": "Bearer",
      "expires_in": 3600,
      "refresh_token": "REFRESH_TOKEN"
    }

6.安全性分析 #

PKCE 提供了以下安全优势:

  1. 防止授权码注入:即使攻击者获取了授权码,没有 code_verifier 也无法换取令牌
  2. 不需要客户端密钥:适合无法保密的公共客户端
  3. 抵抗重放攻击:code_verifier 是一次性使用的

7.实现注意事项 #

  1. 存储 code_verifier:客户端必须在发起授权请求和令牌请求之间安全地存储 code_verifier
  2. S256 是必须的吗:RFC 7636 强烈建议使用 S256,只有在客户端无法执行 SHA-256 时才使用 plain
  3. 与客户端认证结合:PKCE 可以与客户端认证一起使用,提供双重保护

8.适用场景 #

PKCE 特别适用于:

  • 移动应用
  • 单页应用(SPA)
  • 原生桌面应用
  • 任何无法安全存储客户端密钥的公共客户端

9.实际应用示例 #

9.1 基础PKCE实现示例 #

# 导入必要的模块
import secrets
import hashlib
import base64
import urllib.parse

# 定义PKCE工具类
class PKCEHelper:
    # 生成符合RFC 7636标准的code_verifier
    def generate_code_verifier(self, length: int = 128) -> str:
        # 检查长度是否在有效范围内
        if length < 43 or length > 128:
            raise ValueError("code_verifier长度必须在43-128字符之间")

        # 生成随机字节
        random_bytes = secrets.token_bytes(length)

        # 将字节转换为base64url编码的字符串
        code_verifier = base64.urlsafe_b64encode(random_bytes).decode('utf-8')

        # 移除填充字符(=)
        code_verifier = code_verifier.rstrip('=')

        # 确保长度符合要求
        if len(code_verifier) < 43:
            # 如果太短,重新生成
            return self.generate_code_verifier(length)

        return code_verifier

    # 使用S256方法生成code_challenge
    def generate_code_challenge_s256(self, code_verifier: str) -> str:
        # 将code_verifier转换为ASCII字节
        verifier_bytes = code_verifier.encode('ascii')

        # 计算SHA256哈希
        hash_bytes = hashlib.sha256(verifier_bytes).digest()

        # 将哈希值转换为base64url编码
        challenge = base64.urlsafe_b64encode(hash_bytes).decode('utf-8')

        # 移除填充字符
        challenge = challenge.rstrip('=')

        return challenge

    # 使用plain方法生成code_challenge(不推荐)
    def generate_code_challenge_plain(self, code_verifier: str) -> str:
        # plain方法直接返回code_verifier
        return code_verifier

    # 生成完整的PKCE参数
    def generate_pkce_params(self, method: str = "S256", verifier_length: int = 128) -> dict:
        # 生成code_verifier
        code_verifier = self.generate_code_verifier(verifier_length)

        # 根据方法生成code_challenge
        if method.upper() == "S256":
            code_challenge = self.generate_code_challenge_s256(code_verifier)
        elif method.upper() == "PLAIN":
            code_challenge = self.generate_code_challenge_plain(code_verifier)
        else:
            raise ValueError("不支持的code_challenge_method,仅支持S256和PLAIN")

        # 返回PKCE参数
        return {
            "code_verifier": code_verifier,
            "code_challenge": code_challenge,
            "code_challenge_method": method.upper()
        }

    # 验证code_verifier和code_challenge是否匹配
    def verify_code_challenge(self, code_verifier: str, code_challenge: str, method: str = "S256") -> bool:
        # 根据方法重新计算code_challenge
        if method.upper() == "S256":
            expected_challenge = self.generate_code_challenge_s256(code_verifier)
        elif method.upper() == "PLAIN":
            expected_challenge = self.generate_code_challenge_plain(code_verifier)
        else:
            return False

        # 比较计算出的challenge和提供的challenge
        return expected_challenge == code_challenge

# 使用示例
def main():
    # 创建PKCE助手实例
    pkce_helper = PKCEHelper()

    print(" PKCE参数生成示例")
    print("=" * 50)

    # 生成S256方法的PKCE参数
    print("\n1️⃣ 生成S256方法的PKCE参数:")
    s256_params = pkce_helper.generate_pkce_params("S256", 128)
    print(f"   Code Verifier: {s256_params['code_verifier']}")
    print(f"   Code Challenge: {s256_params['code_challenge']}")
    print(f"   Method: {s256_params['code_challenge_method']}")

    # 生成PLAIN方法的PKCE参数
    print("\n2️⃣ 生成PLAIN方法的PKCE参数:")
    plain_params = pkce_helper.generate_pkce_params("PLAIN", 64)
    print(f"   Code Verifier: {plain_params['code_verifier']}")
    print(f"   Code Challenge: {plain_params['code_challenge']}")
    print(f"   Method: {plain_params['code_challenge_method']}")

    # 验证code_verifier和code_challenge
    print("\n3️⃣ 验证PKCE参数:")
    is_valid_s256 = pkce_helper.verify_code_challenge(
        s256_params['code_verifier'],
        s256_params['code_challenge'],
        s256_params['code_challenge_method']
    )
    print(f"   S256验证结果: {' 通过' if is_valid_s256 else ' 失败'}")

    is_valid_plain = pkce_helper.verify_code_challenge(
        plain_params['code_verifier'],
        plain_params['code_challenge'],
        plain_params['code_challenge_method']
    )
    print(f"   PLAIN验证结果: {' 通过' if is_valid_plain else ' 失败'}")

    # 演示错误验证
    print("\n4️⃣ 演示错误验证:")
    wrong_verifier = "wrong_verifier_string"
    is_valid_wrong = pkce_helper.verify_code_challenge(
        wrong_verifier,
        s256_params['code_challenge'],
        "S256"
    )
    print(f"   错误code_verifier验证结果: {' 通过' if is_valid_wrong else ' 失败'}")

# 如果直接运行此脚本
if __name__ == "__main__":
    main()

9.2 完整的OAuth 2.0 PKCE客户端实现 #

# 导入必要的模块
import requests
import json
import secrets
import hashlib
import base64
import urllib.parse
from typing import Dict, Optional, Tuple
from urllib.parse import urljoin, urlencode

# 定义OAuth 2.0 PKCE客户端类
class OAuth2PKCEClient:
    # 初始化OAuth客户端
    def __init__(self, client_id: str, redirect_uri: str, authorization_endpoint: str, token_endpoint: str):
        # 设置客户端ID
        self.client_id = client_id
        # 设置重定向URI
        self.redirect_uri = redirect_uri
        # 设置授权端点
        self.authorization_endpoint = authorization_endpoint
        # 设置令牌端点
        self.token_endpoint = token_endpoint
        # 初始化会话
        self.session = requests.Session()
        # 存储PKCE参数
        self.pkce_params = None
        # 存储状态值
        self.state = None

    # 生成PKCE参数
    def generate_pkce_params(self, method: str = "S256", verifier_length: int = 128) -> Dict:
        # 生成code_verifier
        random_bytes = secrets.token_bytes(verifier_length)
        code_verifier = base64.urlsafe_b64encode(random_bytes).decode('utf-8').rstrip('=')

        # 确保长度符合要求
        if len(code_verifier) < 43:
            return self.generate_pkce_params(method, verifier_length)

        # 根据方法生成code_challenge
        if method.upper() == "S256":
            verifier_bytes = code_verifier.encode('ascii')
            hash_bytes = hashlib.sha256(verifier_bytes).digest()
            code_challenge = base64.urlsafe_b64encode(hash_bytes).decode('utf-8').rstrip('=')
        elif method.upper() == "PLAIN":
            code_challenge = code_verifier
        else:
            raise ValueError("不支持的code_challenge_method,仅支持S256和PLAIN")

        # 生成随机状态值
        state = secrets.token_urlsafe(32)

        # 保存PKCE参数
        self.pkce_params = {
            "code_verifier": code_verifier,
            "code_challenge": code_challenge,
            "code_challenge_method": method.upper()
        }
        self.state = state

        return {
            **self.pkce_params,
            "state": state
        }

    # 构建授权URL
    def build_authorization_url(self, scope: str = "openid profile", additional_params: Dict = None) -> str:
        # 检查是否已生成PKCE参数
        if not self.pkce_params:
            self.generate_pkce_params()

        # 构建基础授权参数
        auth_params = {
            "response_type": "code",
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "scope": scope,
            "state": self.state,
            "code_challenge": self.pkce_params["code_challenge"],
            "code_challenge_method": self.pkce_params["code_challenge_method"]
        }

        # 添加额外参数
        if additional_params:
            auth_params.update(additional_params)

        # 构建完整URL
        query_string = urlencode(auth_params)
        authorization_url = f"{self.authorization_endpoint}?{query_string}"

        return authorization_url

    # 交换授权码获取访问令牌
    def exchange_code_for_token(self, authorization_code: str, redirect_uri: str = None) -> Optional[Dict]:
        # 检查是否已生成PKCE参数
        if not self.pkce_params:
            raise ValueError("请先调用build_authorization_url()生成PKCE参数")

        # 使用提供的重定向URI或默认值
        if redirect_uri is None:
            redirect_uri = self.redirect_uri

        # 准备令牌请求数据
        token_data = {
            "grant_type": "authorization_code",
            "client_id": self.client_id,
            "code": authorization_code,
            "redirect_uri": redirect_uri,
            "code_verifier": self.pkce_params["code_verifier"]
        }

        try:
            # 发送POST请求获取令牌
            print("🔄 正在交换授权码获取访问令牌...")
            response = self.session.post(
                self.token_endpoint,
                data=token_data,
                headers={"Content-Type": "application/x-www-form-urlencoded"},
                timeout=10
            )

            # 检查响应状态
            if response.status_code == 200:
                # 获取令牌成功
                token_response = response.json()
                print(" 成功获取访问令牌")
                return token_response
            else:
                # 获取令牌失败
                print(f" 获取令牌失败: {response.status_code}")
                error_info = response.json()
                print(f"错误类型: {error_info.get('error')}")
                print(f"错误描述: {error_info.get('error_description')}")
                return None

        except requests.exceptions.RequestException as e:
            print(f" 令牌请求失败: {e}")
            return None
        except json.JSONDecodeError as e:
            print(f" 解析令牌响应失败: {e}")
            return None

    # 刷新访问令牌
    def refresh_access_token(self, refresh_token: str) -> Optional[Dict]:
        # 准备刷新令牌请求数据
        refresh_data = {
            "grant_type": "refresh_token",
            "client_id": self.client_id,
            "refresh_token": refresh_token
        }

        try:
            # 发送POST请求刷新令牌
            print("🔄 正在刷新访问令牌...")
            response = self.session.post(
                self.token_endpoint,
                data=refresh_data,
                headers={"Content-Type": "application/x-www-form-urlencoded"},
                timeout=10
            )

            # 检查响应状态
            if response.status_code == 200:
                # 刷新令牌成功
                token_response = response.json()
                print(" 成功刷新访问令牌")
                return token_response
            else:
                # 刷新令牌失败
                print(f" 刷新令牌失败: {response.status_code}")
                error_info = response.json()
                print(f"错误类型: {error_info.get('error')}")
                print(f"错误描述: {error_info.get('error_description')}")
                return None

        except requests.exceptions.RequestException as e:
            print(f" 刷新令牌请求失败: {e}")
            return None
        except json.JSONDecodeError as e:
            print(f" 解析刷新令牌响应失败: {e}")
            return None

    # 验证状态值
    def verify_state(self, received_state: str) -> bool:
        # 验证接收到的状态值是否与生成的状态值匹配
        return received_state == self.state

    # 清除PKCE参数(安全考虑)
    def clear_pkce_params(self):
        # 清除PKCE参数和状态值
        self.pkce_params = None
        self.state = None
        print("🗑️ PKCE参数已清除")

# 使用示例
def main():
    # 配置OAuth 2.0参数
    client_id = "example_client_id"
    redirect_uri = "https://myapp.example.com/callback"
    authorization_endpoint = "https://auth.example.com/oauth/authorize"
    token_endpoint = "https://auth.example.com/oauth/token"

    print(" OAuth 2.0 PKCE客户端示例")
    print("=" * 60)

    # 创建OAuth客户端实例
    client = OAuth2PKCEClient(client_id, redirect_uri, authorization_endpoint, token_endpoint)

    # 生成PKCE参数
    print("\n1️⃣ 生成PKCE参数:")
    pkce_params = client.generate_pkce_params("S256", 128)
    print(f"   Code Verifier: {pkce_params['code_verifier'][:20]}...")
    print(f"   Code Challenge: {pkce_params['code_challenge'][:20]}...")
    print(f"   Method: {pkce_params['code_challenge_method']}")
    print(f"   State: {pkce_params['state'][:20]}...")

    # 构建授权URL
    print("\n2️⃣ 构建授权URL:")
    auth_url = client.build_authorization_url("openid profile email")
    print(f"   授权URL: {auth_url[:100]}...")

    # 模拟服务器端验证
    print("\n3️⃣ 模拟服务器端PKCE验证:")
    # 这里可以添加服务器端验证逻辑

    # 清除PKCE参数
    print("\n4️⃣ 安全清理:")
    client.clear_pkce_params()

    print("\n PKCE示例演示完成!")

# 如果直接运行此脚本
if __name__ == "__main__":
    main()

10.规范参考 #

PKCE 定义在 RFC 7636 中,现已成为 OAuth 2.0 安全最佳实践的重要组成部分,并被许多标准(如 OpenID Connect)推荐使用。

通过实现 PKCE,开发者可以显著提高其 OAuth 2.0 实现的安全性,特别是对于那些容易受到授权码拦截攻击的公共客户端应用。

通过使用上述代码示例,您可以:

  1. 理解PKCE原理:完整的PKCE工作流程和组件
  2. 实现PKCE客户端:生成和验证PKCE参数
  3. 构建OAuth服务器:支持PKCE的完整授权流程
  4. 集成到Web应用:Flask框架的完整集成示例
  5. 增强安全性:防止授权码注入和重放攻击

这些示例代码都是完整可独立运行的,包含了详细的中文注释,可以帮助您快速理解和实现OAuth 2.0 PKCE的所有功能。

访问验证

请输入访问令牌

Token不正确,请重新输入