ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1.发现授权服务器 (RFC9728)
    • 1.1. client.py
    • 1.2. server.py
  • 2.动态客户端注册 (RFC7591)
    • 2.1. auth_server.py
    • 2.2. client.py
  • 3. OAuth授权码流程
    • 3.1. auth_server.py
    • 3.2. client.py
  • 4.模拟授权过程
    • 4.1. client.py
  • 5.交换访问令牌
    • 5.1. auth_server.py
    • 5.2. client.py
  • 6.步骤6: MCP服务器访问
    • 6.1. tools.py
    • 6.2. client.py
    • 6.3. server.py
  • 步骤7: 调用calculate工具
    • 7.1. client.py
    • 7.2. server.py

1.发现授权服务器 (RFC9728) #

uv init mcp_protocal
uv add requests pyjwt 
pip install watchfiles
watchfiles "uv run auth_server.py" .
watchfiles "uv run server.py" .
watchfiles "uv run client.py" .

1.1. client.py #

client.py

# 导入requests库,用于发送HTTP请求
import requests


# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:

    # 构造函数,初始化MCP服务器和OAuth服务器的URL
    def __init__(
        self,
        mcp_server_url="http://localhost:8000",
        oauth_server_url="http://localhost:8001",
    ):
        # 去除MCP服务器URL末尾的斜杠
        self.mcp_server_url = mcp_server_url.rstrip("/")
        # 去除OAuth服务器URL末尾的斜杠
        self.oauth_server_url = oauth_server_url.rstrip("/")

    # 发现授权服务器的方法
    def discover_authorization_server(self):
        try:
            # 发送GET请求获取资源服务器的元数据
            response = requests.get(
                f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
            )
            # 如果响应状态码为200,表示请求成功
            if response.status_code == 200:
                # 解析响应的JSON数据
                resource_metadata = response.json()
                # 打印获取资源元数据成功的信息
                print(f" 获取资源元数据成功")
                # 打印资源标识符
                print(f"   资源标识符: {resource_metadata.get('resource')}")

                # 获取授权服务器列表
                auth_servers = resource_metadata.get("authorization_servers", [])
                # 如果存在授权服务器
                if auth_servers:
                    # 取第一个授权服务器
                    auth_server = auth_servers[0]
                    # 保存授权服务器的元数据
                    self.authorization_server_metadata = auth_server
                    # 获取发行者信息
                    issuer = auth_server.get("issuer")
                    # 获取注册端点
                    registration_endpoint = auth_server.get("registration_endpoint")
                    # 获取授权端点
                    authorization_endpoint = auth_server.get("authorization_endpoint")
                    # 获取令牌端点
                    token_endpoint = auth_server.get("token_endpoint")
                    # 获取支持的作用域
                    scopes_supported = auth_server.get("scopes_supported")
                    # 获取支持的PKCE方法
                    code_challenge_methods_supported = auth_server.get(
                        "code_challenge_methods_supported"
                    )

                    # 打印发现授权服务器成功的信息
                    print(f" 发现授权服务器")
                    # 打印发行者
                    print(f"   发行者: {issuer}")
                    # 打印注册端点
                    print(f"   注册端点: {registration_endpoint}")
                    # 打印授权端点
                    print(f"   授权端点: {authorization_endpoint}")
                    # 打印令牌端点
                    print(f"   令牌端点: {token_endpoint}")
                    # 打印支持的作用域
                    print(f"   支持的作用域: {', '.join(scopes_supported)}")
                    # 打印PKCE支持的方法
                    print(f"   PKCE支持: {', '.join(code_challenge_methods_supported)}")
                    # 返回True,表示发现成功
                    return True
                else:
                    # 未找到授权服务器,打印错误信息
                    print(" 未找到授权服务器")
                    return False
            else:
                # 获取资源元数据失败,打印状态码
                print(f" 获取资源元数据失败: {response.status_code}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 发现授权服务器失败: {e}")
            return False

    # 运行演示流程的方法
    def run(self):
        # 如果未能发现授权服务器,打印失败信息
        if not self.discover_authorization_server():
            print(" 演示失败:无法发现授权服务器")
            return False


# 主函数,程序入口
def main():
    try:
        # 创建OAuthClient实例
        client = OAuthClient()
        # 运行演示流程
        client.run()
    # 捕获用户中断异常
    except KeyboardInterrupt:
        print("\n\n演示被用户中断")
    # 捕获其他异常并打印错误信息
    except Exception as e:
        print(f"\n演示过程中发生错误: {e}")


# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

1.2. server.py #

server.py

# 导入日志模块
import logging
# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler
# 导入json模块
import json

# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)

# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
    """MCP服务器核心类,支持OAuth 2.1授权"""

    # 初始化方法
    def __init__(self):
        # 协议版本号
        self.protocol_version = "2025-06-18"
        # 服务器信息,包括名称和版本
        self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
        # 服务器能力,包括工具、资源和提示
        self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
        # OAuth 2.1授权相关配置
        self.oauth_config = {
            # 授权端点
            "authorization_endpoint": "http://localhost:8001/oauth/authorize",
            # 令牌端点
            "token_endpoint": "http://localhost:8001/oauth/token",
            # 注册端点
            "registration_endpoint": "http://localhost:8001/oauth/register",
            # 发行者
            "issuer": "http://localhost:8001",
            # 支持的作用域
            "scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
            # 支持的响应类型
            "response_types_supported": ["code"],
            # 支持的授权类型
            "grant_types_supported": ["authorization_code", "refresh_token"],
            # 令牌端点支持的认证方法
            "token_endpoint_auth_methods_supported": [
                "client_secret_basic",
                "client_secret_post",
            ],
            # 支持的PKCE方法
            "code_challenge_methods_supported": ["S256"],  # PKCE支持
        }

    # 获取OAuth 2.0 Protected Resource Metadata (RFC9728)
    def get_resource_metadata(self):
        """获取资源服务器元数据"""
        # 返回资源元数据字典
        return {
            "resource": "http://localhost:8000",
            "authorization_servers": [
                {
                    "issuer": self.oauth_config["issuer"],# 发行者
                    "authorization_endpoint": self.oauth_config["authorization_endpoint"],# 授权端点
                    "token_endpoint": self.oauth_config["token_endpoint"],# 令牌端点
                    "registration_endpoint": self.oauth_config["registration_endpoint"],# 注册端点
                    "scopes_supported": self.oauth_config["scopes_supported"],# 支持的作用域
                    "response_types_supported": self.oauth_config["response_types_supported"],# 支持的响应类型
                    "grant_types_supported": self.oauth_config["grant_types_supported"],# 支持的授权类型
                    "token_endpoint_auth_methods_supported": self.oauth_config["token_endpoint_auth_methods_supported"],# 令牌端点支持的认证方法
                    "code_challenge_methods_supported": self.oauth_config["code_challenge_methods_supported"],# 支持的PKCE方法
                }
            ],
        }


# 定义HTTP请求处理类
class MCPHTTPHandler(BaseHTTPRequestHandler):
    # 初始化方法,接收mcp_server参数
    def __init__(self, *args, mcp_server=None, **kwargs):
        # 保存MCP服务器实例
        self.mcp_server = mcp_server
        # 调用父类的初始化方法
        super().__init__(*args, **kwargs)

    # 处理OAuth资源元数据端点
    def _handle_resource_metadata(self):
        try:
            # 获取资源元数据
            metadata = self.mcp_server.get_resource_metadata()
            # 发送200响应
            self.send_response(200)
            # 设置响应头为application/json
            self.send_header("Content-Type", "application/json")
            # 结束响应头
            self.end_headers()
            # 写入JSON数据
            self.wfile.write(json.dumps(metadata).encode("utf-8"))
        except Exception as e:
            # 记录错误日志
            logger.error(f"处理资源元数据错误: {e}")
            # 返回500错误
            self.send_error(500, "Internal Server Error")
    # 处理GET请求
    def do_GET(self):
        try:
            # 如果请求路径为OAuth资源元数据端点
            if self.path == "/.well-known/oauth-protected-resource":
                self._handle_resource_metadata()
                return

            # 如果请求根路径,返回欢迎信息
            if self.path == "/":
                self.send_response(200)
                self.send_header("Content-Type", "text/html")
                self.end_headers()
                self.wfile.write(b"Welcome to MCP server!")
                return
        except Exception as e:
            # 处理GET请求出错,记录日志
            logger.error(f"GET处理错误: {e}")
            # 返回500错误
            self.send_error(500, "Internal Server Error")


# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
    """运行HTTP服务器"""

    # 定义处理器工厂函数,用于传递mcp_server实例
    def handler_factory(*args, **kwargs):
        return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)

    # 创建HTTPServer实例,绑定主机和端口
    server = HTTPServer((host, port), handler_factory)
    # 记录服务器启动信息
    logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")

    try:
        # 启动服务器,进入循环监听请求
        server.serve_forever()
    except KeyboardInterrupt:
        # 捕获Ctrl+C,记录服务器停止
        logger.info("HTTP服务器已停止")
    finally:
        # 关闭服务器
        server.server_close()


# 定义主函数
def main():
    """主函数"""
    # 导入argparse模块,用于解析命令行参数
    import argparse

    # 创建ArgumentParser对象,设置描述信息
    parser = argparse.ArgumentParser(description="MCP HTTP服务器")
    # 添加--host参数,指定服务器主机,默认localhost
    parser.add_argument(
        "--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
    )
    # 添加--port参数,指定服务器端口,默认8000
    parser.add_argument(
        "--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
    )

    # 解析命令行参数
    args = parser.parse_args()

    # 创建MCP服务器实例
    mcp_server = MCPServer()

    # 运行HTTP服务器
    run_http_server(mcp_server, args.host, args.port)


# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

2.动态客户端注册 (RFC7591) #

2.1. auth_server.py #

auth_server.py

# 导入日志模块
import logging

# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler

# 导入json模块
import json

# 导入secrets模块用于生成安全的随机数
import secrets

# 导入datetime相关类
from datetime import datetime, timedelta, timezone

# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义OAuthServer类,作为OAuth 2.1授权服务器
class OAuthServer:
    """OAuth 2.1授权服务器"""

    # 初始化方法
    def __init__(self):
        # 存储注册的客户端信息
        self.clients = {}  # 注册的客户端
        # 存储授权码
        self.authorization_codes = {}  # 授权码
        # 存储访问令牌
        self.access_tokens = {}  # 访问令牌
        # 服务器密钥
        self.server_secret = "oauth_server_secret_key_2025"

        # 服务器元数据
        self.metadata = {
            # 发行者
            "issuer": "http://localhost:8001",
            # 授权端点
            "authorization_endpoint": "http://localhost:8001/oauth/authorize",
            # 令牌端点
            "token_endpoint": "http://localhost:8001/oauth/token",
            # 客户端注册端点
            "registration_endpoint": "http://localhost:8001/oauth/register",
            # 支持的作用域
            "scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
            # 支持的响应类型
            "response_types_supported": ["code"],
            # 支持的授权类型
            "grant_types_supported": ["authorization_code", "refresh_token"],
            # 令牌端点支持的认证方法
            "token_endpoint_auth_methods_supported": [
                "client_secret_basic",
                "client_secret_post",
            ],
            # 支持的PKCE方法
            "code_challenge_methods_supported": ["S256"],
            # 服务文档地址
            "service_documentation": "https://modelcontextprotocol.io/specification/2025-06-18/basic/authorization",
        }

    # 动态客户端注册方法 (RFC7591)
    def register_client(self, client_metadata):
        """动态客户端注册 (RFC7591)"""
        # 生成客户端ID
        client_id = str(secrets.token_urlsafe(16))
        # 生成客户端密钥
        client_secret = str(secrets.token_urlsafe(32))

        # 构造客户端信息字典
        client_info = {
            "client_id": client_id,
            "client_secret": client_secret,
            # 客户端ID签发时间(UTC时间戳)
            "client_id_issued_at": int(datetime.now(timezone.utc).timestamp()),
            # 客户端密钥过期时间(0表示永不过期)
            "client_secret_expires_at": 0,  # 永不过期
            # 合并客户端元数据
            **client_metadata,
        }

        # 保存客户端信息到clients字典
        self.clients[client_id] = client_info
        # 记录新客户端注册日志
        logger.info(f"新客户端注册: {client_id}")

        # 返回客户端信息
        return client_info


# 定义OAuthHandler类,继承自BaseHTTPRequestHandler
class OAuthHandler(BaseHTTPRequestHandler):
    """OAuth HTTP处理器"""

    # 初始化方法,接收oauth_server参数
    def __init__(self, *args, oauth_server=None, **kwargs):
        # 保存OAuthServer实例
        self.oauth_server = oauth_server
        # 调用父类初始化方法
        super().__init__(*args, **kwargs)

    # 处理客户端注册请求的方法
    def _handle_client_registration(self):
        """处理客户端注册请求"""
        try:
            # 获取请求体长度
            content_length = int(self.headers.get("Content-Length", 0))
            # 读取请求体内容
            body = self.rfile.read(content_length)
            # 解析JSON格式的客户端元数据
            client_metadata = json.loads(body.decode("utf-8"))

            # 调用OAuthServer的register_client方法注册客户端
            client_info = self.oauth_server.register_client(client_metadata)

            # 发送201响应,表示创建成功
            self.send_response(201)
            # 设置响应头为application/json
            self.send_header("Content-Type", "application/json")
            # 结束响应头
            self.end_headers()
            # 返回注册成功的客户端信息(JSON格式)
            self.wfile.write(json.dumps(client_info).encode("utf-8"))

        except Exception as e:
            # 记录错误日志
            logger.error(f"客户端注册错误: {e}")
            # 返回400错误
            self.send_error(400, "Bad Request")

    # 处理POST请求的方法
    def do_POST(self):
        """处理POST请求"""
        try:
            # 如果请求路径为/oauth/register,处理客户端注册
            if self.path == "/oauth/register":
                self._handle_client_registration()
            # 其他路径返回404
            else:
                self.send_error(404, "Not Found")
        except Exception as e:
            # 记录POST处理错误日志
            logger.error(f"POST处理错误: {e}")
            # 返回500错误
            self.send_error(500, "Internal Server Error")


# 运行OAuth授权服务器的函数
def run_oauth_server(host="localhost", port=8001):
    """运行OAuth授权服务器"""
    # 创建OAuthServer实例
    oauth_server = OAuthServer()

    # 定义处理器工厂函数,用于传递oauth_server实例
    def handler_factory(*args, **kwargs):
        return OAuthHandler(*args, oauth_server=oauth_server, **kwargs)

    # 创建HTTPServer实例,绑定主机和端口
    server = HTTPServer((host, port), handler_factory)
    # 记录服务器启动信息
    logger.info(f"OAuth授权服务器运行在 http://{host}:{port}")
    # 记录支持的端点信息
    logger.info("支持的端点:")
    logger.info(
        f"  - 服务器元数据: http://{host}:{port}/.well-known/oauth-authorization-server"
    )
    logger.info(f"  - 授权端点: http://{host}:{port}/oauth/authorize")
    logger.info(f"  - 令牌端点: http://{host}:{port}/oauth/token")
    logger.info(f"  - 客户端注册: http://{host}:{port}/oauth/register")

    try:
        # 启动服务器,进入循环监听请求
        server.serve_forever()
    except KeyboardInterrupt:
        # 捕获Ctrl+C,记录服务器停止
        logger.info("OAuth授权服务器已停止")
    finally:
        # 关闭服务器
        server.server_close()


# 判断是否为主程序入口
if __name__ == "__main__":
    # 导入argparse模块,用于解析命令行参数
    import argparse

    # 创建ArgumentParser对象,设置描述信息
    parser = argparse.ArgumentParser(description="MCP OAuth授权服务器")
    # 添加--host参数,指定服务器主机,默认localhost
    parser.add_argument(
        "--host", default="localhost", help="服务器主机 (默认: localhost)"
    )
    # 添加--port参数,指定服务器端口,默认8001
    parser.add_argument(
        "--port", type=int, default=8001, help="服务器端口 (默认: 8001)"
    )

    # 解析命令行参数
    args = parser.parse_args()
    # 启动OAuth授权服务器
    run_oauth_server(args.host, args.port)

2.2. client.py #

client.py

# 导入requests库,用于发送HTTP请求
import requests

# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:

    # 构造函数,初始化MCP服务器和OAuth服务器的URL
    def __init__(
        self,
        mcp_server_url="http://localhost:8000",
        oauth_server_url="http://localhost:8001",
    ):
        # 去除MCP服务器URL末尾的斜杠
        self.mcp_server_url = mcp_server_url.rstrip("/")
        # 去除OAuth服务器URL末尾的斜杠
        self.oauth_server_url = oauth_server_url.rstrip("/")

    # 动态客户端注册方法
+   def register_client(self, client_name="MCP客户端"):
        # 打印步骤提示
+       print("\n📝 步骤2: 动态客户端注册")
+       print("=" * 50)

+       try:
            # 检查是否已发现授权服务器
+           if not self.authorization_server_metadata:
+               print(" 请先发现授权服务器")
+               return False

            # 获取注册端点
+           registration_endpoint = self.authorization_server_metadata[
+               "registration_endpoint"
+           ]

            # 构造客户端注册请求体
+           client_metadata = {
+               "client_name": client_name, # 客户端名称
+               "redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
+               "grant_types": ["authorization_code"], # 授权类型
+               "response_types": ["code"], # 响应类型
+               "token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
+               "scope": "mcp:tools mcp:resources", # 作用域
+               "application_type": "web", # 应用类型
+           }

            # 打印注册请求相关信息
+           print(f"📤 发送客户端注册请求到: {registration_endpoint}")
+           print(f"   客户端名称: {client_name}")
+           print(f"   重定向URI: {client_metadata['redirect_uris']}")
+           print(f"   授权类型: {client_metadata['grant_types']}")
+           print(f"   作用域: {client_metadata['scope']}")

            # 发送POST请求进行客户端注册
+           response = requests.post(registration_endpoint, json=client_metadata)
            # 判断注册是否成功
+           if response.status_code == 201:
                # 解析返回的客户端信息
+               client_info = response.json()
                # 保存客户端ID
+               self.client_id = client_info["client_id"]
                # 保存客户端密钥
+               self.client_secret = client_info["client_secret"]

                # 打印注册成功信息
+               print(" 客户端注册成功")
+               print(f"   客户端ID: {self.client_id}")
+               print(f"   客户端密钥: {self.client_secret[:8]}...")
+               return True
+           else:
                # 注册失败,打印错误信息
+               print(f" 客户端注册失败: {response.status_code}")
+               print(f"   错误信息: {response.text}")
+               return False

        # 捕获异常并打印错误信息
+       except Exception as e:
+           print(f" 客户端注册失败: {e}")
+           return False

    # 发现授权服务器的方法
    def discover_authorization_server(self):
        try:
            # 发送GET请求获取资源服务器的元数据
            response = requests.get(
                f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
            )
            # 判断响应状态码是否为200
            if response.status_code == 200:
                # 解析响应的JSON数据
                resource_metadata = response.json()
                # 打印获取资源元数据成功的信息
                print(f" 获取资源元数据成功")
                # 打印资源标识符
                print(f"   资源标识符: {resource_metadata.get('resource')}")

                # 获取授权服务器列表
                auth_servers = resource_metadata.get("authorization_servers", [])
                # 判断是否存在授权服务器
                if auth_servers:
                    # 取第一个授权服务器
                    auth_server = auth_servers[0]
                    # 保存授权服务器的元数据
                    self.authorization_server_metadata = auth_server
                    # 获取发行者信息
                    issuer = auth_server.get("issuer")
                    # 获取注册端点
                    registration_endpoint = auth_server.get("registration_endpoint")
                    # 获取授权端点
                    authorization_endpoint = auth_server.get("authorization_endpoint")
                    # 获取令牌端点
                    token_endpoint = auth_server.get("token_endpoint")
                    # 获取支持的作用域
                    scopes_supported = auth_server.get("scopes_supported")
                    # 获取支持的PKCE方法
                    code_challenge_methods_supported = auth_server.get(
                        "code_challenge_methods_supported"
                    )

                    # 打印发现授权服务器成功的信息
                    print(f" 发现授权服务器")
                    # 打印发行者
                    print(f"   发行者: {issuer}")
                    # 打印注册端点
                    print(f"   注册端点: {registration_endpoint}")
                    # 打印授权端点
                    print(f"   授权端点: {authorization_endpoint}")
                    # 打印令牌端点
                    print(f"   令牌端点: {token_endpoint}")
                    # 打印支持的作用域
                    print(f"   支持的作用域: {', '.join(scopes_supported)}")
                    # 打印PKCE支持的方法
                    print(f"   PKCE支持: {', '.join(code_challenge_methods_supported)}")
                    # 返回True,表示发现成功
                    return True
                else:
                    # 未找到授权服务器,打印错误信息
                    print(" 未找到授权服务器")
                    return False
            else:
                # 获取资源元数据失败,打印状态码
                print(f" 获取资源元数据失败: {response.status_code}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 发现授权服务器失败: {e}")
            return False

    # 运行演示流程的方法
    def run(self):
        # 步骤1: 发现授权服务器
        if not self.discover_authorization_server():
            print(" 演示失败:无法发现授权服务器")
            return False
        # 步骤2: 注册客户端
+       if not self.register_client():
+           print(" 演示失败:客户端注册失败")
+           return False

# 主函数,程序入口
def main():
    try:
        # 创建OAuthClient实例
        client = OAuthClient()
        # 运行演示流程
        client.run()
    # 捕获用户中断异常
    except KeyboardInterrupt:
        print("\n\n演示被用户中断")
    # 捕获其他异常并打印错误信息
    except Exception as e:
        print(f"\n演示过程中发生错误: {e}")

# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

3. OAuth授权码流程 #

3.1. auth_server.py #

auth_server.py

# 导入日志模块
import logging

# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler

# 导入json模块
import json

# 导入secrets模块用于生成安全的随机数
import secrets

# 导入datetime相关类
from datetime import datetime, timedelta, timezone
+from urllib.parse import parse_qs, urlparse

# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)

# 定义OAuthServer类,作为OAuth 2.1授权服务器
class OAuthServer:
    """OAuth 2.1授权服务器"""

    # 初始化方法
    def __init__(self):
        # 存储注册的客户端信息
        self.clients = {}  # 注册的客户端
        # 存储授权码
        self.authorization_codes = {}  # 授权码
        # 存储访问令牌
        self.access_tokens = {}  # 访问令牌
        # 服务器密钥
        self.server_secret = "oauth_server_secret_key_2025"

        # 服务器元数据
        self.metadata = {
            # 发行者
            "issuer": "http://localhost:8001",
            # 授权端点
            "authorization_endpoint": "http://localhost:8001/oauth/authorize",
            # 令牌端点
            "token_endpoint": "http://localhost:8001/oauth/token",
            # 客户端注册端点
            "registration_endpoint": "http://localhost:8001/oauth/register",
            # 支持的作用域
            "scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
            # 支持的响应类型
            "response_types_supported": ["code"],
            # 支持的授权类型
            "grant_types_supported": ["authorization_code", "refresh_token"],
            # 令牌端点支持的认证方法
            "token_endpoint_auth_methods_supported": [
                "client_secret_basic",
                "client_secret_post",
            ],
            # 支持的PKCE方法
            "code_challenge_methods_supported": ["S256"],
            # 服务文档地址
            "service_documentation": "https://modelcontextprotocol.io/specification/2025-06-18/basic/authorization",
        }

    # 创建授权码方法
+   def create_authorization_code(
+       self, client_id, redirect_uri, scope, state, code_challenge=None
+   ):
+       """创建授权码"""
        # 检查client_id是否已注册
+       if client_id not in self.clients:
+           return None

        # 生成授权码
+       code = str(secrets.token_urlsafe(32))
        # 保存授权码及相关信息
+       self.authorization_codes[code] = {
+           "client_id": client_id, # 客户端ID
+           "redirect_uri": redirect_uri, # 重定向URI
+           "scope": scope, # 作用域
+           "state": state, # 状态
+           "code_challenge": code_challenge, # PKCE挑战码
+           "expires_at": datetime.now(timezone.utc) + timedelta(minutes=10), # 过期时间
+           "created_at": datetime.now(timezone.utc), # 创建时间
+       }

        # 记录授权码创建日志
+       logger.info(f"创建授权码: {code} for client: {client_id}")
        # 返回授权码
+       return code

    # 动态客户端注册方法 (RFC7591)
    def register_client(self, client_metadata):
        """动态客户端注册 (RFC7591)"""
        # 生成客户端ID
        client_id = str(secrets.token_urlsafe(16))
        # 生成客户端密钥
        client_secret = str(secrets.token_urlsafe(32))

        # 构造客户端信息字典
        client_info = {
            "client_id": client_id,
            "client_secret": client_secret,
            # 客户端ID签发时间(UTC时间戳)
            "client_id_issued_at": int(datetime.now(timezone.utc).timestamp()),
            # 客户端密钥过期时间(0表示永不过期)
            "client_secret_expires_at": 0,  # 永不过期
            # 合并客户端元数据
            **client_metadata,
        }

        # 保存客户端信息到clients字典
        self.clients[client_id] = client_info
        # 记录新客户端注册日志
        logger.info(f"新客户端注册: {client_id}")

        # 返回客户端信息
        return client_info

# 定义OAuthHandler类,继承自BaseHTTPRequestHandler
class OAuthHandler(BaseHTTPRequestHandler):
    """OAuth HTTP处理器"""

    # 初始化方法,接收oauth_server参数
    def __init__(self, *args, oauth_server=None, **kwargs):
        # 保存OAuthServer实例
        self.oauth_server = oauth_server
        # 调用父类初始化方法
        super().__init__(*args, **kwargs)

    # 构建查询字符串的方法
+   def _build_query_string(self, params):
+       """构建查询字符串"""
+       return "&".join([f"{k}={v}" for k, v in params.items()])

    # 处理授权请求的方法
+   def _handle_authorization(self):
+       """处理授权请求"""
+       try:
            # 解析查询参数
+           query = urlparse(self.path).query
+           params = parse_qs(query)
            # 获取client_id参数
+           client_id = params.get("client_id", [""])[0]
            # 获取redirect_uri参数
+           redirect_uri = params.get("redirect_uri", [""])[0]
            # 获取scope参数
+           scope = params.get("scope", [""])[0]
            # 获取state参数
+           state = params.get("state", [""])[0]
            # 获取code_challenge参数
+           code_challenge = params.get("code_challenge", [""])[0]
            # 获取resource参数(RFC8707资源指示器)
+           resource = params.get("resource", [""])[0]

            # 记录授权请求日志
+           logger.info(
+               f"授权请求: client_id={client_id}, scope={scope}, resource={resource}"
+           )

            # 验证客户端是否已注册
+           if client_id not in self.oauth_server.clients:
+               self.send_error(400, "Invalid client_id")
+               return

            # 创建授权码
+           code = self.oauth_server.create_authorization_code(
+               client_id, redirect_uri, scope, state, code_challenge
+           )

            # 检查授权码是否创建成功
+           if not code:
+               self.send_error(400, "Failed to create authorization code")
+               return

            # 构造重定向参数
+           redirect_params = {"code": code, "state": state}

            # 构造重定向URL
+           redirect_url = f"{redirect_uri}?{self._build_query_string(redirect_params)}"

            # 发送重定向响应
+           self.send_response(302)
+           self.send_header("Location", redirect_url)
+           self.end_headers()

            # 记录授权成功日志
+           logger.info(f"授权成功,重定向到: {redirect_url}")

+       except Exception as e:
            # 记录授权处理错误日志
+           logger.error(f"授权处理错误: {e}")
            # 返回400错误
+           self.send_error(400, "Bad Request")

    # 处理GET请求的方法
+   def do_GET(self):
+       """处理GET请求"""
+       try:
            # 判断请求路径是否为授权端点
+           if self.path.startswith("/oauth/authorize"):
+               self._handle_authorization()
+           else:
+               self.send_error(404, "Not Found")
+       except Exception as e:
            # 记录GET处理错误日志
+           logger.error(f"GET处理错误: {e}")
            # 返回500错误
+           self.send_error(500, "Internal Server Error")

    # 处理客户端注册请求的方法
    def _handle_client_registration(self):
        """处理客户端注册请求"""
        try:
            # 获取请求体长度
            content_length = int(self.headers.get("Content-Length", 0))
            # 读取请求体内容
            body = self.rfile.read(content_length)
            # 解析JSON格式的客户端元数据
            client_metadata = json.loads(body.decode("utf-8"))

            # 调用OAuthServer的register_client方法注册客户端
            client_info = self.oauth_server.register_client(client_metadata)

            # 发送201响应,表示创建成功
            self.send_response(201)
            # 设置响应头为application/json
            self.send_header("Content-Type", "application/json")
            # 结束响应头
            self.end_headers()
            # 返回注册成功的客户端信息(JSON格式)
            self.wfile.write(json.dumps(client_info).encode("utf-8"))

        except Exception as e:
            # 记录错误日志
            logger.error(f"客户端注册错误: {e}")
            # 返回400错误
            self.send_error(400, "Bad Request")

    # 处理POST请求的方法
    def do_POST(self):
        """处理POST请求"""
        try:
            # 如果请求路径为/oauth/register,处理客户端注册
            if self.path == "/oauth/register":
                self._handle_client_registration()
            # 其他路径返回404
            else:
                self.send_error(404, "Not Found")
        except Exception as e:
            # 记录POST处理错误日志
            logger.error(f"POST处理错误: {e}")
            # 返回500错误
            self.send_error(500, "Internal Server Error")

# 运行OAuth授权服务器的函数
def run_oauth_server(host="localhost", port=8001):
    """运行OAuth授权服务器"""
    # 创建OAuthServer实例
    oauth_server = OAuthServer()

    # 定义处理器工厂函数,用于传递oauth_server实例
    def handler_factory(*args, **kwargs):
        return OAuthHandler(*args, oauth_server=oauth_server, **kwargs)

    # 创建HTTPServer实例,绑定主机和端口
    server = HTTPServer((host, port), handler_factory)
    # 记录服务器启动信息
    logger.info(f"OAuth授权服务器运行在 http://{host}:{port}")
    # 记录支持的端点信息
    logger.info("支持的端点:")
    logger.info(
        f"  - 服务器元数据: http://{host}:{port}/.well-known/oauth-authorization-server"
    )
    logger.info(f"  - 授权端点: http://{host}:{port}/oauth/authorize")
    logger.info(f"  - 令牌端点: http://{host}:{port}/oauth/token")
    logger.info(f"  - 客户端注册: http://{host}:{port}/oauth/register")

    try:
        # 启动服务器,进入循环监听请求
        server.serve_forever()
    except KeyboardInterrupt:
        # 捕获Ctrl+C,记录服务器停止
        logger.info("OAuth授权服务器已停止")
    finally:
        # 关闭服务器
        server.server_close()

# 判断是否为主程序入口
if __name__ == "__main__":
    # 导入argparse模块,用于解析命令行参数
    import argparse

    # 创建ArgumentParser对象,设置描述信息
    parser = argparse.ArgumentParser(description="MCP OAuth授权服务器")
    # 添加--host参数,指定服务器主机,默认localhost
    parser.add_argument(
        "--host", default="localhost", help="服务器主机 (默认: localhost)"
    )
    # 添加--port参数,指定服务器端口,默认8001
    parser.add_argument(
        "--port", type=int, default=8001, help="服务器端口 (默认: 8001)"
    )

    # 解析命令行参数
    args = parser.parse_args()
    # 启动OAuth授权服务器
    run_oauth_server(args.host, args.port)

3.2. client.py #

client.py

# 导入requests库,用于发送HTTP请求
import requests
# 导入secrets库,用于生成安全的随机字符串
+import secrets
# 从urllib.parse导入urlencode,用于URL参数编码
+from urllib.parse import urlencode
# 导入hashlib库,用于哈希运算
+import hashlib
# 导入base64库,用于base64编码
+import base64

# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:

    # 构造函数,初始化MCP服务器和OAuth服务器的URL
    def __init__(
        self,
        mcp_server_url="http://localhost:8000",
        oauth_server_url="http://localhost:8001",
    ):
        # 去除MCP服务器URL末尾的斜杠
        self.mcp_server_url = mcp_server_url.rstrip("/")
        # 去除OAuth服务器URL末尾的斜杠
        self.oauth_server_url = oauth_server_url.rstrip("/")

    # 生成PKCE挑战码的方法
+   def generate_pkce_challenge(self):
        # 生成code_verifier(高强度随机字符串)
+       code_verifier = secrets.token_urlsafe(32)
        # 计算SHA256哈希并进行base64-url编码,去除末尾的等号
+       code_challenge = (
+           base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
+           .decode()
+           .rstrip("=")
+       )
        # 返回code_verifier和code_challenge
+       return code_verifier, code_challenge

    # 开始OAuth授权流程的方法
+   def start_authorization_flow(self, scope="mcp:tools"):
        # 打印流程提示信息
+       print("\n 步骤3: 开始OAuth授权流程")
+       print("=" * 50)

+       try:
            # 检查是否已发现授权服务器
+           if not self.authorization_server_metadata:
+               print(" 请先发现授权服务器")
+               return None

            # 获取授权端点URL
+           authorization_endpoint = self.authorization_server_metadata[
+               "authorization_endpoint"
+           ]

            # 生成PKCE挑战码
+           code_verifier, code_challenge = self.generate_pkce_challenge()

            # 构造授权请求参数
+           auth_params = {
+               "response_type": "code",# 响应类型
+               "client_id": self.client_id, # 客户端ID
+               "redirect_uri": "http://localhost:8080/callback", # 重定向URI
+               "scope": scope, # 作用域
+               "state": secrets.token_urlsafe(16), # 状态
+               "code_challenge": code_challenge, # PKCE挑战码
+               "code_challenge_method": "S256", # PKCE挑战方法
+               "resource": self.mcp_server_url,  # RFC8707资源指示器
+           }

            # 构造授权URL
+           auth_url = f"{authorization_endpoint}?{urlencode(auth_params)}"

            # 打印授权请求参数
+           print(" OAuth授权请求参数:")
+           print(f"   响应类型: {auth_params['response_type']}")
+           print(f"   客户端ID: {auth_params['client_id']}")
+           print(f"   重定向URI: {auth_params['redirect_uri']}")
+           print(f"   作用域: {auth_params['scope']}")
+           print(f"   状态: {auth_params['state']}")
+           print(f"   代码挑战: {auth_params['code_challenge'][:16]}...")
+           print(f"   代码挑战方法: {auth_params['code_challenge_method']}")
+           print(f"   资源指示器: {auth_params['resource']}")

            # 打印授权URL
+           print(f"\n🔗 授权URL:")
+           print(f"   {auth_url}")

            # 提示用户在浏览器中访问授权URL
+           print(f"\n 请在浏览器中访问上述URL完成授权")
+           print(f" 注意: 实际应用中需要用户交互")

            # 返回授权流程相关信息
+           return {
+               "auth_url": auth_url, # 授权URL
+               "code_verifier": code_verifier, # PKCE挑战码
+               "state": auth_params["state"], # 状态
+               "auth_params": auth_params, # 授权参数
+           }

        # 捕获异常并打印错误信息
+       except Exception as e:
+           print(f" 开始授权流程失败: {e}")
+           return None

    # 动态客户端注册方法
    def register_client(self, client_name="MCP客户端"):
        # 打印步骤提示
        print("\n📝 步骤2: 动态客户端注册")
        print("=" * 50)

        try:
            # 检查是否已发现授权服务器
            if not self.authorization_server_metadata:
                print(" 请先发现授权服务器")
                return False

            # 获取注册端点
            registration_endpoint = self.authorization_server_metadata[
                "registration_endpoint"
            ]

            # 构造客户端注册请求体
            client_metadata = {
                "client_name": client_name, # 客户端名称
                "redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
                "grant_types": ["authorization_code"], # 授权类型
                "response_types": ["code"], # 响应类型
                "token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
                "scope": "mcp:tools mcp:resources", # 作用域
                "application_type": "web", # 应用类型
            }

            # 打印注册请求相关信息
            print(f"📤 发送客户端注册请求到: {registration_endpoint}")
            print(f"   客户端名称: {client_name}")
            print(f"   重定向URI: {client_metadata['redirect_uris']}")
            print(f"   授权类型: {client_metadata['grant_types']}")
            print(f"   作用域: {client_metadata['scope']}")

            # 发送POST请求进行客户端注册
            response = requests.post(registration_endpoint, json=client_metadata)
            # 判断注册是否成功
            if response.status_code == 201:
                # 解析返回的客户端信息
                client_info = response.json()
                # 保存客户端ID
                self.client_id = client_info["client_id"]
                # 保存客户端密钥
                self.client_secret = client_info["client_secret"]

                # 打印注册成功信息
                print(" 客户端注册成功")
                print(f"   客户端ID: {self.client_id}")
                print(f"   客户端密钥: {self.client_secret[:8]}...")
                return True
            else:
                # 注册失败,打印错误信息
                print(f" 客户端注册失败: {response.status_code}")
                print(f"   错误信息: {response.text}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 客户端注册失败: {e}")
            return False

    # 发现授权服务器的方法
    def discover_authorization_server(self):
        try:
            # 发送GET请求获取资源服务器的元数据
            response = requests.get(
                f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
            )
            # 判断响应状态码是否为200
            if response.status_code == 200:
                # 解析响应的JSON数据
                resource_metadata = response.json()
                # 打印获取资源元数据成功的信息
                print(f" 获取资源元数据成功")
                # 打印资源标识符
                print(f"   资源标识符: {resource_metadata.get('resource')}")

                # 获取授权服务器列表
                auth_servers = resource_metadata.get("authorization_servers", [])
                # 判断是否存在授权服务器
                if auth_servers:
                    # 取第一个授权服务器
                    auth_server = auth_servers[0]
                    # 保存授权服务器的元数据
                    self.authorization_server_metadata = auth_server
                    # 获取发行者信息
                    issuer = auth_server.get("issuer")
                    # 获取注册端点
                    registration_endpoint = auth_server.get("registration_endpoint")
                    # 获取授权端点
                    authorization_endpoint = auth_server.get("authorization_endpoint")
                    # 获取令牌端点
                    token_endpoint = auth_server.get("token_endpoint")
                    # 获取支持的作用域
                    scopes_supported = auth_server.get("scopes_supported")
                    # 获取支持的PKCE方法
                    code_challenge_methods_supported = auth_server.get(
                        "code_challenge_methods_supported"
                    )

                    # 打印发现授权服务器成功的信息
                    print(f" 发现授权服务器")
                    # 打印发行者
                    print(f"   发行者: {issuer}")
                    # 打印注册端点
                    print(f"   注册端点: {registration_endpoint}")
                    # 打印授权端点
                    print(f"   授权端点: {authorization_endpoint}")
                    # 打印令牌端点
                    print(f"   令牌端点: {token_endpoint}")
                    # 打印支持的作用域
                    print(f"   支持的作用域: {', '.join(scopes_supported)}")
                    # 打印PKCE支持的方法
                    print(f"   PKCE支持: {', '.join(code_challenge_methods_supported)}")
                    # 返回True,表示发现成功
                    return True
                else:
                    # 未找到授权服务器,打印错误信息
                    print(" 未找到授权服务器")
                    return False
            else:
                # 获取资源元数据失败,打印状态码
                print(f" 获取资源元数据失败: {response.status_code}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 发现授权服务器失败: {e}")
            return False

    # 运行演示流程的方法
    def run(self):
        # 步骤1: 发现授权服务器
        if not self.discover_authorization_server():
            print(" 演示失败:无法发现授权服务器")
            return False
        # 步骤2: 注册客户端
        if not self.register_client():
            print(" 演示失败:客户端注册失败")
            return False
        # 步骤3: 开始授权流程
+       auth_flow = self.start_authorization_flow()
+       if not auth_flow:
+           print(" 演示失败:无法开始授权流程")
+           return False

# 主函数,程序入口
def main():
    try:
        # 创建OAuthClient实例
        client = OAuthClient()
        # 运行演示流程
        client.run()
    # 捕获用户中断异常
    except KeyboardInterrupt:
        print("\n\n演示被用户中断")
    # 捕获其他异常并打印错误信息
    except Exception as e:
        print(f"\n演示过程中发生错误: {e}")

# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

4.模拟授权过程 #

4.1. client.py #

client.py

# 导入requests库,用于发送HTTP请求
import requests

# 导入secrets库,用于生成安全的随机字符串
import secrets

# 从urllib.parse导入urlencode、parse_qs、urlparse,用于URL参数编码和解析
+from urllib.parse import urlencode, parse_qs, urlparse

# 导入hashlib库,用于哈希运算
import hashlib

# 导入base64库,用于base64编码
import base64

# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:

    # 构造函数,初始化MCP服务器和OAuth服务器的URL
    def __init__(
        self,
        mcp_server_url="http://localhost:8000",
        oauth_server_url="http://localhost:8001",
    ):
        # 去除MCP服务器URL末尾的斜杠
        self.mcp_server_url = mcp_server_url.rstrip("/")
        # 去除OAuth服务器URL末尾的斜杠
        self.oauth_server_url = oauth_server_url.rstrip("/")

    # 生成PKCE挑战码的方法
    def generate_pkce_challenge(self):
        # 生成code_verifier(高强度随机字符串)
        code_verifier = secrets.token_urlsafe(32)
        # 计算SHA256哈希并进行base64-url编码,去除末尾的等号
        code_challenge = (
            base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
            .decode()
            .rstrip("=")
        )
        # 返回code_verifier和code_challenge
        return code_verifier, code_challenge

    # 模拟授权过程
+   def simulate_authorization(self, auth_flow):
        # 打印模拟授权过程的提示信息
+       print("\n🎭 步骤4: 模拟授权过程")
+       print("=" * 50)

+       try:
            # 从auth_flow中获取授权URL
+           auth_url = auth_flow["auth_url"]
            # 发送GET请求到授权端点,不自动重定向
+           response = requests.get(auth_url, allow_redirects=False)

            # 如果返回302重定向
+           if response.status_code == 302:
                # 获取重定向的Location头
+               redirect_url = response.headers.get("Location")
+               if redirect_url:
                    # 解析重定向URL
+                   parsed_url = urlparse(redirect_url)
                    # 解析URL中的查询参数
+                   query_params = parse_qs(parsed_url.query)

                    # 获取授权码
+                   authorization_code = query_params.get("code", [None])[0]
                    # 获取state参数
+                   state = query_params.get("state", [None])[0]

                    # 检查授权码和state是否匹配
+                   if authorization_code and state == auth_flow["state"]:
+                       print(" 模拟授权成功")
+                       print(f"   授权码: {authorization_code[:16]}...")
+                       print(f"   状态: {state}")
+                       return authorization_code
+                   else:
+                       print(" 授权码获取失败")
+                       return None
+               else:
+                   print(" 未找到重定向URL")
+                   return None
+           else:
+               print(f" 授权请求失败: {response.status_code}")
+               return None

        # 捕获异常并打印错误信息
+       except Exception as e:
+           print(f" 模拟授权失败: {e}")
+           return None

    # 开始OAuth授权流程的方法
    def start_authorization_flow(self, scope="mcp:tools"):
        # 打印流程提示信息
        print("\n 步骤3: 开始OAuth授权流程")
        print("=" * 50)

        try:
            # 检查是否已发现授权服务器
            if not self.authorization_server_metadata:
                print(" 请先发现授权服务器")
                return None

            # 获取授权端点URL
            authorization_endpoint = self.authorization_server_metadata[
                "authorization_endpoint"
            ]

            # 生成PKCE挑战码
            code_verifier, code_challenge = self.generate_pkce_challenge()

            # 构造授权请求参数
            auth_params = {
                "response_type": "code",# 响应类型
                "client_id": self.client_id, # 客户端ID
                "redirect_uri": "http://localhost:8080/callback", # 重定向URI
                "scope": scope, # 作用域
                "state": secrets.token_urlsafe(16), # 状态
                "code_challenge": code_challenge, # PKCE挑战码
                "code_challenge_method": "S256", # PKCE挑战方法
                "resource": self.mcp_server_url,  # RFC8707资源指示器
            }

            # 构造授权URL
            auth_url = f"{authorization_endpoint}?{urlencode(auth_params)}"

            # 打印授权请求参数
            print(" OAuth授权请求参数:")
            print(f"   响应类型: {auth_params['response_type']}")
            print(f"   客户端ID: {auth_params['client_id']}")
            print(f"   重定向URI: {auth_params['redirect_uri']}")
            print(f"   作用域: {auth_params['scope']}")
            print(f"   状态: {auth_params['state']}")
            print(f"   代码挑战: {auth_params['code_challenge'][:16]}...")
            print(f"   代码挑战方法: {auth_params['code_challenge_method']}")
            print(f"   资源指示器: {auth_params['resource']}")

            # 打印授权URL
            print(f"\n🔗 授权URL:")
            print(f"   {auth_url}")

            # 提示用户在浏览器中访问授权URL
            print(f"\n 请在浏览器中访问上述URL完成授权")
            print(f" 注意: 实际应用中需要用户交互")

            # 返回授权流程相关信息
            return {
                "auth_url": auth_url, # 授权URL
                "code_verifier": code_verifier, # PKCE挑战码
                "state": auth_params["state"], # 状态
                "auth_params": auth_params, # 授权参数
            }

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 开始授权流程失败: {e}")
            return None

    # 动态客户端注册方法
    def register_client(self, client_name="MCP客户端"):
        # 打印步骤提示
        print("\n📝 步骤2: 动态客户端注册")
        print("=" * 50)

        try:
            # 检查是否已发现授权服务器
            if not self.authorization_server_metadata:
                print(" 请先发现授权服务器")
                return False

            # 获取注册端点
            registration_endpoint = self.authorization_server_metadata[
                "registration_endpoint"
            ]

            # 构造客户端注册请求体
            client_metadata = {
                "client_name": client_name, # 客户端名称
                "redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
                "grant_types": ["authorization_code"], # 授权类型
                "response_types": ["code"], # 响应类型
                "token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
                "scope": "mcp:tools mcp:resources", # 作用域
                "application_type": "web", # 应用类型
            }

            # 打印注册请求相关信息
            print(f"📤 发送客户端注册请求到: {registration_endpoint}")
            print(f"   客户端名称: {client_name}")
            print(f"   重定向URI: {client_metadata['redirect_uris']}")
            print(f"   授权类型: {client_metadata['grant_types']}")
            print(f"   作用域: {client_metadata['scope']}")

            # 发送POST请求进行客户端注册
            response = requests.post(registration_endpoint, json=client_metadata)
            # 判断注册是否成功
            if response.status_code == 201:
                # 解析返回的客户端信息
                client_info = response.json()
                # 保存客户端ID
                self.client_id = client_info["client_id"]
                # 保存客户端密钥
                self.client_secret = client_info["client_secret"]

                # 打印注册成功信息
                print(" 客户端注册成功")
                print(f"   客户端ID: {self.client_id}")
                print(f"   客户端密钥: {self.client_secret[:8]}...")
                return True
            else:
                # 注册失败,打印错误信息
                print(f" 客户端注册失败: {response.status_code}")
                print(f"   错误信息: {response.text}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 客户端注册失败: {e}")
            return False

    # 发现授权服务器的方法
    def discover_authorization_server(self):
        try:
            # 发送GET请求获取资源服务器的元数据
            response = requests.get(
                f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
            )
            # 判断响应状态码是否为200
            if response.status_code == 200:
                # 解析响应的JSON数据
                resource_metadata = response.json()
                # 打印获取资源元数据成功的信息
                print(f" 获取资源元数据成功")
                # 打印资源标识符
                print(f"   资源标识符: {resource_metadata.get('resource')}")

                # 获取授权服务器列表
                auth_servers = resource_metadata.get("authorization_servers", [])
                # 判断是否存在授权服务器
                if auth_servers:
                    # 取第一个授权服务器
                    auth_server = auth_servers[0]
                    # 保存授权服务器的元数据
                    self.authorization_server_metadata = auth_server
                    # 获取发行者信息
                    issuer = auth_server.get("issuer")
                    # 获取注册端点
                    registration_endpoint = auth_server.get("registration_endpoint")
                    # 获取授权端点
                    authorization_endpoint = auth_server.get("authorization_endpoint")
                    # 获取令牌端点
                    token_endpoint = auth_server.get("token_endpoint")
                    # 获取支持的作用域
                    scopes_supported = auth_server.get("scopes_supported")
                    # 获取支持的PKCE方法
                    code_challenge_methods_supported = auth_server.get(
                        "code_challenge_methods_supported"
                    )

                    # 打印发现授权服务器成功的信息
                    print(f" 发现授权服务器")
                    print(f"   发行者: {issuer}")
                    print(f"   注册端点: {registration_endpoint}")
                    print(f"   授权端点: {authorization_endpoint}")
                    print(f"   令牌端点: {token_endpoint}")
                    print(f"   支持的作用域: {', '.join(scopes_supported)}")
                    print(f"   PKCE支持: {', '.join(code_challenge_methods_supported)}")
                    # 返回True,表示发现成功
                    return True
                else:
                    # 未找到授权服务器,打印错误信息
                    print(" 未找到授权服务器")
                    return False
            else:
                # 获取资源元数据失败,打印状态码
                print(f" 获取资源元数据失败: {response.status_code}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 发现授权服务器失败: {e}")
            return False

    # 运行演示流程的方法
    def run(self):
        # 步骤1: 发现授权服务器
        if not self.discover_authorization_server():
            print(" 演示失败:无法发现授权服务器")
            return False
        # 步骤2: 注册客户端
        if not self.register_client():
            print(" 演示失败:客户端注册失败")
            return False
        # 步骤3: 开始授权流程
        auth_flow = self.start_authorization_flow()
        if not auth_flow:
            print(" 演示失败:无法开始授权流程")
            return False
        # 步骤4: 模拟授权
+       authorization_code = self.simulate_authorization(auth_flow)
+       if not authorization_code:
+           print(" 演示失败:授权失败")
+           return False

# 主函数,程序入口
def main():
    try:
        # 创建OAuthClient实例
        client = OAuthClient()
        # 运行演示流程
        client.run()
    # 捕获用户中断异常
    except KeyboardInterrupt:
        print("\n\n演示被用户中断")
    # 捕获其他异常并打印错误信息
    except Exception as e:
        print(f"\n演示过程中发生错误: {e}")

# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

5.交换访问令牌 #

5.1. auth_server.py #

auth_server.py

# 导入日志模块
import logging

# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler

# 导入json模块
import json

# 导入secrets模块用于生成安全的随机数
import secrets

# 导入datetime相关类
from datetime import datetime, timedelta, timezone
from urllib.parse import parse_qs, urlparse

# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)

# 定义OAuthServer类,作为OAuth 2.1授权服务器
class OAuthServer:
    """OAuth 2.1授权服务器"""

    # 初始化方法
    def __init__(self):
        # 存储注册的客户端信息
        self.clients = {}  # 注册的客户端
        # 存储授权码
        self.authorization_codes = {}  # 授权码
        # 存储访问令牌
        self.access_tokens = {}  # 访问令牌
        # 服务器密钥
        self.server_secret = "oauth_server_secret_key_2025"

        # 服务器元数据
        self.metadata = {
            # 发行者
            "issuer": "http://localhost:8001",
            # 授权端点
            "authorization_endpoint": "http://localhost:8001/oauth/authorize",
            # 令牌端点
            "token_endpoint": "http://localhost:8001/oauth/token",
            # 客户端注册端点
            "registration_endpoint": "http://localhost:8001/oauth/register",
            # 支持的作用域
            "scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
            # 支持的响应类型
            "response_types_supported": ["code"],
            # 支持的授权类型
            "grant_types_supported": ["authorization_code", "refresh_token"],
            # 令牌端点支持的认证方法
            "token_endpoint_auth_methods_supported": [
                "client_secret_basic",
                "client_secret_post",
            ],
            # 支持的PKCE方法
            "code_challenge_methods_supported": ["S256"],
            # 服务文档地址
            "service_documentation": "https://modelcontextprotocol.io/specification/2025-06-18/basic/authorization",
        }

    # 创建授权码方法
    def create_authorization_code(
        self, client_id, redirect_uri, scope, state, code_challenge=None
    ):
        """创建授权码"""
        # 检查client_id是否已注册
        if client_id not in self.clients:
            return None

        # 生成授权码
        code = str(secrets.token_urlsafe(32))
        # 保存授权码及相关信息
        self.authorization_codes[code] = {
            "client_id": client_id, # 客户端ID
            "redirect_uri": redirect_uri, # 重定向URI
            "scope": scope, # 作用域
            "state": state, # 状态
            "code_challenge": code_challenge, # PKCE挑战码
            "expires_at": datetime.now(timezone.utc) + timedelta(minutes=10), # 过期时间
            "created_at": datetime.now(timezone.utc), # 创建时间
        }

        # 记录授权码创建日志
        logger.info(f"创建授权码: {code} for client: {client_id}")
        # 返回授权码
        return code

    # 验证授权码方法
+   def validate_authorization_code(self, code, client_id, code_verifier=None):
+       """验证授权码"""
        # 检查授权码是否存在
+       if code not in self.authorization_codes:
+           return None

        # 获取授权码信息
+       auth_code = self.authorization_codes[code]

        # 检查授权码是否过期
+       if datetime.now(timezone.utc) > auth_code["expires_at"]:
+           del self.authorization_codes[code]
+           return None

        # 检查客户端ID是否匹配
+       if auth_code["client_id"] != client_id:
+           return None

        # 如果使用了PKCE,验证code_verifier
+       if auth_code["code_challenge"] and code_verifier:
+           import hashlib
+           import base64

            # 计算code_verifier的SHA256哈希
+           code_verifier_hash = hashlib.sha256(code_verifier.encode()).digest()
            # 进行base64 url安全编码并去除末尾的等号
+           code_verifier_b64 = (
+               base64.urlsafe_b64encode(code_verifier_hash).decode().rstrip("=")
+           )

            # 比较code_verifier_b64和code_challenge
+           if code_verifier_b64 != auth_code["code_challenge"]:
+               return None

        # 删除已使用的授权码
+       result = auth_code.copy()
+       del self.authorization_codes[code]
+       return result

    # 创建JWT访问令牌方法
+   def create_access_token(self, client_id, scope, user_id="anonymous"):
+       """创建JWT访问令牌"""
        # 导入jwt模块
+       import jwt
+       from datetime import datetime, timedelta

        # 构造JWT载荷
+       payload = {
+           "iss": self.metadata["issuer"],  # 发行者
+           "aud": "http://localhost:8000",  # 目标资源(MCP服务器)
+           "sub": user_id,
+           "client_id": client_id,
+           "scope": scope,
+           "iat": datetime.now(timezone.utc),
+           "exp": datetime.now(timezone.utc) + timedelta(hours=1),
+       }

        # 使用JWT编码令牌,明确指定audience
+       token = jwt.encode(payload, self.server_secret, algorithm="HS256")

        # 存储令牌信息(用于调试)
+       self.access_tokens[token] = {
+           "client_id": client_id,
+           "scope": scope,
+           "user_id": user_id,
+           "created_at": datetime.now(timezone.utc),
+           "expires_at": datetime.now(timezone.utc) + timedelta(hours=1),
+       }

        # 记录令牌创建日志
+       logger.info(f"创建JWT访问令牌: {token[:16]}... for client: {client_id}")
+       logger.info(f"令牌包含audience: {payload['aud']}, scope: {payload['scope']}")
+       return token

    # 动态客户端注册方法 (RFC7591)
    def register_client(self, client_metadata):
        """动态客户端注册 (RFC7591)"""
        # 生成客户端ID
        client_id = str(secrets.token_urlsafe(16))
        # 生成客户端密钥
        client_secret = str(secrets.token_urlsafe(32))

        # 构造客户端信息字典
        client_info = {
            "client_id": client_id,
            "client_secret": client_secret,
            # 客户端ID签发时间(UTC时间戳)
            "client_id_issued_at": int(datetime.now(timezone.utc).timestamp()),
            # 客户端密钥过期时间(0表示永不过期)
            "client_secret_expires_at": 0,  # 永不过期
            # 合并客户端元数据
            **client_metadata,
        }

        # 保存客户端信息到clients字典
        self.clients[client_id] = client_info
        # 记录新客户端注册日志
        logger.info(f"新客户端注册: {client_id}")

        # 返回客户端信息
        return client_info

# 定义OAuthHandler类,继承自BaseHTTPRequestHandler
class OAuthHandler(BaseHTTPRequestHandler):
    """OAuth HTTP处理器"""

    # 初始化方法,接收oauth_server参数
    def __init__(self, *args, oauth_server=None, **kwargs):
        # 保存OAuthServer实例
        self.oauth_server = oauth_server
        # 调用父类初始化方法
        super().__init__(*args, **kwargs)

    # 构建查询字符串的方法
    def _build_query_string(self, params):
        """构建查询字符串"""
        # 将参数字典拼接为查询字符串
        return "&".join([f"{k}={v}" for k, v in params.items()])

    # 处理授权请求的方法
    def _handle_authorization(self):
        """处理授权请求"""
        try:
            # 解析查询参数
            query = urlparse(self.path).query
            params = parse_qs(query)
            # 获取client_id参数
            client_id = params.get("client_id", [""])[0]
            # 获取redirect_uri参数
            redirect_uri = params.get("redirect_uri", [""])[0]
            # 获取scope参数
            scope = params.get("scope", [""])[0]
            # 获取state参数
            state = params.get("state", [""])[0]
            # 获取code_challenge参数
            code_challenge = params.get("code_challenge", [""])[0]
            # 获取resource参数(RFC8707资源指示器)
            resource = params.get("resource", [""])[0]

            # 记录授权请求日志
            logger.info(
                f"授权请求: client_id={client_id}, scope={scope}, resource={resource}"
            )

            # 验证客户端是否已注册
            if client_id not in self.oauth_server.clients:
                self.send_error(400, "Invalid client_id")
                return

            # 创建授权码
            code = self.oauth_server.create_authorization_code(
                client_id, redirect_uri, scope, state, code_challenge
            )

            # 检查授权码是否创建成功
            if not code:
                self.send_error(400, "Failed to create authorization code")
                return

            # 构造重定向参数
            redirect_params = {"code": code, "state": state}

            # 构造重定向URL
            redirect_url = f"{redirect_uri}?{self._build_query_string(redirect_params)}"

            # 发送重定向响应
            self.send_response(302)
            self.send_header("Location", redirect_url)
            self.end_headers()

            # 记录授权成功日志
            logger.info(f"授权成功,重定向到: {redirect_url}")

        except Exception as e:
            # 记录授权处理错误日志
            logger.error(f"授权处理错误: {e}")
            # 返回400错误
            self.send_error(400, "Bad Request")

    # 处理GET请求的方法
    def do_GET(self):
        """处理GET请求"""
        try:
            # 判断请求路径是否为授权端点
            if self.path.startswith("/oauth/authorize"):
                self._handle_authorization()
            else:
                self.send_error(404, "Not Found")
        except Exception as e:
            # 记录GET处理错误日志
            logger.error(f"GET处理错误: {e}")
            # 返回500错误
            self.send_error(500, "Internal Server Error")

    # 处理授权码授权类型
+   def _handle_authorization_code_grant(self, token_request):
+       """处理授权码授权类型"""
        # 获取code参数
+       code = token_request.get("code", [""])[0]
        # 获取client_id参数
+       client_id = token_request.get("client_id", [""])[0]
        # 获取code_verifier参数
+       code_verifier = token_request.get("code_verifier", [""])[0]
        # 获取resource参数(RFC8707资源指示器)
+       resource = token_request.get("resource", [""])[0]

        # 记录令牌请求日志
+       logger.info(
+           f"令牌请求: code={code[:16]}..., client_id={client_id}, resource={resource}"
+       )

        # 验证授权码
+       auth_code = self.oauth_server.validate_authorization_code(
+           code, client_id, code_verifier
+       )
        # 如果授权码验证失败
+       if not auth_code:
+           logger.error("授权码验证失败")
+           self.send_error(400, "Invalid authorization code")
+           return None

        # 记录授权码验证成功日志
+       logger.info(f"授权码验证成功,scope: {auth_code['scope']}")

        # 创建访问令牌
+       access_token = self.oauth_server.create_access_token(
+           client_id, auth_code["scope"]
+       )

        # 记录访问令牌创建成功日志
+       logger.info(f"访问令牌创建成功,包含scope: {auth_code['scope']}")

        # 返回令牌响应
+       return {
+           "access_token": access_token,
+           "token_type": "Bearer",
+           "expires_in": 3600,
+           "scope": auth_code["scope"],
+       }

    # 创建JWT访问令牌(此方法未被调用,保留)
+   def create_access_token(self, client_id, scope, user_id="anonymous"):
+       """创建JWT访问令牌"""
        # 导入jwt模块
+       import jwt
+       from datetime import datetime, timedelta

        # 构造JWT载荷
+       payload = {
+           "iss": self.metadata["issuer"],  # 发行者
+           "aud": "http://localhost:8000",  # 目标资源(MCP服务器)
+           "sub": user_id,
+           "client_id": client_id,
+           "scope": scope,
+           "iat": datetime.now(timezone.utc),
+           "exp": datetime.now(timezone.utc) + timedelta(hours=1),
+       }

        # 使用JWT编码令牌,明确指定audience
+       token = jwt.encode(payload, self.server_secret, algorithm="HS256")

        # 存储令牌信息(用于调试)
+       self.access_tokens[token] = {
+           "client_id": client_id, # 客户端ID
+           "scope": scope, # 作用域
+           "user_id": user_id, # 用户ID
+           "created_at": datetime.now(timezone.utc), # 创建时间
+           "expires_at": datetime.now(timezone.utc) + timedelta(hours=1), # 过期时间   
+       }

        # 记录令牌创建日志
+       logger.info(f"创建JWT访问令牌: {token[:16]}... for client: {client_id}")
+       logger.info(f"令牌包含audience: {payload['aud']}, scope: {payload['scope']}")
+       return token

    # 处理令牌请求的方法
+   def _handle_token(self):
+       """处理令牌请求"""
+       try:
            # 获取请求体长度
+           content_length = int(self.headers.get("Content-Length", 0))
            # 读取请求体内容
+           body = self.rfile.read(content_length)
            # 解析请求体为字典
+           token_request = parse_qs(body.decode("utf-8"))

            # 获取grant_type参数
+           grant_type = token_request.get("grant_type", [""])[0]

            # 判断授权类型
+           if grant_type == "authorization_code":
+               response = self._handle_authorization_code_grant(token_request)
+           else:
+               self.send_error(400, "Unsupported grant type")
+               return

            # 如果有响应,返回JSON
+           if response:
+               self.send_response(200)
+               self.send_header("Content-Type", "application/json")
+               self.end_headers()
+               self.wfile.write(json.dumps(response).encode("utf-8"))

+       except Exception as e:
            # 记录令牌请求错误日志
+           logger.error(f"令牌请求错误: {e}")
+           self.send_error(400, "Bad Request")

    # 处理客户端注册请求的方法
    def _handle_client_registration(self):
        """处理客户端注册请求"""
        try:
            # 获取请求体长度
            content_length = int(self.headers.get("Content-Length", 0))
            # 读取请求体内容
            body = self.rfile.read(content_length)
            # 解析JSON格式的客户端元数据
            client_metadata = json.loads(body.decode("utf-8"))

            # 调用OAuthServer的register_client方法注册客户端
            client_info = self.oauth_server.register_client(client_metadata)

            # 发送201响应,表示创建成功
            self.send_response(201)
            # 设置响应头为application/json
            self.send_header("Content-Type", "application/json")
            # 结束响应头
            self.end_headers()
            # 返回注册成功的客户端信息(JSON格式)
            self.wfile.write(json.dumps(client_info).encode("utf-8"))

        except Exception as e:
            # 记录错误日志
            logger.error(f"客户端注册错误: {e}")
            # 返回400错误
            self.send_error(400, "Bad Request")

    # 处理POST请求的方法
    def do_POST(self):
        """处理POST请求"""
        try:
            # 如果请求路径为/oauth/register,处理客户端注册
            if self.path == "/oauth/register":
                self._handle_client_registration()
            # 如果请求路径为/oauth/token,处理令牌请求
+           elif self.path == "/oauth/token":
+               self._handle_token()
            # 其他路径返回404
            else:
                self.send_error(404, "Not Found")
        except Exception as e:
            # 记录POST处理错误日志
            logger.error(f"POST处理错误: {e}")
            # 返回500错误
            self.send_error(500, "Internal Server Error")

# 运行OAuth授权服务器的函数
def run_oauth_server(host="localhost", port=8001):
    """运行OAuth授权服务器"""
    # 创建OAuthServer实例
    oauth_server = OAuthServer()

    # 定义处理器工厂函数,用于传递oauth_server实例
    def handler_factory(*args, **kwargs):
        return OAuthHandler(*args, oauth_server=oauth_server, **kwargs)

    # 创建HTTPServer实例,绑定主机和端口
    server = HTTPServer((host, port), handler_factory)
    # 记录服务器启动信息
    logger.info(f"OAuth授权服务器运行在 http://{host}:{port}")
    # 记录支持的端点信息
    logger.info("支持的端点:")
    logger.info(
        f"  - 服务器元数据: http://{host}:{port}/.well-known/oauth-authorization-server"
    )
    logger.info(f"  - 授权端点: http://{host}:{port}/oauth/authorize")
    logger.info(f"  - 令牌端点: http://{host}:{port}/oauth/token")
    logger.info(f"  - 客户端注册: http://{host}:{port}/oauth/register")

    try:
        # 启动服务器,进入循环监听请求
        server.serve_forever()
    except KeyboardInterrupt:
        # 捕获Ctrl+C,记录服务器停止
        logger.info("OAuth授权服务器已停止")
    finally:
        # 关闭服务器
        server.server_close()

# 判断是否为主程序入口
if __name__ == "__main__":
    # 导入argparse模块,用于解析命令行参数
    import argparse

    # 创建ArgumentParser对象,设置描述信息
    parser = argparse.ArgumentParser(description="MCP OAuth授权服务器")
    # 添加--host参数,指定服务器主机,默认localhost
    parser.add_argument(
        "--host", default="localhost", help="服务器主机 (默认: localhost)"
    )
    # 添加--port参数,指定服务器端口,默认8001
    parser.add_argument(
        "--port", type=int, default=8001, help="服务器端口 (默认: 8001)"
    )

    # 解析命令行参数
    args = parser.parse_args()
    # 启动OAuth授权服务器
    run_oauth_server(args.host, args.port)

5.2. client.py #

client.py

# 导入requests库,用于发送HTTP请求
import requests

# 导入secrets库,用于生成安全的随机字符串
import secrets

# 从urllib.parse导入urlencode、parse_qs、urlparse,用于URL参数编码和解析
from urllib.parse import urlencode, parse_qs, urlparse

# 导入hashlib库,用于哈希运算
import hashlib

# 导入base64库,用于base64编码
import base64


# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:

    # 构造函数,初始化MCP服务器和OAuth服务器的URL
    def __init__(
        self,
        mcp_server_url="http://localhost:8000",
        oauth_server_url="http://localhost:8001",
    ):
        # 去除MCP服务器URL末尾的斜杠
        self.mcp_server_url = mcp_server_url.rstrip("/")
        # 去除OAuth服务器URL末尾的斜杠
        self.oauth_server_url = oauth_server_url.rstrip("/")

    # 生成PKCE挑战码的方法
    def generate_pkce_challenge(self):
        # 生成高强度随机字符串作为code_verifier
        code_verifier = secrets.token_urlsafe(32)
        # 对code_verifier进行SHA256哈希,然后base64-url编码,去除末尾的等号
        code_challenge = (
            base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
            .decode()
            .rstrip("=")
        )
        # 返回code_verifier和code_challenge
        return code_verifier, code_challenge

    # 模拟授权过程
    def simulate_authorization(self, auth_flow):
        # 打印模拟授权过程的提示信息
        print("\n🎭 步骤4: 模拟授权过程")
        print("=" * 50)

        try:
            # 从auth_flow中获取授权URL
            auth_url = auth_flow["auth_url"]
            # 发送GET请求到授权端点,不自动重定向
            response = requests.get(auth_url, allow_redirects=False)

            # 如果返回302重定向
            if response.status_code == 302:
                # 获取重定向的Location头
                redirect_url = response.headers.get("Location")
                if redirect_url:
                    # 解析重定向URL
                    parsed_url = urlparse(redirect_url)
                    # 解析URL中的查询参数
                    query_params = parse_qs(parsed_url.query)

                    # 获取授权码
                    authorization_code = query_params.get("code", [None])[0]
                    # 获取state参数
                    state = query_params.get("state", [None])[0]

                    # 检查授权码和state是否匹配
                    if authorization_code and state == auth_flow["state"]:
                        # 打印模拟授权成功信息
                        print(" 模拟授权成功")
                        print(f"   授权码: {authorization_code[:16]}...")
                        print(f"   状态: {state}")
                        # 返回授权码
                        return authorization_code
                    else:
                        # 授权码获取失败
                        print(" 授权码获取失败")
                        return None
                else:
                    # 未找到重定向URL
                    print(" 未找到重定向URL")
                    return None
            else:
                # 授权请求失败,打印状态码
                print(f" 授权请求失败: {response.status_code}")
                return None

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 模拟授权失败: {e}")
            return None

    # 使用授权码交换访问令牌
+   def exchange_authorization_code(self, authorization_code, code_verifier):
        # 步骤5: 使用授权码交换访问令牌
+       print("\n🔄 步骤5: 交换访问令牌")
+       print("=" * 50)

+       try:
            # 获取令牌端点
+           token_endpoint = self.authorization_server_metadata["token_endpoint"]

            # 构造令牌请求参数
+           token_request = {
+               "grant_type": "authorization_code", # 授权类型
+               "code": authorization_code, # 授权码
+               "redirect_uri": "http://localhost:8080/callback", # 重定向URI
+               "client_id": self.client_id, # 客户端ID
+               "code_verifier": code_verifier, # PKCE挑战码
+               "resource": self.mcp_server_url,  # RFC8707资源指示器
+           }

            # 打印令牌请求参数
+           print("📤 令牌请求参数:")
+           print(f"   授权类型: {token_request['grant_type']}")
+           print(f"   授权码: {token_request['code'][:16]}...")
+           print(f"   重定向URI: {token_request['redirect_uri']}")
+           print(f"   客户端ID: {token_request['client_id']}")
+           print(f"   代码验证器: {token_request['code_verifier'][:16]}...")
+           print(f"   资源指示器: {token_request['resource']}")

            # 使用客户端凭据进行认证
+           auth = (self.client_id, self.client_secret)

            # 发送POST请求到令牌端点
+           response = requests.post(token_endpoint, data=token_request, auth=auth)
            # 如果响应状态码为200,表示成功
+           if response.status_code == 200:
                # 解析返回的JSON数据
+               token_response = response.json()
                # 保存访问令牌
+               self.access_token = token_response["access_token"]

                # 打印成功获取访问令牌的信息
+               print(" 成功获取访问令牌")
+               print(f"   访问令牌: {self.access_token[:32]}...")
+               print(f"   令牌类型: {token_response['token_type']}")
+               print(f"   过期时间: {token_response['expires_in']}秒")
+               print(f"   作用域: {token_response['scope']}")
+               return True
+           else:
                # 获取访问令牌失败,打印错误信息
+               print(f" 获取访问令牌失败: {response.status_code}")
+               print(f"   错误信息: {response.text}")
+               return False

        # 捕获异常并打印错误信息
+       except Exception as e:
+           print(f" 交换访问令牌失败: {e}")
+           return False

    # 开始OAuth授权流程的方法
    def start_authorization_flow(self, scope="mcp:tools"):
        # 打印流程提示信息
        print("\n 步骤3: 开始OAuth授权流程")
        print("=" * 50)

        try:
            # 检查是否已发现授权服务器
            if not self.authorization_server_metadata:
                print(" 请先发现授权服务器")
                return None

            # 获取授权端点URL
            authorization_endpoint = self.authorization_server_metadata[
                "authorization_endpoint"
            ]

            # 生成PKCE挑战码
            code_verifier, code_challenge = self.generate_pkce_challenge()

            # 构造授权请求参数
            auth_params = {
                "response_type": "code", # 响应类型
                "client_id": self.client_id, # 客户端ID
                "redirect_uri": "http://localhost:8080/callback", # 重定向URI
                "scope": scope, # 作用域
                "state": secrets.token_urlsafe(16), # 状态
                "code_challenge": code_challenge, # PKCE挑战码
                "code_challenge_method": "S256", # PKCE挑战方法
                "resource": self.mcp_server_url,  # RFC8707资源指示器
            }

            # 构造授权URL
            auth_url = f"{authorization_endpoint}?{urlencode(auth_params)}"

            # 打印授权请求参数
            print(" OAuth授权请求参数:")
            print(f"   响应类型: {auth_params['response_type']}")
            print(f"   客户端ID: {auth_params['client_id']}")
            print(f"   重定向URI: {auth_params['redirect_uri']}")
            print(f"   作用域: {auth_params['scope']}")
            print(f"   状态: {auth_params['state']}")
            print(f"   代码挑战: {auth_params['code_challenge'][:16]}...")
            print(f"   代码挑战方法: {auth_params['code_challenge_method']}")
            print(f"   资源指示器: {auth_params['resource']}")

            # 打印授权URL
            print(f"\n🔗 授权URL:")
            print(f"   {auth_url}")

            # 提示用户在浏览器中访问授权URL
            print(f"\n 请在浏览器中访问上述URL完成授权")
            print(f" 注意: 实际应用中需要用户交互")

            # 返回授权流程相关信息
            return {
                "auth_url": auth_url, # 授权URL
                "code_verifier": code_verifier, # PKCE挑战码
                "state": auth_params["state"], # 状态
                "auth_params": auth_params, # 授权参数
            }

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 开始授权流程失败: {e}")
            return None

    # 动态客户端注册方法
    def register_client(self, client_name="MCP客户端"):
        # 打印步骤提示
        print("\n📝 步骤2: 动态客户端注册")
        print("=" * 50)

        try:
            # 检查是否已发现授权服务器
            if not self.authorization_server_metadata:
                print(" 请先发现授权服务器")
                return False

            # 获取注册端点
            registration_endpoint = self.authorization_server_metadata[
                "registration_endpoint"
            ]

            # 构造客户端注册请求体
            client_metadata = {
                "client_name": client_name, # 客户端名称
                "redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
                "grant_types": ["authorization_code"], # 授权类型
                "response_types": ["code"], # 响应类型
                "token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
                "scope": "mcp:tools mcp:resources", # 作用域
                "application_type": "web", # 应用类型
            }

            # 打印注册请求相关信息
            print(f"📤 发送客户端注册请求到: {registration_endpoint}")
            print(f"   客户端名称: {client_name}")
            print(f"   重定向URI: {client_metadata['redirect_uris']}")
            print(f"   授权类型: {client_metadata['grant_types']}")
            print(f"   作用域: {client_metadata['scope']}")

            # 发送POST请求进行客户端注册
            response = requests.post(registration_endpoint, json=client_metadata)
            # 判断注册是否成功
            if response.status_code == 201:
                # 解析返回的客户端信息
                client_info = response.json()
                # 保存客户端ID
                self.client_id = client_info["client_id"]
                # 保存客户端密钥
                self.client_secret = client_info["client_secret"]

                # 打印注册成功信息
                print(" 客户端注册成功")
                print(f"   客户端ID: {self.client_id}")
                print(f"   客户端密钥: {self.client_secret[:8]}...")
                return True
            else:
                # 注册失败,打印错误信息
                print(f" 客户端注册失败: {response.status_code}")
                print(f"   错误信息: {response.text}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 客户端注册失败: {e}")
            return False

    # 发现授权服务器的方法
    def discover_authorization_server(self):
        try:
            # 发送GET请求获取资源服务器的元数据
            response = requests.get(
                f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
            )
            # 判断响应状态码是否为200
            if response.status_code == 200:
                # 解析响应的JSON数据
                resource_metadata = response.json()
                # 打印获取资源元数据成功的信息
                print(f" 获取资源元数据成功")
                # 打印资源标识符
                print(f"   资源标识符: {resource_metadata.get('resource')}")

                # 获取授权服务器列表
                auth_servers = resource_metadata.get("authorization_servers", [])
                # 判断是否存在授权服务器
                if auth_servers:
                    # 取第一个授权服务器
                    auth_server = auth_servers[0]
                    # 保存授权服务器的元数据
                    self.authorization_server_metadata = auth_server
                    # 获取发行者信息
                    issuer = auth_server.get("issuer")
                    # 获取注册端点
                    registration_endpoint = auth_server.get("registration_endpoint")
                    # 获取授权端点
                    authorization_endpoint = auth_server.get("authorization_endpoint")
                    # 获取令牌端点
                    token_endpoint = auth_server.get("token_endpoint")
                    # 获取支持的作用域
                    scopes_supported = auth_server.get("scopes_supported")
                    # 获取支持的PKCE方法
                    code_challenge_methods_supported = auth_server.get(
                        "code_challenge_methods_supported"
                    )

                    # 打印发现授权服务器成功的信息
                    print(f" 发现授权服务器")
                    print(f"   发行者: {issuer}")
                    print(f"   注册端点: {registration_endpoint}")
                    print(f"   授权端点: {authorization_endpoint}")
                    print(f"   令牌端点: {token_endpoint}")
                    print(f"   支持的作用域: {', '.join(scopes_supported)}")
                    print(f"   PKCE支持: {', '.join(code_challenge_methods_supported)}")
                    # 返回True,表示发现成功
                    return True
                else:
                    # 未找到授权服务器,打印错误信息
                    print(" 未找到授权服务器")
                    return False
            else:
                # 获取资源元数据失败,打印状态码
                print(f" 获取资源元数据失败: {response.status_code}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 发现授权服务器失败: {e}")
            return False

    # 运行演示流程的方法
    def run(self):
        # 步骤1: 发现授权服务器
        if not self.discover_authorization_server():
            # 发现授权服务器失败
            print(" 演示失败:无法发现授权服务器")
            return False
        # 步骤2: 注册客户端
        if not self.register_client():
            # 客户端注册失败
            print(" 演示失败:客户端注册失败")
            return False
        # 步骤3: 开始授权流程
        auth_flow = self.start_authorization_flow()
        if not auth_flow:
            # 无法开始授权流程
            print(" 演示失败:无法开始授权流程")
            return False
        # 步骤4: 模拟授权
        authorization_code = self.simulate_authorization(auth_flow)
        if not authorization_code:
            # 授权失败
            print(" 演示失败:授权失败")
            return False
        # 步骤5: 交换访问令牌
+       if not self.exchange_authorization_code(
+           authorization_code, auth_flow["code_verifier"]
+       ):
            # 无法获取访问令牌
+           print(" 演示失败:无法获取访问令牌")
+           return False


# 主函数,程序入口
def main():
    try:
        # 创建OAuthClient实例
        client = OAuthClient()
        # 运行演示流程
        client.run()
    # 捕获用户中断异常
    except KeyboardInterrupt:
        # 打印用户中断信息
        print("\n\n演示被用户中断")
    # 捕获其他异常并打印错误信息
    except Exception as e:
        # 打印错误信息
        print(f"\n演示过程中发生错误: {e}")


# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

6.步骤6: MCP服务器访问 #

6.1. tools.py #

tools.py

# MCP工具定义

tools = [
    {
        "name": "calculate",
        "description": "执行数学计算",
        "inputSchema": {
            "type": "object",
            "properties": {
                "expression": {
                    "type": "string",
                    "description": "数学表达式",
                }
            },
            "required": ["expression"],
        },
    },
]

6.2. client.py #

client.py

# 导入requests库,用于发送HTTP请求
import requests

# 导入secrets库,用于生成安全的随机字符串
import secrets

# 从urllib.parse导入urlencode、parse_qs、urlparse,用于URL参数编码和解析
from urllib.parse import urlencode, parse_qs, urlparse

# 导入hashlib库,用于哈希运算
import hashlib

# 导入base64库,用于base64编码
import base64

# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:

    # 构造函数,初始化MCP服务器和OAuth服务器的URL
    def __init__(
        self,
        mcp_server_url="http://localhost:8000",
        oauth_server_url="http://localhost:8001",
    ):
        # 去除MCP服务器URL末尾的斜杠
        self.mcp_server_url = mcp_server_url.rstrip("/")
        # 去除OAuth服务器URL末尾的斜杠
        self.oauth_server_url = oauth_server_url.rstrip("/")

    # 生成PKCE挑战码的方法
    def generate_pkce_challenge(self):
        # 生成高强度随机字符串作为code_verifier
        code_verifier = secrets.token_urlsafe(32)
        # 对code_verifier进行SHA256哈希,然后base64-url编码,去除末尾的等号
        code_challenge = (
            base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
            .decode()
            .rstrip("=")
        )
        # 返回code_verifier和code_challenge
        return code_verifier, code_challenge

    # 模拟授权过程
    def simulate_authorization(self, auth_flow):
        # 打印模拟授权过程的提示信息
        print("\n🎭 步骤4: 模拟授权过程")
        print("=" * 50)

        try:
            # 从auth_flow中获取授权URL
            auth_url = auth_flow["auth_url"]
            # 发送GET请求到授权端点,不自动重定向
            response = requests.get(auth_url, allow_redirects=False)

            # 如果返回302重定向
            if response.status_code == 302:
                # 获取重定向的Location头
                redirect_url = response.headers.get("Location")
                if redirect_url:
                    # 解析重定向URL
                    parsed_url = urlparse(redirect_url)
                    # 解析URL中的查询参数
                    query_params = parse_qs(parsed_url.query)

                    # 获取授权码
                    authorization_code = query_params.get("code", [None])[0]
                    # 获取state参数
                    state = query_params.get("state", [None])[0]

                    # 检查授权码和state是否匹配
                    if authorization_code and state == auth_flow["state"]:
                        # 打印模拟授权成功信息
                        print(" 模拟授权成功")
                        print(f"   授权码: {authorization_code[:16]}...")
                        print(f"   状态: {state}")
                        # 返回授权码
                        return authorization_code
                    else:
                        # 授权码获取失败
                        print(" 授权码获取失败")
                        return None
                else:
                    # 未找到重定向URL
                    print(" 未找到重定向URL")
                    return None
            else:
                # 授权请求失败,打印状态码
                print(f" 授权请求失败: {response.status_code}")
                return None

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 模拟授权失败: {e}")
            return None

    # 步骤6: 测试使用访问令牌访问MCP服务器
+   def mcp_server_access(self):
        # 打印步骤信息
+       print("\n 步骤6: 测试MCP服务器访问")
+       print("=" * 50)

+       try:
            # 检查是否已获取访问令牌
+           if not self.access_token:
+               print(" 请先获取访问令牌")
+               return False

            # 设置授权头
+           headers = {
+               "Authorization": f"Bearer {self.access_token}",
+               "Accept": "application/json",
+               "MCP-Protocol-Version": "2025-06-18",
+           }

            # 打印发送MCP初始化请求信息
+           print("📤 发送MCP初始化请求")
+           print(f"   授权头: Bearer {self.access_token[:32]}...")

            # 构造MCP初始化请求体
+           init_request = {
+               "jsonrpc": "2.0",
+               "id": "1",
+               "method": "initialize",
+               "params": {
+                   "protocolVersion": "2025-06-18",
+                   "capabilities": {},
+                   "clientInfo": {"name": "OAuth客户端", "version": "1.0.0"},
+               },
+           }

            # 发送POST请求到MCP服务器
+           response = requests.post(
+               f"{self.mcp_server_url}/mcp", json=init_request, headers=headers
+           )

            # 判断初始化是否成功
+           if response.status_code == 200:
                # 解析初始化响应
+               init_response = response.json()
+               print(" MCP初始化成功")
+               print(f"   协议版本: {init_response['result']['protocolVersion']}")
+               print(f"   会话ID: {init_response['result']['sessionId']}")

                # 打印工具列表请求信息
+               print("\n 测试工具列表请求")
                # 构造工具列表请求体
+               tools_request = {
+                   "jsonrpc": "2.0",
+                   "id": "2",
+                   "method": "tools/list",
+                   "params": {},
+               }

                # 发送POST请求获取工具列表
+               tools_response = requests.post(
+                   f"{self.mcp_server_url}/mcp", json=tools_request, headers=headers
+               )

                # 判断工具列表获取是否成功
+               if tools_response.status_code == 200:
                    # 解析工具列表响应
+                   tools_result = tools_response.json()
+                   tools_list = tools_result["result"]["tools"]
+                   print(" 工具列表获取成功")
+                   print(f"   可用工具数量: {len(tools_list)}")
                    # 遍历并打印每个工具信息
+                   for tool in tools_list:
+                       print(f"   - {tool['name']}: {tool['description']}")
+               else:
                    # 工具列表获取失败
+                   print(f" 工具列表获取失败: {tools_response.status_code}")

+               return True
+           else:
                # MCP初始化失败
+               print(f" MCP初始化失败: {response.status_code}")
+               print(f"   错误信息: {response.text}")
+               return False

        # 捕获异常并打印错误信息
+       except Exception as e:
+           print(f" 测试MCP服务器访问失败: {e}")
+           return False

    # 使用授权码交换访问令牌
    def exchange_authorization_code(self, authorization_code, code_verifier):
        # 打印步骤信息
        print("\n🔄 步骤5: 交换访问令牌")
        print("=" * 50)

        try:
            # 获取令牌端点
            token_endpoint = self.authorization_server_metadata["token_endpoint"]

            # 构造令牌请求参数
            token_request = {
                "grant_type": "authorization_code", # 授权类型
                "code": authorization_code, # 授权码
                "redirect_uri": "http://localhost:8080/callback", # 重定向URI
                "client_id": self.client_id, # 客户端ID
                "code_verifier": code_verifier, # PKCE挑战码
                "resource": self.mcp_server_url,  # RFC8707资源指示器
            }

            # 打印令牌请求参数
            print("📤 令牌请求参数:")
            print(f"   授权类型: {token_request['grant_type']}")
            print(f"   授权码: {token_request['code'][:16]}...")
            print(f"   重定向URI: {token_request['redirect_uri']}")
            print(f"   客户端ID: {token_request['client_id']}")
            print(f"   代码验证器: {token_request['code_verifier'][:16]}...")
            print(f"   资源指示器: {token_request['resource']}")

            # 使用客户端凭据进行认证
            auth = (self.client_id, self.client_secret)

            # 发送POST请求到令牌端点
            response = requests.post(token_endpoint, data=token_request, auth=auth)
            # 判断响应状态码是否为200
            if response.status_code == 200:
                # 解析返回的JSON数据
                token_response = response.json()
                # 保存访问令牌
                self.access_token = token_response["access_token"]

                # 打印成功获取访问令牌的信息
                print(" 成功获取访问令牌")
                print(f"   访问令牌: {self.access_token[:32]}...")
                print(f"   令牌类型: {token_response['token_type']}")
                print(f"   过期时间: {token_response['expires_in']}秒")
                print(f"   作用域: {token_response['scope']}")
                return True
            else:
                # 获取访问令牌失败,打印错误信息
                print(f" 获取访问令牌失败: {response.status_code}")
                print(f"   错误信息: {response.text}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 交换访问令牌失败: {e}")
            return False

    # 开始OAuth授权流程的方法
    def start_authorization_flow(self, scope="mcp:tools"):
        # 打印流程提示信息
        print("\n 步骤3: 开始OAuth授权流程")
        print("=" * 50)

        try:
            # 检查是否已发现授权服务器
            if not self.authorization_server_metadata:
                print(" 请先发现授权服务器")
                return None

            # 获取授权端点URL
            authorization_endpoint = self.authorization_server_metadata[
                "authorization_endpoint"
            ]

            # 生成PKCE挑战码
            code_verifier, code_challenge = self.generate_pkce_challenge()

            # 构造授权请求参数
            auth_params = {
                "response_type": "code", # 响应类型
                "client_id": self.client_id, # 客户端ID
                "redirect_uri": "http://localhost:8080/callback", # 重定向URI
                "scope": scope, # 作用域
                "state": secrets.token_urlsafe(16), # 状态
                "code_challenge": code_challenge, # PKCE挑战码
                "code_challenge_method": "S256", # PKCE挑战方法
                "resource": self.mcp_server_url,  # RFC8707资源指示器
            }

            # 构造授权URL
            auth_url = f"{authorization_endpoint}?{urlencode(auth_params)}"

            # 打印授权请求参数
            print(" OAuth授权请求参数:")
            print(f"   响应类型: {auth_params['response_type']}")
            print(f"   客户端ID: {auth_params['client_id']}")
            print(f"   重定向URI: {auth_params['redirect_uri']}")
            print(f"   作用域: {auth_params['scope']}")
            print(f"   状态: {auth_params['state']}")
            print(f"   代码挑战: {auth_params['code_challenge'][:16]}...")
            print(f"   代码挑战方法: {auth_params['code_challenge_method']}")
            print(f"   资源指示器: {auth_params['resource']}")

            # 打印授权URL
            print(f"\n🔗 授权URL:")
            print(f"   {auth_url}")

            # 提示用户在浏览器中访问授权URL
            print(f"\n 请在浏览器中访问上述URL完成授权")
            print(f" 注意: 实际应用中需要用户交互")

            # 返回授权流程相关信息
            return {
                "auth_url": auth_url, # 授权URL
                "code_verifier": code_verifier, # PKCE挑战码
                "state": auth_params["state"], # 状态
                "auth_params": auth_params, # 授权参数
            }

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 开始授权流程失败: {e}")
            return None

    # 动态客户端注册方法
    def register_client(self, client_name="MCP客户端"):
        # 打印步骤提示
        print("\n📝 步骤2: 动态客户端注册")
        print("=" * 50)

        try:
            # 检查是否已发现授权服务器
            if not self.authorization_server_metadata:
                print(" 请先发现授权服务器")
                return False

            # 获取注册端点
            registration_endpoint = self.authorization_server_metadata[
                "registration_endpoint"
            ]

            # 构造客户端注册请求体
            client_metadata = {
                "client_name": client_name, # 客户端名称
                "redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
                "grant_types": ["authorization_code"], # 授权类型
                "response_types": ["code"], # 响应类型
                "token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
                "scope": "mcp:tools mcp:resources", # 作用域
                "application_type": "web", # 应用类型
            }

            # 打印注册请求相关信息
            print(f"📤 发送客户端注册请求到: {registration_endpoint}")
            print(f"   客户端名称: {client_name}")
            print(f"   重定向URI: {client_metadata['redirect_uris']}")
            print(f"   授权类型: {client_metadata['grant_types']}")
            print(f"   作用域: {client_metadata['scope']}")

            # 发送POST请求进行客户端注册
            response = requests.post(registration_endpoint, json=client_metadata)
            # 判断注册是否成功
            if response.status_code == 201:
                # 解析返回的客户端信息
                client_info = response.json()
                # 保存客户端ID
                self.client_id = client_info["client_id"]
                # 保存客户端密钥
                self.client_secret = client_info["client_secret"]

                # 打印注册成功信息
                print(" 客户端注册成功")
                print(f"   客户端ID: {self.client_id}")
                print(f"   客户端密钥: {self.client_secret[:8]}...")
                return True
            else:
                # 注册失败,打印错误信息
                print(f" 客户端注册失败: {response.status_code}")
                print(f"   错误信息: {response.text}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 客户端注册失败: {e}")
            return False

    # 发现授权服务器的方法
    def discover_authorization_server(self):
        try:
            # 发送GET请求获取资源服务器的元数据
            response = requests.get(
                f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
            )
            # 判断响应状态码是否为200
            if response.status_code == 200:
                # 解析响应的JSON数据
                resource_metadata = response.json()
                # 打印获取资源元数据成功的信息
                print(f" 获取资源元数据成功")
                # 打印资源标识符
                print(f"   资源标识符: {resource_metadata.get('resource')}")

                # 获取授权服务器列表
                auth_servers = resource_metadata.get("authorization_servers", [])
                # 判断是否存在授权服务器
                if auth_servers:
                    # 取第一个授权服务器
                    auth_server = auth_servers[0]
                    # 保存授权服务器的元数据
                    self.authorization_server_metadata = auth_server
                    # 获取发行者信息
                    issuer = auth_server.get("issuer")
                    # 获取注册端点
                    registration_endpoint = auth_server.get("registration_endpoint")
                    # 获取授权端点
                    authorization_endpoint = auth_server.get("authorization_endpoint")
                    # 获取令牌端点
                    token_endpoint = auth_server.get("token_endpoint")
                    # 获取支持的作用域
                    scopes_supported = auth_server.get("scopes_supported")
                    # 获取支持的PKCE方法
                    code_challenge_methods_supported = auth_server.get(
                        "code_challenge_methods_supported"
                    )

                    # 打印发现授权服务器成功的信息
                    print(f" 发现授权服务器")
                    print(f"   发行者: {issuer}")
                    print(f"   注册端点: {registration_endpoint}")
                    print(f"   授权端点: {authorization_endpoint}")
                    print(f"   令牌端点: {token_endpoint}")
                    print(f"   支持的作用域: {', '.join(scopes_supported)}")
                    print(f"   PKCE支持: {', '.join(code_challenge_methods_supported)}")
                    # 返回True,表示发现成功
                    return True
                else:
                    # 未找到授权服务器,打印错误信息
                    print(" 未找到授权服务器")
                    return False
            else:
                # 获取资源元数据失败,打印状态码
                print(f" 获取资源元数据失败: {response.status_code}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 发现授权服务器失败: {e}")
            return False

    # 运行演示流程的方法
    def run(self):
        # 步骤1: 发现授权服务器
        if not self.discover_authorization_server():
            # 发现授权服务器失败
            print(" 演示失败:无法发现授权服务器")
            return False
        # 步骤2: 注册客户端
        if not self.register_client():
            # 客户端注册失败
            print(" 演示失败:客户端注册失败")
            return False
        # 步骤3: 开始授权流程
        auth_flow = self.start_authorization_flow()
        if not auth_flow:
            # 无法开始授权流程
            print(" 演示失败:无法开始授权流程")
            return False
        # 步骤4: 模拟授权
        authorization_code = self.simulate_authorization(auth_flow)
        if not authorization_code:
            # 授权失败
            print(" 演示失败:授权失败")
            return False
        # 步骤5: 交换访问令牌
        if not self.exchange_authorization_code(
            authorization_code, auth_flow["code_verifier"]
        ):
            # 无法获取访问令牌
            print(" 演示失败:无法获取访问令牌")
            return False
        # 步骤6: 测试MCP服务器访问
+       if not self.mcp_server_access():
+           print(" 演示失败:无法访问MCP服务器")
+           return False


# 主函数,程序入口
def main():
    try:
        # 创建OAuthClient实例
        client = OAuthClient()
        # 运行演示流程
        client.run()
    # 捕获用户中断异常
    except KeyboardInterrupt:
        # 打印用户中断信息
        print("\n\n演示被用户中断")
    # 捕获其他异常并打印错误信息
    except Exception as e:
        # 打印错误信息
        print(f"\n演示过程中发生错误: {e}")


# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

6.3. server.py #

server.py

# 导入日志模块
import logging

# 导入时间模块
+import time

# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler

# 导入jwt模块
+import jwt

# 导入json模块
import json

# 从datetime模块导入datetime和timezone
+from datetime import datetime, timezone

# 导入uuid模块
+import uuid

# 从tools模块导入tools对象
+from tools import tools

# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)

# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
    """MCP服务器核心类,支持OAuth 2.1授权"""

    # 初始化方法
    def __init__(self):
        # 协议版本号
        self.protocol_version = "2025-06-18"
        # 服务器信息,包括名称和版本
        self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
        # 服务器能力,包括工具、资源和提示
        self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
        # 会话管理字典
+       self.sessions = {}
        # 生成服务器密钥(实际应用中应该使用安全的密钥管理)
+       self.server_secret = "mcp_server_secret_key_2025"
        # OAuth 2.1授权相关配置
        self.oauth_config = {
            # 授权端点
            "authorization_endpoint": "http://localhost:8001/oauth/authorize",
            # 令牌端点
            "token_endpoint": "http://localhost:8001/oauth/token",
            # 注册端点
            "registration_endpoint": "http://localhost:8001/oauth/register",
            # 发行者
            "issuer": "http://localhost:8001",
            # 支持的作用域 工具、资源、提示
            "scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
            # 支持的响应类型
            "response_types_supported": ["code"],
            # 支持的授权类型 授权码和刷新令牌
            "grant_types_supported": ["authorization_code", "refresh_token"],
            # 令牌端点支持的认证方法 客户端密钥基本认证和客户端密钥POST认证
            "token_endpoint_auth_methods_supported": [
                "client_secret_basic", # 客户端密钥基本认证
                "client_secret_post", # 客户端密钥POST认证
            ],
            # 支持的PKCE方法
            "code_challenge_methods_supported": ["S256"],  # PKCE支持
        }

    # 验证访问令牌
+   def validate_access_token(self, token: str) -> dict:
+       """验证访问令牌"""
+       try:
            # 记录开始验证访问令牌
+           logger.info(f"开始验证访问令牌: {token[:16]}...")

            # 首先尝试使用服务器密钥验证(自签名令牌)
+           try:
                # 使用服务器密钥解码JWT
+               payload = jwt.decode(
+                   token,
+                   self.server_secret,
+                   algorithms=["HS256"],
+                   audience="http://localhost:8000",
+               )
                # 记录服务器密钥验证成功
+               logger.info("使用服务器密钥验证成功")
+           except jwt.InvalidTokenError:
                # 记录服务器密钥验证失败,尝试OAuth授权服务器密钥
+               logger.info("服务器密钥验证失败,尝试OAuth授权服务器密钥")
                # 如果失败,尝试使用OAuth授权服务器的密钥验证
                # 这里我们使用一个共享的密钥
+               oauth_server_secret = "oauth_server_secret_key_2025"  # 与授权服务器共享
+               try:
                    # 使用OAuth服务器密钥解码JWT
+                   payload = jwt.decode(
+                       token,
+                       oauth_server_secret,
+                       algorithms=["HS256"],
+                       audience="http://localhost:8000",
+                   )
                    # 记录OAuth授权服务器密钥验证成功
+                   logger.info("使用OAuth授权服务器密钥验证成功")
+               except jwt.InvalidTokenError:
                    # 记录OAuth授权服务器密钥验证也失败
+                   logger.error("OAuth授权服务器密钥验证也失败")
+                   return None

            # 检查令牌是否过期
+           if (
+               "exp" in payload
+               and datetime.now(timezone.utc).timestamp() > payload["exp"]
+           ):
                # 记录令牌已过期
+               logger.error("令牌已过期")
+               return None

            # 检查令牌是否针对此服务器(audience已经在JWT解码时验证过了)
            # 但我们可以再次确认
+           if "aud" in payload and payload["aud"] != "http://localhost:8000":
                # 记录令牌audience不匹配
+               logger.error(
+                   f"令牌audience不匹配: {payload['aud']} != http://localhost:8000"
+               )
+               return None

            # 记录令牌验证成功,包含scope和client_id
+           logger.info(
+               f"令牌验证成功,包含scope: {payload.get('scope')}, client_id: {payload.get('client_id')}"
+           )
            # 返回payload
+           return payload
+       except Exception as e:
            # 记录令牌验证错误
+           logger.error(f"令牌验证错误: {e}")
+           return None

    # 生成唯一的会话ID
+   def _generate_session_id(self) -> str:
+       """生成唯一的会话ID"""
        # 使用uuid4生成唯一ID
+       return str(uuid.uuid4())

    # 获取或创建会话
+   def _get_or_create_session(self, session_id: str = None):
+       """获取或创建会话"""
        # 如果没有传入会话ID,则生成一个新的
+       if not session_id:
+           session_id = self._generate_session_id()
        # 如果会话ID不存在于sessions字典中,则新建一个会话
+       if session_id not in self.sessions:
+           self.sessions[session_id] = {
+               "created_at": time.time(),
+               "last_activity": time.time(),
+               "message_count": 0,
+               "client_info": None,
+           }
            # 记录新会话的创建
+           logger.info(f"创建新会话: {session_id}")
        # 更新会话的最后活动时间
+       self.sessions[session_id]["last_activity"] = time.time()
        # 增加消息计数
+       self.sessions[session_id]["message_count"] += 1
        # 返回会话ID
+       return session_id

    # 处理工具列表请求
+   def handle_tools_list(self, params=None, session_id: str = None):
+       """处理工具列表请求"""
        # 如果有会话ID,则获取或创建会话
+       if session_id:
+           self._get_or_create_session(session_id)
        # 返回工具列表
+       return {"tools": tools}

    # 处理初始化请求(实际生效的方法,覆盖上面的方法)
+   def handle_initialize(self, params, session_id: str = None):
+       """处理初始化请求"""
        # 获取客户端信息
+       client_info = params.get("clientInfo", {})
        # 获取或创建会话ID
+       session_id = self._get_or_create_session(session_id)
        # 存储客户端信息到会话
+       self.sessions[session_id]["client_info"] = client_info
        # 记录客户端初始化信息
+       logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
        # 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
+       return {
+           "protocolVersion": self.protocol_version, # 协议版本
+           "capabilities": self.capabilities, # 能力
+           "serverInfo": self.server_info, # 服务器信息
+           "sessionId": session_id,  # 返回会话ID给客户端
+       }

    # 获取OAuth 2.0 Protected Resource Metadata (RFC9728)
    def get_resource_metadata(self):
        """获取资源服务器元数据"""
        # 返回资源元数据字典
        return {
            "resource": "http://localhost:8000",
            "authorization_servers": [
                {
                    "issuer": self.oauth_config["issuer"], # 发行者
                    "authorization_endpoint": self.oauth_config["authorization_endpoint"], # 授权端点
                    "token_endpoint": self.oauth_config["token_endpoint"], # 令牌端点
                    "registration_endpoint": self.oauth_config["registration_endpoint"], # 注册端点
                    "scopes_supported": self.oauth_config["scopes_supported"], # 支持的作用域
                    "response_types_supported": self.oauth_config["response_types_supported"], # 支持的响应类型
                    "grant_types_supported": self.oauth_config["grant_types_supported"], # 支持的授权类型
                    "token_endpoint_auth_methods_supported": self.oauth_config["token_endpoint_auth_methods_supported"], # 令牌端点支持的认证方法
                    "code_challenge_methods_supported": self.oauth_config["code_challenge_methods_supported"], # 支持的PKCE方法
                }
            ],
        }


# 定义HTTP请求处理类
class MCPHTTPHandler(BaseHTTPRequestHandler):
    # 初始化方法,接收mcp_server参数
    def __init__(self, *args, mcp_server=None, **kwargs):
        # 保存MCP服务器实例
        self.mcp_server = mcp_server
        # 调用父类的初始化方法
        super().__init__(*args, **kwargs)

    # 处理OAuth资源元数据端点
    def _handle_resource_metadata(self):
        try:
            # 获取资源元数据
            metadata = self.mcp_server.get_resource_metadata()
            # 发送200响应
            self.send_response(200)
            # 设置响应头为application/json
            self.send_header("Content-Type", "application/json")
            # 结束响应头
            self.end_headers()
            # 写入JSON数据
            self.wfile.write(json.dumps(metadata).encode("utf-8"))
        except Exception as e:
            # 记录错误日志
            logger.error(f"处理资源元数据错误: {e}")
            # 返回500错误
            self.send_error(500, "Internal Server Error")

    # 处理GET请求
    def do_GET(self):
        try:
            # 如果请求路径为OAuth资源元数据端点
            if self.path == "/.well-known/oauth-protected-resource":
                self._handle_resource_metadata()
                return

            # 如果请求根路径,返回欢迎信息
            if self.path == "/":
                self.send_response(200)
                self.send_header("Content-Type", "text/html")
                self.end_headers()
                self.wfile.write(b"Welcome to MCP server!")
                return
        except Exception as e:
            # 处理GET请求出错,记录日志
            logger.error(f"GET处理错误: {e}")
            # 返回500错误
            self.send_error(500, "Internal Server Error")

    # 发送未授权响应
+   def _send_unauthorized_response(self):
+       """发送未授权响应,包含WWW-Authenticate头"""
        # 发送401未授权响应
+       self.send_response(401)
        # 设置WWW-Authenticate响应头
+       self.send_header(
+           "WWW-Authenticate", 'Bearer realm="MCP Server", error="invalid_token"'
+       )
        # 设置Content-Type为application/json
+       self.send_header("Content-Type", "application/json")
        # 结束响应头
+       self.end_headers()

        # 构造错误响应体
+       error_response = {
+           "error": "invalid_token",
+           "error_description": "The access token is invalid or expired",
+       }
        # 写入错误响应体
+       self.wfile.write(json.dumps(error_response).encode("utf-8"))

    # 验证授权
+   def _validate_authorization(self):
+       """验证访问令牌"""
        # 获取Authorization头
+       auth_header = self.headers.get("Authorization")
        # 检查Authorization头是否存在且格式正确
+       if not auth_header or not auth_header.startswith("Bearer "):
            # 记录警告
+           logger.warning("缺少Authorization头或格式不正确")
            # 发送未授权响应
+           self._send_unauthorized_response()
+           return False

        # 去掉 "Bearer " 前缀,获取token
+       token = auth_header[7:]
        # 记录正在验证访问令牌
+       logger.info(f"正在验证访问令牌: {token[:16]}...")

        # 调用MCPServer的validate_access_token方法验证token
+       payload = self.mcp_server.validate_access_token(token)

        # 如果payload为None,说明验证失败
+       if not payload:
            # 记录警告
+           logger.warning("访问令牌验证失败")
            # 发送未授权响应
+           self._send_unauthorized_response()
+           return False

        # 记录访问令牌验证成功
+       logger.info(f"访问令牌验证成功,客户端ID: {payload.get('client_id')}")
+       return True

    # 处理MCP消息
+   def _handle_mcp_message(self, message, session_id: str = None):
+       """处理MCP消息"""
        # 获取方法名
+       method = message.get("method")
        # 获取参数
+       params = message.get("params", {})
        # 获取消息ID
+       msg_id = message.get("id")
        # 判断方法类型
+       if method == "initialize":
            # 调用服务器的handle_initialize方法
+           result = self.mcp_server.handle_initialize(params, session_id)
            # 如果有消息ID,返回带ID的结果
+           return {"id": msg_id, "result": result} if msg_id else None
+       elif method == "tools/list":
            # 处理工具列表请求
+           result = self.mcp_server.handle_tools_list(params, session_id)
+           return {"id": msg_id, "result": result} if msg_id else None
+       else:
            # 如果方法不存在,返回错误
+           if msg_id:
+               return {
+                   "id": msg_id,
+                   "error": {"code": -32601, "message": f"Method not found: {method}"},
+               }
            # 没有ID则返回None
+           return None

    # 处理POST请求(发送消息到服务器)
+   def do_POST(self):
+       """处理POST请求(发送消息到服务器)"""
+       try:
            # 检查请求路径是否为/mcp
+           if self.path != "/mcp":
+               self.send_error(404, "MCP endpoint not found")
+               return
            # 检查Accept头是否包含application/json或text/event-stream
+           accept_header = self.headers.get("Accept", "")
+           if (
+               "application/json" not in accept_header
+               and "text/event-stream" not in accept_header
+           ):
+               self.send_error(400, "Missing required Accept header")
+               return

            # 验证访问令牌(除了初始化请求)
+           if self.path == "/mcp":
                # 先读取请求体进行令牌验证
+               content_length = int(self.headers.get("Content-Length", 0))
+               body = self.rfile.read(content_length)
+               try:
                    # 解码请求体为JSON
+                   message = json.loads(body.decode("utf-8"))
                    # 如果不是初始化请求,则验证令牌
+                   if message and message.get("method") != "initialize":
+                       if not self._validate_authorization():
+                           return
+               except json.JSONDecodeError:
                    # 请求体不是有效的JSON,返回400
+                   self.send_error(400, "Invalid JSON")
+                   return
            # 检查MCP-Protocol-Version头是否匹配
+           protocol_version = self.headers.get("MCP-Protocol-Version")
+           if (
+               protocol_version
+               and protocol_version != self.mcp_server.protocol_version
+           ):
                # 协议版本不支持,返回400
+               self.send_error(
+                   400, f"Unsupported protocol version: {protocol_version}"
+               )
+               return
            # 获取会话ID
+           session_id = self.headers.get("Mcp-Session-Id")
            # 请求体已经在上面读取过了,直接使用
            # 处理MCP消息
+           response = self._handle_mcp_message(message, session_id)
            # 发送响应
+           if response:
                # 发送200响应码
+               self.send_response(200)
                # 设置响应头Content-Type为application/json
+               self.send_header("Content-Type", "application/json")
                # 设置协议版本头
+               self.send_header(
+                   "MCP-Protocol-Version", self.mcp_server.protocol_version
+               )
                # 如果响应中包含会话ID,则添加到响应头
+               if "result" in response and "sessionId" in response["result"]:
+                   self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
                # 结束响应头
+               self.end_headers()
                # 写入JSON响应体
+               self.wfile.write(json.dumps(response).encode("utf-8"))
+           else:
                # 没有响应内容,返回202 Accepted
+               self.send_response(202)  # Accepted
+               self.end_headers()
+       except Exception as e:
            # 记录错误日志
+           logger.error(f"POST处理错误: {e}")
            # 返回500内部服务器错误
+           self.send_error(500, "Internal Server Error")


# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
    """运行HTTP服务器"""

    # 定义处理器工厂函数,用于传递mcp_server实例
    def handler_factory(*args, **kwargs):
        return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)

    # 创建HTTPServer实例,绑定主机和端口
    server = HTTPServer((host, port), handler_factory)
    # 记录服务器启动信息
    logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")

    try:
        # 启动服务器,进入循环监听请求
        server.serve_forever()
    except KeyboardInterrupt:
        # 捕获Ctrl+C,记录服务器停止
        logger.info("HTTP服务器已停止")
    finally:
        # 关闭服务器
        server.server_close()


# 定义主函数
def main():
    """主函数"""
    # 导入argparse模块,用于解析命令行参数
    import argparse

    # 创建ArgumentParser对象,设置描述信息
    parser = argparse.ArgumentParser(description="MCP HTTP服务器")
    # 添加--host参数,指定服务器主机,默认localhost
    parser.add_argument(
        "--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
    )
    # 添加--port参数,指定服务器端口,默认8000
    parser.add_argument(
        "--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
    )

    # 解析命令行参数
    args = parser.parse_args()

    # 创建MCP服务器实例
    mcp_server = MCPServer()

    # 运行HTTP服务器
    run_http_server(mcp_server, args.host, args.port)


# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

步骤7: 调用calculate工具 #

7.1. client.py #

client.py

# 导入requests库,用于发送HTTP请求
import requests

# 导入secrets库,用于生成安全的随机字符串
import secrets

# 从urllib.parse导入urlencode、parse_qs、urlparse,用于URL参数编码和解析
from urllib.parse import urlencode, parse_qs, urlparse

# 导入hashlib库,用于哈希运算
import hashlib

# 导入base64库,用于base64编码
import base64


# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:

    # 构造函数,初始化MCP服务器和OAuth服务器的URL
    def __init__(
        self,
        mcp_server_url="http://localhost:8000",
        oauth_server_url="http://localhost:8001",
    ):
        # 去除MCP服务器URL末尾的斜杠
        self.mcp_server_url = mcp_server_url.rstrip("/")
        # 去除OAuth服务器URL末尾的斜杠
        self.oauth_server_url = oauth_server_url.rstrip("/")

    # 生成PKCE挑战码的方法
    def generate_pkce_challenge(self):
        # 生成高强度随机字符串作为code_verifier
        code_verifier = secrets.token_urlsafe(32)
        # 对code_verifier进行SHA256哈希,然后base64-url编码,去除末尾的等号
        code_challenge = (
            base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
            .decode()
            .rstrip("=")
        )
        # 返回code_verifier和code_challenge
        return code_verifier, code_challenge

    # 模拟授权过程
    def simulate_authorization(self, auth_flow):
        # 打印模拟授权过程的提示信息
        print("\n🎭 步骤4: 模拟授权过程")
        print("=" * 50)

        try:
            # 从auth_flow中获取授权URL
            auth_url = auth_flow["auth_url"]
            # 发送GET请求到授权端点,不自动重定向
            response = requests.get(auth_url, allow_redirects=False)

            # 如果返回302重定向
            if response.status_code == 302:
                # 获取重定向的Location头
                redirect_url = response.headers.get("Location")
                if redirect_url:
                    # 解析重定向URL
                    parsed_url = urlparse(redirect_url)
                    # 解析URL中的查询参数
                    query_params = parse_qs(parsed_url.query)

                    # 获取授权码
                    authorization_code = query_params.get("code", [None])[0]
                    # 获取state参数
                    state = query_params.get("state", [None])[0]

                    # 检查授权码和state是否匹配
                    if authorization_code and state == auth_flow["state"]:
                        # 打印模拟授权成功信息
                        print(" 模拟授权成功")
                        print(f"   授权码: {authorization_code[:16]}...")
                        print(f"   状态: {state}")
                        # 返回授权码
                        return authorization_code
                    else:
                        # 授权码获取失败
                        print(" 授权码获取失败")
                        return None
                else:
                    # 未找到重定向URL
                    print(" 未找到重定向URL")
                    return None
            else:
                # 授权请求失败,打印状态码
                print(f" 授权请求失败: {response.status_code}")
                return None

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 模拟授权失败: {e}")
            return None

    # 步骤6: 测试使用访问令牌访问MCP服务器
    def mcp_server_access(self):
        # 打印步骤信息
        print("\n 步骤6: 测试MCP服务器访问")
        print("=" * 50)

        try:
            # 检查是否已获取访问令牌
            if not self.access_token:
                print(" 请先获取访问令牌")
                return False

            # 设置授权头
            headers = {
                "Authorization": f"Bearer {self.access_token}", # 授权头
                "Accept": "application/json", # 接受类型
                "MCP-Protocol-Version": "2025-06-18", # MCP协议版本
            }

            # 打印发送MCP初始化请求信息
            print("📤 发送MCP初始化请求")
            print(f"   授权头: Bearer {self.access_token[:32]}...")

            # 构造MCP初始化请求体
            init_request = {
                "jsonrpc": "2.0", # JSON-RPC版本
                "id": "1", # 请求ID
                "method": "initialize", # 方法
                "params": {
                    "protocolVersion": "2025-06-18", # 协议版本
                    "capabilities": {}, # 能力
                    "clientInfo": {"name": "OAuth客户端", "version": "1.0.0"}, # 客户端信息
                },
            }

            # 发送POST请求到MCP服务器
            response = requests.post(
                f"{self.mcp_server_url}/mcp", json=init_request, headers=headers
            )

            # 判断初始化是否成功
            if response.status_code == 200:
                # 解析初始化响应
                init_response = response.json()
                print(" MCP初始化成功")
                print(f"   协议版本: {init_response['result']['protocolVersion']}")
                print(f"   会话ID: {init_response['result']['sessionId']}")

                # 打印工具列表请求信息
                print("\n 测试工具列表请求")
                # 构造工具列表请求体
                tools_request = {
                    "jsonrpc": "2.0", # JSON-RPC版本
                    "id": "2", # 请求ID
                    "method": "tools/list", # 方法
                    "params": {},
                }

                # 发送POST请求获取工具列表
                tools_response = requests.post(
                    f"{self.mcp_server_url}/mcp", json=tools_request, headers=headers
                )

                # 判断工具列表获取是否成功
                if tools_response.status_code == 200:
                    # 解析工具列表响应
                    tools_result = tools_response.json()
                    tools_list = tools_result["result"]["tools"]
                    print(" 工具列表获取成功")
                    print(f"   可用工具数量: {len(tools_list)}")
                    # 遍历并打印每个工具信息
                    for tool in tools_list:
                        print(f"   - {tool['name']}: {tool['description']}")
                else:
                    # 工具列表获取失败
                    print(f" 工具列表获取失败: {tools_response.status_code}")

                return True
            else:
                # MCP初始化失败
                print(f" MCP初始化失败: {response.status_code}")
                print(f"   错误信息: {response.text}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 测试MCP服务器访问失败: {e}")
            return False

    # 使用授权码交换访问令牌
    def exchange_authorization_code(self, authorization_code, code_verifier):
        # 打印步骤信息
        print("\n🔄 步骤5: 交换访问令牌")
        print("=" * 50)

        try:
            # 获取令牌端点
            token_endpoint = self.authorization_server_metadata["token_endpoint"]

            # 构造令牌请求参数
            token_request = {
                "grant_type": "authorization_code", # 授权类型
                "code": authorization_code, # 授权码
                "redirect_uri": "http://localhost:8080/callback", # 重定向URI
                "client_id": self.client_id, # 客户端ID
                "code_verifier": code_verifier, # PKCE挑战码
                "resource": self.mcp_server_url,  # RFC8707资源指示器
            }

            # 打印令牌请求参数
            print("📤 令牌请求参数:")
            print(f"   授权类型: {token_request['grant_type']}")
            print(f"   授权码: {token_request['code'][:16]}...")
            print(f"   重定向URI: {token_request['redirect_uri']}")
            print(f"   客户端ID: {token_request['client_id']}")
            print(f"   代码验证器: {token_request['code_verifier'][:16]}...")
            print(f"   资源指示器: {token_request['resource']}")

            # 使用客户端凭据进行认证
            auth = (self.client_id, self.client_secret)

            # 发送POST请求到令牌端点
            response = requests.post(token_endpoint, data=token_request, auth=auth)
            # 判断响应状态码是否为200
            if response.status_code == 200:
                # 解析返回的JSON数据
                token_response = response.json()
                # 保存访问令牌
                self.access_token = token_response["access_token"]

                # 打印成功获取访问令牌的信息
                print(" 成功获取访问令牌")
                print(f"   访问令牌: {self.access_token[:32]}...")
                print(f"   令牌类型: {token_response['token_type']}")
                print(f"   过期时间: {token_response['expires_in']}秒")
                print(f"   作用域: {token_response['scope']}")
                return True
            else:
                # 获取访问令牌失败,打印错误信息
                print(f" 获取访问令牌失败: {response.status_code}")
                print(f"   错误信息: {response.text}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 交换访问令牌失败: {e}")
            return False

    # 开始OAuth授权流程的方法
    def start_authorization_flow(self, scope="mcp:tools"):
        # 打印流程提示信息
        print("\n 步骤3: 开始OAuth授权流程")
        print("=" * 50)

        try:
            # 检查是否已发现授权服务器
            if not self.authorization_server_metadata:
                print(" 请先发现授权服务器")
                return None

            # 获取授权端点URL
            authorization_endpoint = self.authorization_server_metadata[
                "authorization_endpoint"
            ]

            # 生成PKCE挑战码
            code_verifier, code_challenge = self.generate_pkce_challenge()

            # 构造授权请求参数
            auth_params = {
                "response_type": "code", # 响应类型
                "client_id": self.client_id, # 客户端ID
                "redirect_uri": "http://localhost:8080/callback", # 重定向URI
                "scope": scope, # 作用域
                "state": secrets.token_urlsafe(16), # 状态
                "code_challenge": code_challenge, # PKCE挑战码
                "code_challenge_method": "S256", # PKCE挑战方法
                "resource": self.mcp_server_url,  # RFC8707资源指示器
            }

            # 构造授权URL
            auth_url = f"{authorization_endpoint}?{urlencode(auth_params)}"

            # 打印授权请求参数
            print(" OAuth授权请求参数:")
            print(f"   响应类型: {auth_params['response_type']}")
            print(f"   客户端ID: {auth_params['client_id']}")
            print(f"   重定向URI: {auth_params['redirect_uri']}")
            print(f"   作用域: {auth_params['scope']}")
            print(f"   状态: {auth_params['state']}")
            print(f"   代码挑战: {auth_params['code_challenge'][:16]}...")
            print(f"   代码挑战方法: {auth_params['code_challenge_method']}")
            print(f"   资源指示器: {auth_params['resource']}")

            # 打印授权URL
            print(f"\n🔗 授权URL:")
            print(f"   {auth_url}")

            # 提示用户在浏览器中访问授权URL
            print(f"\n 请在浏览器中访问上述URL完成授权")
            print(f" 注意: 实际应用中需要用户交互")

            # 返回授权流程相关信息
            return {
                "auth_url": auth_url,
                "code_verifier": code_verifier,
                "state": auth_params["state"],
                "auth_params": auth_params,
            }

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 开始授权流程失败: {e}")
            return None

    # 动态客户端注册方法
    def register_client(self, client_name="MCP客户端"):
        # 打印步骤提示
        print("\n📝 步骤2: 动态客户端注册")
        print("=" * 50)

        try:
            # 检查是否已发现授权服务器
            if not self.authorization_server_metadata:
                print(" 请先发现授权服务器")
                return False

            # 获取注册端点
            registration_endpoint = self.authorization_server_metadata[
                "registration_endpoint"
            ]

            # 构造客户端注册请求体
            client_metadata = {
                "client_name": client_name, # 客户端名称
                "redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
                "grant_types": ["authorization_code"], # 授权类型
                "response_types": ["code"], # 响应类型
                "token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
                "scope": "mcp:tools mcp:resources", # 作用域
                "application_type": "web", # 应用类型
            }

            # 打印注册请求相关信息
            print(f"📤 发送客户端注册请求到: {registration_endpoint}")
            print(f"   客户端名称: {client_name}")
            print(f"   重定向URI: {client_metadata['redirect_uris']}")
            print(f"   授权类型: {client_metadata['grant_types']}")
            print(f"   作用域: {client_metadata['scope']}")

            # 发送POST请求进行客户端注册
            response = requests.post(registration_endpoint, json=client_metadata)
            # 判断注册是否成功
            if response.status_code == 201:
                # 解析返回的客户端信息
                client_info = response.json()
                # 保存客户端ID
                self.client_id = client_info["client_id"]
                # 保存客户端密钥
                self.client_secret = client_info["client_secret"]

                # 打印注册成功信息
                print(" 客户端注册成功")
                print(f"   客户端ID: {self.client_id}")
                print(f"   客户端密钥: {self.client_secret[:8]}...")
                return True
            else:
                # 注册失败,打印错误信息
                print(f" 客户端注册失败: {response.status_code}")
                print(f"   错误信息: {response.text}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 客户端注册失败: {e}")
            return False

    # 发现授权服务器的方法
    def discover_authorization_server(self):
        try:
            # 发送GET请求获取资源服务器的元数据
            response = requests.get(
                f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
            )
            # 判断响应状态码是否为200
            if response.status_code == 200:
                # 解析响应的JSON数据
                resource_metadata = response.json()
                # 打印获取资源元数据成功的信息
                print(f" 获取资源元数据成功")
                # 打印资源标识符
                print(f"   资源标识符: {resource_metadata.get('resource')}")

                # 获取授权服务器列表
                auth_servers = resource_metadata.get("authorization_servers", [])
                # 判断是否存在授权服务器
                if auth_servers:
                    # 取第一个授权服务器
                    auth_server = auth_servers[0]
                    # 保存授权服务器的元数据
                    self.authorization_server_metadata = auth_server
                    # 获取发行者信息
                    issuer = auth_server.get("issuer")
                    # 获取注册端点
                    registration_endpoint = auth_server.get("registration_endpoint")
                    # 获取授权端点
                    authorization_endpoint = auth_server.get("authorization_endpoint")
                    # 获取令牌端点
                    token_endpoint = auth_server.get("token_endpoint")
                    # 获取支持的作用域
                    scopes_supported = auth_server.get("scopes_supported")
                    # 获取支持的PKCE方法
                    code_challenge_methods_supported = auth_server.get(
                        "code_challenge_methods_supported"
                    )

                    # 打印发现授权服务器成功的信息
                    print(f" 发现授权服务器")
                    print(f"   发行者: {issuer}")
                    print(f"   注册端点: {registration_endpoint}")
                    print(f"   授权端点: {authorization_endpoint}")
                    print(f"   令牌端点: {token_endpoint}")
                    print(f"   支持的作用域: {', '.join(scopes_supported)}")
                    print(f"   PKCE支持: {', '.join(code_challenge_methods_supported)}")
                    # 返回True,表示发现成功
                    return True
                else:
                    # 未找到授权服务器,打印错误信息
                    print(" 未找到授权服务器")
                    return False
            else:
                # 获取资源元数据失败,打印状态码
                print(f" 获取资源元数据失败: {response.status_code}")
                return False

        # 捕获异常并打印错误信息
        except Exception as e:
            print(f" 发现授权服务器失败: {e}")
            return False

    # 定义调用calculate工具的方法
+   def call_calculate_tool(self):
        # 步骤7: 调用calculate工具
+       print("\n🧮 步骤7: 调用calculate工具")
        # 打印分隔线
+       print("=" * 50)

+       try:
            # 检查是否已获取访问令牌
+           if not self.access_token:
+               print(" 请先获取访问令牌")
+               return False

            # 设置HTTP请求头,包括授权、返回类型和协议版本
+           headers = {
+               "Authorization": f"Bearer {self.access_token}", # 授权头
+               "Accept": "application/json", # 接受类型
+               "MCP-Protocol-Version": "2025-06-18", # MCP协议版本
+           }

            # 打印发送请求提示
+           print("📤 发送calculate工具调用请求")

            # 构造calculate工具的请求体
+           calculate_request = {
+               "jsonrpc": "2.0", # JSON-RPC版本
+               "id": "3", # 请求ID
+               "method": "tools/call", # 方法
+               "params": {
+                   "calls": [
+                       {
+                           "name": "calculate", # 工具名称
+                           "arguments": {"expression": "10 * 5 + 20 / 4"}, # 参数
+                       }
+                   ]
+               },
+           }

            # 打印本次计算的表达式
+           print("🔢 计算表达式: 10 * 5 + 20 / 4")
            # 打印预期结果
+           print(" 预期结果: 10 * 5 + 20 / 4 = 50 + 5 = 55")

            # 发送POST请求到MCP服务器
+           response = requests.post(
+               f"{self.mcp_server_url}/mcp", json=calculate_request, headers=headers
+           )

            # 判断请求是否成功
+           if response.status_code == 200:
                # 解析响应结果为JSON
+               calculate_result = response.json()
+               print(" calculate工具调用成功")

                # 判断响应中是否包含result和calls字段
+               if (
+                   "result" in calculate_result
+                   and "calls" in calculate_result["result"]
+               ):
                    # 获取calls列表
+                   calls = calculate_result["result"]["calls"]
                    # 判断calls是否非空
+                   if calls and len(calls) > 0:
                        # 取第一个调用结果
+                       call_result = calls[0]
                        # 判断是否有content字段且非空
+                       if "content" in call_result and len(call_result["content"]) > 0:
                            # 获取content内容
+                           content = call_result["content"][0]
                            # 判断content中是否有text字段
+                           if "text" in content:
+                               print(f"📊 计算结果: {content['text']}")
+                           else:
+                               print(f"📊 结果内容: {content}")
                        # 判断是否有错误
+                       elif "isError" in call_result and call_result["isError"]:
                            # 获取错误信息
+                           error_msg = call_result.get("error", {}).get(
+                               "message", "未知错误"
+                           )
+                           print(f" 计算错误: {error_msg}")
+                       else:
+                           print(f"📊 调用结果: {call_result}")
+                   else:
+                       print(" 未找到调用结果")
+               else:
                    # 响应格式不符,打印完整响应
+                   print(f"📊 完整响应: {calculate_result}")

                # 测试更多计算表达式
+               print("\n🧮 测试更多计算表达式:")
                # 定义测试表达式列表
+               test_expressions = [
+                   "2 + 2 * 3",
+                   "sin(45) + cos(30)",
+                   "sqrt(16) + pow(2, 3)",
+                   "100 / 4 - 5",
+               ]

                # 遍历每个测试表达式,i从4开始编号
+               for i, expr in enumerate(test_expressions, 4):
                    # 构造每个测试表达式的请求体
+                   test_request = {
+                       "jsonrpc": "2.0",
+                       "id": str(i),
+                       "method": "tools/call",
+                       "params": {
+                           "calls": [
+                               {"name": "calculate", "arguments": {"expression": expr}}
+                           ]
+                       },
+                   }

                    # 打印当前测试表达式
+                   print(f"   📝 测试 {i}: {expr}")
                    # 发送POST请求
+                   test_response = requests.post(
+                       f"{self.mcp_server_url}/mcp", json=test_request, headers=headers
+                   )

                    # 判断请求是否成功
+                   if test_response.status_code == 200:
                        # 解析响应
+                       test_result = test_response.json()
                        # 判断响应格式
+                       if "result" in test_result and "calls" in test_result["result"]:
                            # 获取calls列表
+                           test_calls = test_result["result"]["calls"]
                            # 判断calls是否非空
+                           if test_calls and len(test_calls) > 0:
                                # 取第一个调用结果
+                               test_call = test_calls[0]
                                # 判断是否有content且非空
+                               if (
+                                   "content" in test_call
+                                   and len(test_call["content"]) > 0
+                               ):
                                    # 获取content内容
+                                   test_content = test_call["content"][0]
                                    # 判断是否有text字段
+                                   if "text" in test_content:
+                                       print(f"       结果: {test_content['text']}")
+                                   else:
+                                       print(f"      📊 结果: {test_content}")
                                # 判断是否有错误
+                               elif "isError" in test_call and test_call["isError"]:
                                    # 获取错误信息
+                                   error_msg = test_call.get("error", {}).get(
+                                       "message", "未知错误"
+                                   )
+                                   print(f"       错误: {error_msg}")
+                               else:
+                                   print(f"      📊 结果: {test_call}")
+                           else:
+                               print(f"       无结果")
+                       else:
+                           print(f"       响应格式错误")
+                   else:
                        # 请求失败,打印状态码
+                       print(f"       请求失败: {test_response.status_code}")

                # 所有测试完成,返回True
+               return True
+           else:
                # calculate工具调用失败,打印状态码和错误信息
+               print(f" calculate工具调用失败: {response.status_code}")
+               print(f"   错误信息: {response.text}")
+               return False

        # 捕获异常并打印错误信息
+       except Exception as e:
+           print(f" 调用calculate工具失败: {e}")
+           return False

    # 运行演示流程的方法
    def run(self):
        # 步骤1: 发现授权服务器
        if not self.discover_authorization_server():
            # 发现授权服务器失败
            print(" 演示失败:无法发现授权服务器")
            return False
        # 步骤2: 注册客户端
        if not self.register_client():
            # 客户端注册失败
            print(" 演示失败:客户端注册失败")
            return False
        # 步骤3: 开始授权流程
        auth_flow = self.start_authorization_flow()
        if not auth_flow:
            # 无法开始授权流程
            print(" 演示失败:无法开始授权流程")
            return False
        # 步骤4: 模拟授权
        authorization_code = self.simulate_authorization(auth_flow)
        if not authorization_code:
            # 授权失败
            print(" 演示失败:授权失败")
            return False
        # 步骤5: 交换访问令牌
        if not self.exchange_authorization_code(
            authorization_code, auth_flow["code_verifier"]
        ):
            # 无法获取访问令牌
            print(" 演示失败:无法获取访问令牌")
            return False
        # 步骤6: 测试MCP服务器访问
        if not self.mcp_server_access():
            print(" 演示失败:无法访问MCP服务器")
            return False
        # 步骤7: 调用calculate工具
+       if not self.call_calculate_tool():
+           print(" 演示失败:无法调用calculate工具")
+           return False


# 主函数,程序入口
def main():
    try:
        # 创建OAuthClient实例
        client = OAuthClient()
        # 运行演示流程
        client.run()
    # 捕获用户中断异常
    except KeyboardInterrupt:
        # 打印用户中断信息
        print("\n\n演示被用户中断")
    # 捕获其他异常并打印错误信息
    except Exception as e:
        # 打印错误信息
        print(f"\n演示过程中发生错误: {e}")


# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

7.2. server.py #

server.py

# 导入日志模块
import logging

# 导入时间模块
import time

# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler

# 导入jwt模块
import jwt

# 导入json模块
import json

# 从datetime模块导入datetime和timezone
from datetime import datetime, timezone

# 导入uuid模块
import uuid

# 从tools模块导入tools对象
from tools import tools

# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)

# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)


# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
    """MCP服务器核心类,支持OAuth 2.1授权"""

    # 初始化方法
    def __init__(self):
        # 协议版本号
        self.protocol_version = "2025-06-18"
        # 服务器信息,包括名称和版本
        self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
        # 服务器能力,包括工具、资源和提示
        self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
        # 会话管理字典
        self.sessions = {}
        # 生成服务器密钥(实际应用中应该使用安全的密钥管理)
        self.server_secret = "mcp_server_secret_key_2025"
        # OAuth 2.1授权相关配置
        self.oauth_config = {
            # 授权端点
            "authorization_endpoint": "http://localhost:8001/oauth/authorize",
            # 令牌端点
            "token_endpoint": "http://localhost:8001/oauth/token",
            # 注册端点
            "registration_endpoint": "http://localhost:8001/oauth/register",
            # 发行者
            "issuer": "http://localhost:8001",
            # 支持的作用域 工具、资源、提示
            "scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
            # 支持的响应类型 授权码
            "response_types_supported": ["code"],
            # 支持的授权类型 授权码和刷新令牌
            "grant_types_supported": ["authorization_code", "refresh_token"],
            # 令牌端点支持的认证方法 客户端密钥基本认证和客户端密钥POST认证
            "token_endpoint_auth_methods_supported": [
                "client_secret_basic", # 客户端密钥基本认证
                "client_secret_post", # 客户端密钥POST认证
            ],
            # 支持的PKCE方法
            "code_challenge_methods_supported": ["S256"],  # PKCE支持
        }

    # 处理工具调用请求
+   def handle_tools_call(self, params, session_id: str = None):
+       """处理工具调用请求"""
        # 如果有会话ID,则获取或创建会话
+       if session_id:
+           self._get_or_create_session(session_id)

        # 获取调用列表
+       calls = params.get("calls", [])
        # 初始化结果列表
+       results = []

        # 遍历每个调用
+       for call in calls:
            # 获取工具名称
+           tool_name = call.get("name")
            # 获取参数
+           arguments = call.get("arguments", {})

            # 如果工具名称为"calculate"
+           if tool_name == "calculate":
                # 获取表达式
+               expression = arguments.get("expression", "")
+               try:
                    # 计算表达式结果
+                   result = eval(expression)
                    # 添加成功结果到结果列表
+                   results.append(
+                       {
+                           "name": tool_name,
+                           "content": [
+                               {
+                                   "type": "text",
+                                   "text": f"计算结果: {expression} = {result}",
+                               }
+                           ],
+                       }
+                   )
+               except Exception as e:
                    # 计算出错,添加错误信息
+                   results.append(
+                       {
+                           "name": tool_name,
+                           "isError": True,
+                           "error": {"message": f"计算错误: {str(e)}"},
+                       }
+                   )
+           else:
                # 未知工具,添加错误信息
+               results.append(
+                   {
+                       "name": tool_name,
+                       "isError": True,
+                       "error": {"message": f"未知工具: {tool_name}"},
+                   }
+               )

        # 返回所有调用的结果
+       return {"calls": results}

    # 验证访问令牌
    def validate_access_token(self, token: str) -> dict:
        """验证访问令牌"""
        try:
            # 记录开始验证访问令牌
            logger.info(f"开始验证访问令牌: {token[:16]}...")

            # 首先尝试使用服务器密钥验证(自签名令牌)
            try:
                # 使用服务器密钥解码JWT
                payload = jwt.decode(
                    token, # 令牌
                    self.server_secret, # 服务器密钥
                    algorithms=["HS256"], # 算法
                    audience="http://localhost:8000", # 受众
                )
                # 记录服务器密钥验证成功
                logger.info("使用服务器密钥验证成功")
            except jwt.InvalidTokenError:
                # 记录服务器密钥验证失败,尝试OAuth授权服务器密钥
                logger.info("服务器密钥验证失败,尝试OAuth授权服务器密钥")
                # 如果失败,尝试使用OAuth授权服务器的密钥验证
                # 这里我们使用一个共享的密钥
                oauth_server_secret = "oauth_server_secret_key_2025"  # 与授权服务器共享
                try:
                    # 使用OAuth服务器密钥解码JWT
                    payload = jwt.decode(
                        token, # 令牌
                        oauth_server_secret, # OAuth服务器密钥
                        algorithms=["HS256"], # 算法
                        audience="http://localhost:8000", # 受众
                    )
                    # 记录OAuth授权服务器密钥验证成功
                    logger.info("使用OAuth授权服务器密钥验证成功")
                except jwt.InvalidTokenError:
                    # 记录OAuth授权服务器密钥验证也失败
                    logger.error("OAuth授权服务器密钥验证也失败")
                    return None

            # 检查令牌是否过期 如果过期则返回None
            if (
                "exp" in payload
                and datetime.now(timezone.utc).timestamp() > payload["exp"]
            ):
                # 记录令牌已过期
                logger.error("令牌已过期")
                return None

            # 检查令牌是否针对此服务器(audience已经在JWT解码时验证过了) 但我们可以再次确认
            if "aud" in payload and payload["aud"] != "http://localhost:8000":
                # 记录令牌audience不匹配
                logger.error(
                    f"令牌audience不匹配: {payload['aud']} != http://localhost:8000"
                )
                return None

            # 记录令牌验证成功,包含scope和client_id 包含scope和client_id
            logger.info(
                f"令牌验证成功,包含scope: {payload.get('scope')}, client_id: {payload.get('client_id')}"
            )
            # 返回payload
            return payload
        except Exception as e:
            # 记录令牌验证错误
            logger.error(f"令牌验证错误: {e}")
            return None

    # 生成唯一的会话ID
    def _generate_session_id(self) -> str:
        """生成唯一的会话ID"""
        # 使用uuid4生成唯一ID
        return str(uuid.uuid4())

    # 获取或创建会话
    def _get_or_create_session(self, session_id: str = None):
        """获取或创建会话"""
        # 如果没有传入会话ID,则生成一个新的
        if not session_id:
            session_id = self._generate_session_id()
        # 如果会话ID不存在于sessions字典中,则新建一个会话
        if session_id not in self.sessions:
            self.sessions[session_id] = {
                "created_at": time.time(),
                "last_activity": time.time(),
                "message_count": 0,
                "client_info": None,
            }
            # 记录新会话的创建
            logger.info(f"创建新会话: {session_id}")
        # 更新会话的最后活动时间
        self.sessions[session_id]["last_activity"] = time.time()
        # 增加消息计数
        self.sessions[session_id]["message_count"] += 1
        # 返回会话ID
        return session_id

    # 处理工具列表请求
    def handle_tools_list(self, params=None, session_id: str = None):
        """处理工具列表请求"""
        # 如果有会话ID,则获取或创建会话
        if session_id:
            self._get_or_create_session(session_id)
        # 返回工具列表
        return {"tools": tools}

    # 处理初始化请求(实际生效的方法,覆盖上面的方法)
    def handle_initialize(self, params, session_id: str = None):
        """处理初始化请求"""
        # 获取客户端信息
        client_info = params.get("clientInfo", {})
        # 获取或创建会话ID
        session_id = self._get_or_create_session(session_id)
        # 存储客户端信息到会话
        self.sessions[session_id]["client_info"] = client_info
        # 记录客户端初始化信息
        logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
        # 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
        return {
            "protocolVersion": self.protocol_version, # 协议版本
            "capabilities": self.capabilities, # 能力
            "serverInfo": self.server_info, # 服务器信息
            "sessionId": session_id,  # 返回会话ID给客户端
        }

    # 获取OAuth 2.0 Protected Resource Metadata (RFC9728)
    def get_resource_metadata(self):
        """获取资源服务器元数据"""
        # 返回资源元数据字典
        return {
            "resource": "http://localhost:8000",
            "authorization_servers": [
                {
                    "issuer": self.oauth_config["issuer"], # 发行者
                    "authorization_endpoint": self.oauth_config["authorization_endpoint"], # 授权端点
                    "token_endpoint": self.oauth_config["token_endpoint"], # 令牌端点
                    "registration_endpoint": self.oauth_config["registration_endpoint"], # 注册端点
                    "scopes_supported": self.oauth_config["scopes_supported"], # 支持的作用域
                    "response_types_supported": self.oauth_config["response_types_supported"], # 支持的响应类型
                    "grant_types_supported": self.oauth_config["grant_types_supported"], # 支持的授权类型
                    "token_endpoint_auth_methods_supported": self.oauth_config["token_endpoint_auth_methods_supported"], # 令牌端点支持的认证方法
                    "code_challenge_methods_supported": self.oauth_config["code_challenge_methods_supported"], # 支持的PKCE方法
                }
            ],
        }


# 定义HTTP请求处理类
class MCPHTTPHandler(BaseHTTPRequestHandler):
    # 初始化方法,接收mcp_server参数
    def __init__(self, *args, mcp_server=None, **kwargs):
        # 保存MCP服务器实例
        self.mcp_server = mcp_server
        # 调用父类的初始化方法
        super().__init__(*args, **kwargs)

    # 处理OAuth资源元数据端点
    def _handle_resource_metadata(self):
        try:
            # 获取资源元数据
            metadata = self.mcp_server.get_resource_metadata()
            # 发送200响应
            self.send_response(200)
            # 设置响应头为application/json
            self.send_header("Content-Type", "application/json")
            # 结束响应头
            self.end_headers()
            # 写入JSON数据
            self.wfile.write(json.dumps(metadata).encode("utf-8"))
        except Exception as e:
            # 记录错误日志
            logger.error(f"处理资源元数据错误: {e}")
            # 返回500错误
            self.send_error(500, "Internal Server Error")

    # 处理GET请求
    def do_GET(self):
        try:
            # 如果请求路径为OAuth资源元数据端点
            if self.path == "/.well-known/oauth-protected-resource":
                self._handle_resource_metadata()
                return

            # 如果请求根路径,返回欢迎信息
            if self.path == "/":
                self.send_response(200)
                self.send_header("Content-Type", "text/html")
                self.end_headers()
                self.wfile.write(b"Welcome to MCP server!")
                return
        except Exception as e:
            # 处理GET请求出错,记录日志
            logger.error(f"GET处理错误: {e}")
            # 返回500错误
            self.send_error(500, "Internal Server Error")

    # 发送未授权响应
    def _send_unauthorized_response(self):
        """发送未授权响应,包含WWW-Authenticate头"""
        # 发送401未授权响应
        self.send_response(401)
        # 设置WWW-Authenticate响应头
        self.send_header(
            "WWW-Authenticate", 'Bearer realm="MCP Server", error="invalid_token"'
        )
        # 设置Content-Type为application/json
        self.send_header("Content-Type", "application/json")
        # 结束响应头
        self.end_headers()

        # 构造错误响应体
        error_response = {
            "error": "invalid_token",
            "error_description": "The access token is invalid or expired",
        }
        # 写入错误响应体
        self.wfile.write(json.dumps(error_response).encode("utf-8"))

    # 验证授权
    def _validate_authorization(self):
        """验证访问令牌"""
        # 获取Authorization头
        auth_header = self.headers.get("Authorization")
        # 检查Authorization头是否存在且格式正确
        if not auth_header or not auth_header.startswith("Bearer "):
            # 记录警告
            logger.warning("缺少Authorization头或格式不正确")
            # 发送未授权响应
            self._send_unauthorized_response()
            return False

        # 去掉 "Bearer " 前缀,获取token
        token = auth_header[7:]
        # 记录正在验证访问令牌
        logger.info(f"正在验证访问令牌: {token[:16]}...")

        # 调用MCPServer的validate_access_token方法验证token
        payload = self.mcp_server.validate_access_token(token)

        # 如果payload为None,说明验证失败
        if not payload:
            # 记录警告
            logger.warning("访问令牌验证失败")
            # 发送未授权响应
            self._send_unauthorized_response()
            return False

        # 记录访问令牌验证成功
        logger.info(f"访问令牌验证成功,客户端ID: {payload.get('client_id')}")
        return True

    # 处理MCP消息
    def _handle_mcp_message(self, message, session_id: str = None):
        """处理MCP消息"""
        # 获取方法名
        method = message.get("method")
        # 获取参数
        params = message.get("params", {})
        # 获取消息ID
        msg_id = message.get("id")
        # 判断方法类型
        if method == "initialize":
            # 调用服务器的handle_initialize方法
            result = self.mcp_server.handle_initialize(params, session_id)
            # 如果有消息ID,返回带ID的结果
            return {"id": msg_id, "result": result} if msg_id else None
        elif method == "tools/list":
            # 处理工具列表请求
            result = self.mcp_server.handle_tools_list(params, session_id)
            return {"id": msg_id, "result": result} if msg_id else None
+       elif method == "tools/call":
            # 处理工具调用请求
+           result = self.mcp_server.handle_tools_call(params, session_id)
+           return {"id": msg_id, "result": result} if msg_id else None
        else:
            # 如果方法不存在,返回错误
            if msg_id:
                return {
                    "id": msg_id,
                    "error": {"code": -32601, "message": f"Method not found: {method}"},
                }
            # 没有ID则返回None
            return None

    # 处理POST请求(发送消息到服务器)
    def do_POST(self):
        """处理POST请求(发送消息到服务器)"""
        try:
            # 检查请求路径是否为/mcp
            if self.path != "/mcp":
                self.send_error(404, "MCP endpoint not found")
                return
            # 检查Accept头是否包含application/json或text/event-stream
            accept_header = self.headers.get("Accept", "")
            if (
                "application/json" not in accept_header
                and "text/event-stream" not in accept_header
            ):
                self.send_error(400, "Missing required Accept header")
                return

            # 验证访问令牌(除了初始化请求)
            if self.path == "/mcp":
                # 先读取请求体进行令牌验证
                content_length = int(self.headers.get("Content-Length", 0))
                body = self.rfile.read(content_length)
                try:
                    # 解码请求体为JSON
                    message = json.loads(body.decode("utf-8"))
                    # 如果不是初始化请求,则验证令牌
                    if message and message.get("method") != "initialize":
                        if not self._validate_authorization():
                            return
                except json.JSONDecodeError:
                    # 请求体不是有效的JSON,返回400
                    self.send_error(400, "Invalid JSON")
                    return
            # 检查MCP-Protocol-Version头是否匹配
            protocol_version = self.headers.get("MCP-Protocol-Version")
            if (
                protocol_version
                and protocol_version != self.mcp_server.protocol_version
            ):
                # 协议版本不支持,返回400
                self.send_error(
                    400, f"Unsupported protocol version: {protocol_version}"
                )
                return
            # 获取会话ID
            session_id = self.headers.get("Mcp-Session-Id")
            # 请求体已经在上面读取过了,直接使用
            # 处理MCP消息
            response = self._handle_mcp_message(message, session_id)
            # 发送响应
            if response:
                # 发送200响应码
                self.send_response(200)
                # 设置响应头Content-Type为application/json
                self.send_header("Content-Type", "application/json")
                # 设置协议版本头
                self.send_header(
                    "MCP-Protocol-Version", self.mcp_server.protocol_version
                )
                # 如果响应中包含会话ID,则添加到响应头
                if "result" in response and "sessionId" in response["result"]:
                    self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
                # 结束响应头
                self.end_headers()
                # 写入JSON响应体
                self.wfile.write(json.dumps(response).encode("utf-8"))
            else:
                # 没有响应内容,返回202 Accepted
                self.send_response(202)  # Accepted
                self.end_headers()
        except Exception as e:
            # 记录错误日志
            logger.error(f"POST处理错误: {e}")
            # 返回500内部服务器错误
            self.send_error(500, "Internal Server Error")


# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
    """运行HTTP服务器"""

    # 定义处理器工厂函数,用于传递mcp_server实例
    def handler_factory(*args, **kwargs):
        return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)

    # 创建HTTPServer实例,绑定主机和端口
    server = HTTPServer((host, port), handler_factory)
    # 记录服务器启动信息
    logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")

    try:
        # 启动服务器,进入循环监听请求
        server.serve_forever()
    except KeyboardInterrupt:
        # 捕获Ctrl+C,记录服务器停止
        logger.info("HTTP服务器已停止")
    finally:
        # 关闭服务器
        server.server_close()


# 定义主函数
def main():
    """主函数"""
    # 导入argparse模块,用于解析命令行参数
    import argparse

    # 创建ArgumentParser对象,设置描述信息
    parser = argparse.ArgumentParser(description="MCP HTTP服务器")
    # 添加--host参数,指定服务器主机,默认localhost
    parser.add_argument(
        "--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
    )
    # 添加--port参数,指定服务器端口,默认8000
    parser.add_argument(
        "--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
    )

    # 解析命令行参数
    args = parser.parse_args()

    # 创建MCP服务器实例
    mcp_server = MCPServer()

    # 运行HTTP服务器
    run_http_server(mcp_server, args.host, args.port)


# 判断是否为主程序入口
if __name__ == "__main__":
    # 调用主函数
    main()

访问验证

请输入访问令牌

Token不正确,请重新输入