1. aiohttp是什么? #
aiohttp 是一个基于 asyncio 的异步 HTTP 客户端/服务器框架。它提供了:
- HTTP 客户端:用于发起异步 HTTP 请求
- HTTP 服务器:用于构建异步 Web 应用和 API
- WebSocket 支持:客户端和服务端的 WebSocket 支持
- Session 管理:连接池和 cookie 管理
2. 为什么要用它?传统同步库的问题 #
传统的同步 HTTP 库(如 requests)在发起网络请求时会阻塞整个线程,直到收到响应。在高并发场景下,这会导致:
- 性能瓶颈:每个请求都需要一个线程,线程创建和切换开销大
- 资源浪费:线程在等待网络响应时处于空闲状态
- 扩展性差:难以处理大量并发连接
aiohttp 利用 asyncio 的异步特性,单线程就能处理成千上万的并发连接,在等待网络响应时不会阻塞,可以处理其他任务。
3. 安装 #
这个部分说明了如何安装aiohttp及其相关依赖库。
# 安装aiohttp核心库
pip install aiohttp
# 安装用于提升性能的额外库
pip install cchardet aiodns
# 如果使用Windows系统,可能还需要安装
pip install aiofiles4. 核心概念:客户端 (Client) #
这个部分介绍了aiohttp客户端的核心概念和基本使用方法,展示了如何发起异步HTTP请求。
基本请求示例 #
这个示例展示了如何使用aiohttp发起最基本的GET请求,包括创建会话、发送请求和处理响应。
# 导入必要的模块
import aiohttp
import asyncio
async def fetch_url(url):
"""异步获取URL内容的函数"""
# 创建异步HTTP会话
async with aiohttp.ClientSession() as session:
# 使用会话发送GET请求
async with session.get(url) as response:
# 打印响应状态码
print(f"状态码: {response.status}")
# 打印响应内容类型
print(f"内容类型: {response.headers['content-type']}")
# 获取响应的HTML文本内容
html = await response.text()
return html
async def main():
"""主函数"""
# 设置要请求的URL(使用httpbin.org作为测试服务)
url = "https://httpbin.org/json"
# 获取URL内容
html = await fetch_url(url)
# 打印内容长度
print(f"内容长度: {len(html)}")
# 运行主函数(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main())4.1 使用 Session 的重要性 #
这个部分解释了为什么必须使用ClientSession,以及Session提供的各种重要功能。
一定要使用 ClientSession!它提供了:
- 连接池:复用 TCP 连接,大幅提升性能
- Cookie 共享:自动管理 cookies
- 默认配置:统一设置 headers、超时等
错误用法:
# 错误!每次请求都创建新 session,无法享受连接池好处
async with aiohttp.request('GET', url) as response:
pass正确用法示例:
# 导入必要的模块
import aiohttp
import asyncio
async def demonstrate_session_importance():
"""演示Session重要性的示例"""
# 创建多个URL进行测试
urls = [
"https://httpbin.org/json",
"https://httpbin.org/html",
"https://httpbin.org/xml"
]
print("=== 使用Session的正确方式 ===")
# 创建一个Session,复用连接
async with aiohttp.ClientSession() as session:
for url in urls:
# 使用同一个session发送请求
async with session.get(url) as response:
content = await response.text()
print(f"{url}: 状态码 {response.status}, 长度 {len(content)}")
print("\n=== 错误方式:每次都创建新连接 ===")
# 错误方式:每次都创建新的连接
for url in urls:
async with aiohttp.ClientSession() as session: # 每次都创建新session
async with session.get(url) as response:
content = await response.text()
print(f"{url}: 状态码 {response.status}, 长度 {len(content)}")
async def main_session_demo():
"""运行Session演示的主函数"""
await demonstrate_session_importance()
# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_session_demo())5. 高级客户端功能 #
这个部分展示了aiohttp客户端的高级功能,包括多种响应内容获取方式、请求参数设置和不同类型的POST请求。
5.1 多种响应内容获取方式 #
这个示例展示了如何以不同方式获取HTTP响应的内容,包括文本、二进制、JSON和流式读取。
# 导入必要的模块
import aiohttp
import asyncio
import json
async def demonstrate_response_methods():
"""演示多种响应内容获取方式"""
# 创建异步HTTP会话
async with aiohttp.ClientSession() as session:
# 发送GET请求到测试API
async with session.get('https://httpbin.org/json') as response:
print(f"状态码: {response.status}")
print(f"内容类型: {response.headers.get('content-type', '未知')}")
print("\n=== 方法1: 获取文本内容(自动检测编码) ===")
# 获取文本内容,aiohttp会自动检测编码
text = await response.text()
print(f"文本内容长度: {len(text)}")
print(f"文本内容前100字符: {text[:100]}...")
# 重新发送请求(因为响应内容已经被读取)
async with session.get('https://httpbin.org/json') as response2:
print("\n=== 方法2: 获取二进制内容 ===")
# 获取原始二进制数据
data = await response2.read()
print(f"二进制数据长度: {len(data)} 字节")
print(f"二进制数据前50字节: {data[:50]}")
# 重新发送请求
async with session.get('https://httpbin.org/json') as response3:
print("\n=== 方法3: 获取JSON(自动解析) ===")
# 自动解析JSON响应
json_data = await response3.json()
print(f"JSON数据类型: {type(json_data)}")
print(f"JSON内容: {json_data}")
# 重新发送请求
async with session.get('https://httpbin.org/json') as response4:
print("\n=== 方法4: 流式读取(适合大文件) ===")
# 流式读取响应内容
chunk_size = 1024
total_size = 0
chunks = []
# 逐块读取响应内容
async for chunk in response4.content.iter_chunked(chunk_size):
chunks.append(chunk)
total_size += len(chunk)
print(f"读取块: {len(chunk)} 字节")
print(f"总读取大小: {total_size} 字节")
print(f"读取块数: {len(chunks)}")
async def main_response_methods():
"""运行响应方法演示的主函数"""
print("=== aiohttp 多种响应内容获取方式演示 ===")
await demonstrate_response_methods()
# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_response_methods())5.2 请求参数设置 #
这个示例展示了如何在HTTP请求中设置各种参数,包括查询参数、请求头、超时、代理和SSL设置。
# 导入必要的模块
import aiohttp
import asyncio
import time
async def demonstrate_request_parameters():
"""演示请求参数设置的示例"""
# 创建自定义超时配置
timeout = aiohttp.ClientTimeout(
total=30, # 整个请求超时时间(秒)
connect=10, # 连接建立超时时间(秒)
sock_connect=10, # socket连接超时时间(秒)
sock_read=30 # socket读取超时时间(秒)
)
# 创建自定义连接器配置
connector = aiohttp.TCPConnector(
limit=100, # 最大连接数
limit_per_host=10, # 每主机最大连接数
ssl=False, # 禁用SSL验证(仅用于测试)
force_close=True # 强制关闭空闲连接
)
# 创建自定义请求头
headers = {
'User-Agent': 'MyApp/1.0 (aiohttp demo)',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Authorization': 'Bearer demo_token_12345'
}
# 创建HTTP会话,应用自定义配置
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers=headers,
trust_env=True # 使用系统代理设置
) as session:
print("=== 请求参数设置演示 ===")
# 示例1: 带查询参数的GET请求
print("\n--- 示例1: 带查询参数的GET请求 ---")
params = {
'key': 'value',
'page': 1,
'size': 10,
'sort': 'name'
}
async with session.get(
'https://httpbin.org/get',
params=params
) as response:
result = await response.json()
print(f"查询参数: {params}")
print(f"实际请求URL: {result['url']}")
print(f"接收到的参数: {result['args']}")
# 示例2: 带自定义请求头的请求
print("\n--- 示例2: 带自定义请求头的请求 ---")
custom_headers = {
'X-Custom-Header': 'custom_value',
'X-Request-ID': f'req_{int(time.time())}'
}
async with session.get(
'https://httpbin.org/headers',
headers=custom_headers
) as response:
result = await response.json()
print(f"发送的请求头: {custom_headers}")
print(f"服务器接收到的请求头: {result['headers']}")
# 示例3: 超时设置测试
print("\n--- 示例3: 超时设置测试 ---")
try:
# 设置很短的超时时间来测试超时功能
short_timeout = aiohttp.ClientTimeout(total=1)
async with session.get(
'https://httpbin.org/delay/5', # 延迟5秒响应
timeout=short_timeout
) as response:
await response.text()
except asyncio.TimeoutError:
print("请求超时(这是预期的行为)")
except Exception as e:
print(f"其他错误: {e}")
async def main_request_params():
"""运行请求参数演示的主函数"""
await demonstrate_request_parameters()
# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_request_params())5.3 POST 请求和数据发送 #
这个示例展示了如何发送不同类型的POST请求,包括JSON数据、表单数据、文件上传和多部分表单。
# 导入必要的模块
import aiohttp
import asyncio
import os
async def demonstrate_post_requests():
"""演示不同类型的POST请求"""
# 创建HTTP会话
async with aiohttp.ClientSession() as session:
print("=== POST请求演示 ===")
# 示例1: 发送JSON数据
print("\n--- 示例1: 发送JSON数据 ---")
json_data = {
'name': '张三',
'age': 25,
'email': 'zhangsan@example.com',
'skills': ['Python', 'JavaScript', 'SQL']
}
async with session.post(
'https://httpbin.org/post',
json=json_data
) as response:
result = await response.json()
print(f"发送的JSON数据: {json_data}")
print(f"服务器接收到的数据: {result['json']}")
print(f"响应状态码: {response.status}")
# 示例2: 发送表单数据
print("\n--- 示例2: 发送表单数据 ---")
form_data = {
'username': 'testuser',
'password': 'testpass123',
'remember': 'true'
}
async with session.post(
'https://httpbin.org/post',
data=form_data
) as response:
result = await response.json()
print(f"发送的表单数据: {form_data}")
print(f"服务器接收到的表单数据: {result['form']}")
# 示例3: 文件上传
print("\n--- 示例3: 文件上传 ---")
# 创建临时文件内容
file_content = b"This is a test file content for demonstration purposes."
# 创建FormData对象
data = aiohttp.FormData()
data.add_field('file', file_content,
filename='test.txt',
content_type='text/plain')
data.add_field('description', '这是一个测试文件')
data.add_field('category', 'demo')
async with session.post(
'https://httpbin.org/post',
data=data
) as response:
result = await response.json()
print(f"上传的文件内容: {file_content}")
print(f"服务器接收到的文件信息: {result['files']}")
print(f"服务器接收到的表单数据: {result['form']}")
# 示例4: 多部分表单(包含文件和文本)
print("\n--- 示例4: 多部分表单 ---")
# 创建更复杂的FormData
complex_data = aiohttp.FormData()
# 添加文本字段
complex_data.add_field('user_id', '12345', content_type='text/plain')
complex_data.add_field('user_name', '李四', content_type='text/plain')
# 添加文件字段
complex_data.add_field('profile_picture',
b'fake_image_data_here',
filename='profile.jpg',
content_type='image/jpeg')
# 添加JSON字段
complex_data.add_field('metadata',
'{"role": "admin", "department": "IT"}',
content_type='application/json')
async with session.post(
'https://httpbin.org/post',
data=complex_data
) as response:
result = await response.json()
print(f"多部分表单数据已发送")
print(f"服务器接收到的文件: {result['files']}")
print(f"服务器接收到的表单数据: {result['form']}")
async def main_post_demo():
"""运行POST请求演示的主函数"""
await demonstrate_post_requests()
# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_post_demo())6. 并发请求处理 #
这个部分展示了如何使用aiohttp处理并发请求,包括使用asyncio.gather和Semaphore来控制并发数量,提高请求效率。
6.1 使用 asyncio.gather #
这个示例展示了如何使用asyncio.gather来并发处理多个HTTP请求,提高整体性能。
# 导入必要的模块
import aiohttp
import asyncio
import time
async def fetch_single_url(session, url):
"""获取单个URL的内容"""
try:
# 发送GET请求
async with session.get(url) as response:
# 获取响应文本
content = await response.text()
return {
'url': url,
'status': response.status,
'content_length': len(content),
'content_type': response.headers.get('content-type', '未知')
}
except Exception as e:
# 如果请求失败,返回错误信息
return {
'url': url,
'error': str(e),
'status': None,
'content_length': 0
}
async def fetch_multiple_urls(urls):
"""并发获取多个URL的内容"""
# 创建HTTP会话
async with aiohttp.ClientSession() as session:
# 创建任务列表
tasks = []
for url in urls:
# 为每个URL创建一个异步任务
task = asyncio.create_task(fetch_single_url(session, url))
tasks.append(task)
print(f"开始并发请求 {len(urls)} 个URL...")
start_time = time.time()
# 等待所有任务完成,return_exceptions=True 确保即使有任务失败也不会中断
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"所有请求完成,耗时: {elapsed_time:.2f} 秒")
return results
async def main_gather():
"""运行并发请求示例的主函数"""
# 定义要请求的URL列表
urls = [
'https://httpbin.org/json',
'https://httpbin.org/html',
'https://httpbin.org/xml',
'https://httpbin.org/robots.txt',
'https://httpbin.org/user-agent'
]
print("=== 使用 asyncio.gather 并发请求演示 ===")
print(f"要请求的URL数量: {len(urls)}")
# 执行并发请求
results = await fetch_multiple_urls(urls)
# 显示结果
print("\n=== 请求结果 ===")
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"{i}. URL请求异常: {result}")
else:
if 'error' in result:
print(f"{i}. {result['url']}: 错误 - {result['error']}")
else:
print(f"{i}. {result['url']}: 状态码 {result['status']}, "
f"内容长度 {result['content_length']} 字节, "
f"类型 {result['content_type']}")
# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_gather())6.2 使用 Semaphore 控制并发数 #
这个示例展示了如何使用Semaphore来控制同时进行的请求数量,避免对服务器造成过大压力。
# 导入必要的模块
import aiohttp
import asyncio
import time
async def bounded_fetch(session, url, semaphore):
"""使用信号量控制并发数的请求函数"""
# 获取信号量许可
async with semaphore:
try:
# 记录开始时间
start_time = time.time()
# 发送GET请求
async with session.get(url) as response:
# 获取响应内容
content = await response.text()
# 计算请求耗时
elapsed_time = time.time() - start_time
return {
'url': url,
'status': response.status,
'content_length': len(content),
'elapsed_time': elapsed_time,
'success': True
}
except Exception as e:
# 如果请求失败,返回错误信息
elapsed_time = time.time() - start_time
return {
'url': url,
'error': str(e),
'elapsed_time': elapsed_time,
'success': False
}
async def controlled_concurrent_fetch(urls, max_concurrency=3):
"""控制并发数的并发请求函数"""
# 创建信号量,限制最大并发数
semaphore = asyncio.Semaphore(max_concurrency)
print(f"=== 使用信号量控制并发数演示 ===")
print(f"最大并发数: {max_concurrency}")
print(f"要请求的URL数量: {len(urls)}")
# 创建HTTP会话
async with aiohttp.ClientSession() as session:
# 创建任务列表
tasks = [
bounded_fetch(session, url, semaphore)
for url in urls
]
print(f"开始并发请求,最大并发数: {max_concurrency}...")
start_time = time.time()
# 等待所有任务完成
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
print(f"所有请求完成,总耗时: {total_time:.2f} 秒")
return results
async def demonstrate_concurrency_control():
"""演示不同并发数对性能的影响"""
# 定义测试URL列表
urls = [
'https://httpbin.org/delay/1', # 延迟1秒
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1'
]
# 测试不同的并发数
concurrency_levels = [1, 2, 4, 8]
for concurrency in concurrency_levels:
print(f"\n{'='*50}")
print(f"测试并发数: {concurrency}")
print(f"{'='*50}")
# 执行并发请求
results = await controlled_concurrent_fetch(urls, concurrency)
# 统计结果
successful_requests = sum(1 for r in results if r['success'])
failed_requests = len(results) - successful_requests
print(f"成功请求: {successful_requests}")
print(f"失败请求: {failed_requests}")
if successful_requests > 0:
avg_time = sum(r['elapsed_time'] for r in results if r['success']) / successful_requests
print(f"平均请求时间: {avg_time:.2f} 秒")
async def main_semaphore():
"""运行信号量控制并发示例的主函数"""
await demonstrate_concurrency_control()
# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_semaphore())6.3 高级并发控制:连接池和超时 #
这个示例展示了如何结合连接池配置和超时设置来优化并发请求性能。
# 导入必要的模块
import aiohttp
import asyncio
import time
from aiohttp import TCPConnector, ClientTimeout
async def advanced_concurrent_fetch(urls, max_concurrency=10, max_connections=50):
"""高级并发请求函数,包含连接池和超时控制"""
# 创建自定义连接器
connector = TCPConnector(
limit=max_connections, # 最大连接数
limit_per_host=max_concurrency, # 每主机最大连接数
ssl=False, # 禁用SSL验证(仅用于测试)
force_close=False, # 保持连接复用
enable_cleanup_closed=True # 自动清理关闭的连接
)
# 创建超时配置
timeout = ClientTimeout(
total=30, # 整个请求超时
connect=10, # 连接建立超时
sock_read=20 # 读取超时
)
# 创建信号量控制并发数
semaphore = asyncio.Semaphore(max_concurrency)
print(f"=== 高级并发控制演示 ===")
print(f"最大并发数: {max_concurrency}")
print(f"最大连接数: {max_connections}")
print(f"要请求的URL数量: {len(urls)}")
async def controlled_request(session, url):
"""受控的请求函数"""
async with semaphore:
try:
start_time = time.time()
async with session.get(url) as response:
content = await response.text()
elapsed = time.time() - start_time
return {
'url': url,
'status': response.status,
'content_length': len(content),
'elapsed_time': elapsed,
'success': True
}
except Exception as e:
elapsed = time.time() - start_time
return {
'url': url,
'error': str(e),
'elapsed_time': elapsed,
'success': False
}
# 使用自定义配置创建会话
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={'User-Agent': 'AdvancedConcurrentClient/1.0'}
) as session:
print("开始高级并发请求...")
start_time = time.time()
# 创建所有请求任务
tasks = [controlled_request(session, url) for url in urls]
# 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start_time
# 处理结果
successful = []
failed = []
for result in results:
if isinstance(result, Exception):
failed.append({'error': str(result)})
elif result['success']:
successful.append(result)
else:
failed.append(result)
print(f"\n=== 请求完成 ===")
print(f"总耗时: {total_time:.2f} 秒")
print(f"成功请求: {len(successful)}")
print(f"失败请求: {len(failed)}")
if successful:
avg_time = sum(r['elapsed_time'] for r in successful) / len(successful)
print(f"平均成功请求时间: {avg_time:.2f} 秒")
return results
async def main_advanced_concurrent():
"""运行高级并发控制示例的主函数"""
# 创建测试URL列表(包含不同类型的端点)
urls = [
'https://httpbin.org/json',
'https://httpbin.org/html',
'https://httpbin.org/xml',
'https://httpbin.org/robots.txt',
'https://httpbin.org/user-agent',
'https://httpbin.org/headers',
'https://httpbin.org/ip',
'https://httpbin.org/uuid'
]
# 测试不同的并发配置
configs = [
(5, 20), # 并发数5,连接数20
(10, 30), # 并发数10,连接数30
(15, 50) # 并发数15,连接数50
]
for max_concurrency, max_connections in configs:
print(f"\n{'='*60}")
print(f"配置: 最大并发数={max_concurrency}, 最大连接数={max_connections}")
print(f"{'='*60}")
await advanced_concurrent_fetch(urls, max_concurrency, max_connections)
# 运行示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_advanced_concurrent())7. 服务器端 (Server) #
这个部分展示了如何使用aiohttp构建异步Web服务器,包括基本的路由处理、中间件和WebSocket支持。
7.1 基本 Web 服务器 #
这个示例展示了如何创建一个基本的aiohttp Web服务器,包括GET和POST请求的处理。
# 导入必要的模块
from aiohttp import web
import json
import time
async def handle_index(request):
"""处理根路径的请求"""
# 获取查询参数中的name,如果没有则使用默认值"World"
name = request.query.get('name', "World")
# 返回问候语
text = f"Hello, {name}!"
return web.Response(text=text, content_type='text/html; charset=utf-8')
async def handle_greeting(request):
"""处理带路径参数的问候请求"""
# 从URL路径中获取name参数
name = request.match_info.get('name', "World")
# 返回问候语
text = f"Hello, {name}! 欢迎使用aiohttp!"
return web.Response(text=text, content_type='text/html; charset=utf-8')
async def handle_json(request):
"""处理JSON POST请求"""
try:
# 解析请求体中的JSON数据
data = await request.json()
# 返回接收到的数据
return web.json_response({
'status': 'success',
'message': '数据接收成功',
'received_data': data,
'timestamp': time.time()
})
except json.JSONDecodeError:
# 如果JSON解析失败,返回错误信息
return web.json_response({
'status': 'error',
'message': '无效的JSON数据'
}, status=400)
async def handle_form(request):
"""处理表单POST请求"""
try:
# 解析表单数据
data = await request.post()
# 将表单数据转换为字典
form_dict = dict(data)
# 返回接收到的表单数据
return web.json_response({
'status': 'success',
'message': '表单数据接收成功',
'received_data': form_dict,
'timestamp': time.time()
})
except Exception as e:
# 如果处理失败,返回错误信息
return web.json_response({
'status': 'error',
'message': f'表单数据处理失败: {str(e)}'
}, status=400)
async def handle_status(request):
"""处理状态查询请求"""
# 返回服务器状态信息
return web.json_response({
'server': 'aiohttp',
'status': 'running',
'timestamp': time.time(),
'version': '1.0.0'
})
def create_app():
"""创建Web应用"""
# 创建Web应用实例
app = web.Application()
# 添加路由
app.router.add_get('/', handle_index) # 根路径
app.router.add_get('/greet/{name}', handle_greeting) # 带参数的路径
app.router.add_post('/api/json', handle_json) # JSON API端点
app.router.add_post('/api/form', handle_form) # 表单API端点
app.router.add_get('/status', handle_status) # 状态查询端点
return app
async def main_server():
"""运行Web服务器的主函数"""
print("=== aiohttp Web服务器演示 ===")
print("服务器启动中...")
# 创建Web应用
app = create_app()
# 启动服务器
runner = web.AppRunner(app)
await runner.setup()
# 创建站点
site = web.TCPSite(runner, '127.0.0.1', 8080)
print("服务器已启动,访问地址:")
print(" - 主页: http://127.0.0.1:8080/")
print(" - 问候: http://127.0.0.1:8080/greet/张三")
print(" - 状态: http://127.0.0.1:8080/status")
print(" - JSON API: POST http://127.0.0.1:8080/api/json")
print(" - 表单API: POST http://127.0.0.1:8080/api/form")
print("\n按 Ctrl+C 停止服务器")
# 启动站点
await site.start()
# 保持服务器运行
try:
await asyncio.Future() # 无限等待
except KeyboardInterrupt:
print("\n服务器正在关闭...")
finally:
await runner.cleanup()
# 运行服务器(如果在Windows命令行中运行)
if __name__ == '__main__':
import asyncio
asyncio.run(main_server())7.2 中间件 (Middleware) #
这个示例展示了如何使用中间件来添加认证、日志记录等功能,增强Web服务器的功能。
# 导入必要的模块
from aiohttp import web
import time
import json
import hashlib
async def auth_middleware(app, handler):
"""认证中间件"""
async def middleware_handler(request):
# 检查请求头中是否包含Authorization
auth_header = request.headers.get('Authorization')
if not auth_header:
# 如果没有认证头,返回401未授权
return web.json_response({
'error': '未授权访问',
'message': '请提供有效的认证信息'
}, status=401)
# 简单的token验证(实际应用中应该使用更安全的方法)
if not auth_header.startswith('Bearer '):
return web.json_response({
'error': '无效的认证格式',
'message': '认证头格式应为: Bearer <token>'
}, status=401)
token = auth_header[7:] # 去掉"Bearer "前缀
# 简单的token验证(这里使用固定的演示token)
if token != 'demo_token_12345':
return web.json_response({
'error': '无效的认证token',
'message': '请提供有效的认证token'
}, status=401)
# 认证通过,继续处理请求
return await handler(request)
return middleware_handler
async def logging_middleware(app, handler):
"""日志记录中间件"""
async def middleware_handler(request):
# 记录请求开始时间
start_time = time.time()
# 记录请求信息
print(f"请求开始: {request.method} {request.path}")
print(f"客户端IP: {request.remote}")
print(f"用户代理: {request.headers.get('User-Agent', '未知')}")
try:
# 处理请求
response = await handler(request)
# 计算处理时间
elapsed = time.time() - start_time
# 记录响应信息
print(f"响应状态: {response.status}")
print(f"处理时间: {elapsed:.3f}秒")
print(f"请求完成: {request.method} {request.path}")
return response
except Exception as e:
# 记录错误信息
elapsed = time.time() - start_time
print(f"请求错误: {e}")
print(f"错误处理时间: {elapsed:.3f}秒")
raise
return middleware_handler
async def cors_middleware(app, handler):
"""CORS跨域中间件"""
async def middleware_handler(request):
# 处理预检请求
if request.method == 'OPTIONS':
response = web.Response()
else:
response = await handler(request)
# 添加CORS头
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
return response
return middleware_handler
# 受保护的API端点
async def protected_api(request):
"""需要认证的API端点"""
# 获取当前用户信息(这里简化处理)
user_info = {
'user_id': '12345',
'username': 'demo_user',
'role': 'admin',
'timestamp': time.time()
}
return web.json_response({
'status': 'success',
'message': '认证成功',
'user_info': user_info
})
async def public_api(request):
"""公开的API端点"""
return web.json_response({
'status': 'success',
'message': '这是公开的API端点',
'timestamp': time.time()
})
async def health_check(request):
"""健康检查端点"""
return web.json_response({
'status': 'healthy',
'service': 'aiohttp_demo',
'timestamp': time.time()
})
def create_app_with_middleware():
"""创建带中间件的Web应用"""
# 创建Web应用实例
app = web.Application()
# 添加中间件(注意顺序很重要)
app.middlewares.extend([
logging_middleware, # 日志记录中间件
cors_middleware, # CORS中间件
auth_middleware # 认证中间件
])
# 添加路由
app.router.add_get('/health', health_check) # 健康检查(无需认证)
app.router.add_get('/api/public', public_api) # 公开API(无需认证)
app.router.add_get('/api/protected', protected_api) # 受保护API(需要认证)
return app
async def main_middleware():
"""运行带中间件的Web服务器的主函数"""
print("=== aiohttp 中间件演示 ===")
print("服务器启动中...")
# 创建Web应用
app = create_app_with_middleware()
# 启动服务器
runner = web.AppRunner(app)
await runner.setup()
# 创建站点
site = web.TCPSite(runner, '127.0.0.1', 8081)
print("服务器已启动,访问地址:")
print(" - 健康检查: http://127.0.0.1:8081/health")
print(" - 公开API: http://127.0.0.1:8081/api/public")
print(" - 受保护API: http://127.0.0.1:8081/api/protected")
print("\n测试受保护API时,请在请求头中添加:")
print(" Authorization: Bearer demo_token_12345")
print("\n按 Ctrl+C 停止服务器")
# 启动站点
await site.start()
# 保持服务器运行
try:
await asyncio.Future() # 无限等待
except KeyboardInterrupt:
print("\n服务器正在关闭...")
finally:
await runner.cleanup()
# 运行带中间件的服务器(如果在Windows命令行中运行)
if __name__ == '__main__':
import asyncio
asyncio.run(main_middleware())8. WebSocket 支持 #
这个部分展示了如何使用aiohttp实现WebSocket通信,包括客户端和服务端的实现。
8.1 WebSocket 客户端 #
这个示例展示了如何创建WebSocket客户端,连接到WebSocket服务器并进行双向通信。
# 导入必要的模块
import aiohttp
import asyncio
import json
import time
async def websocket_client():
"""WebSocket客户端示例"""
# 创建HTTP会话
async with aiohttp.ClientSession() as session:
print("=== WebSocket客户端演示 ===")
print("正在连接到WebSocket服务器...")
try:
# 连接到WebSocket服务器
async with session.ws_connect('ws://echo.websocket.org') as ws:
print("已连接到WebSocket服务器")
# 发送文本消息
message = "Hello WebSocket! 来自aiohttp客户端"
print(f"发送消息: {message}")
await ws.send_str(message)
# 发送JSON消息
json_message = {
'type': 'greeting',
'content': '你好,WebSocket服务器!',
'timestamp': time.time(),
'client': 'aiohttp_demo'
}
print(f"发送JSON消息: {json_message}")
await ws.send_str(json.dumps(json_message, ensure_ascii=False))
# 接收服务器响应
print("\n等待服务器响应...")
message_count = 0
# 设置接收超时
try:
async for msg in ws:
message_count += 1
if msg.type == aiohttp.WSMsgType.TEXT:
# 处理文本消息
print(f"收到文本消息 {message_count}: {msg.data}")
# 尝试解析JSON
try:
json_data = json.loads(msg.data)
print(f" JSON解析结果: {json_data}")
except json.JSONDecodeError:
print(f" 非JSON文本消息")
elif msg.type == aiohttp.WSMsgType.BINARY:
# 处理二进制消息
print(f"收到二进制消息 {message_count}: {len(msg.data)} 字节")
elif msg.type == aiohttp.WSMsgType.ERROR:
# 处理错误消息
print(f"WebSocket错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
# 处理关闭消息
print("WebSocket连接已关闭")
break
# 接收几条消息后主动关闭
if message_count >= 3:
print("已接收足够消息,主动关闭连接")
await ws.close()
break
except asyncio.TimeoutError:
print("接收消息超时")
print("WebSocket客户端演示完成")
except Exception as e:
print(f"WebSocket连接失败: {e}")
async def websocket_client_with_custom_server():
"""连接到自定义WebSocket服务器的客户端"""
# 创建HTTP会话
async with aiohttp.ClientSession() as session:
print("\n=== 连接到自定义WebSocket服务器 ===")
print("注意:需要先启动WebSocket服务器")
try:
# 连接到本地WebSocket服务器
async with session.ws_connect('ws://127.0.0.1:8082/ws') as ws:
print("已连接到本地WebSocket服务器")
# 发送不同类型的消息
messages = [
"Hello, 自定义服务器!",
"这是一条测试消息",
"close" # 特殊消息,告诉服务器关闭连接
]
for i, message in enumerate(messages, 1):
print(f"发送消息 {i}: {message}")
await ws.send_str(message)
# 等待服务器响应
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"服务器响应: {msg.data}")
# 如果收到关闭消息,退出循环
if msg.data == 'close':
print("服务器请求关闭连接")
break
# 只接收一条响应就继续发送下一条
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
print("WebSocket连接已关闭")
break
except Exception as e:
print(f"接收消息时出错: {e}")
break
# 关闭连接
await ws.close()
print("WebSocket连接已关闭")
except Exception as e:
print(f"连接自定义WebSocket服务器失败: {e}")
print("请确保WebSocket服务器正在运行")
async def main_websocket_client():
"""运行WebSocket客户端示例的主函数"""
# 首先连接到公共测试服务器
await websocket_client()
# 然后尝试连接到自定义服务器
await websocket_client_with_custom_server()
# 运行WebSocket客户端示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_websocket_client())8.2 WebSocket 服务器 #
这个示例展示了如何创建WebSocket服务器,处理客户端连接和消息。
# 导入必要的模块
from aiohttp import web
import asyncio
import json
import time
# 存储所有活跃的WebSocket连接
websocket_connections = set()
async def websocket_handler(request):
"""WebSocket处理器"""
# 创建WebSocket响应
ws = web.WebSocketResponse()
# 准备WebSocket连接
await ws.prepare(request)
# 将连接添加到活跃连接集合
websocket_connections.add(ws)
client_id = id(ws)
print(f"WebSocket客户端 {client_id} 已连接")
print(f"当前活跃连接数: {len(websocket_connections)}")
# 发送欢迎消息
welcome_message = {
'type': 'welcome',
'message': f'欢迎连接!您的客户端ID是: {client_id}',
'timestamp': time.time(),
'active_connections': len(websocket_connections)
}
await ws.send_str(json.dumps(welcome_message, ensure_ascii=False))
try:
# 处理WebSocket消息
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
# 处理文本消息
print(f"收到来自客户端 {client_id} 的文本消息: {msg.data}")
# 检查是否是关闭命令
if msg.data == 'close':
# 发送确认消息
await ws.send_str('收到关闭请求,正在关闭连接')
await ws.close()
break
# 尝试解析JSON消息
try:
data = json.loads(msg.data)
message_type = data.get('type', 'unknown')
if message_type == 'greeting':
# 处理问候消息
response = {
'type': 'response',
'message': f'你好,{data.get("content", "客户端")}!',
'timestamp': time.time(),
'echo': data
}
await ws.send_str(json.dumps(response, ensure_ascii=False))
elif message_type == 'broadcast':
# 处理广播消息
broadcast_msg = {
'type': 'broadcast',
'from': client_id,
'content': data.get('content', ''),
'timestamp': time.time()
}
# 向所有其他客户端广播消息
for conn in websocket_connections:
if conn != ws and not conn.closed:
try:
await conn.send_str(json.dumps(broadcast_msg, ensure_ascii=False))
except Exception as e:
print(f"广播消息失败: {e}")
# 发送确认消息
await ws.send_str(json.dumps({
'type': 'broadcast_sent',
'message': '广播消息已发送',
'timestamp': time.time()
}, ensure_ascii=False))
else:
# 处理其他类型的消息
echo_response = {
'type': 'echo',
'message': '消息已收到',
'original_data': data,
'timestamp': time.time()
}
await ws.send_str(json.dumps(echo_response, ensure_ascii=False))
except json.JSONDecodeError:
# 如果不是JSON,发送回显消息
echo_response = {
'type': 'echo',
'message': '收到文本消息',
'original_text': msg.data,
'timestamp': time.time()
}
await ws.send_str(json.dumps(echo_response, ensure_ascii=False))
elif msg.type == aiohttp.WSMsgType.BINARY:
# 处理二进制消息
print(f"收到来自客户端 {client_id} 的二进制消息: {len(msg.data)} 字节")
# 发送确认消息
response = {
'type': 'binary_received',
'message': f'收到二进制数据: {len(msg.data)} 字节',
'timestamp': time.time()
}
await ws.send_str(json.dumps(response, ensure_ascii=False))
elif msg.type == aiohttp.WSMsgType.ERROR:
# 处理错误
print(f"WebSocket客户端 {client_id} 发生错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
# 处理关闭消息
print(f"WebSocket客户端 {client_id} 请求关闭连接")
break
except Exception as e:
print(f"处理WebSocket消息时出错: {e}")
finally:
# 清理连接
websocket_connections.discard(ws)
print(f"WebSocket客户端 {client_id} 已断开连接")
print(f"当前活跃连接数: {len(websocket_connections)}")
return ws
async def websocket_status(request):
"""WebSocket状态查询端点"""
status_info = {
'active_connections': len(websocket_connections),
'timestamp': time.time(),
'server': 'aiohttp_websocket_demo'
}
return web.json_response(status_info)
async def broadcast_message(request):
"""广播消息端点(用于测试)"""
try:
# 获取POST数据
data = await request.json()
message = data.get('message', '测试广播消息')
# 创建广播消息
broadcast_msg = {
'type': 'server_broadcast',
'content': message,
'timestamp': time.time(),
'source': 'server'
}
# 向所有连接的客户端广播
disconnected_clients = set()
for ws in websocket_connections:
if not ws.closed:
try:
await ws.send_str(json.dumps(broadcast_msg, ensure_ascii=False))
except Exception as e:
print(f"向客户端广播失败: {e}")
disconnected_clients.add(ws)
else:
disconnected_clients.add(ws)
# 清理断开的连接
websocket_connections.difference_update(disconnected_clients)
return web.json_response({
'status': 'success',
'message': f'广播消息已发送给 {len(websocket_connections)} 个客户端',
'timestamp': time.time()
})
except Exception as e:
return web.json_response({
'status': 'error',
'message': f'广播失败: {str(e)}'
}, status=400)
def create_websocket_app():
"""创建WebSocket应用"""
# 创建Web应用实例
app = web.Application()
# 添加路由
app.router.add_get('/ws', websocket_handler) # WebSocket端点
app.router.add_get('/status', websocket_status) # 状态查询
app.router.add_post('/broadcast', broadcast_message) # 广播消息端点
return app
async def main_websocket_server():
"""运行WebSocket服务器的主函数"""
print("=== aiohttp WebSocket服务器演示 ===")
print("服务器启动中...")
# 创建Web应用
app = create_websocket_app()
# 启动服务器
runner = web.AppRunner(app)
await runner.setup()
# 创建站点
site = web.TCPSite(runner, '127.0.0.1', 8082)
print("WebSocket服务器已启动:")
print(" - WebSocket端点: ws://127.0.0.1:8082/ws")
print(" - 状态查询: http://127.0.0.1:8082/status")
print(" - 广播消息: POST http://127.0.0.1:8082/broadcast")
print("\n可以使用WebSocket客户端连接到 ws://127.0.0.1:8082/ws")
print("按 Ctrl+C 停止服务器")
# 启动站点
await site.start()
# 保持服务器运行
try:
await asyncio.Future() # 无限等待
except KeyboardInterrupt:
print("\n服务器正在关闭...")
finally:
await runner.cleanup()
# 运行WebSocket服务器(如果在Windows命令行中运行)
if __name__ == '__main__':
asyncio.run(main_websocket_server())9. 高级配置和最佳实践 #
这个部分展示了如何配置aiohttp的高级选项,以及在实际开发中应该遵循的最佳实践。
9.1 Session 配置 #
这个示例展示了如何创建自定义配置的ClientSession,包括连接器、超时、请求头等配置。
# 导入必要的模块
import aiohttp
import asyncio
import time
from aiohttp import ClientSession, TCPConnector
async def demonstrate_session_configuration():
"""演示Session配置的示例"""
print("=== aiohttp Session配置演示 ===")
# 创建自定义连接器配置
connector = TCPConnector(
limit=100, # 最大连接数
limit_per_host=10, # 每主机最大连接数
ssl=False, # 禁用SSL验证(仅用于测试)
force_close=True, # 强制关闭空闲连接
enable_cleanup_closed=True, # 自动清理关闭的连接
ttl_dns_cache=300, # DNS缓存TTL(秒)
use_dns_cache=True, # 启用DNS缓存
keepalive_timeout=30 # 保持连接超时(秒)
)
# 创建超时配置
timeout = aiohttp.ClientTimeout(
total=60, # 整个请求超时
connect=10, # 连接建立超时
sock_connect=10,# socket 连接超时
sock_read=30 # socket 读取超时
)
# 创建自定义请求头
headers = {
'User-Agent': 'MyApp/1.0 (Advanced Config Demo)',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
}
# 创建Cookie jar(用于管理cookies)
cookie_jar = aiohttp.CookieJar()
print("创建自定义配置的Session...")
print(f"最大连接数: {connector.limit}")
print(f"每主机最大连接数: {connector.limit_per_host}")
print(f"总超时: {timeout.total}秒")
print(f"连接超时: {timeout.connect}秒")
# 使用自定义配置创建会话
async with ClientSession(
connector=connector,
timeout=timeout,
headers=headers,
cookie_jar=cookie_jar,
trust_env=True, # 使用系统代理设置
raise_for_status=False # 不自动抛出HTTP错误
) as session:
print("\n=== 测试自定义配置 ===")
# 测试1: 基本请求
print("\n--- 测试1: 基本请求 ---")
try:
async with session.get('https://httpbin.org/get') as response:
print(f"状态码: {response.status}")
print(f"响应头: {dict(response.headers)}")
if response.status == 200:
data = await response.json()
print(f"响应数据: {data}")
else:
print(f"请求失败,状态码: {response.status}")
except Exception as e:
print(f"请求异常: {e}")
# 测试2: 测试连接池
print("\n--- 测试2: 连接池测试 ---")
urls = [
'https://httpbin.org/json',
'https://httpbin.org/html',
'https://httpbin.org/xml'
]
start_time = time.time()
tasks = []
for url in urls:
task = asyncio.create_task(session.get(url))
tasks.append(task)
# 并发请求
responses = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
print(f"并发请求完成,耗时: {end_time - start_time:.2f}秒")
for i, response in enumerate(responses):
if isinstance(response, Exception):
print(f"请求 {i+1} 失败: {response}")
else:
print(f"请求 {i+1} 成功: 状态码 {response.status}")
# 测试3: Cookie管理
print("\n--- 测试3: Cookie管理 ---")
try:
# 发送请求到会设置cookie的端点
async with session.get('https://httpbin.org/cookies/set?name=test&value=value') as response:
print(f"Cookie设置请求状态码: {response.status}")
# 检查cookie是否被设置
cookies = list(cookie_jar)
print(f"当前cookies: {cookies}")
# 发送另一个请求,验证cookie是否被发送
async with session.get('https://httpbin.org/cookies') as response2:
if response2.status == 200:
cookie_data = await response2.json()
print(f"服务器接收到的cookies: {cookie_data}")
except Exception as e:
print(f"Cookie测试失败: {e}")
async def main_session_config():
"""运行Session配置演示的主函数"""
await demonstrate_session_configuration()
# 运行Session配置示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_session_config())9.2 错误处理 #
这个示例展示了如何健壮地处理aiohttp请求中可能出现的各种错误。
# 导入必要的模块
import aiohttp
import asyncio
import time
async def robust_request(session, url, max_retries=3):
"""健壮的请求函数,包含重试和错误处理"""
for attempt in range(max_retries):
try:
print(f"尝试请求 {url} (第 {attempt + 1} 次)")
# 发送请求
async with session.get(url) as response:
# 检查HTTP状态码
response.raise_for_status()
# 尝试解析JSON响应
try:
return await response.json()
except aiohttp.ContentTypeError:
# 如果不是JSON,返回文本
return await response.text()
except aiohttp.ClientError as e:
# 网络相关错误
print(f"网络错误 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
return {'error': f'网络错误: {e}', 'url': url}
except aiohttp.ClientResponseError as e:
# HTTP错误
print(f"HTTP错误 {e.status} (尝试 {attempt + 1}/{max_retries}): {e.message}")
if attempt == max_retries - 1:
return {'error': f'HTTP错误 {e.status}: {e.message}', 'url': url}
except asyncio.TimeoutError:
# 超时错误
print(f"请求超时 (尝试 {attempt + 1}/{max_retries})")
if attempt == max_retries - 1:
return {'error': '请求超时', 'url': url}
except Exception as e:
# 其他未预期的错误
print(f"未预期的错误 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
return {'error': f'未预期的错误: {e}', 'url': url}
# 如果不是最后一次尝试,等待一段时间后重试
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 指数退避
print(f"等待 {wait_time} 秒后重试...")
await asyncio.sleep(wait_time)
return {'error': '所有重试都失败了', 'url': url}
async def demonstrate_error_handling():
"""演示错误处理的示例"""
print("=== aiohttp 错误处理演示 ===")
# 创建HTTP会话
timeout = aiohttp.ClientTimeout(total=10, connect=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
# 测试不同类型的URL
test_urls = [
'https://httpbin.org/json', # 正常URL
'https://httpbin.org/status/404', # 404错误
'https://httpbin.org/status/500', # 500错误
'https://httpbin.org/delay/15', # 超时
'https://invalid-domain-that-does-not-exist.com', # 无效域名
'https://httpbin.org/status/200' # 正常状态
]
print(f"测试 {len(test_urls)} 个URL的错误处理...")
# 并发处理所有请求
tasks = [robust_request(session, url) for url in test_urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 分析结果
print("\n=== 错误处理结果分析 ===")
successful = 0
failed = 0
for i, (url, result) in enumerate(zip(test_urls, results)):
print(f"\n{i+1}. {url}")
if isinstance(result, Exception):
print(f" 异常: {result}")
failed += 1
elif isinstance(result, dict) and 'error' in result:
print(f" 错误: {result['error']}")
failed += 1
else:
print(f" 成功: 状态码 200")
successful += 1
print(f"\n总结: 成功 {successful} 个,失败 {failed} 个")
async def main_error_handling():
"""运行错误处理演示的主函数"""
await demonstrate_error_handling()
# 运行错误处理示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_error_handling())10. 性能优化技巧 #
这个部分总结了使用aiohttp时的性能优化技巧,帮助开发者写出高效的异步HTTP代码。
10.1 性能优化实践 #
这个示例展示了各种性能优化技巧的实际应用。
# 导入必要的模块
import aiohttp
import asyncio
import time
from aiohttp import TCPConnector, ClientTimeout
async def performance_optimization_demo():
"""性能优化技巧演示"""
print("=== aiohttp 性能优化技巧演示 ===")
# 技巧1: 复用Session
print("\n--- 技巧1: 复用Session ---")
# 创建优化的连接器
connector = TCPConnector(
limit=200, # 增加最大连接数
limit_per_host=20, # 增加每主机连接数
ssl=False, # 禁用SSL验证(仅用于测试)
force_close=False, # 保持连接复用
enable_cleanup_closed=True, # 自动清理
ttl_dns_cache=300, # DNS缓存
use_dns_cache=True # 启用DNS缓存
)
# 创建优化的超时配置
timeout = ClientTimeout(
total=30,
connect=5,
sock_read=20
)
# 创建优化的Session
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={'User-Agent': 'PerformanceDemo/1.0'},
trust_env=True
) as session:
# 技巧2: 批量处理请求
print("\n--- 技巧2: 批量处理请求 ---")
# 创建大量测试URL
base_url = "https://httpbin.org"
urls = [
f"{base_url}/json",
f"{base_url}/html",
f"{base_url}/xml",
f"{base_url}/robots.txt",
f"{base_url}/user-agent",
f"{base_url}/headers",
f"{base_url}/ip",
f"{base_url}/uuid"
] * 3 # 重复3次,总共24个URL
print(f"准备并发请求 {len(urls)} 个URL...")
# 技巧3: 使用信号量控制并发数
semaphore = asyncio.Semaphore(15) # 限制最大并发数
async def optimized_request(url):
"""优化的请求函数"""
async with semaphore:
try:
start_time = time.time()
async with session.get(url) as response:
content = await response.text()
elapsed = time.time() - start_time
return {
'url': url,
'status': response.status,
'elapsed': elapsed,
'success': True
}
except Exception as e:
elapsed = time.time() - start_time
return {
'url': url,
'error': str(e),
'elapsed': elapsed,
'success': False
}
# 技巧4: 使用asyncio.gather进行并发
print("开始并发请求...")
start_time = time.time()
tasks = [optimized_request(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start_time
# 分析结果
successful = sum(1 for r in results if isinstance(r, dict) and r.get('success'))
failed = len(results) - successful
if successful > 0:
avg_time = sum(r['elapsed'] for r in results if isinstance(r, dict) and r.get('success')) / successful
print(f"成功请求: {successful}, 失败: {failed}")
print(f"平均请求时间: {avg_time:.3f}秒")
print(f"总耗时: {total_time:.2f}秒")
print(f"吞吐量: {successful/total_time:.2f} 请求/秒")
# 技巧5: 流式处理大响应
print("\n--- 技巧5: 流式处理大响应 ---")
try:
async with session.get('https://httpbin.org/stream/10') as response:
print("开始流式读取响应...")
chunk_count = 0
total_size = 0
# 流式读取响应内容
async for chunk in response.content.iter_chunked(1024):
chunk_count += 1
total_size += len(chunk)
if chunk_count <= 3: # 只显示前3个块的信息
print(f"块 {chunk_count}: {len(chunk)} 字节")
print(f"总共读取 {chunk_count} 个块,总大小 {total_size} 字节")
except Exception as e:
print(f"流式读取失败: {e}")
async def main_performance():
"""运行性能优化演示的主函数"""
await performance_optimization_demo()
# 运行性能优化示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_performance())10.2 性能监控和分析 #
这个示例展示了如何监控和分析aiohttp应用的性能。
# 导入必要的模块
import aiohttp
import asyncio
import time
import statistics
from collections import defaultdict
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.request_times = defaultdict(list)
self.error_counts = defaultdict(int)
self.total_requests = 0
self.start_time = None
def start_monitoring(self):
"""开始监控"""
self.start_time = time.time()
print("性能监控已启动")
def record_request(self, url, elapsed_time, success=True):
"""记录请求信息"""
self.total_requests += 1
self.request_times[url].append(elapsed_time)
if not success:
self.error_counts[url] += 1
def get_statistics(self):
"""获取统计信息"""
if not self.start_time:
return {}
total_time = time.time() - self.start_time
stats = {
'total_requests': self.total_requests,
'total_time': total_time,
'requests_per_second': self.total_requests / total_time if total_time > 0 else 0,
'urls_tested': len(self.request_times),
'total_errors': sum(self.error_counts.values())
}
# 计算每个URL的统计信息
url_stats = {}
for url, times in self.request_times.items():
if times:
url_stats[url] = {
'request_count': len(times),
'avg_time': statistics.mean(times),
'min_time': min(times),
'max_time': max(times),
'error_count': self.error_counts[url]
}
stats['url_details'] = url_stats
return stats
def print_report(self):
"""打印性能报告"""
stats = self.get_statistics()
print("\n" + "="*60)
print("性能监控报告")
print("="*60)
print(f"总请求数: {stats['total_requests']}")
print(f"总耗时: {stats['total_time']:.2f}秒")
print(f"请求速率: {stats['requests_per_second']:.2f} 请求/秒")
print(f"测试URL数: {stats['urls_tested']}")
print(f"总错误数: {stats['total_errors']}")
print(f"\n详细统计:")
for url, url_stat in stats['url_details'].items():
print(f"\n{url}:")
print(f" 请求数: {url_stat['request_count']}")
print(f" 平均时间: {url_stat['avg_time']:.3f}秒")
print(f" 最快时间: {url_stat['min_time']:.3f}秒")
print(f" 最慢时间: {url_stat['max_time']:.3f}秒")
print(f" 错误数: {url_stat['error_count']}")
async def monitored_request(session, url, monitor, semaphore):
"""带监控的请求函数"""
async with semaphore:
try:
start_time = time.time()
async with session.get(url) as response:
content = await response.text()
elapsed = time.time() - start_time
# 记录成功请求
monitor.record_request(url, elapsed, success=True)
return {
'url': url,
'status': response.status,
'elapsed': elapsed,
'success': True
}
except Exception as e:
elapsed = time.time() - start_time
# 记录失败请求
monitor.record_request(url, elapsed, success=False)
return {
'url': url,
'error': str(e),
'elapsed': elapsed,
'success': False
}
async def performance_monitoring_demo():
"""性能监控演示"""
print("=== aiohttp 性能监控演示 ===")
# 创建性能监控器
monitor = PerformanceMonitor()
monitor.start_monitoring()
# 创建优化的Session
connector = TCPConnector(
limit=100,
limit_per_host=10,
ssl=False
)
timeout = ClientTimeout(total=30, connect=10)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout
) as session:
# 测试URL列表
urls = [
'https://httpbin.org/json',
'https://httpbin.org/html',
'https://httpbin.org/xml',
'https://httpbin.org/robots.txt',
'https://httpbin.org/user-agent'
] * 4 # 重复4次,总共20个请求
print(f"开始监控 {len(urls)} 个请求的性能...")
# 使用信号量控制并发
semaphore = asyncio.Semaphore(8)
# 创建所有请求任务
tasks = [
monitored_request(session, url, monitor, semaphore)
for url in urls
]
# 执行所有请求
start_time = time.time()
results = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start_time
print(f"所有请求完成,总耗时: {total_time:.2f}秒")
# 生成性能报告
monitor.print_report()
async def main_monitoring():
"""运行性能监控演示的主函数"""
await performance_monitoring_demo()
# 运行性能监控示例(如果在Windows命令行中运行)
if __name__ == "__main__":
asyncio.run(main_monitoring())