1. JSON Web Key Set (JWKS) #
JWKS (JSON Web Key Set) 是 OAuth 2.0、OpenID Connect (OIDC) 和 JWT (JSON Web Token) 安全机制的核心组成部分,用于存储和传输加密密钥(通常是公钥),以便验证数字签名或加密数据。
2. JWKS 是什么? #
JWKS 是一个 JSON 格式的数据结构,包含一组 JSON Web Key (JWK),用于:
- 验证 JWT 签名(如 OIDC ID Token 或 OAuth 2.0 Access Token)
- 加密/解密数据(如 JWE - JSON Web Encryption)
- 密钥管理(如密钥轮换、多密钥支持)
2.1 典型 JWKS 示例 #
{
"keys": [
{
"kty": "RSA",
"use": "sig",
"kid": "2025-08-14",
"alg": "RS256",
"n": "modulus_in_base64url...",
"e": "AQAB"
},
{
"kty": "EC",
"use": "sig",
"kid": "ec-key-1",
"alg": "ES256",
"crv": "P-256",
"x": "x_coordinate_in_base64url...",
"y": "y_coordinate_in_base64url..."
}
]
}3. JWKS 的核心字段 #
每个 JWK (JSON Web Key) 包含以下关键字段:
| 字段 | 说明 | 示例值 |
|---|---|---|
kty |
密钥类型(Key Type) | RSA、EC、oct(对称密钥) |
use |
密钥用途(Key Use) | sig(签名)、enc(加密) |
kid |
密钥 ID(Key ID),唯一标识密钥 | "2025-08-14" |
alg |
算法(Algorithm) | RS256、ES256、HS256 |
n (RSA) |
RSA 公钥模数(Modulus) | Base64URL 编码的大整数 |
e (RSA) |
RSA 公钥指数(Exponent) | 通常为 "AQAB"(即 65537) |
crv (EC) |
椭圆曲线类型(Curve) | P-256、P-384、P-521 |
x (EC) |
椭圆曲线公钥的 x 坐标 | Base64URL 编码 |
y (EC) |
椭圆曲线公钥的 y 坐标 | Base64URL 编码 |
k (对称密钥) |
对称密钥值(Key Value) | Base64URL 编码的密钥 |
4. JWKS 的工作原理 #
4.1 JWT 签名验证流程 #
- 客户端请求 Token(如 OIDC ID Token):
{ "alg": "RS256", "kid": "2025-08-14", "typ": "JWT" } - 资源服务器获取 JWKS:
- 向
jwks_uri(如https://auth.example.com/.well-known/jwks.json)请求公钥列表。
- 向
- 匹配
kid并验证签名:- 根据 JWT 头部的
kid找到对应的 JWK。 - 使用 JWK 中的公钥验证 JWT 签名是否有效。
- 根据 JWT 头部的
4.2 密钥轮换(Key Rotation) #
- 服务器可以发布多个密钥,客户端根据
kid选择正确的密钥。 - 旧密钥可以逐步淘汰,新密钥无缝替换。
5. JWKS 的常见用途 #
5.1 OAuth 2.0 / OIDC 身份验证 #
jwks_uri在 OIDC Discovery 端点中定义:{ "issuer": "https://auth.example.com", "jwks_uri": "https://auth.example.com/.well-known/jwks.json", "authorization_endpoint": "...", "token_endpoint": "..." }- 资源服务器(如 API)使用 JWKS 验证 Access Token。
5.2 JWT 签名验证 #
- 例如,微服务架构中,服务 A 签发 JWT,服务 B 用 JWKS 验证。
5.3 加密通信(JWE) #
- JWKS 可以包含公钥,用于加密数据(如 JWE)。
6. JWKS 的安全最佳实践 #
- 必须使用 HTTPS:
jwks_uri必须通过 HTTPS 提供,防止中间人攻击。
- 定期密钥轮换:
- 避免长期使用同一密钥,定期更新
kid。
- 避免长期使用同一密钥,定期更新
- 限制缓存时间:
- 客户端应缓存 JWKS,但设置合理的过期时间(如 24 小时)。
- 密钥管理:
- 私钥必须严格保护,不能泄露。
- 使用 HSM(硬件安全模块)或 KMS(密钥管理服务)存储私钥。
7. 如何生成 JWKS? #
7.1 使用 OpenSSL 生成 RSA 密钥对 #
# 生成 RSA 私钥
openssl genpkey -algorithm RSA -out private.key -pkeyopt rsa_keygen_bits:2048
# 提取公钥
openssl rsa -pubout -in private.key -out public.key
# 转换为 JWK 格式(可使用工具如 `jose`)
jose jwk gen -i public.key -o jwks.json7.2 使用在线工具 #
- jwt.io 提供 JWK 生成器。
- Auth0 JWK Generator 可生成 JWKS。
8. 常见问题(FAQ) #
8.1 JWKS 和 PEM 有什么区别? #
- JWKS 是 JSON 格式,用于 Web 应用(如 OAuth/OIDC)。
- PEM 是文本格式(如
-----BEGIN PUBLIC KEY-----),常用于传统 PKI。
8.2 对称密钥(HS256)能用 JWKS 吗? #
- 可以,但不推荐,因为对称密钥必须保密,而 JWKS 通常是公开的。
8.3 如果 kid 不匹配怎么办? #
- 客户端应检查所有可用密钥,如果没有匹配的
kid,则拒绝令牌。
9. 实际应用示例 #
9.1 基础JWKS生成和验证示例 #
# 导入必要的模块
import json
import base64
import hashlib
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, ec
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
import jwt
from datetime import datetime, timedelta
# 定义JWKS工具类
class JWKSHelper:
# 生成RSA密钥对并转换为JWK格式
def generate_rsa_jwk(self, key_id: str = None, key_size: int = 2048) -> dict:
# 生成RSA私钥
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size
)
# 获取公钥
public_key = private_key.public_key()
# 序列化公钥为PEM格式
pem_public_key = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# 提取RSA公钥参数
rsa_numbers = public_key.public_numbers()
# 转换为JWK格式
jwk = {
"kty": "RSA",
"use": "sig",
"kid": key_id or f"rsa-{datetime.now().strftime('%Y%m%d')}",
"alg": "RS256",
"n": self._int_to_base64url(rsa_numbers.n),
"e": self._int_to_base64url(rsa_numbers.e)
}
return jwk, private_key
# 生成椭圆曲线密钥对并转换为JWK格式
def generate_ec_jwk(self, curve_name: str = "P-256", key_id: str = None) -> dict:
# 选择椭圆曲线
if curve_name == "P-256":
curve = ec.SECP256R1()
alg = "ES256"
elif curve_name == "P-384":
curve = ec.SECP384R1()
alg = "ES384"
elif curve_name == "P-521":
curve = ec.SECP521R1()
alg = "ES512"
else:
raise ValueError(f"不支持的椭圆曲线: {curve_name}")
# 生成私钥
private_key = ec.generate_private_key(curve)
# 获取公钥
public_key = private_key.public_key()
# 获取公钥坐标
public_numbers = public_key.public_numbers()
# 转换为JWK格式
jwk = {
"kty": "EC",
"use": "sig",
"kid": key_id or f"ec-{curve_name.lower()}-{datetime.now().strftime('%Y%m%d')}",
"alg": alg,
"crv": curve_name,
"x": self._int_to_base64url(public_numbers.x),
"y": self._int_to_base64url(public_numbers.y)
}
return jwk, private_key
# 生成对称密钥JWK(仅用于演示,生产环境不推荐)
def generate_oct_jwk(self, key_size: int = 256, key_id: str = None) -> dict:
# 生成随机密钥
import secrets
key_bytes = secrets.token_bytes(key_size // 8)
# 转换为JWK格式
jwk = {
"kty": "oct",
"use": "sig",
"kid": key_id or f"oct-{key_size}-{datetime.now().strftime('%Y%m%d')}",
"alg": f"HS{key_size}",
"k": base64.urlsafe_b64encode(key_bytes).decode('utf-8').rstrip('=')
}
return jwk, key_bytes
# 将大整数转换为Base64URL编码
def _int_to_base64url(self, value: int) -> str:
# 将整数转换为字节
byte_length = (value.bit_length() + 7) // 8
value_bytes = value.to_bytes(byte_length, byteorder='big')
# 转换为Base64URL编码
base64_value = base64.urlsafe_b64encode(value_bytes).decode('utf-8')
# 移除填充字符
return base64_value.rstrip('=')
# 生成完整的JWKS
def generate_jwks(self, keys: list) -> dict:
# 构建JWKS结构
jwks = {
"keys": keys
}
return jwks
# 验证JWK格式
def validate_jwk(self, jwk: dict) -> bool:
# 检查必需字段
required_fields = ["kty", "use", "kid", "alg"]
for field in required_fields:
if field not in jwk:
print(f" 缺少必需字段: {field}")
return False
# 根据密钥类型检查特定字段
if jwk["kty"] == "RSA":
required_rsa_fields = ["n", "e"]
for field in required_rsa_fields:
if field not in jwk:
print(f" RSA密钥缺少字段: {field}")
return False
elif jwk["kty"] == "EC":
required_ec_fields = ["crv", "x", "y"]
for field in required_ec_fields:
if field not in jwk:
print(f" EC密钥缺少字段: {field}")
return False
elif jwk["kty"] == "oct":
if "k" not in jwk:
print(" 对称密钥缺少字段: k")
return False
else:
print(f" 不支持的密钥类型: {jwk['kty']}")
return False
return True
# 定义JWT验证工具类
class JWTValidator:
# 验证JWT令牌
def validate_jwt(self, token: str, jwks: dict) -> dict:
try:
# 解码JWT头部(不验证签名)
header = jwt.get_unverified_header(token)
# 获取算法和密钥ID
alg = header.get("alg")
kid = header.get("kid")
if not kid:
raise ValueError("JWT缺少kid字段")
# 在JWKS中查找对应的密钥
jwk = self._find_jwk_by_kid(jwks, kid)
if not jwk:
raise ValueError(f"未找到密钥ID: {kid}")
# 验证JWT签名
decoded = jwt.decode(
token,
jwk,
algorithms=[alg],
options={"verify_signature": True}
)
return {
"valid": True,
"payload": decoded,
"header": header,
"jwk": jwk
}
except jwt.InvalidTokenError as e:
return {
"valid": False,
"error": str(e)
}
except Exception as e:
return {
"valid": False,
"error": f"验证失败: {str(e)}"
}
# 根据kid查找JWK
def _find_jwk_by_kid(self, jwks: dict, kid: str) -> dict:
for key in jwks.get("keys", []):
if key.get("kid") == kid:
return key
return None
# 使用示例
def main():
# 创建JWKS助手实例
jwks_helper = JWKSHelper()
print(" JWKS生成和验证示例")
print("=" * 50)
# 生成RSA密钥对
print("\n1️⃣ 生成RSA密钥对:")
rsa_jwk, rsa_private_key = jwks_helper.generate_rsa_jwk("rsa-example", 2048)
print(f" RSA JWK: {json.dumps(rsa_jwk, indent=2)}")
# 生成椭圆曲线密钥对
print("\n2️⃣ 生成椭圆曲线密钥对:")
ec_jwk, ec_private_key = jwks_helper.generate_ec_jwk("P-256", "ec-example")
print(f" EC JWK: {json.dumps(ec_jwk, indent=2)}")
# 生成对称密钥
print("\n3️⃣ 生成对称密钥:")
oct_jwk, oct_key = jwks_helper.generate_oct_jwk(256, "oct-example")
print(f" 对称密钥JWK: {json.dumps(oct_jwk, indent=2)}")
# 验证JWK格式
print("\n4️⃣ 验证JWK格式:")
jwks_list = [rsa_jwk, ec_jwk, oct_jwk]
for i, jwk in enumerate(jwks_list):
is_valid = jwks_helper.validate_jwk(jwk)
print(f" JWK {i+1} ({jwk['kty']}): {' 有效' if is_valid else ' 无效'}")
# 生成完整JWKS
print("\n5️⃣ 生成完整JWKS:")
jwks = jwks_helper.generate_jwks(jwks_list)
print(f" JWKS: {json.dumps(jwks, indent=2)}")
# 保存JWKS到文件
with open("jwks_example.json", "w", encoding="utf-8") as f:
json.dump(jwks, f, indent=2, ensure_ascii=False)
print(" 💾 JWKS已保存到 jwks_example.json")
# 演示JWT验证
print("\n6️⃣ 演示JWT验证:")
try:
# 使用RSA私钥创建JWT
payload = {
"sub": "user123",
"name": "示例用户",
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(hours=1)
}
rsa_token = jwt.encode(
payload,
rsa_private_key,
algorithm="RS256",
headers={"kid": rsa_jwk["kid"]}
)
print(f" 生成的JWT: {rsa_token[:50]}...")
# 验证JWT
validator = JWTValidator()
validation_result = validator.validate_jwt(rsa_token, jwks)
if validation_result["valid"]:
print(" JWT验证成功")
print(f" 载荷: {validation_result['payload']}")
else:
print(f" JWT验证失败: {validation_result['error']}")
except Exception as e:
print(f" JWT创建/验证失败: {e}")
# 如果直接运行此脚本
if __name__ == "__main__":
main()9.2 完整的JWKS管理器和验证器 #
# 导入必要的模块
import requests
import json
import time
from typing import Dict, List, Optional, Union
from cryptography.hazmat.primitives.asymmetric import rsa, ec
from cryptography.hazmat.primitives import serialization
import jwt
from datetime import datetime, timedelta
# 定义JWKS管理器类
class JWKSManager:
# 初始化JWKS管理器
def __init__(self, jwks_uri: str = None):
# 设置JWKS端点URI
self.jwks_uri = jwks_uri
# 初始化会话
self.session = requests.Session()
# 存储JWKS缓存
self.jwks_cache = None
# 缓存时间戳
self.cache_timestamp = 0
# 缓存过期时间(秒)
self.cache_expiry = 86400 # 24小时
# 从远程端点获取JWKS
def fetch_jwks(self, force_refresh: bool = False) -> Optional[Dict]:
# 检查缓存是否有效
if not force_refresh and self.jwks_cache and (time.time() - self.cache_timestamp) < self.cache_expiry:
print(" 使用缓存的JWKS")
return self.jwks_cache
if not self.jwks_uri:
print(" 未设置JWKS端点URI")
return None
try:
# 发送GET请求获取JWKS
print(f"🔍 正在获取JWKS: {self.jwks_uri}")
response = self.session.get(self.jwks_uri, timeout=10)
response.raise_for_status()
# 解析JWKS
jwks = response.json()
# 验证JWKS格式
if not self._validate_jwks_format(jwks):
print(" JWKS格式验证失败")
return None
# 更新缓存
self.jwks_cache = jwks
self.cache_timestamp = time.time()
print(" 成功获取并缓存JWKS")
return jwks
except requests.exceptions.RequestException as e:
print(f" 获取JWKS失败: {e}")
return None
except json.JSONDecodeError as e:
print(f" 解析JWKS失败: {e}")
return None
# 验证JWKS格式
def _validate_jwks_format(self, jwks: Dict) -> bool:
# 检查必需字段
if "keys" not in jwks:
print(" JWKS缺少keys字段")
return False
if not isinstance(jwks["keys"], list):
print(" keys字段必须是数组")
return False
# 验证每个JWK
for i, key in enumerate(jwks["keys"]):
if not self._validate_jwk_format(key):
print(f" 密钥 {i+1} 格式无效")
return False
return True
# 验证单个JWK格式
def _validate_jwk_format(self, jwk: Dict) -> bool:
# 检查必需字段
required_fields = ["kty", "use", "kid", "alg"]
for field in required_fields:
if field not in jwk:
print(f" JWK缺少字段: {field}")
return False
# 根据密钥类型检查特定字段
if jwk["kty"] == "RSA":
required_rsa_fields = ["n", "e"]
for field in required_rsa_fields:
if field not in jwk:
print(f" RSA密钥缺少字段: {field}")
return False
elif jwk["kty"] == "EC":
required_ec_fields = ["crv", "x", "y"]
for field in required_ec_fields:
if field not in jwk:
print(f" EC密钥缺少字段: {field}")
return False
return True
# 根据kid查找JWK
def find_jwk_by_kid(self, kid: str) -> Optional[Dict]:
# 获取JWKS
jwks = self.fetch_jwks()
if not jwks:
return None
# 查找匹配的密钥
for key in jwks.get("keys", []):
if key.get("kid") == kid:
return key
return None
# 获取支持的算法列表
def get_supported_algorithms(self) -> List[str]:
# 获取JWKS
jwks = self.fetch_jwks()
if not jwks:
return []
# 提取所有支持的算法
algorithms = set()
for key in jwks.get("keys", []):
if "alg" in key:
algorithms.add(key["alg"])
return list(algorithms)
# 获取密钥统计信息
def get_key_statistics(self) -> Dict:
# 获取JWKS
jwks = self.fetch_jwks()
if not jwks:
return {}
# 统计密钥信息
stats = {
"total_keys": len(jwks.get("keys", [])),
"key_types": {},
"algorithms": {},
"key_uses": {}
}
for key in jwks.get("keys", []):
# 统计密钥类型
kty = key.get("kty", "unknown")
stats["key_types"][kty] = stats["key_types"].get(kty, 0) + 1
# 统计算法
alg = key.get("alg", "unknown")
stats["algorithms"][alg] = stats["algorithms"].get(alg, 0) + 1
# 统计用途
use = key.get("use", "unknown")
stats["key_uses"][use] = stats["key_uses"].get(use, 0) + 1
return stats
# 清除缓存
def clear_cache(self):
# 清空JWKS缓存
self.jwks_cache = None
self.cache_timestamp = 0
print("🗑️ JWKS缓存已清除")
# 获取缓存状态
def get_cache_status(self) -> Dict:
if not self.jwks_cache:
return {"status": "no_cache"}
age = time.time() - self.cache_timestamp
is_expired = age > self.cache_expiry
return {
"status": "cached",
"age_seconds": int(age),
"age_hours": round(age / 3600, 2),
"is_expired": is_expired,
"expires_in_seconds": max(0, self.cache_expiry - age)
}
# 定义JWT验证器类
class JWTValidator:
# 初始化JWT验证器
def __init__(self, jwks_manager: JWKSManager):
# 设置JWKS管理器
self.jwks_manager = jwks_manager
# 验证JWT令牌
def validate_jwt(self, token: str) -> Dict:
try:
# 解码JWT头部(不验证签名)
header = jwt.get_unverified_header(token)
# 获取算法和密钥ID
alg = header.get("alg")
kid = header.get("kid")
if not kid:
return {
"valid": False,
"error": "JWT缺少kid字段"
}
# 查找对应的JWK
jwk = self.jwks_manager.find_jwk_by_kid(kid)
if not jwk:
return {
"valid": False,
"error": f"未找到密钥ID: {kid}"
}
# 验证算法是否匹配
if jwk.get("alg") != alg:
return {
"valid": False,
"error": f"算法不匹配: JWT使用{alg},JWK支持{jwk.get('alg')}"
}
# 验证JWT签名
decoded = jwt.decode(
token,
jwk,
algorithms=[alg],
options={"verify_signature": True}
)
return {
"valid": True,
"payload": decoded,
"header": header,
"jwk": jwk
}
except jwt.ExpiredSignatureError:
return {
"valid": False,
"error": "JWT已过期"
}
except jwt.InvalidTokenError as e:
return {
"valid": False,
"error": f"JWT无效: {str(e)}"
}
except Exception as e:
return {
"valid": False,
"error": f"验证失败: {str(e)}"
}
# 批量验证JWT令牌
def validate_multiple_jwts(self, tokens: List[str]) -> List[Dict]:
# 批量验证多个JWT令牌
results = []
for i, token in enumerate(tokens):
print(f"🔍 验证JWT {i+1}/{len(tokens)}...")
result = self.validate_jwt(token)
results.append({
"token_index": i,
"token": token[:50] + "..." if len(token) > 50 else token,
"result": result
})
return results
# 使用示例
def main():
# 配置JWKS端点(示例)
jwks_uri = "https://auth.example.com/.well-known/jwks.json"
print(" JWKS管理器和验证器示例")
print("=" * 60)
# 创建JWKS管理器
manager = JWKSManager(jwks_uri)
# 获取JWKS(如果端点可用)
print("\n1️⃣ 获取JWKS:")
jwks = manager.fetch_jwks()
if jwks:
print(" 成功获取JWKS")
# 显示密钥统计信息
print("\n2️⃣ JWKS统计信息:")
stats = manager.get_key_statistics()
print(f" 总密钥数: {stats['total_keys']}")
print(f" 密钥类型: {stats['key_types']}")
print(f" 支持算法: {stats['algorithms']}")
print(f" 密钥用途: {stats['key_uses']}")
# 显示支持的算法
print("\n3️⃣ 支持的算法:")
algorithms = manager.get_supported_algorithms()
for alg in algorithms:
print(f" - {alg}")
# 创建JWT验证器
print("\n4️⃣ 创建JWT验证器:")
validator = JWTValidator(manager)
print(" JWT验证器创建成功")
else:
print("⚠️ 无法获取JWKS,使用模拟数据演示")
# 创建模拟JWKS数据
mock_jwks = {
"keys": [
{
"kty": "RSA",
"use": "sig",
"kid": "mock-rsa-key",
"alg": "RS256",
"n": "mock-modulus",
"e": "AQAB"
}
]
}
# 模拟获取JWKS
manager.jwks_cache = mock_jwks
manager.cache_timestamp = time.time()
print(" 使用模拟JWKS数据")
# 显示缓存状态
print("\n5️⃣ 缓存状态:")
cache_status = manager.get_cache_status()
for key, value in cache_status.items():
print(f" {key}: {value}")
# 清除缓存
print("\n6️⃣ 清除缓存:")
manager.clear_cache()
print("\n JWKS管理器示例演示完成!")
# 如果直接运行此脚本
if __name__ == "__main__":
main()10. 总结 #
- JWKS 是存储和传输公钥的标准方式,用于 JWT/OAuth/OIDC 安全验证。
jwks_uri是 OIDC 发现端点的一部分,客户端通过它获取公钥。- 密钥轮换 通过
kid实现,提高安全性。 - 必须使用 HTTPS 保护 JWKS 传输,防止 MITM 攻击。
通过使用上述代码示例,您可以:
- 生成JWKS:支持RSA、椭圆曲线和对称密钥
- 管理JWKS:远程获取、缓存管理和格式验证
- 验证JWT:使用JWKS验证JWT签名
- 密钥管理:密钥轮换和算法支持
- 安全集成:与OAuth 2.0和OIDC系统集成
这些示例代码都是完整可独立运行的,包含了详细的中文注释,可以帮助您快速理解和实现JWKS的所有功能。
JWKS 是现代身份验证和 API 安全的关键组件,正确使用可大幅提升系统安全性。