ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1. AsyncExitStack是什么?
  • 2. 为什么要用它?它解决了什么问题?
  • 3. 核心概念和工作原理
    • 3.1 主要方法:
  • 4. 基本使用模式
    • 4.1 模式一:使用 async with(推荐)
    • 4.2 模式二:手动管理
  • 5. 实用示例
    • 5.1 示例1:管理多个动态资源
    • 5.2 示例2:条件性资源管理
    • 5.3 示例3:处理资源获取失败的情况
  • 6. 高级用法:回调函数管理
  • 7. 与同步版本的区别
    • 7.1 同步版本示例对比
    • 7.2 混合使用场景
  • 8. 最佳实践
    • 8.1 核心最佳实践
    • 8.2 常见陷阱和解决方案
  • 9. 总结

1. AsyncExitStack是什么? #

AsyncExitStack 是 Python 标准库 contextlib 模块中的一个类,专门用于异步环境。它是一个后进先出(LIFO)的栈结构,用于管理多个异步上下文管理器的退出操作。

简单来说,它允许你:

  • 动态地添加多个异步上下文管理器
  • 按顺序(后进先出)确保所有资源都被正确清理
  • 灵活地处理不确定数量的资源

2. 为什么要用它?它解决了什么问题? #

在异步编程中,你经常会遇到需要同时管理多个资源的情况:

async def process_data():
    async with get_db_connection() as conn:
        async with get_file_lock() as lock:
            async with get_http_session() as session:
                # 处理数据...
                pass

这种嵌套的 async with 语句有几个问题:

  1. 代码嵌套深:资源越多,嵌套越深,代码可读性差
  2. 不够灵活:需要管理的资源数量在编写代码时就必须确定
  3. 难以处理动态资源:无法根据运行时条件动态添加资源

AsyncExitStack 完美解决了这些问题。

3. 核心概念和工作原理 #

AsyncExitStack 的核心思想是:先获取所有需要的资源,然后统一管理它们的退出。

3.1 主要方法: #

  1. enter_async_context(cm):

    • 进入一个异步上下文管理器,并将其退出回调压入栈中
    • 返回上下文管理器 __aenter__ 方法的结果
    • 相当于 async with cm as result: 中的 result
  2. push_async_exit(callback):

    • 直接将一个异步退出回调函数压入栈中
    • 回调函数应该接受异常信息并返回 awaitable
  3. `push_async_callback(callback, *args, kwds)`**:

    • 将一个普通函数包装成异步退出回调压入栈中
  4. aclose():

    • 异步关闭整个栈,执行所有退出回调(后进先出顺序)
    • 通常用 async with 来自动调用

4. 基本使用模式 #

4.1 模式一:使用 async with(推荐) #

这个模式使用 async with 语句来自动管理 AsyncExitStack 的生命周期,是最推荐的使用方式。

# 导入必要的模块
import asyncio
from contextlib import AsyncExitStack

# 模拟异步上下文管理器
class MockDatabaseConnection:
    """模拟数据库连接类"""
    def __init__(self, name):
        self.name = name

    async def __aenter__(self):
        # 模拟连接数据库
        print(f"连接到数据库: {self.name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 模拟关闭数据库连接
        print(f"关闭数据库连接: {self.name}")

class MockFileLock:
    """模拟文件锁类"""
    def __init__(self, filename):
        self.filename = filename

    async def __aenter__(self):
        # 模拟获取文件锁
        print(f"获取文件锁: {self.filename}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 模拟释放文件锁
        print(f"释放文件锁: {self.filename}")

class MockHttpSession:
    """模拟HTTP会话类"""
    def __init__(self, url):
        self.url = url

    async def __aenter__(self):
        # 模拟创建HTTP会话
        print(f"创建HTTP会话: {self.url}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 模拟关闭HTTP会话
        print(f"关闭HTTP会话: {self.url}")

# 模拟获取资源的函数
async def get_db_connection():
    """获取数据库连接"""
    return MockDatabaseConnection("main_db")

async def get_file_lock():
    """获取文件锁"""
    return MockFileLock("data.txt")

async def get_http_session():
    """获取HTTP会话"""
    return MockHttpSession("https://api.example.com")

async def process_data(conn, lock, session):
    """处理数据的函数"""
    print(f"使用资源处理数据: {conn.name}, {lock.filename}, {session.url}")
    await asyncio.sleep(0.1)  # 模拟数据处理
    return "数据处理完成"

async def complex_operation():
    """使用AsyncExitStack的复杂操作示例"""
    # 使用async with自动管理AsyncExitStack
    async with AsyncExitStack() as stack:
        # 动态添加数据库连接资源
        conn = await stack.enter_async_context(get_db_connection())
        # 动态添加文件锁资源
        lock = await stack.enter_async_context(get_file_lock())
        # 动态添加HTTP会话资源
        session = await stack.enter_async_context(get_http_session())

        # 使用所有资源处理数据
        result = await process_data(conn, lock, session)

    # 退出时会自动清理所有资源(后进先出顺序)
    return result

# 主函数
async def main():
    """主函数,运行示例"""
    print("=== AsyncExitStack 示例 ===")
    result = await complex_operation()
    print(f"操作结果: {result}")

# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
    asyncio.run(main())

4.2 模式二:手动管理 #

这个模式手动创建和管理AsyncExitStack,适用于需要更精细控制的情况。

# 导入必要的模块
import asyncio
from contextlib import AsyncExitStack

# 模拟异步上下文管理器
class MockResource:
    """模拟资源类"""
    def __init__(self, name):
        self.name = name

    async def __aenter__(self):
        # 模拟获取资源
        print(f"获取资源: {self.name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 模拟释放资源
        print(f"释放资源: {self.name}")

# 模拟获取资源的函数
async def get_resource(name):
    """获取资源"""
    return MockResource(name)

async def do_something(resource1, resource2):
    """使用资源执行某些操作"""
    print(f"使用资源执行操作: {resource1.name} 和 {resource2.name}")
    await asyncio.sleep(0.1)  # 模拟操作执行
    return "操作执行完成"

async def complex_operation_manual():
    """手动管理AsyncExitStack的复杂操作示例"""
    # 手动创建AsyncExitStack实例
    stack = AsyncExitStack()
    try:
        # 获取第一个资源
        resource1 = await stack.enter_async_context(get_resource("数据库连接"))
        # 获取第二个资源
        resource2 = await stack.enter_async_context(get_resource("文件锁"))

        # 使用资源执行业务逻辑
        result = await do_something(resource1, resource2)
        return result
    finally:
        # 手动关闭栈,确保所有资源都被清理
        await stack.aclose()

# 主函数
async def main_manual():
    """主函数,运行手动管理示例"""
    print("=== 手动管理 AsyncExitStack 示例 ===")
    result = await complex_operation_manual()
    print(f"操作结果: {result}")

# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
    asyncio.run(main_manual())

5. 实用示例 #

5.1 示例1:管理多个动态资源 #

这个示例展示了如何为多个用户动态创建数据库连接和锁,展示了AsyncExitStack处理动态数量资源的强大能力。

# 导入必要的模块
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager

# 使用装饰器创建异步上下文管理器
@asynccontextmanager
async def get_database_connection(db_name):
    """模拟获取数据库连接的异步上下文管理器"""
    # 模拟连接数据库
    print(f"连接到数据库: {db_name}...")
    # 返回连接对象
    yield f"connection_{db_name}"
    # 模拟断开数据库连接
    print(f"断开数据库连接: {db_name}")

@asynccontextmanager
async def acquire_lock(lock_name):
    """模拟获取锁的异步上下文管理器"""
    # 模拟获取锁
    print(f"获取锁: {lock_name}...")
    # 返回锁对象
    yield f"lock_{lock_name}"
    # 模拟释放锁
    print(f"释放锁: {lock_name}")

async def process_user_data(user_ids):
    """为多个用户动态创建数据库连接和锁"""
    # 存储处理结果
    results = []

    # 使用AsyncExitStack自动管理资源
    async with AsyncExitStack() as stack:
        # 存储所有获取的资源
        resources = {}

        # 为每个用户动态创建资源
        for user_id in user_ids:
            # 为每个用户创建独立的数据库连接
            conn = await stack.enter_async_context(
                get_database_connection(f"user_db_{user_id}")
            )
            # 为每个用户创建独立的锁
            lock = await stack.enter_async_context(
                acquire_lock(f"user_lock_{user_id}")
            )
            # 将资源存储到字典中
            resources[user_id] = (conn, lock)

        # 使用所有资源进行处理
        for user_id, (conn, lock) in resources.items():
            # 模拟处理用户数据
            print(f"处理用户 {user_id},使用 {conn} 和 {lock}")
            await asyncio.sleep(0.1)  # 模拟处理时间
            results.append(f"processed_{user_id}")

    # 退出时会自动关闭所有资源(按后进先出顺序)
    return results

# 主函数
async def main_example1():
    """运行示例1的主函数"""
    print("=== 示例1:管理多个动态资源 ===")
    # 定义要处理的用户ID列表
    user_ids = [1, 2, 3, 4]
    # 处理用户数据
    results = await process_user_data(user_ids)
    print("处理结果:", results)

# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
    asyncio.run(main_example1())

5.2 示例2:条件性资源管理 #

这个示例展示了如何根据运行时参数动态决定是否启用某些功能,展示了AsyncExitStack的灵活性。

# 导入必要的模块
import asyncio
from contextlib import AsyncExitStack

# 模拟数据库连接类
class MockDatabaseConnection:
    """模拟数据库连接"""
    def __init__(self, name):
        self.name = name

    async def __aenter__(self):
        print(f"连接到数据库: {self.name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"断开数据库连接: {self.name}")

# 模拟压缩器类
class MockCompressor:
    """模拟压缩器"""
    def __init__(self):
        self.name = "压缩器"

    async def __aenter__(self):
        print(f"启动{self.name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"关闭{self.name}")

# 模拟加密器类
class MockEncryptor:
    """模拟加密器"""
    def __init__(self):
        self.name = "加密器"

    async def __aenter__(self):
        print(f"启动{self.name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"关闭{self.name}")

# 模拟获取资源的函数
async def get_database_connection():
    """获取数据库连接"""
    return MockDatabaseConnection("backup_db")

async def get_compressor():
    """获取压缩器"""
    return MockCompressor()

async def get_encryptor():
    """获取加密器"""
    return MockEncryptor()

async def perform_backup(db_conn, compressor, encryptor):
    """执行备份操作"""
    print("开始执行备份...")
    # 模拟备份过程
    await asyncio.sleep(0.2)
    print("备份完成")

async def backup_database(enable_compression=True, enable_encryption=False):
    """备份数据库,根据参数动态启用不同功能"""

    # 使用AsyncExitStack管理资源
    async with AsyncExitStack() as stack:
        # 必须的资源:数据库连接
        db_conn = await stack.enter_async_context(get_database_connection())

        # 条件性资源:压缩器(根据参数决定是否启用)
        compressor = None
        if enable_compression:
            # 如果启用压缩,获取压缩器资源
            compressor = await stack.enter_async_context(get_compressor())
            print("压缩功能已启用")

        # 条件性资源:加密器(根据参数决定是否启用)
        encryptor = None
        if enable_encryption:
            # 如果启用加密,获取加密器资源
            encryptor = await stack.enter_async_context(get_encryptor())
            print("加密功能已启用")

        # 执行备份操作
        print("开始备份...")
        await perform_backup(db_conn, compressor, encryptor)
        print("备份完成")

    # 无论条件如何,所有已获取的资源都会被正确清理

# 主函数
async def main_example2():
    """运行示例2的主函数"""
    print("=== 示例2:条件性资源管理 ===")

    print("\n--- 启用压缩,不启用加密 ---")
    await backup_database(enable_compression=True, enable_encryption=False)

    print("\n--- 启用压缩和加密 ---")
    await backup_database(enable_compression=True, enable_encryption=True)

    print("\n--- 不启用压缩和加密 ---")
    await backup_database(enable_compression=False, enable_encryption=False)

# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
    asyncio.run(main_example2())

5.3 示例3:处理资源获取失败的情况 #

这个示例展示了如何健壮地处理资源获取失败的情况,确保即使部分资源获取失败,已获取的资源也能被正确清理。

# 导入必要的模块
import asyncio
from contextlib import AsyncExitStack

# 模拟数据库连接类
class MockDatabaseConnection:
    """模拟数据库连接"""
    def __init__(self, name):
        self.name = name

    async def __aenter__(self):
        print(f"连接到数据库: {self.name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"断开数据库连接: {self.name}")

# 模拟网络会话类
class MockNetworkSession:
    """模拟网络会话"""
    def __init__(self, url):
        self.url = url

    async def __aenter__(self):
        print(f"创建网络会话: {self.url}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"关闭网络会话: {self.url}")

# 模拟文件处理器类
class MockFileHandler:
    """模拟文件处理器"""
    def __init__(self, filename):
        self.filename = filename

    async def __aenter__(self):
        print(f"打开文件: {self.filename}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"关闭文件: {self.filename}")

# 模拟获取资源的函数
async def get_db_connection():
    """获取数据库连接"""
    return MockDatabaseConnection("main_db")

async def get_network_session():
    """获取网络会话"""
    return MockNetworkSession("https://api.example.com")

async def get_file_handler():
    """获取文件处理器"""
    return MockFileHandler("data.txt")

async def process_data(resources):
    """使用资源处理数据"""
    print(f"使用 {len(resources)} 个资源处理数据")
    await asyncio.sleep(0.1)  # 模拟处理时间
    return "数据处理完成"

async def robust_data_processing():
    """健壮的数据处理,即使部分资源获取失败也能正确清理"""
    # 创建AsyncExitStack实例
    stack = AsyncExitStack()
    # 存储成功获取的资源
    acquired_resources = []

    try:
        # 定义要尝试获取的资源列表
        resources_to_acquire = [
            get_db_connection(),
            get_network_session(),
            get_file_handler()
        ]

        # 尝试获取每个资源
        for resource_cm in resources_to_acquire:
            try:
                # 尝试获取资源
                resource = await stack.enter_async_context(resource_cm)
                # 如果成功,添加到已获取资源列表
                acquired_resources.append(resource)
            except Exception as e:
                # 如果某个资源获取失败,打印错误信息并继续尝试其他的
                print(f"获取资源失败: {e}")
                continue

        # 检查是否获取了足够的资源
        if len(acquired_resources) < 2:
            raise RuntimeError("获取的资源不足,至少需要2个资源")

        # 使用成功获取的资源处理数据
        return await process_data(acquired_resources)

    finally:
        # 确保所有已获取的资源都被清理
        await stack.aclose()

# 主函数
async def main_example3():
    """运行示例3的主函数"""
    print("=== 示例3:处理资源获取失败的情况 ===")

    try:
        # 执行健壮的数据处理
        result = await robust_data_processing()
        print(f"处理结果: {result}")
    except Exception as e:
        print(f"处理过程中出现错误: {e}")

# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
    asyncio.run(main_example3())

6. 高级用法:回调函数管理 #

这个部分展示了如何使用AsyncExitStack的高级功能,包括直接管理回调函数和自定义清理逻辑。

# 导入必要的模块
import asyncio
from contextlib import AsyncExitStack

# 模拟异步清理函数
async def cleanup_async():
    """异步清理函数"""
    print("执行异步清理任务...")
    await asyncio.sleep(0.1)  # 模拟异步清理操作
    print("异步清理完成")

# 模拟同步清理函数
def cleanup_sync(arg1, arg2):
    """同步清理函数"""
    print(f"执行同步清理任务,参数: {arg1}, {arg2}")
    print("同步清理完成")

# 模拟特定的清理任务
async def specific_cleanup_task():
    """特定的清理任务"""
    print("执行特定的清理任务...")
    await asyncio.sleep(0.1)  # 模拟清理时间
    print("特定清理任务完成")

# 模拟工作函数
async def do_work():
    """执行主要工作"""
    print("开始执行主要工作...")
    await asyncio.sleep(0.2)  # 模拟工作时间
    print("主要工作完成")
    return "工作结果"

async def complex_operation_with_callbacks():
    """使用AsyncExitStack管理回调函数的复杂操作示例"""
    # 使用AsyncExitStack自动管理资源
    async with AsyncExitStack() as stack:
        # 添加异步退出回调函数
        await stack.push_async_exit(lambda *args: cleanup_async())

        # 添加同步回调函数(会自动包装成异步)
        # 传递位置参数和关键字参数
        stack.push_async_callback(cleanup_sync, "参数1", arg2="参数2")

        # 添加自定义的异步退出回调函数
        async def custom_cleanup(exc_type, exc_val, exc_tb):
            """自定义清理函数,接收异常信息"""
            if exc_type is not None:
                print(f"清理时发现异常: {exc_val}")
            else:
                print("正常清理,无异常")
            # 执行特定的清理任务
            await specific_cleanup_task()

        # 将自定义清理函数添加到栈中
        stack.push_async_exit(custom_cleanup)

        # 执行业务逻辑
        result = await do_work()
        return result

# 主函数
async def main_example6():
    """运行示例6的主函数"""
    print("=== 示例6:高级用法 - 回调函数管理 ===")

    try:
        # 执行复杂操作
        result = await complex_operation_with_callbacks()
        print(f"操作结果: {result}")
    except Exception as e:
        print(f"操作过程中出现错误: {e}")

# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
    asyncio.run(main_example6())

7. 与同步版本的区别 #

这个部分详细说明了AsyncExitStack与同步版本ExitStack的区别,帮助开发者理解何时使用哪个版本。

特性 AsyncExitStack ExitStack (同步)
方法前缀 enter_async_context() enter_context()
关闭方法 aclose() close()
回调推送 push_async_exit() push()
使用方式 async with with
适用场景 异步函数和协程 同步函数
资源类型 异步上下文管理器 同步上下文管理器

7.1 同步版本示例对比 #

为了更好地理解区别,这里提供一个同步版本的对比示例:

# 导入必要的模块
from contextlib import ExitStack

# 模拟同步上下文管理器
class MockFile:
    """模拟文件类"""
    def __init__(self, filename):
        self.filename = filename

    def __enter__(self):
        print(f"打开文件: {self.filename}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f"关闭文件: {self.filename}")

class MockLock:
    """模拟锁类"""
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        print(f"获取锁: {self.name}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f"释放锁: {self.name}")

# 模拟获取同步资源的函数
def get_file(filename):
    """获取文件对象"""
    return MockFile(filename)

def get_lock(name):
    """获取锁对象"""
    return MockLock(name)

def process_data_sync(file_obj, lock_obj):
    """同步处理数据"""
    print(f"使用文件 {file_obj.filename} 和锁 {lock_obj.name} 处理数据")
    return "同步处理完成"

def sync_operation():
    """使用同步ExitStack的操作示例"""
    # 使用同步ExitStack管理资源
    with ExitStack() as stack:
        # 获取文件资源
        file_obj = stack.enter_context(get_file("data.txt"))
        # 获取锁资源
        lock_obj = stack.enter_context(get_lock("data_lock"))

        # 处理数据
        result = process_data_sync(file_obj, lock_obj)

    # 退出时自动清理所有资源
    return result

# 主函数
def main_sync_example():
    """运行同步示例的主函数"""
    print("=== 同步 ExitStack 示例 ===")
    result = sync_operation()
    print(f"操作结果: {result}")

# 运行同步示例
if __name__ == "__main__":
    main_sync_example()

7.2 混合使用场景 #

在某些情况下,你可能需要在异步代码中同时管理同步和异步资源:

# 导入必要的模块
import asyncio
from contextlib import AsyncExitStack, ExitStack

# 模拟异步资源
class AsyncResource:
    """异步资源"""
    def __init__(self, name):
        self.name = name

    async def __aenter__(self):
        print(f"异步获取资源: {self.name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"异步释放资源: {self.name}")

# 模拟同步资源
class SyncResource:
    """同步资源"""
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        print(f"同步获取资源: {self.name}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f"同步释放资源: {self.name}")

async def mixed_resource_management():
    """混合管理同步和异步资源的示例"""
    # 使用AsyncExitStack管理异步资源
    async with AsyncExitStack() as async_stack:
        # 获取异步资源
        async_resource = await async_stack.enter_async_context(AsyncResource("数据库连接"))

        # 在异步上下文中使用同步ExitStack管理同步资源
        with ExitStack() as sync_stack:
            # 获取同步资源
            sync_resource = sync_stack.enter_context(SyncResource("文件锁"))

            # 使用所有资源
            print(f"使用异步资源: {async_resource.name}")
            print(f"使用同步资源: {sync_resource.name}")

            # 模拟工作
            await asyncio.sleep(0.1)
            print("混合资源管理完成")

# 主函数
async def main_mixed_example():
    """运行混合示例的主函数"""
    print("=== 混合资源管理示例 ===")
    await mixed_resource_management()

# 运行混合示例(如果在Windows命令行中运行)
if __name__ == "__main__":
    asyncio.run(main_mixed_example())

8. 最佳实践 #

这个部分总结了使用AsyncExitStack的最佳实践,帮助开发者避免常见陷阱并写出更健壮的代码。

8.1 核心最佳实践 #

# 导入必要的模块
import asyncio
from contextlib import AsyncExitStack

# 模拟资源类
class MockResource:
    """模拟资源类"""
    def __init__(self, name, should_fail=False):
        self.name = name
        self.should_fail = should_fail

    async def __aenter__(self):
        if self.should_fail:
            raise RuntimeError(f"资源 {self.name} 获取失败")
        print(f"成功获取资源: {self.name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"释放资源: {self.name}")

# 模拟获取资源的函数
async def get_resource(name, should_fail=False):
    """获取指定名称的资源"""
    return MockResource(name, should_fail)

async def process_data(resources):
    """处理数据"""
    print(f"使用 {len(resources)} 个资源处理数据")
    await asyncio.sleep(0.1)
    return "数据处理完成"

async def best_practice_example():
    """展示最佳实践的示例"""

    # 最佳实践1:优先使用 async with 模式
    async with AsyncExitStack() as stack:
        acquired_resources = []

        # 最佳实践2:处理获取异常
        resources_to_acquire = [
            ("数据库连接", False),      # 正常获取
            ("文件锁", False),          # 正常获取
            ("网络连接", True),         # 模拟获取失败
            ("缓存连接", False)         # 正常获取
        ]

        for resource_name, should_fail in resources_to_acquire:
            try:
                # 尝试获取资源
                resource = await stack.enter_async_context(
                    get_resource(resource_name, should_fail)
                )
                acquired_resources.append(resource)
                print(f"成功获取资源: {resource_name}")
            except Exception as e:
                print(f"获取资源 {resource_name} 失败: {e}")
                # 继续尝试获取其他资源
                continue

        # 最佳实践3:注意执行顺序(后进先出)
        print(f"资源获取顺序: {[r.name for r in acquired_resources]}")
        print("退出时资源释放顺序将是: ", end="")
        print([r.name for r in reversed(acquired_resources)])

        # 最佳实践4:资源验证
        if len(acquired_resources) < 2:
            raise RuntimeError("获取的资源不足,至少需要2个资源")

        # 使用资源处理数据
        result = await process_data(acquired_resources)
        return result

# 主函数
async def main_best_practice():
    """运行最佳实践示例的主函数"""
    print("=== 最佳实践示例 ===")

    try:
        result = await best_practice_example()
        print(f"操作结果: {result}")
    except Exception as e:
        print(f"操作失败: {e}")

# 运行最佳实践示例(如果在Windows命令行中运行)
if __name__ == "__main__":
    asyncio.run(main_best_practice())

8.2 常见陷阱和解决方案 #

# 导入必要的模块
import asyncio
from contextlib import AsyncExitStack

# 模拟资源类
class MockResource:
    """模拟资源类"""
    def __init__(self, name):
        self.name = name

    async def __aenter__(self):
        print(f"获取资源: {self.name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"释放资源: {self.name}")

async def get_resource(name):
    """获取资源"""
    return MockResource(name)

async def work_with_resource(resource):
    """使用资源工作"""
    print(f"使用资源 {resource.name} 工作")
    await asyncio.sleep(0.1)

# 陷阱1:忘记调用 aclose()
async def trap1_forget_aclose():
    """陷阱1:忘记调用 aclose()"""
    print("=== 陷阱1:忘记调用 aclose() ===")

    # 错误示例:忘记调用 aclose()
    stack = AsyncExitStack()
    try:
        resource = await stack.enter_async_context(get_resource("资源1"))
        await work_with_resource(resource)
        # 忘记调用 await stack.aclose()
        return "操作完成"
    except Exception as e:
        print(f"错误: {e}")
        # 即使在这里也忘记调用 aclose()
        raise

# 解决方案1:使用 async with
async def solution1_use_async_with():
    """解决方案1:使用 async with"""
    print("=== 解决方案1:使用 async with ===")

    async with AsyncExitStack() as stack:
        resource = await stack.enter_async_context(get_resource("资源1"))
        await work_with_resource(resource)
        return "操作完成"

# 陷阱2:异常处理不当
async def trap2_bad_exception_handling():
    """陷阱2:异常处理不当"""
    print("=== 陷阱2:异常处理不当 ===")

    stack = AsyncExitStack()
    try:
        # 获取第一个资源
        resource1 = await stack.enter_async_context(get_resource("资源1"))

        # 模拟异常
        raise RuntimeError("模拟的业务逻辑异常")

    except Exception as e:
        print(f"捕获到异常: {e}")
        # 错误:没有调用 aclose(),资源可能泄漏
        raise
    finally:
        # 正确:在 finally 中调用 aclose()
        await stack.aclose()

# 解决方案2:正确的异常处理
async def solution2_proper_exception_handling():
    """解决方案2:正确的异常处理"""
    print("=== 解决方案2:正确的异常处理 ===")

    stack = AsyncExitStack()
    try:
        resource1 = await stack.enter_async_context(get_resource("资源1"))
        raise RuntimeError("模拟的业务逻辑异常")
    except Exception as e:
        print(f"捕获到异常: {e}")
        raise
    finally:
        # 确保资源被清理
        await stack.aclose()

# 主函数
async def main_traps_and_solutions():
    """运行陷阱和解决方案示例的主函数"""
    print("=== 常见陷阱和解决方案 ===")

    # 演示陷阱1
    try:
        await trap1_forget_aclose()
    except Exception as e:
        print(f"陷阱1演示完成: {e}")

    print("\n" + "="*50 + "\n")

    # 演示解决方案1
    try:
        result = await solution1_use_async_with()
        print(f"解决方案1结果: {result}")
    except Exception as e:
        print(f"解决方案1错误: {e}")

    print("\n" + "="*50 + "\n")

    # 演示陷阱2
    try:
        await trap2_bad_exception_handling()
    except Exception as e:
        print(f"陷阱2演示完成: {e}")

    print("\n" + "="*50 + "\n")

    # 演示解决方案2
    try:
        await solution2_proper_exception_handling()
    except Exception as e:
        print(f"解决方案2演示完成: {e}")

# 运行陷阱和解决方案示例(如果在Windows命令行中运行)
if __name__ == "__main__":
    asyncio.run(main_traps_and_solutions())

9. 总结 #

AsyncExitStack 是 Python 异步编程中管理多个资源的强大工具。通过本文档的学习,你应该能够:

  1. 理解核心概念:AsyncExitStack 是一个后进先出的栈结构,用于管理异步上下文管理器
  2. 掌握基本用法:学会使用 async with 和手动管理两种模式
  3. 应用实用技巧:处理动态资源、条件性资源管理和异常情况
  4. 避免常见陷阱:正确使用 aclose() 和异常处理
  5. 遵循最佳实践:写出健壮、可维护的异步代码

记住,AsyncExitStack 的核心价值在于:

  • 灵活性:可以动态添加不确定数量的资源
  • 安全性:确保所有资源都能被正确清理
  • 可读性:避免深层嵌套的 async with 语句
  • 健壮性:即使在异常情况下也能正确清理资源

在实际开发中,当你需要管理多个异步资源时,AsyncExitStack 通常是比嵌套 async with 语句更好的选择。

访问验证

请输入访问令牌

Token不正确,请重新输入