1. #
1.1. mcp_client.py #
14.oauth/mcp_client.py
"""
MCP OAuth客户端
按如下顺序执行
1. 请求MCP服务器上受保护的资源,拿到401+资源元数据的地址
"""
import sys
import re
import asyncio
import httpx
def parse_resource_metadata_from_www_authenticate(www_authenticate):
# resource_metadata="http://127.0.0.1:8000/.well-known/oauth-protected-resource/mcp"
if not www_authenticate:
return None
match = re.search(
r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE
)
return match.group(1) if match else None
async def discover_resource_metadata(http_client, resource_url):
# 无令牌的情况下访问MCP资源服务器
response = await http_client.get(
resource_url, headers={"Accept": "application/json,text/event-stream"}
)
# 无token访问资源服务器预期返回401状态码
if response.status_code != 401:
raise RuntimeError(f"预期先返回401,但收到{response.status_code}")
# 从响应头中解析出资源元数据
return parse_resource_metadata_from_www_authenticate(
response.headers.get("www-authenticate")
)
async def fetch_json(client, url):
# 向目标URL发起GET请求
resp = await client.get(url)
# 请非状态非2XX会抛异常
resp.raise_for_status()
# 解析响应内容为JSON
data = resp.json()
if not isinstance(data, dict):
raise ValueError(f"元数据响应不是JSON对象:{url}")
return data
async def register_client(client, registration_endpoint, redirect_uri):
# 通过POST向注册端点发起客户端注册请求
resp = await client.post(
registration_endpoint,
json={"client_name": "MCP OAUTH客户端", "redirect_uris": [redirect_uri]},
)
resp.raise_for_status()
# 获取JSON格式的响应体
data = resp.json()
# 获取响应体里的客户端ID
client_id = data.get("client_id")
return client_id
async def run_client(resource_url, username, password, redirect_uri):
async with httpx.AsyncClient(
timeout=30,
follow_redirects=True,
headers={"Accept": "application/json, text/event-stream"},
) as http_client:
# 第一步,无token请求受保护的资源
print(f"第一步,无token请求受保护的资源")
resource_metadata_url = await discover_resource_metadata(
http_client, resource_url
)
# 收到资源元数据地址 http://127.0.0.1:8000/.well-known/oauth-protected-resource/mcp
print(f"收到资源元数据地址:{resource_metadata_url}")
# 第二步获取资源元数据
resoure_metadata = await fetch_json(http_client, resource_metadata_url)
# 资源元数据可能返回 authorization_servers(数组) 或 authorization_server(字符串)
auth_servers = resoure_metadata.get("authorization_servers")
+ if isinstance(auth_servers, list) and auth_servers and isinstance(
+ auth_servers[0], str
):
+ auth_server = auth_servers[0]
+ else:
+ single_auth_server = resoure_metadata.get("authorization_server")
+ if isinstance(single_auth_server, str) and single_auth_server:
+ auth_server = single_auth_server
+ else:
+ raise ValueError(
+ "受保护资源元数据缺少authorization_servers/authorization_server字段"
+ )
print(f"第二步 授权服务器地址:{auth_server}")
# 第三步获取授权服务器的元数据
+ auth_server = auth_server.rstrip("/")
auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
print(f"第三步 授权服务器元数据地址:{auth_metadata_url}")
# 异步获取授权服务器元数据JSON
auth_metadata = await fetch_json(http_client, auth_metadata_url)
# 从授权服务器获取元令牌 对于客户端来说,只需要知道
authorization_endpoint = auth_metadata.get("authorization_endpoint")
token_endpoint = auth_metadata.get("token_endpoint")
registration_endpoint = auth_metadata.get("registration_endpoint")
print(f"客户端引导用户去授权服务器授权的端点:{authorization_endpoint}")
print(f"客户端通过授权码换取Token端点:{token_endpoint}")
+ print(f"客户端注册端点:{registration_endpoint}")
# 第四步 动态客户端注册
print(f"第四步 动态客户端注册")
client_id = await register_client(
http_client, registration_endpoint, redirect_uri
)
print(f"客户端标识:", client_id)
# 第五步 引导用户授权
def main():
try:
# 1.受保护的MCP资源服务器地址
resource_url = "http://127.0.0.1:8000/mcp"
# 用户名
username = "admin"
# 密码
password = "123456"
# 回调地址(OAuth2授权码流程需要,客户端监听此地址以接收授权码)
redirect_uri = "http://127.0.0.1:8000/callback"
asyncio.run(run_client(resource_url, username, password, redirect_uri))
return 0
except httpx.HTTPStatusError as e:
print(f"HTTP请求错误 {e.response.status_code}:{e.response.text}")
return 1
except Exception as e:
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
sys.exit(130)