导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.FunctionCalling
  • starlette
  • FastAPI
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • LiteLLM
  • temp
  • 1.
    • 1.1. mcp_client.py

1. #

1.1. mcp_client.py #

14.oauth/mcp_client.py

"""
MCP OAuth客户端
按如下顺序执行
1. 请求MCP服务器上受保护的资源,拿到401+资源元数据的地址
"""

import sys
import re
import asyncio
import httpx


def parse_resource_metadata_from_www_authenticate(www_authenticate):
    # resource_metadata="http://127.0.0.1:8000/.well-known/oauth-protected-resource/mcp"
    if not www_authenticate:
        return None
    match = re.search(
        r'resource_metadata="([^"]+)"', www_authenticate, flags=re.IGNORECASE
    )
    return match.group(1) if match else None


async def discover_resource_metadata(http_client, resource_url):
    # 无令牌的情况下访问MCP资源服务器
    response = await http_client.get(
        resource_url, headers={"Accept": "application/json,text/event-stream"}
    )
    # 无token访问资源服务器预期返回401状态码
    if response.status_code != 401:
        raise RuntimeError(f"预期先返回401,但收到{response.status_code}")
    # 从响应头中解析出资源元数据
    return parse_resource_metadata_from_www_authenticate(
        response.headers.get("www-authenticate")
    )


async def fetch_json(client, url):
    # 向目标URL发起GET请求
    resp = await client.get(url)
    # 请非状态非2XX会抛异常
    resp.raise_for_status()
    # 解析响应内容为JSON
    data = resp.json()
    if not isinstance(data, dict):
        raise ValueError(f"元数据响应不是JSON对象:{url}")
    return data


async def register_client(client, registration_endpoint, redirect_uri):
    # 通过POST向注册端点发起客户端注册请求
    resp = await client.post(
        registration_endpoint,
        json={"client_name": "MCP OAUTH客户端", "redirect_uris": [redirect_uri]},
    )
    resp.raise_for_status()
    # 获取JSON格式的响应体
    data = resp.json()
    # 获取响应体里的客户端ID
    client_id = data.get("client_id")
    return client_id


async def run_client(resource_url, username, password, redirect_uri):
    async with httpx.AsyncClient(
        timeout=30,
        follow_redirects=True,
        headers={"Accept": "application/json, text/event-stream"},
    ) as http_client:
        # 第一步,无token请求受保护的资源
        print(f"第一步,无token请求受保护的资源")
        resource_metadata_url = await discover_resource_metadata(
            http_client, resource_url
        )
        # 收到资源元数据地址 http://127.0.0.1:8000/.well-known/oauth-protected-resource/mcp
        print(f"收到资源元数据地址:{resource_metadata_url}")
        # 第二步获取资源元数据
        resoure_metadata = await fetch_json(http_client, resource_metadata_url)
        # 资源元数据可能返回 authorization_servers(数组) 或 authorization_server(字符串)
        auth_servers = resoure_metadata.get("authorization_servers")
+       if isinstance(auth_servers, list) and auth_servers and isinstance(
+           auth_servers[0], str
        ):
+           auth_server = auth_servers[0]
+       else:
+           single_auth_server = resoure_metadata.get("authorization_server")
+           if isinstance(single_auth_server, str) and single_auth_server:
+               auth_server = single_auth_server
+           else:
+               raise ValueError(
+                   "受保护资源元数据缺少authorization_servers/authorization_server字段"
+               )
        print(f"第二步 授权服务器地址:{auth_server}")
        # 第三步获取授权服务器的元数据
+       auth_server = auth_server.rstrip("/")
        auth_metadata_url = f"{auth_server}/.well-known/oauth-authorization-server"
        print(f"第三步 授权服务器元数据地址:{auth_metadata_url}")
        # 异步获取授权服务器元数据JSON
        auth_metadata = await fetch_json(http_client, auth_metadata_url)
        # 从授权服务器获取元令牌 对于客户端来说,只需要知道
        authorization_endpoint = auth_metadata.get("authorization_endpoint")
        token_endpoint = auth_metadata.get("token_endpoint")
        registration_endpoint = auth_metadata.get("registration_endpoint")
        print(f"客户端引导用户去授权服务器授权的端点:{authorization_endpoint}")
        print(f"客户端通过授权码换取Token端点:{token_endpoint}")
+       print(f"客户端注册端点:{registration_endpoint}")
        # 第四步 动态客户端注册
        print(f"第四步 动态客户端注册")
        client_id = await register_client(
            http_client, registration_endpoint, redirect_uri
        )
        print(f"客户端标识:", client_id)
        # 第五步 引导用户授权


def main():
    try:
        # 1.受保护的MCP资源服务器地址
        resource_url = "http://127.0.0.1:8000/mcp"
        # 用户名
        username = "admin"
        # 密码
        password = "123456"
        # 回调地址(OAuth2授权码流程需要,客户端监听此地址以接收授权码)
        redirect_uri = "http://127.0.0.1:8000/callback"
        asyncio.run(run_client(resource_url, username, password, redirect_uri))
        return 0
    except httpx.HTTPStatusError as e:
        print(f"HTTP请求错误 {e.response.status_code}:{e.response.text}")
        return 1
    except Exception as e:
        import traceback

        traceback.print_exc()
        return 1


if __name__ == "__main__":
    try:
        sys.exit(main())
    except KeyboardInterrupt:
        sys.exit(130)
← 上一节 subprocess 下一节 threading →

访问验证

请输入访问令牌

Token不正确,请重新输入