1. asynccontextmanager是什么? #
contextlib.asynccontextmanager 是 Python 标准库 contextlib 模块中的一个装饰器。它的作用是让你能够轻松地为自己编写的异步生成器函数创建一个异步上下文管理器(Asynchronous Context Manager)。
简单来说,它把:
- 一个内部包含
yield语句的async def函数(即异步生成器) - 转换成一个能用于
async with语句的对象。
2. 为什么要用它?它解决了什么问题? #
在异步编程中,我们经常需要管理资源,比如数据库连接、网络连接、文件锁等。这些资源的获取和释放通常是异步操作(例如 async with aiohttp.ClientSession() as session:)。
有时候,我们需要创建自定义的、复杂的资源管理逻辑。手动实现 __aenter__ 和 __aexit__ 方法来创建一个异步上下文管理器会比较繁琐和容易出错。
@asynccontextmanager 装饰器通过一个更简洁、更直观的生成器模式来解决这个问题。你只需要关注“在 yield 之前设置资源,在 yield 之后清理资源”,装饰器会自动帮你处理好上下文管理器的协议。
3. 核心概念和工作原理 #
- 装饰的函数:必须是一个异步生成器函数(
async def并且函数体内包含yield)。 yield语句:这是整个模式的核心。yield之前的代码相当于__aenter__方法,用于获取和设置资源。这部分代码会在进入async with块时执行。yield语句本身提供一个值,这个值会被赋给async with语句中as后面的变量。yield之后的代码相当于__aexit__方法,用于清理和释放资源。无论async with块内部是正常结束还是发生了异常,这部分代码都保证会执行。
3.1 执行流程 #
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def my_async_context():
# 1. __aenter__ 部分:进入 async with 时执行
print("Entering context")
resource = await acquire_async_resource() # 模拟异步获取资源
try:
# 2. yield:将资源提供给 async with 块使用
yield resource
# 4. 如果 async with 块正常结束,代码会从这里继续
print("Exiting normally")
except Exception as e:
# 5. 如果 async with 块发生异常,代码会从这里继续
print(f"Exiting with exception: {e}")
# 可以选择是否处理异常
# 如果不想抑制异常,需要重新抛出
raise
finally:
# 3. __aexit__ 部分:无论如何都会执行清理
print("Cleaning up")
await release_async_resource(resource) # 模拟异步释放资源
async def main():
# 使用上下文管理器
async with my_async_context() as r:
# 3. 这里正在使用资源 ‘r’
print(f"Using resource: {r}")
# 可以在这里进行各种异步操作
# 如果这里发生异常,会被上面的 except 捕获
asyncio.run(main())输出结果:
Entering context
Using resource: <the resource object>
Exiting normally
Cleaning up如果 async with 块内发生异常,输出可能为:
Entering context
Using resource: <the resource object>
Exiting with exception: Something went wrong!
Cleaning up
Traceback (most recent call last):
... # 异常信息依然会打印4. 一个更实用的示例:模拟数据库连接池 #
假设我们有一个简单的异步数据库连接池。
import asyncio
from contextlib import asynccontextmanager
# 模拟一个异步连接池
class AsyncConnectionPool:
def __init__(self, size):
self.size = size
self.connections = []
async def warmup(self):
"""预热连接池"""
print("Warming up connection pool...")
for i in range(self.size):
# 模拟异步创建连接
self.connections.append(f"Connection-{i}")
print("Pool is ready!")
async def close(self):
"""关闭所有连接"""
print("Closing all connections...")
self.connections.clear()
print("Pool is closed.")
async def get_connection(self):
"""从池中获取一个连接(模拟)"""
if not self.connections:
raise Exception("No available connections!")
# 模拟异步获取(例如等待一个可用的信号量)
await asyncio.sleep(0.1)
return self.connections.pop()
async def return_connection(self, conn):
"""将连接归还给池(模拟)"""
# 模拟异步归还
await asyncio.sleep(0.1)
self.connections.append(conn)
# 创建全局连接池
pool = AsyncConnectionPool(2)
@asynccontextmanager
async def get_db_connection():
"""
一个异步上下文管理器,用于从全局池中获取和归还数据库连接。
"""
conn = None
try:
# 1. __aenter__: 获取连接
conn = await pool.get_connection()
print(f"Acquired connection: {conn}")
# 2. yield: 将连接提供给 with 块使用
yield conn
# 4. 如果 with 块成功,归还连接
print("Operation succeeded, returning connection.")
await pool.return_connection(conn)
except Exception as e:
# 5. 如果 with 块失败,也要归还连接,但可能记录日志等
print(f"Operation failed: {e}. Returning connection.")
if conn:
await pool.return_connection(conn)
# 重新抛出异常,不抑制它
raise
# finally 块在这里被 asynccontextmanager 隐式处理了
async def perform_database_operation(query):
"""
使用上面定义的上下文管理器来执行数据库操作
"""
async with get_db_connection() as connection:
# 在这里使用连接执行操作
print(f"Executing query '{query}' with {connection}")
# 模拟一个可能成功也可能失败的异步操作
await asyncio.sleep(0.5)
if "fail" in query:
raise RuntimeError("Database operation failed!")
return "SUCCESS"
async def main():
# 首先预热连接池
await pool.warmup()
# 执行一些成功的操作
try:
result = await perform_database_operation("SELECT * FROM users")
print(f"Result: {result}")
except Exception as e:
print(f"Caught error: {e}")
print("----")
# 执行一个会失败的操作
try:
result = await perform_database_operation("INSERT INTO users FAIL")
print(f"Result: {result}")
except Exception as e:
print(f"Caught error: {e}")
# 最后关闭连接池
await pool.close()
asyncio.run(main())输出结果:
Warming up connection pool...
Pool is ready!
Acquired connection: Connection-0
Executing query 'SELECT * FROM users' with Connection-0
Operation succeeded, returning connection.
Result: SUCCESS
----
Acquired connection: Connection-0
Executing query 'INSERT INTO users FAIL' with Connection-0
Operation failed: Database operation failed!. Returning connection.
Caught error: Database operation failed!
Closing all connections...
Pool is closed.5. 关键要点和最佳实践 #
- 异常处理:
yield语句被包裹在try...except...finally结构中至关重要。这确保了即使在async with块内发生异常,清理代码也能被执行。 - 不要抑制异常:默认情况下,如果你在
except块中只是打印日志而没有重新抛出(raise)异常,上下文管理器会抑制这个异常。大多数情况下,你应该重新抛出异常,让调用者知道发生了什么错误。 - 资源安全:
yield只能发生一次。这个装饰器的设计初衷就是管理单个资源的进入和退出。 - 它是异步的:与同步版本的
@contextmanager不同,@asynccontextmanager装饰的函数和其中所有的await调用都是异步的,必须在异步环境中使用。