1.发现授权服务器 (RFC9728) #
uv init mcp_protocal
uv add requests pyjwt
pip install watchfiles
watchfiles "uv run auth_server.py" .
watchfiles "uv run server.py" .
watchfiles "uv run client.py" .1.1. client.py #
client.py
# 导入requests库,用于发送HTTP请求
import requests
# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:
# 构造函数,初始化MCP服务器和OAuth服务器的URL
def __init__(
self,
mcp_server_url="http://localhost:8000",
oauth_server_url="http://localhost:8001",
):
# 去除MCP服务器URL末尾的斜杠
self.mcp_server_url = mcp_server_url.rstrip("/")
# 去除OAuth服务器URL末尾的斜杠
self.oauth_server_url = oauth_server_url.rstrip("/")
# 发现授权服务器的方法
def discover_authorization_server(self):
try:
# 发送GET请求获取资源服务器的元数据
response = requests.get(
f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
)
# 如果响应状态码为200,表示请求成功
if response.status_code == 200:
# 解析响应的JSON数据
resource_metadata = response.json()
# 打印获取资源元数据成功的信息
print(f" 获取资源元数据成功")
# 打印资源标识符
print(f" 资源标识符: {resource_metadata.get('resource')}")
# 获取授权服务器列表
auth_servers = resource_metadata.get("authorization_servers", [])
# 如果存在授权服务器
if auth_servers:
# 取第一个授权服务器
auth_server = auth_servers[0]
# 保存授权服务器的元数据
self.authorization_server_metadata = auth_server
# 获取发行者信息
issuer = auth_server.get("issuer")
# 获取注册端点
registration_endpoint = auth_server.get("registration_endpoint")
# 获取授权端点
authorization_endpoint = auth_server.get("authorization_endpoint")
# 获取令牌端点
token_endpoint = auth_server.get("token_endpoint")
# 获取支持的作用域
scopes_supported = auth_server.get("scopes_supported")
# 获取支持的PKCE方法
code_challenge_methods_supported = auth_server.get(
"code_challenge_methods_supported"
)
# 打印发现授权服务器成功的信息
print(f" 发现授权服务器")
# 打印发行者
print(f" 发行者: {issuer}")
# 打印注册端点
print(f" 注册端点: {registration_endpoint}")
# 打印授权端点
print(f" 授权端点: {authorization_endpoint}")
# 打印令牌端点
print(f" 令牌端点: {token_endpoint}")
# 打印支持的作用域
print(f" 支持的作用域: {', '.join(scopes_supported)}")
# 打印PKCE支持的方法
print(f" PKCE支持: {', '.join(code_challenge_methods_supported)}")
# 返回True,表示发现成功
return True
else:
# 未找到授权服务器,打印错误信息
print(" 未找到授权服务器")
return False
else:
# 获取资源元数据失败,打印状态码
print(f" 获取资源元数据失败: {response.status_code}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 发现授权服务器失败: {e}")
return False
# 运行演示流程的方法
def run(self):
# 如果未能发现授权服务器,打印失败信息
if not self.discover_authorization_server():
print(" 演示失败:无法发现授权服务器")
return False
# 主函数,程序入口
def main():
try:
# 创建OAuthClient实例
client = OAuthClient()
# 运行演示流程
client.run()
# 捕获用户中断异常
except KeyboardInterrupt:
print("\n\n演示被用户中断")
# 捕获其他异常并打印错误信息
except Exception as e:
print(f"\n演示过程中发生错误: {e}")
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
1.2. server.py #
server.py
# 导入日志模块
import logging
# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler
# 导入json模块
import json
# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
"""MCP服务器核心类,支持OAuth 2.1授权"""
# 初始化方法
def __init__(self):
# 协议版本号
self.protocol_version = "2025-06-18"
# 服务器信息,包括名称和版本
self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
# 服务器能力,包括工具、资源和提示
self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
# OAuth 2.1授权相关配置
self.oauth_config = {
# 授权端点
"authorization_endpoint": "http://localhost:8001/oauth/authorize",
# 令牌端点
"token_endpoint": "http://localhost:8001/oauth/token",
# 注册端点
"registration_endpoint": "http://localhost:8001/oauth/register",
# 发行者
"issuer": "http://localhost:8001",
# 支持的作用域
"scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
# 支持的响应类型
"response_types_supported": ["code"],
# 支持的授权类型
"grant_types_supported": ["authorization_code", "refresh_token"],
# 令牌端点支持的认证方法
"token_endpoint_auth_methods_supported": [
"client_secret_basic",
"client_secret_post",
],
# 支持的PKCE方法
"code_challenge_methods_supported": ["S256"], # PKCE支持
}
# 获取OAuth 2.0 Protected Resource Metadata (RFC9728)
def get_resource_metadata(self):
"""获取资源服务器元数据"""
# 返回资源元数据字典
return {
"resource": "http://localhost:8000",
"authorization_servers": [
{
"issuer": self.oauth_config["issuer"],# 发行者
"authorization_endpoint": self.oauth_config["authorization_endpoint"],# 授权端点
"token_endpoint": self.oauth_config["token_endpoint"],# 令牌端点
"registration_endpoint": self.oauth_config["registration_endpoint"],# 注册端点
"scopes_supported": self.oauth_config["scopes_supported"],# 支持的作用域
"response_types_supported": self.oauth_config["response_types_supported"],# 支持的响应类型
"grant_types_supported": self.oauth_config["grant_types_supported"],# 支持的授权类型
"token_endpoint_auth_methods_supported": self.oauth_config["token_endpoint_auth_methods_supported"],# 令牌端点支持的认证方法
"code_challenge_methods_supported": self.oauth_config["code_challenge_methods_supported"],# 支持的PKCE方法
}
],
}
# 定义HTTP请求处理类
class MCPHTTPHandler(BaseHTTPRequestHandler):
# 初始化方法,接收mcp_server参数
def __init__(self, *args, mcp_server=None, **kwargs):
# 保存MCP服务器实例
self.mcp_server = mcp_server
# 调用父类的初始化方法
super().__init__(*args, **kwargs)
# 处理OAuth资源元数据端点
def _handle_resource_metadata(self):
try:
# 获取资源元数据
metadata = self.mcp_server.get_resource_metadata()
# 发送200响应
self.send_response(200)
# 设置响应头为application/json
self.send_header("Content-Type", "application/json")
# 结束响应头
self.end_headers()
# 写入JSON数据
self.wfile.write(json.dumps(metadata).encode("utf-8"))
except Exception as e:
# 记录错误日志
logger.error(f"处理资源元数据错误: {e}")
# 返回500错误
self.send_error(500, "Internal Server Error")
# 处理GET请求
def do_GET(self):
try:
# 如果请求路径为OAuth资源元数据端点
if self.path == "/.well-known/oauth-protected-resource":
self._handle_resource_metadata()
return
# 如果请求根路径,返回欢迎信息
if self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(b"Welcome to MCP server!")
return
except Exception as e:
# 处理GET请求出错,记录日志
logger.error(f"GET处理错误: {e}")
# 返回500错误
self.send_error(500, "Internal Server Error")
# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
"""运行HTTP服务器"""
# 定义处理器工厂函数,用于传递mcp_server实例
def handler_factory(*args, **kwargs):
return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)
# 创建HTTPServer实例,绑定主机和端口
server = HTTPServer((host, port), handler_factory)
# 记录服务器启动信息
logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")
try:
# 启动服务器,进入循环监听请求
server.serve_forever()
except KeyboardInterrupt:
# 捕获Ctrl+C,记录服务器停止
logger.info("HTTP服务器已停止")
finally:
# 关闭服务器
server.server_close()
# 定义主函数
def main():
"""主函数"""
# 导入argparse模块,用于解析命令行参数
import argparse
# 创建ArgumentParser对象,设置描述信息
parser = argparse.ArgumentParser(description="MCP HTTP服务器")
# 添加--host参数,指定服务器主机,默认localhost
parser.add_argument(
"--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
)
# 添加--port参数,指定服务器端口,默认8000
parser.add_argument(
"--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
)
# 解析命令行参数
args = parser.parse_args()
# 创建MCP服务器实例
mcp_server = MCPServer()
# 运行HTTP服务器
run_http_server(mcp_server, args.host, args.port)
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
2.动态客户端注册 (RFC7591) #
2.1. auth_server.py #
auth_server.py
# 导入日志模块
import logging
# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler
# 导入json模块
import json
# 导入secrets模块用于生成安全的随机数
import secrets
# 导入datetime相关类
from datetime import datetime, timedelta, timezone
# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义OAuthServer类,作为OAuth 2.1授权服务器
class OAuthServer:
"""OAuth 2.1授权服务器"""
# 初始化方法
def __init__(self):
# 存储注册的客户端信息
self.clients = {} # 注册的客户端
# 存储授权码
self.authorization_codes = {} # 授权码
# 存储访问令牌
self.access_tokens = {} # 访问令牌
# 服务器密钥
self.server_secret = "oauth_server_secret_key_2025"
# 服务器元数据
self.metadata = {
# 发行者
"issuer": "http://localhost:8001",
# 授权端点
"authorization_endpoint": "http://localhost:8001/oauth/authorize",
# 令牌端点
"token_endpoint": "http://localhost:8001/oauth/token",
# 客户端注册端点
"registration_endpoint": "http://localhost:8001/oauth/register",
# 支持的作用域
"scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
# 支持的响应类型
"response_types_supported": ["code"],
# 支持的授权类型
"grant_types_supported": ["authorization_code", "refresh_token"],
# 令牌端点支持的认证方法
"token_endpoint_auth_methods_supported": [
"client_secret_basic",
"client_secret_post",
],
# 支持的PKCE方法
"code_challenge_methods_supported": ["S256"],
# 服务文档地址
"service_documentation": "https://modelcontextprotocol.io/specification/2025-06-18/basic/authorization",
}
# 动态客户端注册方法 (RFC7591)
def register_client(self, client_metadata):
"""动态客户端注册 (RFC7591)"""
# 生成客户端ID
client_id = str(secrets.token_urlsafe(16))
# 生成客户端密钥
client_secret = str(secrets.token_urlsafe(32))
# 构造客户端信息字典
client_info = {
"client_id": client_id,
"client_secret": client_secret,
# 客户端ID签发时间(UTC时间戳)
"client_id_issued_at": int(datetime.now(timezone.utc).timestamp()),
# 客户端密钥过期时间(0表示永不过期)
"client_secret_expires_at": 0, # 永不过期
# 合并客户端元数据
**client_metadata,
}
# 保存客户端信息到clients字典
self.clients[client_id] = client_info
# 记录新客户端注册日志
logger.info(f"新客户端注册: {client_id}")
# 返回客户端信息
return client_info
# 定义OAuthHandler类,继承自BaseHTTPRequestHandler
class OAuthHandler(BaseHTTPRequestHandler):
"""OAuth HTTP处理器"""
# 初始化方法,接收oauth_server参数
def __init__(self, *args, oauth_server=None, **kwargs):
# 保存OAuthServer实例
self.oauth_server = oauth_server
# 调用父类初始化方法
super().__init__(*args, **kwargs)
# 处理客户端注册请求的方法
def _handle_client_registration(self):
"""处理客户端注册请求"""
try:
# 获取请求体长度
content_length = int(self.headers.get("Content-Length", 0))
# 读取请求体内容
body = self.rfile.read(content_length)
# 解析JSON格式的客户端元数据
client_metadata = json.loads(body.decode("utf-8"))
# 调用OAuthServer的register_client方法注册客户端
client_info = self.oauth_server.register_client(client_metadata)
# 发送201响应,表示创建成功
self.send_response(201)
# 设置响应头为application/json
self.send_header("Content-Type", "application/json")
# 结束响应头
self.end_headers()
# 返回注册成功的客户端信息(JSON格式)
self.wfile.write(json.dumps(client_info).encode("utf-8"))
except Exception as e:
# 记录错误日志
logger.error(f"客户端注册错误: {e}")
# 返回400错误
self.send_error(400, "Bad Request")
# 处理POST请求的方法
def do_POST(self):
"""处理POST请求"""
try:
# 如果请求路径为/oauth/register,处理客户端注册
if self.path == "/oauth/register":
self._handle_client_registration()
# 其他路径返回404
else:
self.send_error(404, "Not Found")
except Exception as e:
# 记录POST处理错误日志
logger.error(f"POST处理错误: {e}")
# 返回500错误
self.send_error(500, "Internal Server Error")
# 运行OAuth授权服务器的函数
def run_oauth_server(host="localhost", port=8001):
"""运行OAuth授权服务器"""
# 创建OAuthServer实例
oauth_server = OAuthServer()
# 定义处理器工厂函数,用于传递oauth_server实例
def handler_factory(*args, **kwargs):
return OAuthHandler(*args, oauth_server=oauth_server, **kwargs)
# 创建HTTPServer实例,绑定主机和端口
server = HTTPServer((host, port), handler_factory)
# 记录服务器启动信息
logger.info(f"OAuth授权服务器运行在 http://{host}:{port}")
# 记录支持的端点信息
logger.info("支持的端点:")
logger.info(
f" - 服务器元数据: http://{host}:{port}/.well-known/oauth-authorization-server"
)
logger.info(f" - 授权端点: http://{host}:{port}/oauth/authorize")
logger.info(f" - 令牌端点: http://{host}:{port}/oauth/token")
logger.info(f" - 客户端注册: http://{host}:{port}/oauth/register")
try:
# 启动服务器,进入循环监听请求
server.serve_forever()
except KeyboardInterrupt:
# 捕获Ctrl+C,记录服务器停止
logger.info("OAuth授权服务器已停止")
finally:
# 关闭服务器
server.server_close()
# 判断是否为主程序入口
if __name__ == "__main__":
# 导入argparse模块,用于解析命令行参数
import argparse
# 创建ArgumentParser对象,设置描述信息
parser = argparse.ArgumentParser(description="MCP OAuth授权服务器")
# 添加--host参数,指定服务器主机,默认localhost
parser.add_argument(
"--host", default="localhost", help="服务器主机 (默认: localhost)"
)
# 添加--port参数,指定服务器端口,默认8001
parser.add_argument(
"--port", type=int, default=8001, help="服务器端口 (默认: 8001)"
)
# 解析命令行参数
args = parser.parse_args()
# 启动OAuth授权服务器
run_oauth_server(args.host, args.port)
2.2. client.py #
client.py
# 导入requests库,用于发送HTTP请求
import requests
# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:
# 构造函数,初始化MCP服务器和OAuth服务器的URL
def __init__(
self,
mcp_server_url="http://localhost:8000",
oauth_server_url="http://localhost:8001",
):
# 去除MCP服务器URL末尾的斜杠
self.mcp_server_url = mcp_server_url.rstrip("/")
# 去除OAuth服务器URL末尾的斜杠
self.oauth_server_url = oauth_server_url.rstrip("/")
# 动态客户端注册方法
+ def register_client(self, client_name="MCP客户端"):
# 打印步骤提示
+ print("\n📝 步骤2: 动态客户端注册")
+ print("=" * 50)
+ try:
# 检查是否已发现授权服务器
+ if not self.authorization_server_metadata:
+ print(" 请先发现授权服务器")
+ return False
# 获取注册端点
+ registration_endpoint = self.authorization_server_metadata[
+ "registration_endpoint"
+ ]
# 构造客户端注册请求体
+ client_metadata = {
+ "client_name": client_name, # 客户端名称
+ "redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
+ "grant_types": ["authorization_code"], # 授权类型
+ "response_types": ["code"], # 响应类型
+ "token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
+ "scope": "mcp:tools mcp:resources", # 作用域
+ "application_type": "web", # 应用类型
+ }
# 打印注册请求相关信息
+ print(f"📤 发送客户端注册请求到: {registration_endpoint}")
+ print(f" 客户端名称: {client_name}")
+ print(f" 重定向URI: {client_metadata['redirect_uris']}")
+ print(f" 授权类型: {client_metadata['grant_types']}")
+ print(f" 作用域: {client_metadata['scope']}")
# 发送POST请求进行客户端注册
+ response = requests.post(registration_endpoint, json=client_metadata)
# 判断注册是否成功
+ if response.status_code == 201:
# 解析返回的客户端信息
+ client_info = response.json()
# 保存客户端ID
+ self.client_id = client_info["client_id"]
# 保存客户端密钥
+ self.client_secret = client_info["client_secret"]
# 打印注册成功信息
+ print(" 客户端注册成功")
+ print(f" 客户端ID: {self.client_id}")
+ print(f" 客户端密钥: {self.client_secret[:8]}...")
+ return True
+ else:
# 注册失败,打印错误信息
+ print(f" 客户端注册失败: {response.status_code}")
+ print(f" 错误信息: {response.text}")
+ return False
# 捕获异常并打印错误信息
+ except Exception as e:
+ print(f" 客户端注册失败: {e}")
+ return False
# 发现授权服务器的方法
def discover_authorization_server(self):
try:
# 发送GET请求获取资源服务器的元数据
response = requests.get(
f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
)
# 判断响应状态码是否为200
if response.status_code == 200:
# 解析响应的JSON数据
resource_metadata = response.json()
# 打印获取资源元数据成功的信息
print(f" 获取资源元数据成功")
# 打印资源标识符
print(f" 资源标识符: {resource_metadata.get('resource')}")
# 获取授权服务器列表
auth_servers = resource_metadata.get("authorization_servers", [])
# 判断是否存在授权服务器
if auth_servers:
# 取第一个授权服务器
auth_server = auth_servers[0]
# 保存授权服务器的元数据
self.authorization_server_metadata = auth_server
# 获取发行者信息
issuer = auth_server.get("issuer")
# 获取注册端点
registration_endpoint = auth_server.get("registration_endpoint")
# 获取授权端点
authorization_endpoint = auth_server.get("authorization_endpoint")
# 获取令牌端点
token_endpoint = auth_server.get("token_endpoint")
# 获取支持的作用域
scopes_supported = auth_server.get("scopes_supported")
# 获取支持的PKCE方法
code_challenge_methods_supported = auth_server.get(
"code_challenge_methods_supported"
)
# 打印发现授权服务器成功的信息
print(f" 发现授权服务器")
# 打印发行者
print(f" 发行者: {issuer}")
# 打印注册端点
print(f" 注册端点: {registration_endpoint}")
# 打印授权端点
print(f" 授权端点: {authorization_endpoint}")
# 打印令牌端点
print(f" 令牌端点: {token_endpoint}")
# 打印支持的作用域
print(f" 支持的作用域: {', '.join(scopes_supported)}")
# 打印PKCE支持的方法
print(f" PKCE支持: {', '.join(code_challenge_methods_supported)}")
# 返回True,表示发现成功
return True
else:
# 未找到授权服务器,打印错误信息
print(" 未找到授权服务器")
return False
else:
# 获取资源元数据失败,打印状态码
print(f" 获取资源元数据失败: {response.status_code}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 发现授权服务器失败: {e}")
return False
# 运行演示流程的方法
def run(self):
# 步骤1: 发现授权服务器
if not self.discover_authorization_server():
print(" 演示失败:无法发现授权服务器")
return False
# 步骤2: 注册客户端
+ if not self.register_client():
+ print(" 演示失败:客户端注册失败")
+ return False
# 主函数,程序入口
def main():
try:
# 创建OAuthClient实例
client = OAuthClient()
# 运行演示流程
client.run()
# 捕获用户中断异常
except KeyboardInterrupt:
print("\n\n演示被用户中断")
# 捕获其他异常并打印错误信息
except Exception as e:
print(f"\n演示过程中发生错误: {e}")
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
3. OAuth授权码流程 #
3.1. auth_server.py #
auth_server.py
# 导入日志模块
import logging
# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler
# 导入json模块
import json
# 导入secrets模块用于生成安全的随机数
import secrets
# 导入datetime相关类
from datetime import datetime, timedelta, timezone
+from urllib.parse import parse_qs, urlparse
# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义OAuthServer类,作为OAuth 2.1授权服务器
class OAuthServer:
"""OAuth 2.1授权服务器"""
# 初始化方法
def __init__(self):
# 存储注册的客户端信息
self.clients = {} # 注册的客户端
# 存储授权码
self.authorization_codes = {} # 授权码
# 存储访问令牌
self.access_tokens = {} # 访问令牌
# 服务器密钥
self.server_secret = "oauth_server_secret_key_2025"
# 服务器元数据
self.metadata = {
# 发行者
"issuer": "http://localhost:8001",
# 授权端点
"authorization_endpoint": "http://localhost:8001/oauth/authorize",
# 令牌端点
"token_endpoint": "http://localhost:8001/oauth/token",
# 客户端注册端点
"registration_endpoint": "http://localhost:8001/oauth/register",
# 支持的作用域
"scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
# 支持的响应类型
"response_types_supported": ["code"],
# 支持的授权类型
"grant_types_supported": ["authorization_code", "refresh_token"],
# 令牌端点支持的认证方法
"token_endpoint_auth_methods_supported": [
"client_secret_basic",
"client_secret_post",
],
# 支持的PKCE方法
"code_challenge_methods_supported": ["S256"],
# 服务文档地址
"service_documentation": "https://modelcontextprotocol.io/specification/2025-06-18/basic/authorization",
}
# 创建授权码方法
+ def create_authorization_code(
+ self, client_id, redirect_uri, scope, state, code_challenge=None
+ ):
+ """创建授权码"""
# 检查client_id是否已注册
+ if client_id not in self.clients:
+ return None
# 生成授权码
+ code = str(secrets.token_urlsafe(32))
# 保存授权码及相关信息
+ self.authorization_codes[code] = {
+ "client_id": client_id, # 客户端ID
+ "redirect_uri": redirect_uri, # 重定向URI
+ "scope": scope, # 作用域
+ "state": state, # 状态
+ "code_challenge": code_challenge, # PKCE挑战码
+ "expires_at": datetime.now(timezone.utc) + timedelta(minutes=10), # 过期时间
+ "created_at": datetime.now(timezone.utc), # 创建时间
+ }
# 记录授权码创建日志
+ logger.info(f"创建授权码: {code} for client: {client_id}")
# 返回授权码
+ return code
# 动态客户端注册方法 (RFC7591)
def register_client(self, client_metadata):
"""动态客户端注册 (RFC7591)"""
# 生成客户端ID
client_id = str(secrets.token_urlsafe(16))
# 生成客户端密钥
client_secret = str(secrets.token_urlsafe(32))
# 构造客户端信息字典
client_info = {
"client_id": client_id,
"client_secret": client_secret,
# 客户端ID签发时间(UTC时间戳)
"client_id_issued_at": int(datetime.now(timezone.utc).timestamp()),
# 客户端密钥过期时间(0表示永不过期)
"client_secret_expires_at": 0, # 永不过期
# 合并客户端元数据
**client_metadata,
}
# 保存客户端信息到clients字典
self.clients[client_id] = client_info
# 记录新客户端注册日志
logger.info(f"新客户端注册: {client_id}")
# 返回客户端信息
return client_info
# 定义OAuthHandler类,继承自BaseHTTPRequestHandler
class OAuthHandler(BaseHTTPRequestHandler):
"""OAuth HTTP处理器"""
# 初始化方法,接收oauth_server参数
def __init__(self, *args, oauth_server=None, **kwargs):
# 保存OAuthServer实例
self.oauth_server = oauth_server
# 调用父类初始化方法
super().__init__(*args, **kwargs)
# 构建查询字符串的方法
+ def _build_query_string(self, params):
+ """构建查询字符串"""
+ return "&".join([f"{k}={v}" for k, v in params.items()])
# 处理授权请求的方法
+ def _handle_authorization(self):
+ """处理授权请求"""
+ try:
# 解析查询参数
+ query = urlparse(self.path).query
+ params = parse_qs(query)
# 获取client_id参数
+ client_id = params.get("client_id", [""])[0]
# 获取redirect_uri参数
+ redirect_uri = params.get("redirect_uri", [""])[0]
# 获取scope参数
+ scope = params.get("scope", [""])[0]
# 获取state参数
+ state = params.get("state", [""])[0]
# 获取code_challenge参数
+ code_challenge = params.get("code_challenge", [""])[0]
# 获取resource参数(RFC8707资源指示器)
+ resource = params.get("resource", [""])[0]
# 记录授权请求日志
+ logger.info(
+ f"授权请求: client_id={client_id}, scope={scope}, resource={resource}"
+ )
# 验证客户端是否已注册
+ if client_id not in self.oauth_server.clients:
+ self.send_error(400, "Invalid client_id")
+ return
# 创建授权码
+ code = self.oauth_server.create_authorization_code(
+ client_id, redirect_uri, scope, state, code_challenge
+ )
# 检查授权码是否创建成功
+ if not code:
+ self.send_error(400, "Failed to create authorization code")
+ return
# 构造重定向参数
+ redirect_params = {"code": code, "state": state}
# 构造重定向URL
+ redirect_url = f"{redirect_uri}?{self._build_query_string(redirect_params)}"
# 发送重定向响应
+ self.send_response(302)
+ self.send_header("Location", redirect_url)
+ self.end_headers()
# 记录授权成功日志
+ logger.info(f"授权成功,重定向到: {redirect_url}")
+ except Exception as e:
# 记录授权处理错误日志
+ logger.error(f"授权处理错误: {e}")
# 返回400错误
+ self.send_error(400, "Bad Request")
# 处理GET请求的方法
+ def do_GET(self):
+ """处理GET请求"""
+ try:
# 判断请求路径是否为授权端点
+ if self.path.startswith("/oauth/authorize"):
+ self._handle_authorization()
+ else:
+ self.send_error(404, "Not Found")
+ except Exception as e:
# 记录GET处理错误日志
+ logger.error(f"GET处理错误: {e}")
# 返回500错误
+ self.send_error(500, "Internal Server Error")
# 处理客户端注册请求的方法
def _handle_client_registration(self):
"""处理客户端注册请求"""
try:
# 获取请求体长度
content_length = int(self.headers.get("Content-Length", 0))
# 读取请求体内容
body = self.rfile.read(content_length)
# 解析JSON格式的客户端元数据
client_metadata = json.loads(body.decode("utf-8"))
# 调用OAuthServer的register_client方法注册客户端
client_info = self.oauth_server.register_client(client_metadata)
# 发送201响应,表示创建成功
self.send_response(201)
# 设置响应头为application/json
self.send_header("Content-Type", "application/json")
# 结束响应头
self.end_headers()
# 返回注册成功的客户端信息(JSON格式)
self.wfile.write(json.dumps(client_info).encode("utf-8"))
except Exception as e:
# 记录错误日志
logger.error(f"客户端注册错误: {e}")
# 返回400错误
self.send_error(400, "Bad Request")
# 处理POST请求的方法
def do_POST(self):
"""处理POST请求"""
try:
# 如果请求路径为/oauth/register,处理客户端注册
if self.path == "/oauth/register":
self._handle_client_registration()
# 其他路径返回404
else:
self.send_error(404, "Not Found")
except Exception as e:
# 记录POST处理错误日志
logger.error(f"POST处理错误: {e}")
# 返回500错误
self.send_error(500, "Internal Server Error")
# 运行OAuth授权服务器的函数
def run_oauth_server(host="localhost", port=8001):
"""运行OAuth授权服务器"""
# 创建OAuthServer实例
oauth_server = OAuthServer()
# 定义处理器工厂函数,用于传递oauth_server实例
def handler_factory(*args, **kwargs):
return OAuthHandler(*args, oauth_server=oauth_server, **kwargs)
# 创建HTTPServer实例,绑定主机和端口
server = HTTPServer((host, port), handler_factory)
# 记录服务器启动信息
logger.info(f"OAuth授权服务器运行在 http://{host}:{port}")
# 记录支持的端点信息
logger.info("支持的端点:")
logger.info(
f" - 服务器元数据: http://{host}:{port}/.well-known/oauth-authorization-server"
)
logger.info(f" - 授权端点: http://{host}:{port}/oauth/authorize")
logger.info(f" - 令牌端点: http://{host}:{port}/oauth/token")
logger.info(f" - 客户端注册: http://{host}:{port}/oauth/register")
try:
# 启动服务器,进入循环监听请求
server.serve_forever()
except KeyboardInterrupt:
# 捕获Ctrl+C,记录服务器停止
logger.info("OAuth授权服务器已停止")
finally:
# 关闭服务器
server.server_close()
# 判断是否为主程序入口
if __name__ == "__main__":
# 导入argparse模块,用于解析命令行参数
import argparse
# 创建ArgumentParser对象,设置描述信息
parser = argparse.ArgumentParser(description="MCP OAuth授权服务器")
# 添加--host参数,指定服务器主机,默认localhost
parser.add_argument(
"--host", default="localhost", help="服务器主机 (默认: localhost)"
)
# 添加--port参数,指定服务器端口,默认8001
parser.add_argument(
"--port", type=int, default=8001, help="服务器端口 (默认: 8001)"
)
# 解析命令行参数
args = parser.parse_args()
# 启动OAuth授权服务器
run_oauth_server(args.host, args.port)
3.2. client.py #
client.py
# 导入requests库,用于发送HTTP请求
import requests
# 导入secrets库,用于生成安全的随机字符串
+import secrets
# 从urllib.parse导入urlencode,用于URL参数编码
+from urllib.parse import urlencode
# 导入hashlib库,用于哈希运算
+import hashlib
# 导入base64库,用于base64编码
+import base64
# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:
# 构造函数,初始化MCP服务器和OAuth服务器的URL
def __init__(
self,
mcp_server_url="http://localhost:8000",
oauth_server_url="http://localhost:8001",
):
# 去除MCP服务器URL末尾的斜杠
self.mcp_server_url = mcp_server_url.rstrip("/")
# 去除OAuth服务器URL末尾的斜杠
self.oauth_server_url = oauth_server_url.rstrip("/")
# 生成PKCE挑战码的方法
+ def generate_pkce_challenge(self):
# 生成code_verifier(高强度随机字符串)
+ code_verifier = secrets.token_urlsafe(32)
# 计算SHA256哈希并进行base64-url编码,去除末尾的等号
+ code_challenge = (
+ base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
+ .decode()
+ .rstrip("=")
+ )
# 返回code_verifier和code_challenge
+ return code_verifier, code_challenge
# 开始OAuth授权流程的方法
+ def start_authorization_flow(self, scope="mcp:tools"):
# 打印流程提示信息
+ print("\n 步骤3: 开始OAuth授权流程")
+ print("=" * 50)
+ try:
# 检查是否已发现授权服务器
+ if not self.authorization_server_metadata:
+ print(" 请先发现授权服务器")
+ return None
# 获取授权端点URL
+ authorization_endpoint = self.authorization_server_metadata[
+ "authorization_endpoint"
+ ]
# 生成PKCE挑战码
+ code_verifier, code_challenge = self.generate_pkce_challenge()
# 构造授权请求参数
+ auth_params = {
+ "response_type": "code",# 响应类型
+ "client_id": self.client_id, # 客户端ID
+ "redirect_uri": "http://localhost:8080/callback", # 重定向URI
+ "scope": scope, # 作用域
+ "state": secrets.token_urlsafe(16), # 状态
+ "code_challenge": code_challenge, # PKCE挑战码
+ "code_challenge_method": "S256", # PKCE挑战方法
+ "resource": self.mcp_server_url, # RFC8707资源指示器
+ }
# 构造授权URL
+ auth_url = f"{authorization_endpoint}?{urlencode(auth_params)}"
# 打印授权请求参数
+ print(" OAuth授权请求参数:")
+ print(f" 响应类型: {auth_params['response_type']}")
+ print(f" 客户端ID: {auth_params['client_id']}")
+ print(f" 重定向URI: {auth_params['redirect_uri']}")
+ print(f" 作用域: {auth_params['scope']}")
+ print(f" 状态: {auth_params['state']}")
+ print(f" 代码挑战: {auth_params['code_challenge'][:16]}...")
+ print(f" 代码挑战方法: {auth_params['code_challenge_method']}")
+ print(f" 资源指示器: {auth_params['resource']}")
# 打印授权URL
+ print(f"\n🔗 授权URL:")
+ print(f" {auth_url}")
# 提示用户在浏览器中访问授权URL
+ print(f"\n 请在浏览器中访问上述URL完成授权")
+ print(f" 注意: 实际应用中需要用户交互")
# 返回授权流程相关信息
+ return {
+ "auth_url": auth_url, # 授权URL
+ "code_verifier": code_verifier, # PKCE挑战码
+ "state": auth_params["state"], # 状态
+ "auth_params": auth_params, # 授权参数
+ }
# 捕获异常并打印错误信息
+ except Exception as e:
+ print(f" 开始授权流程失败: {e}")
+ return None
# 动态客户端注册方法
def register_client(self, client_name="MCP客户端"):
# 打印步骤提示
print("\n📝 步骤2: 动态客户端注册")
print("=" * 50)
try:
# 检查是否已发现授权服务器
if not self.authorization_server_metadata:
print(" 请先发现授权服务器")
return False
# 获取注册端点
registration_endpoint = self.authorization_server_metadata[
"registration_endpoint"
]
# 构造客户端注册请求体
client_metadata = {
"client_name": client_name, # 客户端名称
"redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
"grant_types": ["authorization_code"], # 授权类型
"response_types": ["code"], # 响应类型
"token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
"scope": "mcp:tools mcp:resources", # 作用域
"application_type": "web", # 应用类型
}
# 打印注册请求相关信息
print(f"📤 发送客户端注册请求到: {registration_endpoint}")
print(f" 客户端名称: {client_name}")
print(f" 重定向URI: {client_metadata['redirect_uris']}")
print(f" 授权类型: {client_metadata['grant_types']}")
print(f" 作用域: {client_metadata['scope']}")
# 发送POST请求进行客户端注册
response = requests.post(registration_endpoint, json=client_metadata)
# 判断注册是否成功
if response.status_code == 201:
# 解析返回的客户端信息
client_info = response.json()
# 保存客户端ID
self.client_id = client_info["client_id"]
# 保存客户端密钥
self.client_secret = client_info["client_secret"]
# 打印注册成功信息
print(" 客户端注册成功")
print(f" 客户端ID: {self.client_id}")
print(f" 客户端密钥: {self.client_secret[:8]}...")
return True
else:
# 注册失败,打印错误信息
print(f" 客户端注册失败: {response.status_code}")
print(f" 错误信息: {response.text}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 客户端注册失败: {e}")
return False
# 发现授权服务器的方法
def discover_authorization_server(self):
try:
# 发送GET请求获取资源服务器的元数据
response = requests.get(
f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
)
# 判断响应状态码是否为200
if response.status_code == 200:
# 解析响应的JSON数据
resource_metadata = response.json()
# 打印获取资源元数据成功的信息
print(f" 获取资源元数据成功")
# 打印资源标识符
print(f" 资源标识符: {resource_metadata.get('resource')}")
# 获取授权服务器列表
auth_servers = resource_metadata.get("authorization_servers", [])
# 判断是否存在授权服务器
if auth_servers:
# 取第一个授权服务器
auth_server = auth_servers[0]
# 保存授权服务器的元数据
self.authorization_server_metadata = auth_server
# 获取发行者信息
issuer = auth_server.get("issuer")
# 获取注册端点
registration_endpoint = auth_server.get("registration_endpoint")
# 获取授权端点
authorization_endpoint = auth_server.get("authorization_endpoint")
# 获取令牌端点
token_endpoint = auth_server.get("token_endpoint")
# 获取支持的作用域
scopes_supported = auth_server.get("scopes_supported")
# 获取支持的PKCE方法
code_challenge_methods_supported = auth_server.get(
"code_challenge_methods_supported"
)
# 打印发现授权服务器成功的信息
print(f" 发现授权服务器")
# 打印发行者
print(f" 发行者: {issuer}")
# 打印注册端点
print(f" 注册端点: {registration_endpoint}")
# 打印授权端点
print(f" 授权端点: {authorization_endpoint}")
# 打印令牌端点
print(f" 令牌端点: {token_endpoint}")
# 打印支持的作用域
print(f" 支持的作用域: {', '.join(scopes_supported)}")
# 打印PKCE支持的方法
print(f" PKCE支持: {', '.join(code_challenge_methods_supported)}")
# 返回True,表示发现成功
return True
else:
# 未找到授权服务器,打印错误信息
print(" 未找到授权服务器")
return False
else:
# 获取资源元数据失败,打印状态码
print(f" 获取资源元数据失败: {response.status_code}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 发现授权服务器失败: {e}")
return False
# 运行演示流程的方法
def run(self):
# 步骤1: 发现授权服务器
if not self.discover_authorization_server():
print(" 演示失败:无法发现授权服务器")
return False
# 步骤2: 注册客户端
if not self.register_client():
print(" 演示失败:客户端注册失败")
return False
# 步骤3: 开始授权流程
+ auth_flow = self.start_authorization_flow()
+ if not auth_flow:
+ print(" 演示失败:无法开始授权流程")
+ return False
# 主函数,程序入口
def main():
try:
# 创建OAuthClient实例
client = OAuthClient()
# 运行演示流程
client.run()
# 捕获用户中断异常
except KeyboardInterrupt:
print("\n\n演示被用户中断")
# 捕获其他异常并打印错误信息
except Exception as e:
print(f"\n演示过程中发生错误: {e}")
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
4.模拟授权过程 #
4.1. client.py #
client.py
# 导入requests库,用于发送HTTP请求
import requests
# 导入secrets库,用于生成安全的随机字符串
import secrets
# 从urllib.parse导入urlencode、parse_qs、urlparse,用于URL参数编码和解析
+from urllib.parse import urlencode, parse_qs, urlparse
# 导入hashlib库,用于哈希运算
import hashlib
# 导入base64库,用于base64编码
import base64
# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:
# 构造函数,初始化MCP服务器和OAuth服务器的URL
def __init__(
self,
mcp_server_url="http://localhost:8000",
oauth_server_url="http://localhost:8001",
):
# 去除MCP服务器URL末尾的斜杠
self.mcp_server_url = mcp_server_url.rstrip("/")
# 去除OAuth服务器URL末尾的斜杠
self.oauth_server_url = oauth_server_url.rstrip("/")
# 生成PKCE挑战码的方法
def generate_pkce_challenge(self):
# 生成code_verifier(高强度随机字符串)
code_verifier = secrets.token_urlsafe(32)
# 计算SHA256哈希并进行base64-url编码,去除末尾的等号
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
.decode()
.rstrip("=")
)
# 返回code_verifier和code_challenge
return code_verifier, code_challenge
# 模拟授权过程
+ def simulate_authorization(self, auth_flow):
# 打印模拟授权过程的提示信息
+ print("\n🎭 步骤4: 模拟授权过程")
+ print("=" * 50)
+ try:
# 从auth_flow中获取授权URL
+ auth_url = auth_flow["auth_url"]
# 发送GET请求到授权端点,不自动重定向
+ response = requests.get(auth_url, allow_redirects=False)
# 如果返回302重定向
+ if response.status_code == 302:
# 获取重定向的Location头
+ redirect_url = response.headers.get("Location")
+ if redirect_url:
# 解析重定向URL
+ parsed_url = urlparse(redirect_url)
# 解析URL中的查询参数
+ query_params = parse_qs(parsed_url.query)
# 获取授权码
+ authorization_code = query_params.get("code", [None])[0]
# 获取state参数
+ state = query_params.get("state", [None])[0]
# 检查授权码和state是否匹配
+ if authorization_code and state == auth_flow["state"]:
+ print(" 模拟授权成功")
+ print(f" 授权码: {authorization_code[:16]}...")
+ print(f" 状态: {state}")
+ return authorization_code
+ else:
+ print(" 授权码获取失败")
+ return None
+ else:
+ print(" 未找到重定向URL")
+ return None
+ else:
+ print(f" 授权请求失败: {response.status_code}")
+ return None
# 捕获异常并打印错误信息
+ except Exception as e:
+ print(f" 模拟授权失败: {e}")
+ return None
# 开始OAuth授权流程的方法
def start_authorization_flow(self, scope="mcp:tools"):
# 打印流程提示信息
print("\n 步骤3: 开始OAuth授权流程")
print("=" * 50)
try:
# 检查是否已发现授权服务器
if not self.authorization_server_metadata:
print(" 请先发现授权服务器")
return None
# 获取授权端点URL
authorization_endpoint = self.authorization_server_metadata[
"authorization_endpoint"
]
# 生成PKCE挑战码
code_verifier, code_challenge = self.generate_pkce_challenge()
# 构造授权请求参数
auth_params = {
"response_type": "code",# 响应类型
"client_id": self.client_id, # 客户端ID
"redirect_uri": "http://localhost:8080/callback", # 重定向URI
"scope": scope, # 作用域
"state": secrets.token_urlsafe(16), # 状态
"code_challenge": code_challenge, # PKCE挑战码
"code_challenge_method": "S256", # PKCE挑战方法
"resource": self.mcp_server_url, # RFC8707资源指示器
}
# 构造授权URL
auth_url = f"{authorization_endpoint}?{urlencode(auth_params)}"
# 打印授权请求参数
print(" OAuth授权请求参数:")
print(f" 响应类型: {auth_params['response_type']}")
print(f" 客户端ID: {auth_params['client_id']}")
print(f" 重定向URI: {auth_params['redirect_uri']}")
print(f" 作用域: {auth_params['scope']}")
print(f" 状态: {auth_params['state']}")
print(f" 代码挑战: {auth_params['code_challenge'][:16]}...")
print(f" 代码挑战方法: {auth_params['code_challenge_method']}")
print(f" 资源指示器: {auth_params['resource']}")
# 打印授权URL
print(f"\n🔗 授权URL:")
print(f" {auth_url}")
# 提示用户在浏览器中访问授权URL
print(f"\n 请在浏览器中访问上述URL完成授权")
print(f" 注意: 实际应用中需要用户交互")
# 返回授权流程相关信息
return {
"auth_url": auth_url, # 授权URL
"code_verifier": code_verifier, # PKCE挑战码
"state": auth_params["state"], # 状态
"auth_params": auth_params, # 授权参数
}
# 捕获异常并打印错误信息
except Exception as e:
print(f" 开始授权流程失败: {e}")
return None
# 动态客户端注册方法
def register_client(self, client_name="MCP客户端"):
# 打印步骤提示
print("\n📝 步骤2: 动态客户端注册")
print("=" * 50)
try:
# 检查是否已发现授权服务器
if not self.authorization_server_metadata:
print(" 请先发现授权服务器")
return False
# 获取注册端点
registration_endpoint = self.authorization_server_metadata[
"registration_endpoint"
]
# 构造客户端注册请求体
client_metadata = {
"client_name": client_name, # 客户端名称
"redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
"grant_types": ["authorization_code"], # 授权类型
"response_types": ["code"], # 响应类型
"token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
"scope": "mcp:tools mcp:resources", # 作用域
"application_type": "web", # 应用类型
}
# 打印注册请求相关信息
print(f"📤 发送客户端注册请求到: {registration_endpoint}")
print(f" 客户端名称: {client_name}")
print(f" 重定向URI: {client_metadata['redirect_uris']}")
print(f" 授权类型: {client_metadata['grant_types']}")
print(f" 作用域: {client_metadata['scope']}")
# 发送POST请求进行客户端注册
response = requests.post(registration_endpoint, json=client_metadata)
# 判断注册是否成功
if response.status_code == 201:
# 解析返回的客户端信息
client_info = response.json()
# 保存客户端ID
self.client_id = client_info["client_id"]
# 保存客户端密钥
self.client_secret = client_info["client_secret"]
# 打印注册成功信息
print(" 客户端注册成功")
print(f" 客户端ID: {self.client_id}")
print(f" 客户端密钥: {self.client_secret[:8]}...")
return True
else:
# 注册失败,打印错误信息
print(f" 客户端注册失败: {response.status_code}")
print(f" 错误信息: {response.text}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 客户端注册失败: {e}")
return False
# 发现授权服务器的方法
def discover_authorization_server(self):
try:
# 发送GET请求获取资源服务器的元数据
response = requests.get(
f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
)
# 判断响应状态码是否为200
if response.status_code == 200:
# 解析响应的JSON数据
resource_metadata = response.json()
# 打印获取资源元数据成功的信息
print(f" 获取资源元数据成功")
# 打印资源标识符
print(f" 资源标识符: {resource_metadata.get('resource')}")
# 获取授权服务器列表
auth_servers = resource_metadata.get("authorization_servers", [])
# 判断是否存在授权服务器
if auth_servers:
# 取第一个授权服务器
auth_server = auth_servers[0]
# 保存授权服务器的元数据
self.authorization_server_metadata = auth_server
# 获取发行者信息
issuer = auth_server.get("issuer")
# 获取注册端点
registration_endpoint = auth_server.get("registration_endpoint")
# 获取授权端点
authorization_endpoint = auth_server.get("authorization_endpoint")
# 获取令牌端点
token_endpoint = auth_server.get("token_endpoint")
# 获取支持的作用域
scopes_supported = auth_server.get("scopes_supported")
# 获取支持的PKCE方法
code_challenge_methods_supported = auth_server.get(
"code_challenge_methods_supported"
)
# 打印发现授权服务器成功的信息
print(f" 发现授权服务器")
print(f" 发行者: {issuer}")
print(f" 注册端点: {registration_endpoint}")
print(f" 授权端点: {authorization_endpoint}")
print(f" 令牌端点: {token_endpoint}")
print(f" 支持的作用域: {', '.join(scopes_supported)}")
print(f" PKCE支持: {', '.join(code_challenge_methods_supported)}")
# 返回True,表示发现成功
return True
else:
# 未找到授权服务器,打印错误信息
print(" 未找到授权服务器")
return False
else:
# 获取资源元数据失败,打印状态码
print(f" 获取资源元数据失败: {response.status_code}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 发现授权服务器失败: {e}")
return False
# 运行演示流程的方法
def run(self):
# 步骤1: 发现授权服务器
if not self.discover_authorization_server():
print(" 演示失败:无法发现授权服务器")
return False
# 步骤2: 注册客户端
if not self.register_client():
print(" 演示失败:客户端注册失败")
return False
# 步骤3: 开始授权流程
auth_flow = self.start_authorization_flow()
if not auth_flow:
print(" 演示失败:无法开始授权流程")
return False
# 步骤4: 模拟授权
+ authorization_code = self.simulate_authorization(auth_flow)
+ if not authorization_code:
+ print(" 演示失败:授权失败")
+ return False
# 主函数,程序入口
def main():
try:
# 创建OAuthClient实例
client = OAuthClient()
# 运行演示流程
client.run()
# 捕获用户中断异常
except KeyboardInterrupt:
print("\n\n演示被用户中断")
# 捕获其他异常并打印错误信息
except Exception as e:
print(f"\n演示过程中发生错误: {e}")
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
5.交换访问令牌 #
5.1. auth_server.py #
auth_server.py
# 导入日志模块
import logging
# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler
# 导入json模块
import json
# 导入secrets模块用于生成安全的随机数
import secrets
# 导入datetime相关类
from datetime import datetime, timedelta, timezone
from urllib.parse import parse_qs, urlparse
# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义OAuthServer类,作为OAuth 2.1授权服务器
class OAuthServer:
"""OAuth 2.1授权服务器"""
# 初始化方法
def __init__(self):
# 存储注册的客户端信息
self.clients = {} # 注册的客户端
# 存储授权码
self.authorization_codes = {} # 授权码
# 存储访问令牌
self.access_tokens = {} # 访问令牌
# 服务器密钥
self.server_secret = "oauth_server_secret_key_2025"
# 服务器元数据
self.metadata = {
# 发行者
"issuer": "http://localhost:8001",
# 授权端点
"authorization_endpoint": "http://localhost:8001/oauth/authorize",
# 令牌端点
"token_endpoint": "http://localhost:8001/oauth/token",
# 客户端注册端点
"registration_endpoint": "http://localhost:8001/oauth/register",
# 支持的作用域
"scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
# 支持的响应类型
"response_types_supported": ["code"],
# 支持的授权类型
"grant_types_supported": ["authorization_code", "refresh_token"],
# 令牌端点支持的认证方法
"token_endpoint_auth_methods_supported": [
"client_secret_basic",
"client_secret_post",
],
# 支持的PKCE方法
"code_challenge_methods_supported": ["S256"],
# 服务文档地址
"service_documentation": "https://modelcontextprotocol.io/specification/2025-06-18/basic/authorization",
}
# 创建授权码方法
def create_authorization_code(
self, client_id, redirect_uri, scope, state, code_challenge=None
):
"""创建授权码"""
# 检查client_id是否已注册
if client_id not in self.clients:
return None
# 生成授权码
code = str(secrets.token_urlsafe(32))
# 保存授权码及相关信息
self.authorization_codes[code] = {
"client_id": client_id, # 客户端ID
"redirect_uri": redirect_uri, # 重定向URI
"scope": scope, # 作用域
"state": state, # 状态
"code_challenge": code_challenge, # PKCE挑战码
"expires_at": datetime.now(timezone.utc) + timedelta(minutes=10), # 过期时间
"created_at": datetime.now(timezone.utc), # 创建时间
}
# 记录授权码创建日志
logger.info(f"创建授权码: {code} for client: {client_id}")
# 返回授权码
return code
# 验证授权码方法
+ def validate_authorization_code(self, code, client_id, code_verifier=None):
+ """验证授权码"""
# 检查授权码是否存在
+ if code not in self.authorization_codes:
+ return None
# 获取授权码信息
+ auth_code = self.authorization_codes[code]
# 检查授权码是否过期
+ if datetime.now(timezone.utc) > auth_code["expires_at"]:
+ del self.authorization_codes[code]
+ return None
# 检查客户端ID是否匹配
+ if auth_code["client_id"] != client_id:
+ return None
# 如果使用了PKCE,验证code_verifier
+ if auth_code["code_challenge"] and code_verifier:
+ import hashlib
+ import base64
# 计算code_verifier的SHA256哈希
+ code_verifier_hash = hashlib.sha256(code_verifier.encode()).digest()
# 进行base64 url安全编码并去除末尾的等号
+ code_verifier_b64 = (
+ base64.urlsafe_b64encode(code_verifier_hash).decode().rstrip("=")
+ )
# 比较code_verifier_b64和code_challenge
+ if code_verifier_b64 != auth_code["code_challenge"]:
+ return None
# 删除已使用的授权码
+ result = auth_code.copy()
+ del self.authorization_codes[code]
+ return result
# 创建JWT访问令牌方法
+ def create_access_token(self, client_id, scope, user_id="anonymous"):
+ """创建JWT访问令牌"""
# 导入jwt模块
+ import jwt
+ from datetime import datetime, timedelta
# 构造JWT载荷
+ payload = {
+ "iss": self.metadata["issuer"], # 发行者
+ "aud": "http://localhost:8000", # 目标资源(MCP服务器)
+ "sub": user_id,
+ "client_id": client_id,
+ "scope": scope,
+ "iat": datetime.now(timezone.utc),
+ "exp": datetime.now(timezone.utc) + timedelta(hours=1),
+ }
# 使用JWT编码令牌,明确指定audience
+ token = jwt.encode(payload, self.server_secret, algorithm="HS256")
# 存储令牌信息(用于调试)
+ self.access_tokens[token] = {
+ "client_id": client_id,
+ "scope": scope,
+ "user_id": user_id,
+ "created_at": datetime.now(timezone.utc),
+ "expires_at": datetime.now(timezone.utc) + timedelta(hours=1),
+ }
# 记录令牌创建日志
+ logger.info(f"创建JWT访问令牌: {token[:16]}... for client: {client_id}")
+ logger.info(f"令牌包含audience: {payload['aud']}, scope: {payload['scope']}")
+ return token
# 动态客户端注册方法 (RFC7591)
def register_client(self, client_metadata):
"""动态客户端注册 (RFC7591)"""
# 生成客户端ID
client_id = str(secrets.token_urlsafe(16))
# 生成客户端密钥
client_secret = str(secrets.token_urlsafe(32))
# 构造客户端信息字典
client_info = {
"client_id": client_id,
"client_secret": client_secret,
# 客户端ID签发时间(UTC时间戳)
"client_id_issued_at": int(datetime.now(timezone.utc).timestamp()),
# 客户端密钥过期时间(0表示永不过期)
"client_secret_expires_at": 0, # 永不过期
# 合并客户端元数据
**client_metadata,
}
# 保存客户端信息到clients字典
self.clients[client_id] = client_info
# 记录新客户端注册日志
logger.info(f"新客户端注册: {client_id}")
# 返回客户端信息
return client_info
# 定义OAuthHandler类,继承自BaseHTTPRequestHandler
class OAuthHandler(BaseHTTPRequestHandler):
"""OAuth HTTP处理器"""
# 初始化方法,接收oauth_server参数
def __init__(self, *args, oauth_server=None, **kwargs):
# 保存OAuthServer实例
self.oauth_server = oauth_server
# 调用父类初始化方法
super().__init__(*args, **kwargs)
# 构建查询字符串的方法
def _build_query_string(self, params):
"""构建查询字符串"""
# 将参数字典拼接为查询字符串
return "&".join([f"{k}={v}" for k, v in params.items()])
# 处理授权请求的方法
def _handle_authorization(self):
"""处理授权请求"""
try:
# 解析查询参数
query = urlparse(self.path).query
params = parse_qs(query)
# 获取client_id参数
client_id = params.get("client_id", [""])[0]
# 获取redirect_uri参数
redirect_uri = params.get("redirect_uri", [""])[0]
# 获取scope参数
scope = params.get("scope", [""])[0]
# 获取state参数
state = params.get("state", [""])[0]
# 获取code_challenge参数
code_challenge = params.get("code_challenge", [""])[0]
# 获取resource参数(RFC8707资源指示器)
resource = params.get("resource", [""])[0]
# 记录授权请求日志
logger.info(
f"授权请求: client_id={client_id}, scope={scope}, resource={resource}"
)
# 验证客户端是否已注册
if client_id not in self.oauth_server.clients:
self.send_error(400, "Invalid client_id")
return
# 创建授权码
code = self.oauth_server.create_authorization_code(
client_id, redirect_uri, scope, state, code_challenge
)
# 检查授权码是否创建成功
if not code:
self.send_error(400, "Failed to create authorization code")
return
# 构造重定向参数
redirect_params = {"code": code, "state": state}
# 构造重定向URL
redirect_url = f"{redirect_uri}?{self._build_query_string(redirect_params)}"
# 发送重定向响应
self.send_response(302)
self.send_header("Location", redirect_url)
self.end_headers()
# 记录授权成功日志
logger.info(f"授权成功,重定向到: {redirect_url}")
except Exception as e:
# 记录授权处理错误日志
logger.error(f"授权处理错误: {e}")
# 返回400错误
self.send_error(400, "Bad Request")
# 处理GET请求的方法
def do_GET(self):
"""处理GET请求"""
try:
# 判断请求路径是否为授权端点
if self.path.startswith("/oauth/authorize"):
self._handle_authorization()
else:
self.send_error(404, "Not Found")
except Exception as e:
# 记录GET处理错误日志
logger.error(f"GET处理错误: {e}")
# 返回500错误
self.send_error(500, "Internal Server Error")
# 处理授权码授权类型
+ def _handle_authorization_code_grant(self, token_request):
+ """处理授权码授权类型"""
# 获取code参数
+ code = token_request.get("code", [""])[0]
# 获取client_id参数
+ client_id = token_request.get("client_id", [""])[0]
# 获取code_verifier参数
+ code_verifier = token_request.get("code_verifier", [""])[0]
# 获取resource参数(RFC8707资源指示器)
+ resource = token_request.get("resource", [""])[0]
# 记录令牌请求日志
+ logger.info(
+ f"令牌请求: code={code[:16]}..., client_id={client_id}, resource={resource}"
+ )
# 验证授权码
+ auth_code = self.oauth_server.validate_authorization_code(
+ code, client_id, code_verifier
+ )
# 如果授权码验证失败
+ if not auth_code:
+ logger.error("授权码验证失败")
+ self.send_error(400, "Invalid authorization code")
+ return None
# 记录授权码验证成功日志
+ logger.info(f"授权码验证成功,scope: {auth_code['scope']}")
# 创建访问令牌
+ access_token = self.oauth_server.create_access_token(
+ client_id, auth_code["scope"]
+ )
# 记录访问令牌创建成功日志
+ logger.info(f"访问令牌创建成功,包含scope: {auth_code['scope']}")
# 返回令牌响应
+ return {
+ "access_token": access_token,
+ "token_type": "Bearer",
+ "expires_in": 3600,
+ "scope": auth_code["scope"],
+ }
# 创建JWT访问令牌(此方法未被调用,保留)
+ def create_access_token(self, client_id, scope, user_id="anonymous"):
+ """创建JWT访问令牌"""
# 导入jwt模块
+ import jwt
+ from datetime import datetime, timedelta
# 构造JWT载荷
+ payload = {
+ "iss": self.metadata["issuer"], # 发行者
+ "aud": "http://localhost:8000", # 目标资源(MCP服务器)
+ "sub": user_id,
+ "client_id": client_id,
+ "scope": scope,
+ "iat": datetime.now(timezone.utc),
+ "exp": datetime.now(timezone.utc) + timedelta(hours=1),
+ }
# 使用JWT编码令牌,明确指定audience
+ token = jwt.encode(payload, self.server_secret, algorithm="HS256")
# 存储令牌信息(用于调试)
+ self.access_tokens[token] = {
+ "client_id": client_id, # 客户端ID
+ "scope": scope, # 作用域
+ "user_id": user_id, # 用户ID
+ "created_at": datetime.now(timezone.utc), # 创建时间
+ "expires_at": datetime.now(timezone.utc) + timedelta(hours=1), # 过期时间
+ }
# 记录令牌创建日志
+ logger.info(f"创建JWT访问令牌: {token[:16]}... for client: {client_id}")
+ logger.info(f"令牌包含audience: {payload['aud']}, scope: {payload['scope']}")
+ return token
# 处理令牌请求的方法
+ def _handle_token(self):
+ """处理令牌请求"""
+ try:
# 获取请求体长度
+ content_length = int(self.headers.get("Content-Length", 0))
# 读取请求体内容
+ body = self.rfile.read(content_length)
# 解析请求体为字典
+ token_request = parse_qs(body.decode("utf-8"))
# 获取grant_type参数
+ grant_type = token_request.get("grant_type", [""])[0]
# 判断授权类型
+ if grant_type == "authorization_code":
+ response = self._handle_authorization_code_grant(token_request)
+ else:
+ self.send_error(400, "Unsupported grant type")
+ return
# 如果有响应,返回JSON
+ if response:
+ self.send_response(200)
+ self.send_header("Content-Type", "application/json")
+ self.end_headers()
+ self.wfile.write(json.dumps(response).encode("utf-8"))
+ except Exception as e:
# 记录令牌请求错误日志
+ logger.error(f"令牌请求错误: {e}")
+ self.send_error(400, "Bad Request")
# 处理客户端注册请求的方法
def _handle_client_registration(self):
"""处理客户端注册请求"""
try:
# 获取请求体长度
content_length = int(self.headers.get("Content-Length", 0))
# 读取请求体内容
body = self.rfile.read(content_length)
# 解析JSON格式的客户端元数据
client_metadata = json.loads(body.decode("utf-8"))
# 调用OAuthServer的register_client方法注册客户端
client_info = self.oauth_server.register_client(client_metadata)
# 发送201响应,表示创建成功
self.send_response(201)
# 设置响应头为application/json
self.send_header("Content-Type", "application/json")
# 结束响应头
self.end_headers()
# 返回注册成功的客户端信息(JSON格式)
self.wfile.write(json.dumps(client_info).encode("utf-8"))
except Exception as e:
# 记录错误日志
logger.error(f"客户端注册错误: {e}")
# 返回400错误
self.send_error(400, "Bad Request")
# 处理POST请求的方法
def do_POST(self):
"""处理POST请求"""
try:
# 如果请求路径为/oauth/register,处理客户端注册
if self.path == "/oauth/register":
self._handle_client_registration()
# 如果请求路径为/oauth/token,处理令牌请求
+ elif self.path == "/oauth/token":
+ self._handle_token()
# 其他路径返回404
else:
self.send_error(404, "Not Found")
except Exception as e:
# 记录POST处理错误日志
logger.error(f"POST处理错误: {e}")
# 返回500错误
self.send_error(500, "Internal Server Error")
# 运行OAuth授权服务器的函数
def run_oauth_server(host="localhost", port=8001):
"""运行OAuth授权服务器"""
# 创建OAuthServer实例
oauth_server = OAuthServer()
# 定义处理器工厂函数,用于传递oauth_server实例
def handler_factory(*args, **kwargs):
return OAuthHandler(*args, oauth_server=oauth_server, **kwargs)
# 创建HTTPServer实例,绑定主机和端口
server = HTTPServer((host, port), handler_factory)
# 记录服务器启动信息
logger.info(f"OAuth授权服务器运行在 http://{host}:{port}")
# 记录支持的端点信息
logger.info("支持的端点:")
logger.info(
f" - 服务器元数据: http://{host}:{port}/.well-known/oauth-authorization-server"
)
logger.info(f" - 授权端点: http://{host}:{port}/oauth/authorize")
logger.info(f" - 令牌端点: http://{host}:{port}/oauth/token")
logger.info(f" - 客户端注册: http://{host}:{port}/oauth/register")
try:
# 启动服务器,进入循环监听请求
server.serve_forever()
except KeyboardInterrupt:
# 捕获Ctrl+C,记录服务器停止
logger.info("OAuth授权服务器已停止")
finally:
# 关闭服务器
server.server_close()
# 判断是否为主程序入口
if __name__ == "__main__":
# 导入argparse模块,用于解析命令行参数
import argparse
# 创建ArgumentParser对象,设置描述信息
parser = argparse.ArgumentParser(description="MCP OAuth授权服务器")
# 添加--host参数,指定服务器主机,默认localhost
parser.add_argument(
"--host", default="localhost", help="服务器主机 (默认: localhost)"
)
# 添加--port参数,指定服务器端口,默认8001
parser.add_argument(
"--port", type=int, default=8001, help="服务器端口 (默认: 8001)"
)
# 解析命令行参数
args = parser.parse_args()
# 启动OAuth授权服务器
run_oauth_server(args.host, args.port)
5.2. client.py #
client.py
# 导入requests库,用于发送HTTP请求
import requests
# 导入secrets库,用于生成安全的随机字符串
import secrets
# 从urllib.parse导入urlencode、parse_qs、urlparse,用于URL参数编码和解析
from urllib.parse import urlencode, parse_qs, urlparse
# 导入hashlib库,用于哈希运算
import hashlib
# 导入base64库,用于base64编码
import base64
# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:
# 构造函数,初始化MCP服务器和OAuth服务器的URL
def __init__(
self,
mcp_server_url="http://localhost:8000",
oauth_server_url="http://localhost:8001",
):
# 去除MCP服务器URL末尾的斜杠
self.mcp_server_url = mcp_server_url.rstrip("/")
# 去除OAuth服务器URL末尾的斜杠
self.oauth_server_url = oauth_server_url.rstrip("/")
# 生成PKCE挑战码的方法
def generate_pkce_challenge(self):
# 生成高强度随机字符串作为code_verifier
code_verifier = secrets.token_urlsafe(32)
# 对code_verifier进行SHA256哈希,然后base64-url编码,去除末尾的等号
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
.decode()
.rstrip("=")
)
# 返回code_verifier和code_challenge
return code_verifier, code_challenge
# 模拟授权过程
def simulate_authorization(self, auth_flow):
# 打印模拟授权过程的提示信息
print("\n🎭 步骤4: 模拟授权过程")
print("=" * 50)
try:
# 从auth_flow中获取授权URL
auth_url = auth_flow["auth_url"]
# 发送GET请求到授权端点,不自动重定向
response = requests.get(auth_url, allow_redirects=False)
# 如果返回302重定向
if response.status_code == 302:
# 获取重定向的Location头
redirect_url = response.headers.get("Location")
if redirect_url:
# 解析重定向URL
parsed_url = urlparse(redirect_url)
# 解析URL中的查询参数
query_params = parse_qs(parsed_url.query)
# 获取授权码
authorization_code = query_params.get("code", [None])[0]
# 获取state参数
state = query_params.get("state", [None])[0]
# 检查授权码和state是否匹配
if authorization_code and state == auth_flow["state"]:
# 打印模拟授权成功信息
print(" 模拟授权成功")
print(f" 授权码: {authorization_code[:16]}...")
print(f" 状态: {state}")
# 返回授权码
return authorization_code
else:
# 授权码获取失败
print(" 授权码获取失败")
return None
else:
# 未找到重定向URL
print(" 未找到重定向URL")
return None
else:
# 授权请求失败,打印状态码
print(f" 授权请求失败: {response.status_code}")
return None
# 捕获异常并打印错误信息
except Exception as e:
print(f" 模拟授权失败: {e}")
return None
# 使用授权码交换访问令牌
+ def exchange_authorization_code(self, authorization_code, code_verifier):
# 步骤5: 使用授权码交换访问令牌
+ print("\n🔄 步骤5: 交换访问令牌")
+ print("=" * 50)
+ try:
# 获取令牌端点
+ token_endpoint = self.authorization_server_metadata["token_endpoint"]
# 构造令牌请求参数
+ token_request = {
+ "grant_type": "authorization_code", # 授权类型
+ "code": authorization_code, # 授权码
+ "redirect_uri": "http://localhost:8080/callback", # 重定向URI
+ "client_id": self.client_id, # 客户端ID
+ "code_verifier": code_verifier, # PKCE挑战码
+ "resource": self.mcp_server_url, # RFC8707资源指示器
+ }
# 打印令牌请求参数
+ print("📤 令牌请求参数:")
+ print(f" 授权类型: {token_request['grant_type']}")
+ print(f" 授权码: {token_request['code'][:16]}...")
+ print(f" 重定向URI: {token_request['redirect_uri']}")
+ print(f" 客户端ID: {token_request['client_id']}")
+ print(f" 代码验证器: {token_request['code_verifier'][:16]}...")
+ print(f" 资源指示器: {token_request['resource']}")
# 使用客户端凭据进行认证
+ auth = (self.client_id, self.client_secret)
# 发送POST请求到令牌端点
+ response = requests.post(token_endpoint, data=token_request, auth=auth)
# 如果响应状态码为200,表示成功
+ if response.status_code == 200:
# 解析返回的JSON数据
+ token_response = response.json()
# 保存访问令牌
+ self.access_token = token_response["access_token"]
# 打印成功获取访问令牌的信息
+ print(" 成功获取访问令牌")
+ print(f" 访问令牌: {self.access_token[:32]}...")
+ print(f" 令牌类型: {token_response['token_type']}")
+ print(f" 过期时间: {token_response['expires_in']}秒")
+ print(f" 作用域: {token_response['scope']}")
+ return True
+ else:
# 获取访问令牌失败,打印错误信息
+ print(f" 获取访问令牌失败: {response.status_code}")
+ print(f" 错误信息: {response.text}")
+ return False
# 捕获异常并打印错误信息
+ except Exception as e:
+ print(f" 交换访问令牌失败: {e}")
+ return False
# 开始OAuth授权流程的方法
def start_authorization_flow(self, scope="mcp:tools"):
# 打印流程提示信息
print("\n 步骤3: 开始OAuth授权流程")
print("=" * 50)
try:
# 检查是否已发现授权服务器
if not self.authorization_server_metadata:
print(" 请先发现授权服务器")
return None
# 获取授权端点URL
authorization_endpoint = self.authorization_server_metadata[
"authorization_endpoint"
]
# 生成PKCE挑战码
code_verifier, code_challenge = self.generate_pkce_challenge()
# 构造授权请求参数
auth_params = {
"response_type": "code", # 响应类型
"client_id": self.client_id, # 客户端ID
"redirect_uri": "http://localhost:8080/callback", # 重定向URI
"scope": scope, # 作用域
"state": secrets.token_urlsafe(16), # 状态
"code_challenge": code_challenge, # PKCE挑战码
"code_challenge_method": "S256", # PKCE挑战方法
"resource": self.mcp_server_url, # RFC8707资源指示器
}
# 构造授权URL
auth_url = f"{authorization_endpoint}?{urlencode(auth_params)}"
# 打印授权请求参数
print(" OAuth授权请求参数:")
print(f" 响应类型: {auth_params['response_type']}")
print(f" 客户端ID: {auth_params['client_id']}")
print(f" 重定向URI: {auth_params['redirect_uri']}")
print(f" 作用域: {auth_params['scope']}")
print(f" 状态: {auth_params['state']}")
print(f" 代码挑战: {auth_params['code_challenge'][:16]}...")
print(f" 代码挑战方法: {auth_params['code_challenge_method']}")
print(f" 资源指示器: {auth_params['resource']}")
# 打印授权URL
print(f"\n🔗 授权URL:")
print(f" {auth_url}")
# 提示用户在浏览器中访问授权URL
print(f"\n 请在浏览器中访问上述URL完成授权")
print(f" 注意: 实际应用中需要用户交互")
# 返回授权流程相关信息
return {
"auth_url": auth_url, # 授权URL
"code_verifier": code_verifier, # PKCE挑战码
"state": auth_params["state"], # 状态
"auth_params": auth_params, # 授权参数
}
# 捕获异常并打印错误信息
except Exception as e:
print(f" 开始授权流程失败: {e}")
return None
# 动态客户端注册方法
def register_client(self, client_name="MCP客户端"):
# 打印步骤提示
print("\n📝 步骤2: 动态客户端注册")
print("=" * 50)
try:
# 检查是否已发现授权服务器
if not self.authorization_server_metadata:
print(" 请先发现授权服务器")
return False
# 获取注册端点
registration_endpoint = self.authorization_server_metadata[
"registration_endpoint"
]
# 构造客户端注册请求体
client_metadata = {
"client_name": client_name, # 客户端名称
"redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
"grant_types": ["authorization_code"], # 授权类型
"response_types": ["code"], # 响应类型
"token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
"scope": "mcp:tools mcp:resources", # 作用域
"application_type": "web", # 应用类型
}
# 打印注册请求相关信息
print(f"📤 发送客户端注册请求到: {registration_endpoint}")
print(f" 客户端名称: {client_name}")
print(f" 重定向URI: {client_metadata['redirect_uris']}")
print(f" 授权类型: {client_metadata['grant_types']}")
print(f" 作用域: {client_metadata['scope']}")
# 发送POST请求进行客户端注册
response = requests.post(registration_endpoint, json=client_metadata)
# 判断注册是否成功
if response.status_code == 201:
# 解析返回的客户端信息
client_info = response.json()
# 保存客户端ID
self.client_id = client_info["client_id"]
# 保存客户端密钥
self.client_secret = client_info["client_secret"]
# 打印注册成功信息
print(" 客户端注册成功")
print(f" 客户端ID: {self.client_id}")
print(f" 客户端密钥: {self.client_secret[:8]}...")
return True
else:
# 注册失败,打印错误信息
print(f" 客户端注册失败: {response.status_code}")
print(f" 错误信息: {response.text}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 客户端注册失败: {e}")
return False
# 发现授权服务器的方法
def discover_authorization_server(self):
try:
# 发送GET请求获取资源服务器的元数据
response = requests.get(
f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
)
# 判断响应状态码是否为200
if response.status_code == 200:
# 解析响应的JSON数据
resource_metadata = response.json()
# 打印获取资源元数据成功的信息
print(f" 获取资源元数据成功")
# 打印资源标识符
print(f" 资源标识符: {resource_metadata.get('resource')}")
# 获取授权服务器列表
auth_servers = resource_metadata.get("authorization_servers", [])
# 判断是否存在授权服务器
if auth_servers:
# 取第一个授权服务器
auth_server = auth_servers[0]
# 保存授权服务器的元数据
self.authorization_server_metadata = auth_server
# 获取发行者信息
issuer = auth_server.get("issuer")
# 获取注册端点
registration_endpoint = auth_server.get("registration_endpoint")
# 获取授权端点
authorization_endpoint = auth_server.get("authorization_endpoint")
# 获取令牌端点
token_endpoint = auth_server.get("token_endpoint")
# 获取支持的作用域
scopes_supported = auth_server.get("scopes_supported")
# 获取支持的PKCE方法
code_challenge_methods_supported = auth_server.get(
"code_challenge_methods_supported"
)
# 打印发现授权服务器成功的信息
print(f" 发现授权服务器")
print(f" 发行者: {issuer}")
print(f" 注册端点: {registration_endpoint}")
print(f" 授权端点: {authorization_endpoint}")
print(f" 令牌端点: {token_endpoint}")
print(f" 支持的作用域: {', '.join(scopes_supported)}")
print(f" PKCE支持: {', '.join(code_challenge_methods_supported)}")
# 返回True,表示发现成功
return True
else:
# 未找到授权服务器,打印错误信息
print(" 未找到授权服务器")
return False
else:
# 获取资源元数据失败,打印状态码
print(f" 获取资源元数据失败: {response.status_code}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 发现授权服务器失败: {e}")
return False
# 运行演示流程的方法
def run(self):
# 步骤1: 发现授权服务器
if not self.discover_authorization_server():
# 发现授权服务器失败
print(" 演示失败:无法发现授权服务器")
return False
# 步骤2: 注册客户端
if not self.register_client():
# 客户端注册失败
print(" 演示失败:客户端注册失败")
return False
# 步骤3: 开始授权流程
auth_flow = self.start_authorization_flow()
if not auth_flow:
# 无法开始授权流程
print(" 演示失败:无法开始授权流程")
return False
# 步骤4: 模拟授权
authorization_code = self.simulate_authorization(auth_flow)
if not authorization_code:
# 授权失败
print(" 演示失败:授权失败")
return False
# 步骤5: 交换访问令牌
+ if not self.exchange_authorization_code(
+ authorization_code, auth_flow["code_verifier"]
+ ):
# 无法获取访问令牌
+ print(" 演示失败:无法获取访问令牌")
+ return False
# 主函数,程序入口
def main():
try:
# 创建OAuthClient实例
client = OAuthClient()
# 运行演示流程
client.run()
# 捕获用户中断异常
except KeyboardInterrupt:
# 打印用户中断信息
print("\n\n演示被用户中断")
# 捕获其他异常并打印错误信息
except Exception as e:
# 打印错误信息
print(f"\n演示过程中发生错误: {e}")
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
6.步骤6: MCP服务器访问 #
6.1. tools.py #
tools.py
# MCP工具定义
tools = [
{
"name": "calculate",
"description": "执行数学计算",
"inputSchema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "数学表达式",
}
},
"required": ["expression"],
},
},
]
6.2. client.py #
client.py
# 导入requests库,用于发送HTTP请求
import requests
# 导入secrets库,用于生成安全的随机字符串
import secrets
# 从urllib.parse导入urlencode、parse_qs、urlparse,用于URL参数编码和解析
from urllib.parse import urlencode, parse_qs, urlparse
# 导入hashlib库,用于哈希运算
import hashlib
# 导入base64库,用于base64编码
import base64
# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:
# 构造函数,初始化MCP服务器和OAuth服务器的URL
def __init__(
self,
mcp_server_url="http://localhost:8000",
oauth_server_url="http://localhost:8001",
):
# 去除MCP服务器URL末尾的斜杠
self.mcp_server_url = mcp_server_url.rstrip("/")
# 去除OAuth服务器URL末尾的斜杠
self.oauth_server_url = oauth_server_url.rstrip("/")
# 生成PKCE挑战码的方法
def generate_pkce_challenge(self):
# 生成高强度随机字符串作为code_verifier
code_verifier = secrets.token_urlsafe(32)
# 对code_verifier进行SHA256哈希,然后base64-url编码,去除末尾的等号
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
.decode()
.rstrip("=")
)
# 返回code_verifier和code_challenge
return code_verifier, code_challenge
# 模拟授权过程
def simulate_authorization(self, auth_flow):
# 打印模拟授权过程的提示信息
print("\n🎭 步骤4: 模拟授权过程")
print("=" * 50)
try:
# 从auth_flow中获取授权URL
auth_url = auth_flow["auth_url"]
# 发送GET请求到授权端点,不自动重定向
response = requests.get(auth_url, allow_redirects=False)
# 如果返回302重定向
if response.status_code == 302:
# 获取重定向的Location头
redirect_url = response.headers.get("Location")
if redirect_url:
# 解析重定向URL
parsed_url = urlparse(redirect_url)
# 解析URL中的查询参数
query_params = parse_qs(parsed_url.query)
# 获取授权码
authorization_code = query_params.get("code", [None])[0]
# 获取state参数
state = query_params.get("state", [None])[0]
# 检查授权码和state是否匹配
if authorization_code and state == auth_flow["state"]:
# 打印模拟授权成功信息
print(" 模拟授权成功")
print(f" 授权码: {authorization_code[:16]}...")
print(f" 状态: {state}")
# 返回授权码
return authorization_code
else:
# 授权码获取失败
print(" 授权码获取失败")
return None
else:
# 未找到重定向URL
print(" 未找到重定向URL")
return None
else:
# 授权请求失败,打印状态码
print(f" 授权请求失败: {response.status_code}")
return None
# 捕获异常并打印错误信息
except Exception as e:
print(f" 模拟授权失败: {e}")
return None
# 步骤6: 测试使用访问令牌访问MCP服务器
+ def mcp_server_access(self):
# 打印步骤信息
+ print("\n 步骤6: 测试MCP服务器访问")
+ print("=" * 50)
+ try:
# 检查是否已获取访问令牌
+ if not self.access_token:
+ print(" 请先获取访问令牌")
+ return False
# 设置授权头
+ headers = {
+ "Authorization": f"Bearer {self.access_token}",
+ "Accept": "application/json",
+ "MCP-Protocol-Version": "2025-06-18",
+ }
# 打印发送MCP初始化请求信息
+ print("📤 发送MCP初始化请求")
+ print(f" 授权头: Bearer {self.access_token[:32]}...")
# 构造MCP初始化请求体
+ init_request = {
+ "jsonrpc": "2.0",
+ "id": "1",
+ "method": "initialize",
+ "params": {
+ "protocolVersion": "2025-06-18",
+ "capabilities": {},
+ "clientInfo": {"name": "OAuth客户端", "version": "1.0.0"},
+ },
+ }
# 发送POST请求到MCP服务器
+ response = requests.post(
+ f"{self.mcp_server_url}/mcp", json=init_request, headers=headers
+ )
# 判断初始化是否成功
+ if response.status_code == 200:
# 解析初始化响应
+ init_response = response.json()
+ print(" MCP初始化成功")
+ print(f" 协议版本: {init_response['result']['protocolVersion']}")
+ print(f" 会话ID: {init_response['result']['sessionId']}")
# 打印工具列表请求信息
+ print("\n 测试工具列表请求")
# 构造工具列表请求体
+ tools_request = {
+ "jsonrpc": "2.0",
+ "id": "2",
+ "method": "tools/list",
+ "params": {},
+ }
# 发送POST请求获取工具列表
+ tools_response = requests.post(
+ f"{self.mcp_server_url}/mcp", json=tools_request, headers=headers
+ )
# 判断工具列表获取是否成功
+ if tools_response.status_code == 200:
# 解析工具列表响应
+ tools_result = tools_response.json()
+ tools_list = tools_result["result"]["tools"]
+ print(" 工具列表获取成功")
+ print(f" 可用工具数量: {len(tools_list)}")
# 遍历并打印每个工具信息
+ for tool in tools_list:
+ print(f" - {tool['name']}: {tool['description']}")
+ else:
# 工具列表获取失败
+ print(f" 工具列表获取失败: {tools_response.status_code}")
+ return True
+ else:
# MCP初始化失败
+ print(f" MCP初始化失败: {response.status_code}")
+ print(f" 错误信息: {response.text}")
+ return False
# 捕获异常并打印错误信息
+ except Exception as e:
+ print(f" 测试MCP服务器访问失败: {e}")
+ return False
# 使用授权码交换访问令牌
def exchange_authorization_code(self, authorization_code, code_verifier):
# 打印步骤信息
print("\n🔄 步骤5: 交换访问令牌")
print("=" * 50)
try:
# 获取令牌端点
token_endpoint = self.authorization_server_metadata["token_endpoint"]
# 构造令牌请求参数
token_request = {
"grant_type": "authorization_code", # 授权类型
"code": authorization_code, # 授权码
"redirect_uri": "http://localhost:8080/callback", # 重定向URI
"client_id": self.client_id, # 客户端ID
"code_verifier": code_verifier, # PKCE挑战码
"resource": self.mcp_server_url, # RFC8707资源指示器
}
# 打印令牌请求参数
print("📤 令牌请求参数:")
print(f" 授权类型: {token_request['grant_type']}")
print(f" 授权码: {token_request['code'][:16]}...")
print(f" 重定向URI: {token_request['redirect_uri']}")
print(f" 客户端ID: {token_request['client_id']}")
print(f" 代码验证器: {token_request['code_verifier'][:16]}...")
print(f" 资源指示器: {token_request['resource']}")
# 使用客户端凭据进行认证
auth = (self.client_id, self.client_secret)
# 发送POST请求到令牌端点
response = requests.post(token_endpoint, data=token_request, auth=auth)
# 判断响应状态码是否为200
if response.status_code == 200:
# 解析返回的JSON数据
token_response = response.json()
# 保存访问令牌
self.access_token = token_response["access_token"]
# 打印成功获取访问令牌的信息
print(" 成功获取访问令牌")
print(f" 访问令牌: {self.access_token[:32]}...")
print(f" 令牌类型: {token_response['token_type']}")
print(f" 过期时间: {token_response['expires_in']}秒")
print(f" 作用域: {token_response['scope']}")
return True
else:
# 获取访问令牌失败,打印错误信息
print(f" 获取访问令牌失败: {response.status_code}")
print(f" 错误信息: {response.text}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 交换访问令牌失败: {e}")
return False
# 开始OAuth授权流程的方法
def start_authorization_flow(self, scope="mcp:tools"):
# 打印流程提示信息
print("\n 步骤3: 开始OAuth授权流程")
print("=" * 50)
try:
# 检查是否已发现授权服务器
if not self.authorization_server_metadata:
print(" 请先发现授权服务器")
return None
# 获取授权端点URL
authorization_endpoint = self.authorization_server_metadata[
"authorization_endpoint"
]
# 生成PKCE挑战码
code_verifier, code_challenge = self.generate_pkce_challenge()
# 构造授权请求参数
auth_params = {
"response_type": "code", # 响应类型
"client_id": self.client_id, # 客户端ID
"redirect_uri": "http://localhost:8080/callback", # 重定向URI
"scope": scope, # 作用域
"state": secrets.token_urlsafe(16), # 状态
"code_challenge": code_challenge, # PKCE挑战码
"code_challenge_method": "S256", # PKCE挑战方法
"resource": self.mcp_server_url, # RFC8707资源指示器
}
# 构造授权URL
auth_url = f"{authorization_endpoint}?{urlencode(auth_params)}"
# 打印授权请求参数
print(" OAuth授权请求参数:")
print(f" 响应类型: {auth_params['response_type']}")
print(f" 客户端ID: {auth_params['client_id']}")
print(f" 重定向URI: {auth_params['redirect_uri']}")
print(f" 作用域: {auth_params['scope']}")
print(f" 状态: {auth_params['state']}")
print(f" 代码挑战: {auth_params['code_challenge'][:16]}...")
print(f" 代码挑战方法: {auth_params['code_challenge_method']}")
print(f" 资源指示器: {auth_params['resource']}")
# 打印授权URL
print(f"\n🔗 授权URL:")
print(f" {auth_url}")
# 提示用户在浏览器中访问授权URL
print(f"\n 请在浏览器中访问上述URL完成授权")
print(f" 注意: 实际应用中需要用户交互")
# 返回授权流程相关信息
return {
"auth_url": auth_url, # 授权URL
"code_verifier": code_verifier, # PKCE挑战码
"state": auth_params["state"], # 状态
"auth_params": auth_params, # 授权参数
}
# 捕获异常并打印错误信息
except Exception as e:
print(f" 开始授权流程失败: {e}")
return None
# 动态客户端注册方法
def register_client(self, client_name="MCP客户端"):
# 打印步骤提示
print("\n📝 步骤2: 动态客户端注册")
print("=" * 50)
try:
# 检查是否已发现授权服务器
if not self.authorization_server_metadata:
print(" 请先发现授权服务器")
return False
# 获取注册端点
registration_endpoint = self.authorization_server_metadata[
"registration_endpoint"
]
# 构造客户端注册请求体
client_metadata = {
"client_name": client_name, # 客户端名称
"redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
"grant_types": ["authorization_code"], # 授权类型
"response_types": ["code"], # 响应类型
"token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
"scope": "mcp:tools mcp:resources", # 作用域
"application_type": "web", # 应用类型
}
# 打印注册请求相关信息
print(f"📤 发送客户端注册请求到: {registration_endpoint}")
print(f" 客户端名称: {client_name}")
print(f" 重定向URI: {client_metadata['redirect_uris']}")
print(f" 授权类型: {client_metadata['grant_types']}")
print(f" 作用域: {client_metadata['scope']}")
# 发送POST请求进行客户端注册
response = requests.post(registration_endpoint, json=client_metadata)
# 判断注册是否成功
if response.status_code == 201:
# 解析返回的客户端信息
client_info = response.json()
# 保存客户端ID
self.client_id = client_info["client_id"]
# 保存客户端密钥
self.client_secret = client_info["client_secret"]
# 打印注册成功信息
print(" 客户端注册成功")
print(f" 客户端ID: {self.client_id}")
print(f" 客户端密钥: {self.client_secret[:8]}...")
return True
else:
# 注册失败,打印错误信息
print(f" 客户端注册失败: {response.status_code}")
print(f" 错误信息: {response.text}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 客户端注册失败: {e}")
return False
# 发现授权服务器的方法
def discover_authorization_server(self):
try:
# 发送GET请求获取资源服务器的元数据
response = requests.get(
f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
)
# 判断响应状态码是否为200
if response.status_code == 200:
# 解析响应的JSON数据
resource_metadata = response.json()
# 打印获取资源元数据成功的信息
print(f" 获取资源元数据成功")
# 打印资源标识符
print(f" 资源标识符: {resource_metadata.get('resource')}")
# 获取授权服务器列表
auth_servers = resource_metadata.get("authorization_servers", [])
# 判断是否存在授权服务器
if auth_servers:
# 取第一个授权服务器
auth_server = auth_servers[0]
# 保存授权服务器的元数据
self.authorization_server_metadata = auth_server
# 获取发行者信息
issuer = auth_server.get("issuer")
# 获取注册端点
registration_endpoint = auth_server.get("registration_endpoint")
# 获取授权端点
authorization_endpoint = auth_server.get("authorization_endpoint")
# 获取令牌端点
token_endpoint = auth_server.get("token_endpoint")
# 获取支持的作用域
scopes_supported = auth_server.get("scopes_supported")
# 获取支持的PKCE方法
code_challenge_methods_supported = auth_server.get(
"code_challenge_methods_supported"
)
# 打印发现授权服务器成功的信息
print(f" 发现授权服务器")
print(f" 发行者: {issuer}")
print(f" 注册端点: {registration_endpoint}")
print(f" 授权端点: {authorization_endpoint}")
print(f" 令牌端点: {token_endpoint}")
print(f" 支持的作用域: {', '.join(scopes_supported)}")
print(f" PKCE支持: {', '.join(code_challenge_methods_supported)}")
# 返回True,表示发现成功
return True
else:
# 未找到授权服务器,打印错误信息
print(" 未找到授权服务器")
return False
else:
# 获取资源元数据失败,打印状态码
print(f" 获取资源元数据失败: {response.status_code}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 发现授权服务器失败: {e}")
return False
# 运行演示流程的方法
def run(self):
# 步骤1: 发现授权服务器
if not self.discover_authorization_server():
# 发现授权服务器失败
print(" 演示失败:无法发现授权服务器")
return False
# 步骤2: 注册客户端
if not self.register_client():
# 客户端注册失败
print(" 演示失败:客户端注册失败")
return False
# 步骤3: 开始授权流程
auth_flow = self.start_authorization_flow()
if not auth_flow:
# 无法开始授权流程
print(" 演示失败:无法开始授权流程")
return False
# 步骤4: 模拟授权
authorization_code = self.simulate_authorization(auth_flow)
if not authorization_code:
# 授权失败
print(" 演示失败:授权失败")
return False
# 步骤5: 交换访问令牌
if not self.exchange_authorization_code(
authorization_code, auth_flow["code_verifier"]
):
# 无法获取访问令牌
print(" 演示失败:无法获取访问令牌")
return False
# 步骤6: 测试MCP服务器访问
+ if not self.mcp_server_access():
+ print(" 演示失败:无法访问MCP服务器")
+ return False
# 主函数,程序入口
def main():
try:
# 创建OAuthClient实例
client = OAuthClient()
# 运行演示流程
client.run()
# 捕获用户中断异常
except KeyboardInterrupt:
# 打印用户中断信息
print("\n\n演示被用户中断")
# 捕获其他异常并打印错误信息
except Exception as e:
# 打印错误信息
print(f"\n演示过程中发生错误: {e}")
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
6.3. server.py #
server.py
# 导入日志模块
import logging
# 导入时间模块
+import time
# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler
# 导入jwt模块
+import jwt
# 导入json模块
import json
# 从datetime模块导入datetime和timezone
+from datetime import datetime, timezone
# 导入uuid模块
+import uuid
# 从tools模块导入tools对象
+from tools import tools
# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
"""MCP服务器核心类,支持OAuth 2.1授权"""
# 初始化方法
def __init__(self):
# 协议版本号
self.protocol_version = "2025-06-18"
# 服务器信息,包括名称和版本
self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
# 服务器能力,包括工具、资源和提示
self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
# 会话管理字典
+ self.sessions = {}
# 生成服务器密钥(实际应用中应该使用安全的密钥管理)
+ self.server_secret = "mcp_server_secret_key_2025"
# OAuth 2.1授权相关配置
self.oauth_config = {
# 授权端点
"authorization_endpoint": "http://localhost:8001/oauth/authorize",
# 令牌端点
"token_endpoint": "http://localhost:8001/oauth/token",
# 注册端点
"registration_endpoint": "http://localhost:8001/oauth/register",
# 发行者
"issuer": "http://localhost:8001",
# 支持的作用域 工具、资源、提示
"scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
# 支持的响应类型
"response_types_supported": ["code"],
# 支持的授权类型 授权码和刷新令牌
"grant_types_supported": ["authorization_code", "refresh_token"],
# 令牌端点支持的认证方法 客户端密钥基本认证和客户端密钥POST认证
"token_endpoint_auth_methods_supported": [
"client_secret_basic", # 客户端密钥基本认证
"client_secret_post", # 客户端密钥POST认证
],
# 支持的PKCE方法
"code_challenge_methods_supported": ["S256"], # PKCE支持
}
# 验证访问令牌
+ def validate_access_token(self, token: str) -> dict:
+ """验证访问令牌"""
+ try:
# 记录开始验证访问令牌
+ logger.info(f"开始验证访问令牌: {token[:16]}...")
# 首先尝试使用服务器密钥验证(自签名令牌)
+ try:
# 使用服务器密钥解码JWT
+ payload = jwt.decode(
+ token,
+ self.server_secret,
+ algorithms=["HS256"],
+ audience="http://localhost:8000",
+ )
# 记录服务器密钥验证成功
+ logger.info("使用服务器密钥验证成功")
+ except jwt.InvalidTokenError:
# 记录服务器密钥验证失败,尝试OAuth授权服务器密钥
+ logger.info("服务器密钥验证失败,尝试OAuth授权服务器密钥")
# 如果失败,尝试使用OAuth授权服务器的密钥验证
# 这里我们使用一个共享的密钥
+ oauth_server_secret = "oauth_server_secret_key_2025" # 与授权服务器共享
+ try:
# 使用OAuth服务器密钥解码JWT
+ payload = jwt.decode(
+ token,
+ oauth_server_secret,
+ algorithms=["HS256"],
+ audience="http://localhost:8000",
+ )
# 记录OAuth授权服务器密钥验证成功
+ logger.info("使用OAuth授权服务器密钥验证成功")
+ except jwt.InvalidTokenError:
# 记录OAuth授权服务器密钥验证也失败
+ logger.error("OAuth授权服务器密钥验证也失败")
+ return None
# 检查令牌是否过期
+ if (
+ "exp" in payload
+ and datetime.now(timezone.utc).timestamp() > payload["exp"]
+ ):
# 记录令牌已过期
+ logger.error("令牌已过期")
+ return None
# 检查令牌是否针对此服务器(audience已经在JWT解码时验证过了)
# 但我们可以再次确认
+ if "aud" in payload and payload["aud"] != "http://localhost:8000":
# 记录令牌audience不匹配
+ logger.error(
+ f"令牌audience不匹配: {payload['aud']} != http://localhost:8000"
+ )
+ return None
# 记录令牌验证成功,包含scope和client_id
+ logger.info(
+ f"令牌验证成功,包含scope: {payload.get('scope')}, client_id: {payload.get('client_id')}"
+ )
# 返回payload
+ return payload
+ except Exception as e:
# 记录令牌验证错误
+ logger.error(f"令牌验证错误: {e}")
+ return None
# 生成唯一的会话ID
+ def _generate_session_id(self) -> str:
+ """生成唯一的会话ID"""
# 使用uuid4生成唯一ID
+ return str(uuid.uuid4())
# 获取或创建会话
+ def _get_or_create_session(self, session_id: str = None):
+ """获取或创建会话"""
# 如果没有传入会话ID,则生成一个新的
+ if not session_id:
+ session_id = self._generate_session_id()
# 如果会话ID不存在于sessions字典中,则新建一个会话
+ if session_id not in self.sessions:
+ self.sessions[session_id] = {
+ "created_at": time.time(),
+ "last_activity": time.time(),
+ "message_count": 0,
+ "client_info": None,
+ }
# 记录新会话的创建
+ logger.info(f"创建新会话: {session_id}")
# 更新会话的最后活动时间
+ self.sessions[session_id]["last_activity"] = time.time()
# 增加消息计数
+ self.sessions[session_id]["message_count"] += 1
# 返回会话ID
+ return session_id
# 处理工具列表请求
+ def handle_tools_list(self, params=None, session_id: str = None):
+ """处理工具列表请求"""
# 如果有会话ID,则获取或创建会话
+ if session_id:
+ self._get_or_create_session(session_id)
# 返回工具列表
+ return {"tools": tools}
# 处理初始化请求(实际生效的方法,覆盖上面的方法)
+ def handle_initialize(self, params, session_id: str = None):
+ """处理初始化请求"""
# 获取客户端信息
+ client_info = params.get("clientInfo", {})
# 获取或创建会话ID
+ session_id = self._get_or_create_session(session_id)
# 存储客户端信息到会话
+ self.sessions[session_id]["client_info"] = client_info
# 记录客户端初始化信息
+ logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
# 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
+ return {
+ "protocolVersion": self.protocol_version, # 协议版本
+ "capabilities": self.capabilities, # 能力
+ "serverInfo": self.server_info, # 服务器信息
+ "sessionId": session_id, # 返回会话ID给客户端
+ }
# 获取OAuth 2.0 Protected Resource Metadata (RFC9728)
def get_resource_metadata(self):
"""获取资源服务器元数据"""
# 返回资源元数据字典
return {
"resource": "http://localhost:8000",
"authorization_servers": [
{
"issuer": self.oauth_config["issuer"], # 发行者
"authorization_endpoint": self.oauth_config["authorization_endpoint"], # 授权端点
"token_endpoint": self.oauth_config["token_endpoint"], # 令牌端点
"registration_endpoint": self.oauth_config["registration_endpoint"], # 注册端点
"scopes_supported": self.oauth_config["scopes_supported"], # 支持的作用域
"response_types_supported": self.oauth_config["response_types_supported"], # 支持的响应类型
"grant_types_supported": self.oauth_config["grant_types_supported"], # 支持的授权类型
"token_endpoint_auth_methods_supported": self.oauth_config["token_endpoint_auth_methods_supported"], # 令牌端点支持的认证方法
"code_challenge_methods_supported": self.oauth_config["code_challenge_methods_supported"], # 支持的PKCE方法
}
],
}
# 定义HTTP请求处理类
class MCPHTTPHandler(BaseHTTPRequestHandler):
# 初始化方法,接收mcp_server参数
def __init__(self, *args, mcp_server=None, **kwargs):
# 保存MCP服务器实例
self.mcp_server = mcp_server
# 调用父类的初始化方法
super().__init__(*args, **kwargs)
# 处理OAuth资源元数据端点
def _handle_resource_metadata(self):
try:
# 获取资源元数据
metadata = self.mcp_server.get_resource_metadata()
# 发送200响应
self.send_response(200)
# 设置响应头为application/json
self.send_header("Content-Type", "application/json")
# 结束响应头
self.end_headers()
# 写入JSON数据
self.wfile.write(json.dumps(metadata).encode("utf-8"))
except Exception as e:
# 记录错误日志
logger.error(f"处理资源元数据错误: {e}")
# 返回500错误
self.send_error(500, "Internal Server Error")
# 处理GET请求
def do_GET(self):
try:
# 如果请求路径为OAuth资源元数据端点
if self.path == "/.well-known/oauth-protected-resource":
self._handle_resource_metadata()
return
# 如果请求根路径,返回欢迎信息
if self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(b"Welcome to MCP server!")
return
except Exception as e:
# 处理GET请求出错,记录日志
logger.error(f"GET处理错误: {e}")
# 返回500错误
self.send_error(500, "Internal Server Error")
# 发送未授权响应
+ def _send_unauthorized_response(self):
+ """发送未授权响应,包含WWW-Authenticate头"""
# 发送401未授权响应
+ self.send_response(401)
# 设置WWW-Authenticate响应头
+ self.send_header(
+ "WWW-Authenticate", 'Bearer realm="MCP Server", error="invalid_token"'
+ )
# 设置Content-Type为application/json
+ self.send_header("Content-Type", "application/json")
# 结束响应头
+ self.end_headers()
# 构造错误响应体
+ error_response = {
+ "error": "invalid_token",
+ "error_description": "The access token is invalid or expired",
+ }
# 写入错误响应体
+ self.wfile.write(json.dumps(error_response).encode("utf-8"))
# 验证授权
+ def _validate_authorization(self):
+ """验证访问令牌"""
# 获取Authorization头
+ auth_header = self.headers.get("Authorization")
# 检查Authorization头是否存在且格式正确
+ if not auth_header or not auth_header.startswith("Bearer "):
# 记录警告
+ logger.warning("缺少Authorization头或格式不正确")
# 发送未授权响应
+ self._send_unauthorized_response()
+ return False
# 去掉 "Bearer " 前缀,获取token
+ token = auth_header[7:]
# 记录正在验证访问令牌
+ logger.info(f"正在验证访问令牌: {token[:16]}...")
# 调用MCPServer的validate_access_token方法验证token
+ payload = self.mcp_server.validate_access_token(token)
# 如果payload为None,说明验证失败
+ if not payload:
# 记录警告
+ logger.warning("访问令牌验证失败")
# 发送未授权响应
+ self._send_unauthorized_response()
+ return False
# 记录访问令牌验证成功
+ logger.info(f"访问令牌验证成功,客户端ID: {payload.get('client_id')}")
+ return True
# 处理MCP消息
+ def _handle_mcp_message(self, message, session_id: str = None):
+ """处理MCP消息"""
# 获取方法名
+ method = message.get("method")
# 获取参数
+ params = message.get("params", {})
# 获取消息ID
+ msg_id = message.get("id")
# 判断方法类型
+ if method == "initialize":
# 调用服务器的handle_initialize方法
+ result = self.mcp_server.handle_initialize(params, session_id)
# 如果有消息ID,返回带ID的结果
+ return {"id": msg_id, "result": result} if msg_id else None
+ elif method == "tools/list":
# 处理工具列表请求
+ result = self.mcp_server.handle_tools_list(params, session_id)
+ return {"id": msg_id, "result": result} if msg_id else None
+ else:
# 如果方法不存在,返回错误
+ if msg_id:
+ return {
+ "id": msg_id,
+ "error": {"code": -32601, "message": f"Method not found: {method}"},
+ }
# 没有ID则返回None
+ return None
# 处理POST请求(发送消息到服务器)
+ def do_POST(self):
+ """处理POST请求(发送消息到服务器)"""
+ try:
# 检查请求路径是否为/mcp
+ if self.path != "/mcp":
+ self.send_error(404, "MCP endpoint not found")
+ return
# 检查Accept头是否包含application/json或text/event-stream
+ accept_header = self.headers.get("Accept", "")
+ if (
+ "application/json" not in accept_header
+ and "text/event-stream" not in accept_header
+ ):
+ self.send_error(400, "Missing required Accept header")
+ return
# 验证访问令牌(除了初始化请求)
+ if self.path == "/mcp":
# 先读取请求体进行令牌验证
+ content_length = int(self.headers.get("Content-Length", 0))
+ body = self.rfile.read(content_length)
+ try:
# 解码请求体为JSON
+ message = json.loads(body.decode("utf-8"))
# 如果不是初始化请求,则验证令牌
+ if message and message.get("method") != "initialize":
+ if not self._validate_authorization():
+ return
+ except json.JSONDecodeError:
# 请求体不是有效的JSON,返回400
+ self.send_error(400, "Invalid JSON")
+ return
# 检查MCP-Protocol-Version头是否匹配
+ protocol_version = self.headers.get("MCP-Protocol-Version")
+ if (
+ protocol_version
+ and protocol_version != self.mcp_server.protocol_version
+ ):
# 协议版本不支持,返回400
+ self.send_error(
+ 400, f"Unsupported protocol version: {protocol_version}"
+ )
+ return
# 获取会话ID
+ session_id = self.headers.get("Mcp-Session-Id")
# 请求体已经在上面读取过了,直接使用
# 处理MCP消息
+ response = self._handle_mcp_message(message, session_id)
# 发送响应
+ if response:
# 发送200响应码
+ self.send_response(200)
# 设置响应头Content-Type为application/json
+ self.send_header("Content-Type", "application/json")
# 设置协议版本头
+ self.send_header(
+ "MCP-Protocol-Version", self.mcp_server.protocol_version
+ )
# 如果响应中包含会话ID,则添加到响应头
+ if "result" in response and "sessionId" in response["result"]:
+ self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
# 结束响应头
+ self.end_headers()
# 写入JSON响应体
+ self.wfile.write(json.dumps(response).encode("utf-8"))
+ else:
# 没有响应内容,返回202 Accepted
+ self.send_response(202) # Accepted
+ self.end_headers()
+ except Exception as e:
# 记录错误日志
+ logger.error(f"POST处理错误: {e}")
# 返回500内部服务器错误
+ self.send_error(500, "Internal Server Error")
# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
"""运行HTTP服务器"""
# 定义处理器工厂函数,用于传递mcp_server实例
def handler_factory(*args, **kwargs):
return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)
# 创建HTTPServer实例,绑定主机和端口
server = HTTPServer((host, port), handler_factory)
# 记录服务器启动信息
logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")
try:
# 启动服务器,进入循环监听请求
server.serve_forever()
except KeyboardInterrupt:
# 捕获Ctrl+C,记录服务器停止
logger.info("HTTP服务器已停止")
finally:
# 关闭服务器
server.server_close()
# 定义主函数
def main():
"""主函数"""
# 导入argparse模块,用于解析命令行参数
import argparse
# 创建ArgumentParser对象,设置描述信息
parser = argparse.ArgumentParser(description="MCP HTTP服务器")
# 添加--host参数,指定服务器主机,默认localhost
parser.add_argument(
"--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
)
# 添加--port参数,指定服务器端口,默认8000
parser.add_argument(
"--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
)
# 解析命令行参数
args = parser.parse_args()
# 创建MCP服务器实例
mcp_server = MCPServer()
# 运行HTTP服务器
run_http_server(mcp_server, args.host, args.port)
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
步骤7: 调用calculate工具 #
7.1. client.py #
client.py
# 导入requests库,用于发送HTTP请求
import requests
# 导入secrets库,用于生成安全的随机字符串
import secrets
# 从urllib.parse导入urlencode、parse_qs、urlparse,用于URL参数编码和解析
from urllib.parse import urlencode, parse_qs, urlparse
# 导入hashlib库,用于哈希运算
import hashlib
# 导入base64库,用于base64编码
import base64
# 定义OAuthClient类,用于与MCP服务器和OAuth服务器交互
class OAuthClient:
# 构造函数,初始化MCP服务器和OAuth服务器的URL
def __init__(
self,
mcp_server_url="http://localhost:8000",
oauth_server_url="http://localhost:8001",
):
# 去除MCP服务器URL末尾的斜杠
self.mcp_server_url = mcp_server_url.rstrip("/")
# 去除OAuth服务器URL末尾的斜杠
self.oauth_server_url = oauth_server_url.rstrip("/")
# 生成PKCE挑战码的方法
def generate_pkce_challenge(self):
# 生成高强度随机字符串作为code_verifier
code_verifier = secrets.token_urlsafe(32)
# 对code_verifier进行SHA256哈希,然后base64-url编码,去除末尾的等号
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
.decode()
.rstrip("=")
)
# 返回code_verifier和code_challenge
return code_verifier, code_challenge
# 模拟授权过程
def simulate_authorization(self, auth_flow):
# 打印模拟授权过程的提示信息
print("\n🎭 步骤4: 模拟授权过程")
print("=" * 50)
try:
# 从auth_flow中获取授权URL
auth_url = auth_flow["auth_url"]
# 发送GET请求到授权端点,不自动重定向
response = requests.get(auth_url, allow_redirects=False)
# 如果返回302重定向
if response.status_code == 302:
# 获取重定向的Location头
redirect_url = response.headers.get("Location")
if redirect_url:
# 解析重定向URL
parsed_url = urlparse(redirect_url)
# 解析URL中的查询参数
query_params = parse_qs(parsed_url.query)
# 获取授权码
authorization_code = query_params.get("code", [None])[0]
# 获取state参数
state = query_params.get("state", [None])[0]
# 检查授权码和state是否匹配
if authorization_code and state == auth_flow["state"]:
# 打印模拟授权成功信息
print(" 模拟授权成功")
print(f" 授权码: {authorization_code[:16]}...")
print(f" 状态: {state}")
# 返回授权码
return authorization_code
else:
# 授权码获取失败
print(" 授权码获取失败")
return None
else:
# 未找到重定向URL
print(" 未找到重定向URL")
return None
else:
# 授权请求失败,打印状态码
print(f" 授权请求失败: {response.status_code}")
return None
# 捕获异常并打印错误信息
except Exception as e:
print(f" 模拟授权失败: {e}")
return None
# 步骤6: 测试使用访问令牌访问MCP服务器
def mcp_server_access(self):
# 打印步骤信息
print("\n 步骤6: 测试MCP服务器访问")
print("=" * 50)
try:
# 检查是否已获取访问令牌
if not self.access_token:
print(" 请先获取访问令牌")
return False
# 设置授权头
headers = {
"Authorization": f"Bearer {self.access_token}", # 授权头
"Accept": "application/json", # 接受类型
"MCP-Protocol-Version": "2025-06-18", # MCP协议版本
}
# 打印发送MCP初始化请求信息
print("📤 发送MCP初始化请求")
print(f" 授权头: Bearer {self.access_token[:32]}...")
# 构造MCP初始化请求体
init_request = {
"jsonrpc": "2.0", # JSON-RPC版本
"id": "1", # 请求ID
"method": "initialize", # 方法
"params": {
"protocolVersion": "2025-06-18", # 协议版本
"capabilities": {}, # 能力
"clientInfo": {"name": "OAuth客户端", "version": "1.0.0"}, # 客户端信息
},
}
# 发送POST请求到MCP服务器
response = requests.post(
f"{self.mcp_server_url}/mcp", json=init_request, headers=headers
)
# 判断初始化是否成功
if response.status_code == 200:
# 解析初始化响应
init_response = response.json()
print(" MCP初始化成功")
print(f" 协议版本: {init_response['result']['protocolVersion']}")
print(f" 会话ID: {init_response['result']['sessionId']}")
# 打印工具列表请求信息
print("\n 测试工具列表请求")
# 构造工具列表请求体
tools_request = {
"jsonrpc": "2.0", # JSON-RPC版本
"id": "2", # 请求ID
"method": "tools/list", # 方法
"params": {},
}
# 发送POST请求获取工具列表
tools_response = requests.post(
f"{self.mcp_server_url}/mcp", json=tools_request, headers=headers
)
# 判断工具列表获取是否成功
if tools_response.status_code == 200:
# 解析工具列表响应
tools_result = tools_response.json()
tools_list = tools_result["result"]["tools"]
print(" 工具列表获取成功")
print(f" 可用工具数量: {len(tools_list)}")
# 遍历并打印每个工具信息
for tool in tools_list:
print(f" - {tool['name']}: {tool['description']}")
else:
# 工具列表获取失败
print(f" 工具列表获取失败: {tools_response.status_code}")
return True
else:
# MCP初始化失败
print(f" MCP初始化失败: {response.status_code}")
print(f" 错误信息: {response.text}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 测试MCP服务器访问失败: {e}")
return False
# 使用授权码交换访问令牌
def exchange_authorization_code(self, authorization_code, code_verifier):
# 打印步骤信息
print("\n🔄 步骤5: 交换访问令牌")
print("=" * 50)
try:
# 获取令牌端点
token_endpoint = self.authorization_server_metadata["token_endpoint"]
# 构造令牌请求参数
token_request = {
"grant_type": "authorization_code", # 授权类型
"code": authorization_code, # 授权码
"redirect_uri": "http://localhost:8080/callback", # 重定向URI
"client_id": self.client_id, # 客户端ID
"code_verifier": code_verifier, # PKCE挑战码
"resource": self.mcp_server_url, # RFC8707资源指示器
}
# 打印令牌请求参数
print("📤 令牌请求参数:")
print(f" 授权类型: {token_request['grant_type']}")
print(f" 授权码: {token_request['code'][:16]}...")
print(f" 重定向URI: {token_request['redirect_uri']}")
print(f" 客户端ID: {token_request['client_id']}")
print(f" 代码验证器: {token_request['code_verifier'][:16]}...")
print(f" 资源指示器: {token_request['resource']}")
# 使用客户端凭据进行认证
auth = (self.client_id, self.client_secret)
# 发送POST请求到令牌端点
response = requests.post(token_endpoint, data=token_request, auth=auth)
# 判断响应状态码是否为200
if response.status_code == 200:
# 解析返回的JSON数据
token_response = response.json()
# 保存访问令牌
self.access_token = token_response["access_token"]
# 打印成功获取访问令牌的信息
print(" 成功获取访问令牌")
print(f" 访问令牌: {self.access_token[:32]}...")
print(f" 令牌类型: {token_response['token_type']}")
print(f" 过期时间: {token_response['expires_in']}秒")
print(f" 作用域: {token_response['scope']}")
return True
else:
# 获取访问令牌失败,打印错误信息
print(f" 获取访问令牌失败: {response.status_code}")
print(f" 错误信息: {response.text}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 交换访问令牌失败: {e}")
return False
# 开始OAuth授权流程的方法
def start_authorization_flow(self, scope="mcp:tools"):
# 打印流程提示信息
print("\n 步骤3: 开始OAuth授权流程")
print("=" * 50)
try:
# 检查是否已发现授权服务器
if not self.authorization_server_metadata:
print(" 请先发现授权服务器")
return None
# 获取授权端点URL
authorization_endpoint = self.authorization_server_metadata[
"authorization_endpoint"
]
# 生成PKCE挑战码
code_verifier, code_challenge = self.generate_pkce_challenge()
# 构造授权请求参数
auth_params = {
"response_type": "code", # 响应类型
"client_id": self.client_id, # 客户端ID
"redirect_uri": "http://localhost:8080/callback", # 重定向URI
"scope": scope, # 作用域
"state": secrets.token_urlsafe(16), # 状态
"code_challenge": code_challenge, # PKCE挑战码
"code_challenge_method": "S256", # PKCE挑战方法
"resource": self.mcp_server_url, # RFC8707资源指示器
}
# 构造授权URL
auth_url = f"{authorization_endpoint}?{urlencode(auth_params)}"
# 打印授权请求参数
print(" OAuth授权请求参数:")
print(f" 响应类型: {auth_params['response_type']}")
print(f" 客户端ID: {auth_params['client_id']}")
print(f" 重定向URI: {auth_params['redirect_uri']}")
print(f" 作用域: {auth_params['scope']}")
print(f" 状态: {auth_params['state']}")
print(f" 代码挑战: {auth_params['code_challenge'][:16]}...")
print(f" 代码挑战方法: {auth_params['code_challenge_method']}")
print(f" 资源指示器: {auth_params['resource']}")
# 打印授权URL
print(f"\n🔗 授权URL:")
print(f" {auth_url}")
# 提示用户在浏览器中访问授权URL
print(f"\n 请在浏览器中访问上述URL完成授权")
print(f" 注意: 实际应用中需要用户交互")
# 返回授权流程相关信息
return {
"auth_url": auth_url,
"code_verifier": code_verifier,
"state": auth_params["state"],
"auth_params": auth_params,
}
# 捕获异常并打印错误信息
except Exception as e:
print(f" 开始授权流程失败: {e}")
return None
# 动态客户端注册方法
def register_client(self, client_name="MCP客户端"):
# 打印步骤提示
print("\n📝 步骤2: 动态客户端注册")
print("=" * 50)
try:
# 检查是否已发现授权服务器
if not self.authorization_server_metadata:
print(" 请先发现授权服务器")
return False
# 获取注册端点
registration_endpoint = self.authorization_server_metadata[
"registration_endpoint"
]
# 构造客户端注册请求体
client_metadata = {
"client_name": client_name, # 客户端名称
"redirect_uris": ["http://localhost:8080/callback"], # 重定向URI
"grant_types": ["authorization_code"], # 授权类型
"response_types": ["code"], # 响应类型
"token_endpoint_auth_method": "client_secret_basic", # 令牌端点认证方法
"scope": "mcp:tools mcp:resources", # 作用域
"application_type": "web", # 应用类型
}
# 打印注册请求相关信息
print(f"📤 发送客户端注册请求到: {registration_endpoint}")
print(f" 客户端名称: {client_name}")
print(f" 重定向URI: {client_metadata['redirect_uris']}")
print(f" 授权类型: {client_metadata['grant_types']}")
print(f" 作用域: {client_metadata['scope']}")
# 发送POST请求进行客户端注册
response = requests.post(registration_endpoint, json=client_metadata)
# 判断注册是否成功
if response.status_code == 201:
# 解析返回的客户端信息
client_info = response.json()
# 保存客户端ID
self.client_id = client_info["client_id"]
# 保存客户端密钥
self.client_secret = client_info["client_secret"]
# 打印注册成功信息
print(" 客户端注册成功")
print(f" 客户端ID: {self.client_id}")
print(f" 客户端密钥: {self.client_secret[:8]}...")
return True
else:
# 注册失败,打印错误信息
print(f" 客户端注册失败: {response.status_code}")
print(f" 错误信息: {response.text}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 客户端注册失败: {e}")
return False
# 发现授权服务器的方法
def discover_authorization_server(self):
try:
# 发送GET请求获取资源服务器的元数据
response = requests.get(
f"{self.mcp_server_url}/.well-known/oauth-protected-resource"
)
# 判断响应状态码是否为200
if response.status_code == 200:
# 解析响应的JSON数据
resource_metadata = response.json()
# 打印获取资源元数据成功的信息
print(f" 获取资源元数据成功")
# 打印资源标识符
print(f" 资源标识符: {resource_metadata.get('resource')}")
# 获取授权服务器列表
auth_servers = resource_metadata.get("authorization_servers", [])
# 判断是否存在授权服务器
if auth_servers:
# 取第一个授权服务器
auth_server = auth_servers[0]
# 保存授权服务器的元数据
self.authorization_server_metadata = auth_server
# 获取发行者信息
issuer = auth_server.get("issuer")
# 获取注册端点
registration_endpoint = auth_server.get("registration_endpoint")
# 获取授权端点
authorization_endpoint = auth_server.get("authorization_endpoint")
# 获取令牌端点
token_endpoint = auth_server.get("token_endpoint")
# 获取支持的作用域
scopes_supported = auth_server.get("scopes_supported")
# 获取支持的PKCE方法
code_challenge_methods_supported = auth_server.get(
"code_challenge_methods_supported"
)
# 打印发现授权服务器成功的信息
print(f" 发现授权服务器")
print(f" 发行者: {issuer}")
print(f" 注册端点: {registration_endpoint}")
print(f" 授权端点: {authorization_endpoint}")
print(f" 令牌端点: {token_endpoint}")
print(f" 支持的作用域: {', '.join(scopes_supported)}")
print(f" PKCE支持: {', '.join(code_challenge_methods_supported)}")
# 返回True,表示发现成功
return True
else:
# 未找到授权服务器,打印错误信息
print(" 未找到授权服务器")
return False
else:
# 获取资源元数据失败,打印状态码
print(f" 获取资源元数据失败: {response.status_code}")
return False
# 捕获异常并打印错误信息
except Exception as e:
print(f" 发现授权服务器失败: {e}")
return False
# 定义调用calculate工具的方法
+ def call_calculate_tool(self):
# 步骤7: 调用calculate工具
+ print("\n🧮 步骤7: 调用calculate工具")
# 打印分隔线
+ print("=" * 50)
+ try:
# 检查是否已获取访问令牌
+ if not self.access_token:
+ print(" 请先获取访问令牌")
+ return False
# 设置HTTP请求头,包括授权、返回类型和协议版本
+ headers = {
+ "Authorization": f"Bearer {self.access_token}", # 授权头
+ "Accept": "application/json", # 接受类型
+ "MCP-Protocol-Version": "2025-06-18", # MCP协议版本
+ }
# 打印发送请求提示
+ print("📤 发送calculate工具调用请求")
# 构造calculate工具的请求体
+ calculate_request = {
+ "jsonrpc": "2.0", # JSON-RPC版本
+ "id": "3", # 请求ID
+ "method": "tools/call", # 方法
+ "params": {
+ "calls": [
+ {
+ "name": "calculate", # 工具名称
+ "arguments": {"expression": "10 * 5 + 20 / 4"}, # 参数
+ }
+ ]
+ },
+ }
# 打印本次计算的表达式
+ print("🔢 计算表达式: 10 * 5 + 20 / 4")
# 打印预期结果
+ print(" 预期结果: 10 * 5 + 20 / 4 = 50 + 5 = 55")
# 发送POST请求到MCP服务器
+ response = requests.post(
+ f"{self.mcp_server_url}/mcp", json=calculate_request, headers=headers
+ )
# 判断请求是否成功
+ if response.status_code == 200:
# 解析响应结果为JSON
+ calculate_result = response.json()
+ print(" calculate工具调用成功")
# 判断响应中是否包含result和calls字段
+ if (
+ "result" in calculate_result
+ and "calls" in calculate_result["result"]
+ ):
# 获取calls列表
+ calls = calculate_result["result"]["calls"]
# 判断calls是否非空
+ if calls and len(calls) > 0:
# 取第一个调用结果
+ call_result = calls[0]
# 判断是否有content字段且非空
+ if "content" in call_result and len(call_result["content"]) > 0:
# 获取content内容
+ content = call_result["content"][0]
# 判断content中是否有text字段
+ if "text" in content:
+ print(f"📊 计算结果: {content['text']}")
+ else:
+ print(f"📊 结果内容: {content}")
# 判断是否有错误
+ elif "isError" in call_result and call_result["isError"]:
# 获取错误信息
+ error_msg = call_result.get("error", {}).get(
+ "message", "未知错误"
+ )
+ print(f" 计算错误: {error_msg}")
+ else:
+ print(f"📊 调用结果: {call_result}")
+ else:
+ print(" 未找到调用结果")
+ else:
# 响应格式不符,打印完整响应
+ print(f"📊 完整响应: {calculate_result}")
# 测试更多计算表达式
+ print("\n🧮 测试更多计算表达式:")
# 定义测试表达式列表
+ test_expressions = [
+ "2 + 2 * 3",
+ "sin(45) + cos(30)",
+ "sqrt(16) + pow(2, 3)",
+ "100 / 4 - 5",
+ ]
# 遍历每个测试表达式,i从4开始编号
+ for i, expr in enumerate(test_expressions, 4):
# 构造每个测试表达式的请求体
+ test_request = {
+ "jsonrpc": "2.0",
+ "id": str(i),
+ "method": "tools/call",
+ "params": {
+ "calls": [
+ {"name": "calculate", "arguments": {"expression": expr}}
+ ]
+ },
+ }
# 打印当前测试表达式
+ print(f" 📝 测试 {i}: {expr}")
# 发送POST请求
+ test_response = requests.post(
+ f"{self.mcp_server_url}/mcp", json=test_request, headers=headers
+ )
# 判断请求是否成功
+ if test_response.status_code == 200:
# 解析响应
+ test_result = test_response.json()
# 判断响应格式
+ if "result" in test_result and "calls" in test_result["result"]:
# 获取calls列表
+ test_calls = test_result["result"]["calls"]
# 判断calls是否非空
+ if test_calls and len(test_calls) > 0:
# 取第一个调用结果
+ test_call = test_calls[0]
# 判断是否有content且非空
+ if (
+ "content" in test_call
+ and len(test_call["content"]) > 0
+ ):
# 获取content内容
+ test_content = test_call["content"][0]
# 判断是否有text字段
+ if "text" in test_content:
+ print(f" 结果: {test_content['text']}")
+ else:
+ print(f" 📊 结果: {test_content}")
# 判断是否有错误
+ elif "isError" in test_call and test_call["isError"]:
# 获取错误信息
+ error_msg = test_call.get("error", {}).get(
+ "message", "未知错误"
+ )
+ print(f" 错误: {error_msg}")
+ else:
+ print(f" 📊 结果: {test_call}")
+ else:
+ print(f" 无结果")
+ else:
+ print(f" 响应格式错误")
+ else:
# 请求失败,打印状态码
+ print(f" 请求失败: {test_response.status_code}")
# 所有测试完成,返回True
+ return True
+ else:
# calculate工具调用失败,打印状态码和错误信息
+ print(f" calculate工具调用失败: {response.status_code}")
+ print(f" 错误信息: {response.text}")
+ return False
# 捕获异常并打印错误信息
+ except Exception as e:
+ print(f" 调用calculate工具失败: {e}")
+ return False
# 运行演示流程的方法
def run(self):
# 步骤1: 发现授权服务器
if not self.discover_authorization_server():
# 发现授权服务器失败
print(" 演示失败:无法发现授权服务器")
return False
# 步骤2: 注册客户端
if not self.register_client():
# 客户端注册失败
print(" 演示失败:客户端注册失败")
return False
# 步骤3: 开始授权流程
auth_flow = self.start_authorization_flow()
if not auth_flow:
# 无法开始授权流程
print(" 演示失败:无法开始授权流程")
return False
# 步骤4: 模拟授权
authorization_code = self.simulate_authorization(auth_flow)
if not authorization_code:
# 授权失败
print(" 演示失败:授权失败")
return False
# 步骤5: 交换访问令牌
if not self.exchange_authorization_code(
authorization_code, auth_flow["code_verifier"]
):
# 无法获取访问令牌
print(" 演示失败:无法获取访问令牌")
return False
# 步骤6: 测试MCP服务器访问
if not self.mcp_server_access():
print(" 演示失败:无法访问MCP服务器")
return False
# 步骤7: 调用calculate工具
+ if not self.call_calculate_tool():
+ print(" 演示失败:无法调用calculate工具")
+ return False
# 主函数,程序入口
def main():
try:
# 创建OAuthClient实例
client = OAuthClient()
# 运行演示流程
client.run()
# 捕获用户中断异常
except KeyboardInterrupt:
# 打印用户中断信息
print("\n\n演示被用户中断")
# 捕获其他异常并打印错误信息
except Exception as e:
# 打印错误信息
print(f"\n演示过程中发生错误: {e}")
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()
7.2. server.py #
server.py
# 导入日志模块
import logging
# 导入时间模块
import time
# 从http.server模块导入HTTPServer和BaseHTTPRequestHandler类
from http.server import HTTPServer, BaseHTTPRequestHandler
# 导入jwt模块
import jwt
# 导入json模块
import json
# 从datetime模块导入datetime和timezone
from datetime import datetime, timezone
# 导入uuid模块
import uuid
# 从tools模块导入tools对象
from tools import tools
# 配置日志记录级别为INFO
logging.basicConfig(level=logging.INFO)
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义MCPServer类,作为MCP服务器的核心类
class MCPServer:
"""MCP服务器核心类,支持OAuth 2.1授权"""
# 初始化方法
def __init__(self):
# 协议版本号
self.protocol_version = "2025-06-18"
# 服务器信息,包括名称和版本
self.server_info = {"name": "MCP服务器", "version": "1.0.0"}
# 服务器能力,包括工具、资源和提示
self.capabilities = {"tools": {}, "resources": {}, "prompts": {}}
# 会话管理字典
self.sessions = {}
# 生成服务器密钥(实际应用中应该使用安全的密钥管理)
self.server_secret = "mcp_server_secret_key_2025"
# OAuth 2.1授权相关配置
self.oauth_config = {
# 授权端点
"authorization_endpoint": "http://localhost:8001/oauth/authorize",
# 令牌端点
"token_endpoint": "http://localhost:8001/oauth/token",
# 注册端点
"registration_endpoint": "http://localhost:8001/oauth/register",
# 发行者
"issuer": "http://localhost:8001",
# 支持的作用域 工具、资源、提示
"scopes_supported": ["mcp:tools", "mcp:resources", "mcp:prompts"],
# 支持的响应类型 授权码
"response_types_supported": ["code"],
# 支持的授权类型 授权码和刷新令牌
"grant_types_supported": ["authorization_code", "refresh_token"],
# 令牌端点支持的认证方法 客户端密钥基本认证和客户端密钥POST认证
"token_endpoint_auth_methods_supported": [
"client_secret_basic", # 客户端密钥基本认证
"client_secret_post", # 客户端密钥POST认证
],
# 支持的PKCE方法
"code_challenge_methods_supported": ["S256"], # PKCE支持
}
# 处理工具调用请求
+ def handle_tools_call(self, params, session_id: str = None):
+ """处理工具调用请求"""
# 如果有会话ID,则获取或创建会话
+ if session_id:
+ self._get_or_create_session(session_id)
# 获取调用列表
+ calls = params.get("calls", [])
# 初始化结果列表
+ results = []
# 遍历每个调用
+ for call in calls:
# 获取工具名称
+ tool_name = call.get("name")
# 获取参数
+ arguments = call.get("arguments", {})
# 如果工具名称为"calculate"
+ if tool_name == "calculate":
# 获取表达式
+ expression = arguments.get("expression", "")
+ try:
# 计算表达式结果
+ result = eval(expression)
# 添加成功结果到结果列表
+ results.append(
+ {
+ "name": tool_name,
+ "content": [
+ {
+ "type": "text",
+ "text": f"计算结果: {expression} = {result}",
+ }
+ ],
+ }
+ )
+ except Exception as e:
# 计算出错,添加错误信息
+ results.append(
+ {
+ "name": tool_name,
+ "isError": True,
+ "error": {"message": f"计算错误: {str(e)}"},
+ }
+ )
+ else:
# 未知工具,添加错误信息
+ results.append(
+ {
+ "name": tool_name,
+ "isError": True,
+ "error": {"message": f"未知工具: {tool_name}"},
+ }
+ )
# 返回所有调用的结果
+ return {"calls": results}
# 验证访问令牌
def validate_access_token(self, token: str) -> dict:
"""验证访问令牌"""
try:
# 记录开始验证访问令牌
logger.info(f"开始验证访问令牌: {token[:16]}...")
# 首先尝试使用服务器密钥验证(自签名令牌)
try:
# 使用服务器密钥解码JWT
payload = jwt.decode(
token, # 令牌
self.server_secret, # 服务器密钥
algorithms=["HS256"], # 算法
audience="http://localhost:8000", # 受众
)
# 记录服务器密钥验证成功
logger.info("使用服务器密钥验证成功")
except jwt.InvalidTokenError:
# 记录服务器密钥验证失败,尝试OAuth授权服务器密钥
logger.info("服务器密钥验证失败,尝试OAuth授权服务器密钥")
# 如果失败,尝试使用OAuth授权服务器的密钥验证
# 这里我们使用一个共享的密钥
oauth_server_secret = "oauth_server_secret_key_2025" # 与授权服务器共享
try:
# 使用OAuth服务器密钥解码JWT
payload = jwt.decode(
token, # 令牌
oauth_server_secret, # OAuth服务器密钥
algorithms=["HS256"], # 算法
audience="http://localhost:8000", # 受众
)
# 记录OAuth授权服务器密钥验证成功
logger.info("使用OAuth授权服务器密钥验证成功")
except jwt.InvalidTokenError:
# 记录OAuth授权服务器密钥验证也失败
logger.error("OAuth授权服务器密钥验证也失败")
return None
# 检查令牌是否过期 如果过期则返回None
if (
"exp" in payload
and datetime.now(timezone.utc).timestamp() > payload["exp"]
):
# 记录令牌已过期
logger.error("令牌已过期")
return None
# 检查令牌是否针对此服务器(audience已经在JWT解码时验证过了) 但我们可以再次确认
if "aud" in payload and payload["aud"] != "http://localhost:8000":
# 记录令牌audience不匹配
logger.error(
f"令牌audience不匹配: {payload['aud']} != http://localhost:8000"
)
return None
# 记录令牌验证成功,包含scope和client_id 包含scope和client_id
logger.info(
f"令牌验证成功,包含scope: {payload.get('scope')}, client_id: {payload.get('client_id')}"
)
# 返回payload
return payload
except Exception as e:
# 记录令牌验证错误
logger.error(f"令牌验证错误: {e}")
return None
# 生成唯一的会话ID
def _generate_session_id(self) -> str:
"""生成唯一的会话ID"""
# 使用uuid4生成唯一ID
return str(uuid.uuid4())
# 获取或创建会话
def _get_or_create_session(self, session_id: str = None):
"""获取或创建会话"""
# 如果没有传入会话ID,则生成一个新的
if not session_id:
session_id = self._generate_session_id()
# 如果会话ID不存在于sessions字典中,则新建一个会话
if session_id not in self.sessions:
self.sessions[session_id] = {
"created_at": time.time(),
"last_activity": time.time(),
"message_count": 0,
"client_info": None,
}
# 记录新会话的创建
logger.info(f"创建新会话: {session_id}")
# 更新会话的最后活动时间
self.sessions[session_id]["last_activity"] = time.time()
# 增加消息计数
self.sessions[session_id]["message_count"] += 1
# 返回会话ID
return session_id
# 处理工具列表请求
def handle_tools_list(self, params=None, session_id: str = None):
"""处理工具列表请求"""
# 如果有会话ID,则获取或创建会话
if session_id:
self._get_or_create_session(session_id)
# 返回工具列表
return {"tools": tools}
# 处理初始化请求(实际生效的方法,覆盖上面的方法)
def handle_initialize(self, params, session_id: str = None):
"""处理初始化请求"""
# 获取客户端信息
client_info = params.get("clientInfo", {})
# 获取或创建会话ID
session_id = self._get_or_create_session(session_id)
# 存储客户端信息到会话
self.sessions[session_id]["client_info"] = client_info
# 记录客户端初始化信息
logger.info(f"客户端初始化: {client_info}, 会话ID: {session_id}")
# 返回初始化结果,包括协议版本、能力、服务器信息和会话ID
return {
"protocolVersion": self.protocol_version, # 协议版本
"capabilities": self.capabilities, # 能力
"serverInfo": self.server_info, # 服务器信息
"sessionId": session_id, # 返回会话ID给客户端
}
# 获取OAuth 2.0 Protected Resource Metadata (RFC9728)
def get_resource_metadata(self):
"""获取资源服务器元数据"""
# 返回资源元数据字典
return {
"resource": "http://localhost:8000",
"authorization_servers": [
{
"issuer": self.oauth_config["issuer"], # 发行者
"authorization_endpoint": self.oauth_config["authorization_endpoint"], # 授权端点
"token_endpoint": self.oauth_config["token_endpoint"], # 令牌端点
"registration_endpoint": self.oauth_config["registration_endpoint"], # 注册端点
"scopes_supported": self.oauth_config["scopes_supported"], # 支持的作用域
"response_types_supported": self.oauth_config["response_types_supported"], # 支持的响应类型
"grant_types_supported": self.oauth_config["grant_types_supported"], # 支持的授权类型
"token_endpoint_auth_methods_supported": self.oauth_config["token_endpoint_auth_methods_supported"], # 令牌端点支持的认证方法
"code_challenge_methods_supported": self.oauth_config["code_challenge_methods_supported"], # 支持的PKCE方法
}
],
}
# 定义HTTP请求处理类
class MCPHTTPHandler(BaseHTTPRequestHandler):
# 初始化方法,接收mcp_server参数
def __init__(self, *args, mcp_server=None, **kwargs):
# 保存MCP服务器实例
self.mcp_server = mcp_server
# 调用父类的初始化方法
super().__init__(*args, **kwargs)
# 处理OAuth资源元数据端点
def _handle_resource_metadata(self):
try:
# 获取资源元数据
metadata = self.mcp_server.get_resource_metadata()
# 发送200响应
self.send_response(200)
# 设置响应头为application/json
self.send_header("Content-Type", "application/json")
# 结束响应头
self.end_headers()
# 写入JSON数据
self.wfile.write(json.dumps(metadata).encode("utf-8"))
except Exception as e:
# 记录错误日志
logger.error(f"处理资源元数据错误: {e}")
# 返回500错误
self.send_error(500, "Internal Server Error")
# 处理GET请求
def do_GET(self):
try:
# 如果请求路径为OAuth资源元数据端点
if self.path == "/.well-known/oauth-protected-resource":
self._handle_resource_metadata()
return
# 如果请求根路径,返回欢迎信息
if self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(b"Welcome to MCP server!")
return
except Exception as e:
# 处理GET请求出错,记录日志
logger.error(f"GET处理错误: {e}")
# 返回500错误
self.send_error(500, "Internal Server Error")
# 发送未授权响应
def _send_unauthorized_response(self):
"""发送未授权响应,包含WWW-Authenticate头"""
# 发送401未授权响应
self.send_response(401)
# 设置WWW-Authenticate响应头
self.send_header(
"WWW-Authenticate", 'Bearer realm="MCP Server", error="invalid_token"'
)
# 设置Content-Type为application/json
self.send_header("Content-Type", "application/json")
# 结束响应头
self.end_headers()
# 构造错误响应体
error_response = {
"error": "invalid_token",
"error_description": "The access token is invalid or expired",
}
# 写入错误响应体
self.wfile.write(json.dumps(error_response).encode("utf-8"))
# 验证授权
def _validate_authorization(self):
"""验证访问令牌"""
# 获取Authorization头
auth_header = self.headers.get("Authorization")
# 检查Authorization头是否存在且格式正确
if not auth_header or not auth_header.startswith("Bearer "):
# 记录警告
logger.warning("缺少Authorization头或格式不正确")
# 发送未授权响应
self._send_unauthorized_response()
return False
# 去掉 "Bearer " 前缀,获取token
token = auth_header[7:]
# 记录正在验证访问令牌
logger.info(f"正在验证访问令牌: {token[:16]}...")
# 调用MCPServer的validate_access_token方法验证token
payload = self.mcp_server.validate_access_token(token)
# 如果payload为None,说明验证失败
if not payload:
# 记录警告
logger.warning("访问令牌验证失败")
# 发送未授权响应
self._send_unauthorized_response()
return False
# 记录访问令牌验证成功
logger.info(f"访问令牌验证成功,客户端ID: {payload.get('client_id')}")
return True
# 处理MCP消息
def _handle_mcp_message(self, message, session_id: str = None):
"""处理MCP消息"""
# 获取方法名
method = message.get("method")
# 获取参数
params = message.get("params", {})
# 获取消息ID
msg_id = message.get("id")
# 判断方法类型
if method == "initialize":
# 调用服务器的handle_initialize方法
result = self.mcp_server.handle_initialize(params, session_id)
# 如果有消息ID,返回带ID的结果
return {"id": msg_id, "result": result} if msg_id else None
elif method == "tools/list":
# 处理工具列表请求
result = self.mcp_server.handle_tools_list(params, session_id)
return {"id": msg_id, "result": result} if msg_id else None
+ elif method == "tools/call":
# 处理工具调用请求
+ result = self.mcp_server.handle_tools_call(params, session_id)
+ return {"id": msg_id, "result": result} if msg_id else None
else:
# 如果方法不存在,返回错误
if msg_id:
return {
"id": msg_id,
"error": {"code": -32601, "message": f"Method not found: {method}"},
}
# 没有ID则返回None
return None
# 处理POST请求(发送消息到服务器)
def do_POST(self):
"""处理POST请求(发送消息到服务器)"""
try:
# 检查请求路径是否为/mcp
if self.path != "/mcp":
self.send_error(404, "MCP endpoint not found")
return
# 检查Accept头是否包含application/json或text/event-stream
accept_header = self.headers.get("Accept", "")
if (
"application/json" not in accept_header
and "text/event-stream" not in accept_header
):
self.send_error(400, "Missing required Accept header")
return
# 验证访问令牌(除了初始化请求)
if self.path == "/mcp":
# 先读取请求体进行令牌验证
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
try:
# 解码请求体为JSON
message = json.loads(body.decode("utf-8"))
# 如果不是初始化请求,则验证令牌
if message and message.get("method") != "initialize":
if not self._validate_authorization():
return
except json.JSONDecodeError:
# 请求体不是有效的JSON,返回400
self.send_error(400, "Invalid JSON")
return
# 检查MCP-Protocol-Version头是否匹配
protocol_version = self.headers.get("MCP-Protocol-Version")
if (
protocol_version
and protocol_version != self.mcp_server.protocol_version
):
# 协议版本不支持,返回400
self.send_error(
400, f"Unsupported protocol version: {protocol_version}"
)
return
# 获取会话ID
session_id = self.headers.get("Mcp-Session-Id")
# 请求体已经在上面读取过了,直接使用
# 处理MCP消息
response = self._handle_mcp_message(message, session_id)
# 发送响应
if response:
# 发送200响应码
self.send_response(200)
# 设置响应头Content-Type为application/json
self.send_header("Content-Type", "application/json")
# 设置协议版本头
self.send_header(
"MCP-Protocol-Version", self.mcp_server.protocol_version
)
# 如果响应中包含会话ID,则添加到响应头
if "result" in response and "sessionId" in response["result"]:
self.send_header("Mcp-Session-Id", response["result"]["sessionId"])
# 结束响应头
self.end_headers()
# 写入JSON响应体
self.wfile.write(json.dumps(response).encode("utf-8"))
else:
# 没有响应内容,返回202 Accepted
self.send_response(202) # Accepted
self.end_headers()
except Exception as e:
# 记录错误日志
logger.error(f"POST处理错误: {e}")
# 返回500内部服务器错误
self.send_error(500, "Internal Server Error")
# 定义运行HTTP服务器的函数
def run_http_server(mcp_server: MCPServer, host: str = "localhost", port: int = 8000):
"""运行HTTP服务器"""
# 定义处理器工厂函数,用于传递mcp_server实例
def handler_factory(*args, **kwargs):
return MCPHTTPHandler(*args, mcp_server=mcp_server, **kwargs)
# 创建HTTPServer实例,绑定主机和端口
server = HTTPServer((host, port), handler_factory)
# 记录服务器启动信息
logger.info(f"HTTP服务器运行在 http://{host}:{port}/mcp")
try:
# 启动服务器,进入循环监听请求
server.serve_forever()
except KeyboardInterrupt:
# 捕获Ctrl+C,记录服务器停止
logger.info("HTTP服务器已停止")
finally:
# 关闭服务器
server.server_close()
# 定义主函数
def main():
"""主函数"""
# 导入argparse模块,用于解析命令行参数
import argparse
# 创建ArgumentParser对象,设置描述信息
parser = argparse.ArgumentParser(description="MCP HTTP服务器")
# 添加--host参数,指定服务器主机,默认localhost
parser.add_argument(
"--host", default="localhost", help="HTTP服务器主机 (默认: localhost)"
)
# 添加--port参数,指定服务器端口,默认8000
parser.add_argument(
"--port", type=int, default=8000, help="HTTP服务器端口 (默认: 8000)"
)
# 解析命令行参数
args = parser.parse_args()
# 创建MCP服务器实例
mcp_server = MCPServer()
# 运行HTTP服务器
run_http_server(mcp_server, args.host, args.port)
# 判断是否为主程序入口
if __name__ == "__main__":
# 调用主函数
main()