1. 什么是 databases #
databases 是一个 异步 的 SQL 数据库客户端库,为 Python 的异步生态(如 FastAPI、Starlette)提供了简单而强大的数据库访问能力。
databases 库的设计理念是提供一个轻量级、高性能的异步数据库访问接口。它不包含 ORM 功能,而是专注于提供底层的 SQL 查询能力,这使得它非常适合需要精确控制 SQL 语句的场景。库的核心优势在于其异步特性,能够有效处理高并发请求,是现代异步 Web 应用的理想选择。
2. 核心特性 #
- 完全异步:基于
async/await语法,支持高并发 - SQL 查询支持:直接执行原始 SQL 语句
- 连接池管理:自动管理数据库连接池
- 多数据库支持:PostgreSQL、MySQL、SQLite
- 类型安全:支持记录查询和类型注解
这些特性使得 databases 特别适合构建高性能的异步应用。异步特性意味着可以同时处理多个数据库请求而不阻塞,连接池管理确保了资源的有效利用,而多数据库支持则提供了部署的灵活性。
3. 基本用法 #
3.1 创建数据库连接 #
3.2 连接字符串格式说明 #
不同的数据库使用不同的连接字符串格式。PostgreSQL 使用 postgresql:// 前缀,MySQL 使用 mysql:// 前缀,而 SQLite 使用 sqlite:/// 前缀。这些连接字符串包含了连接数据库所需的所有信息,如用户名、密码、主机地址、端口和数据库名称。
# 导入必要的库
from databases import Database
import asyncio
# 定义不同数据库的连接字符串
# PostgreSQL 连接字符串格式:postgresql://用户名:密码@主机:端口/数据库名
postgresql_url = 'postgresql://user:password@localhost:5432/mydb'
# MySQL 连接字符串格式:mysql://用户名:密码@主机:端口/数据库名
mysql_url = 'mysql://user:password@localhost:3306/mydb'
# SQLite 连接字符串格式:sqlite:///./数据库文件路径(相对路径)
sqlite_url = 'sqlite:///./test.db'
# 创建数据库连接实例
database = Database(sqlite_url)
# 测试连接函数
async def test_connection():
try:
# 建立数据库连接
await database.connect()
print("数据库连接成功!")
# 执行一个简单的测试查询
result = await database.fetch_one("SELECT 1 as test")
print(f"测试查询结果: {result}")
except Exception as e:
print(f"连接失败: {e}")
finally:
# 断开数据库连接
await database.disconnect()
print("数据库连接已断开")
# 运行测试
if __name__ == "__main__":
asyncio.run(test_connection())3.3 连接和断开连接 #
3.3.1 连接管理说明 #
正确的连接管理对于数据库应用至关重要。databases 提供了 connect() 和 disconnect() 方法来管理连接的生命周期。建议使用 try-finally 结构确保即使发生异常,连接也能被正确关闭,避免资源泄漏。
# 导入必要的库
import asyncio
from databases import Database
# 定义主函数
async def main():
# 创建数据库连接实例
database = Database('sqlite:///./example.db')
try:
# 建立数据库连接
await database.connect()
print("数据库连接已建立")
# 执行数据库操作
# 这里可以添加各种数据库查询操作
print("正在执行数据库操作...")
# 模拟一些数据库操作
await asyncio.sleep(1)
print("数据库操作完成")
except Exception as e:
# 异常处理
print(f"操作过程中发生错误: {e}")
finally:
# 确保在函数结束时断开连接
await database.disconnect()
print("数据库连接已断开")
# 程序入口点
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())3.4 执行查询操作 #
3.4.1 数据库操作概述 #
databases 提供了多种查询方法:execute() 用于执行 INSERT、UPDATE、DELETE 等操作,fetch_one() 用于获取单条记录,fetch_all() 用于获取多条记录。这些方法都支持参数化查询,可以有效防止 SQL 注入攻击。
3.4.2 创建表和数据插入 #
3.4.2.1 表结构设计说明 #
这个示例展示了如何创建一个用户表,包含基本的用户信息字段。使用 CREATE TABLE IF NOT EXISTS 语句可以避免重复创建表的错误,AUTOINCREMENT 关键字确保 ID 字段自动递增,UNIQUE 约束确保邮箱地址的唯一性。
# 导入必要的库
import asyncio
from databases import Database
# 数据库设置函数
async def setup_database():
# 创建数据库连接实例
database = Database('sqlite:///./example.db')
try:
# 建立数据库连接
await database.connect()
print("数据库连接已建立")
# 创建用户表的SQL语句
# 使用 IF NOT EXISTS 避免重复创建表的错误
create_table_query = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT, -- 主键,自动递增
name TEXT NOT NULL, -- 用户名,不能为空
email TEXT NOT NULL UNIQUE -- 邮箱,不能为空且唯一
)
"""
# 执行创建表的SQL语句
await database.execute(create_table_query)
print("用户表创建成功")
# 插入用户数据的SQL语句
# 使用参数化查询防止SQL注入
insert_query = "INSERT INTO users (name, email) VALUES (:name, :email)"
# 定义要插入的用户数据
values = {"name": "Alice", "email": "alice@example.com"}
# 执行插入操作并获取新插入记录的ID
user_id = await database.execute(insert_query, values)
print(f"成功插入用户,ID: {user_id}")
# 插入更多用户数据
more_users = [
{"name": "Bob", "email": "bob@example.com"},
{"name": "Charlie", "email": "charlie@example.com"}
]
for user_data in more_users:
user_id = await database.execute(insert_query, user_data)
print(f"成功插入用户 {user_data['name']},ID: {user_id}")
print("数据库初始化完成")
except Exception as e:
# 异常处理
print(f"设置数据库时发生错误: {e}")
finally:
# 断开数据库连接
await database.disconnect()
print("数据库连接已断开")
# 程序入口点
if __name__ == "__main__":
# 运行数据库设置函数
asyncio.run(setup_database())3.5 查询数据 #
3.5.1 数据查询说明 #
这个示例展示了如何从数据库中查询数据。fetch_all() 方法返回所有匹配的记录,每条记录都是一个类似字典的对象,可以通过键名访问字段值。这种方法适合需要处理多条记录的场景。
# 导入必要的库
import asyncio
from databases import Database
# 查询所有用户的函数
async def query_users():
# 创建数据库连接实例
database = Database('sqlite:///./example.db')
try:
# 建立数据库连接
await database.connect()
print("数据库连接已建立")
# 查询所有用户的SQL语句
query = "SELECT * FROM users"
# 执行查询并获取所有结果
users = await database.fetch_all(query)
# 检查是否找到用户
if not users:
print("数据库中没有找到用户")
return
# 遍历并显示所有用户信息
print(f"找到 {len(users)} 个用户:")
print("-" * 50)
for user in users:
# 每条记录都是一个类似字典的对象
print(f"ID: {user['id']}")
print(f"姓名: {user['name']}")
print(f"邮箱: {user['email']}")
print("-" * 50)
except Exception as e:
# 异常处理
print(f"查询用户时发生错误: {e}")
finally:
# 断开数据库连接
await database.disconnect()
print("数据库连接已断开")
# 程序入口点
if __name__ == "__main__":
# 运行查询函数
asyncio.run(query_users())3.6 带参数的查询 #
3.6.1 参数化查询说明 #
参数化查询是数据库操作的最佳实践,它不仅可以防止 SQL 注入攻击,还能提高查询性能。通过使用命名参数(如 :id),我们可以安全地将用户输入传递给 SQL 语句,数据库会自动处理参数的类型转换和转义。
# 导入必要的库
import asyncio
from databases import Database
# 根据ID查询特定用户的函数
async def query_user_by_id(user_id: int):
# 创建数据库连接实例
database = Database('sqlite:///./example.db')
try:
# 建立数据库连接
await database.connect()
print("数据库连接已建立")
# 使用参数化查询的SQL语句
# :id 是一个命名参数,会被 values 字典中的值替换
query = "SELECT * FROM users WHERE id = :id"
# 定义查询参数
values = {"id": user_id}
# 执行查询并获取单条结果
user = await database.fetch_one(query, values=values)
# 检查是否找到用户
if user:
print(f"找到用户:")
print(f" ID: {user['id']}")
print(f" 姓名: {user['name']}")
print(f" 邮箱: {user['email']}")
else:
print(f"未找到ID为 {user_id} 的用户")
except Exception as e:
# 异常处理
print(f"查询用户时发生错误: {e}")
finally:
# 断开数据库连接
await database.disconnect()
print("数据库连接已断开")
# 程序入口点
if __name__ == "__main__":
# 测试查询不同ID的用户
test_ids = [1, 2, 999] # 包括存在的和不存在的ID
for test_id in test_ids:
print(f"\n正在查询ID为 {test_id} 的用户...")
asyncio.run(query_user_by_id(test_id))3.7 使用连接上下文管理器(推荐) #
3.7.1 上下文管理器优势说明 #
使用 async with 语句可以自动管理数据库连接的生命周期,无需手动调用 connect() 和 disconnect() 方法。这种方式更加简洁、安全,能够确保即使发生异常,连接也会被正确关闭,是生产环境中的推荐做法。
# 导入必要的库
import asyncio
from databases import Database
# 使用上下文管理器的函数
async def using_context_manager():
# 使用 async with 自动管理数据库连接
# 进入 with 块时自动连接,退出时自动断开
async with Database('sqlite:///./example.db') as database:
print("数据库连接已建立(通过上下文管理器)")
try:
# 查询所有用户
users = await database.fetch_all("SELECT * FROM users")
# 检查是否找到用户
if not users:
print("数据库中没有找到用户")
return
# 显示用户信息
print(f"找到 {len(users)} 个用户:")
for user in users:
print(f" {user['name']} ({user['email']})")
except Exception as e:
# 异常处理
print(f"查询过程中发生错误: {e}")
# 不需要手动断开连接,上下文管理器会自动处理
print("即将退出上下文管理器,连接将自动断开")
# 程序入口点
if __name__ == "__main__":
# 运行使用上下文管理器的函数
asyncio.run(using_context_manager())4. 高级用法 #
4.1 事务管理 #
4.1.1 事务概念说明 #
事务是数据库操作的重要概念,它确保一组相关的操作要么全部成功,要么全部失败。在 databases 中,使用 database.transaction() 上下文管理器来管理事务。事务特别适用于需要保证数据一致性的场景,如银行转账、库存管理等。
# 导入必要的库
import asyncio
from databases import Database
# 资金转账函数
async def transfer_funds(from_id: int, to_id: int, amount: float):
# 创建数据库连接实例
database = Database('sqlite:///./example.db')
try:
# 建立数据库连接
await database.connect()
print("数据库连接已建立")
# 首先创建账户表(如果不存在)
create_accounts_table = """
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
balance REAL NOT NULL DEFAULT 0.0
)
"""
await database.execute(create_accounts_table)
# 插入一些测试账户数据
await database.execute(
"INSERT OR IGNORE INTO accounts (id, name, balance) VALUES (1, 'Alice', 1000.0)"
)
await database.execute(
"INSERT OR IGNORE INTO accounts (id, name, balance) VALUES (2, 'Bob', 500.0)"
)
print(f"转账前账户余额:")
alice = await database.fetch_one("SELECT * FROM accounts WHERE id = :id", {"id": from_id})
bob = await database.fetch_one("SELECT * FROM accounts WHERE id = :id", {"id": to_id})
if alice and bob:
print(f"Alice (ID: {from_id}): ${alice['balance']:.2f}")
print(f"Bob (ID: {to_id}): ${bob['balance']:.2f}")
# 使用事务进行资金转账
async with database.transaction():
print(f"\n开始转账 ${amount:.2f} 从账户 {from_id} 到账户 {to_id}")
# 扣款操作
await database.execute(
"UPDATE accounts SET balance = balance - :amount WHERE id = :id",
{"amount": amount, "id": from_id}
)
print(f"已从账户 {from_id} 扣除 ${amount:.2f}")
# 存款操作
await database.execute(
"UPDATE accounts SET balance = balance + :amount WHERE id = :id",
{"amount": amount, "id": to_id}
)
print(f"已向账户 {to_id} 转入 ${amount:.2f}")
# 验证转账结果
alice_after = await database.fetch_one("SELECT * FROM accounts WHERE id = :id", {"id": from_id})
bob_after = await database.fetch_one("SELECT * FROM accounts WHERE id = :id", {"id": to_id})
print(f"\n转账后账户余额:")
print(f"Alice (ID: {from_id}): ${alice_after['balance']:.2f}")
print(f"Bob (ID: {to_id}): ${bob_after['balance']:.2f}")
print("转账成功完成!")
except Exception as e:
# 异常处理
print(f"转账过程中发生错误: {e}")
print("由于使用了事务,所有更改都已回滚")
finally:
# 断开数据库连接
await database.disconnect()
print("数据库连接已断开")
# 程序入口点
if __name__ == "__main__":
# 执行转账操作
asyncio.run(transfer_funds(1, 2, 200.0))4.2 使用 Pydantic 模型(类型安全) #
4.2.1 类型安全说明 #
Pydantic 是一个强大的数据验证库,它与 databases 结合使用可以提供类型安全的数据库操作。通过定义 Pydantic 模型,我们可以在运行时验证数据的类型和格式,确保数据的完整性和一致性。这种方式特别适合构建 API 和需要严格数据验证的应用。
# 导入必要的库
import asyncio
from databases import Database
from pydantic import BaseModel, EmailStr
from typing import List, Optional
# 定义用户数据模型
class User(BaseModel):
id: int
name: str
email: str
age: Optional[int] = None # 可选字段,默认值为None
# 定义创建用户的请求模型
class CreateUserRequest(BaseModel):
name: str
email: str
age: Optional[int] = None
# 定义用户列表响应模型
class UserListResponse(BaseModel):
users: List[User]
total_count: int
# 获取所有用户并转换为Pydantic模型的函数
async def get_users_as_models() -> UserListResponse:
# 创建数据库连接实例
database = Database('sqlite:///./example.db')
try:
# 建立数据库连接
await database.connect()
print("数据库连接已建立")
# 首先确保用户表存在并包含age字段
create_table_query = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
age INTEGER
)
"""
await database.execute(create_table_query)
# 插入一些测试数据(如果表为空)
count_query = "SELECT COUNT(*) as count FROM users"
result = await database.fetch_one(count_query)
if result['count'] == 0:
print("插入测试用户数据...")
test_users = [
{"name": "Alice", "email": "alice@example.com", "age": 25},
{"name": "Bob", "email": "bob@example.com", "age": 30},
{"name": "Charlie", "email": "charlie@example.com", "age": None}
]
for user_data in test_users:
await database.execute(
"INSERT INTO users (name, email, age) VALUES (:name, :email, :age)",
user_data
)
print("测试数据插入完成")
# 查询所有用户
query = "SELECT id, name, email, age FROM users"
rows = await database.fetch_all(query)
# 将数据库行转换为Pydantic模型
users = []
for row in rows:
# 使用dict(row)将数据库行转换为字典,然后传递给Pydantic模型
user = User(**dict(row))
users.append(user)
print(f"验证用户数据: {user}")
# 创建响应模型
response = UserListResponse(users=users, total_count=len(users))
print(f"\n成功获取 {len(users)} 个用户")
return response
except Exception as e:
# 异常处理
print(f"获取用户时发生错误: {e}")
raise
finally:
# 断开数据库连接
await database.disconnect()
print("数据库连接已断开")
# 创建新用户的函数
async def create_user(user_request: CreateUserRequest) -> User:
# 创建数据库连接实例
database = Database('sqlite:///./example.db')
try:
# 建立数据库连接
await database.connect()
print("数据库连接已建立")
# 插入新用户
insert_query = "INSERT INTO users (name, email, age) VALUES (:name, :email, :age) RETURNING id"
user_id = await database.execute(insert_query, user_request.dict())
# 创建并返回用户模型
user = User(id=user_id, **user_request.dict())
print(f"成功创建用户: {user}")
return user
except Exception as e:
# 异常处理
print(f"创建用户时发生错误: {e}")
raise
finally:
# 断开数据库连接
await database.disconnect()
print("数据库连接已断开")
# 程序入口点
if __name__ == "__main__":
# 获取所有用户
print("=== 获取所有用户 ===")
users_response = asyncio.run(get_users_as_models())
print(f"\n=== 创建新用户 ===")
# 创建新用户请求
new_user_request = CreateUserRequest(
name="David",
email="david@example.com",
age=28
)
# 创建新用户
new_user = asyncio.run(create_user(new_user_request))
print(f"新用户创建成功: {new_user}")4.3 分页查询 #
4.3.1 分页查询说明 #
分页查询是处理大量数据的常用技术,它通过 LIMIT 和 OFFSET 子句将结果集分割成多个页面。这种方式可以显著提高应用性能,减少内存使用,并改善用户体验。在实际应用中,分页查询通常与排序和过滤功能结合使用。
# 导入必要的库
import asyncio
from databases import Database
from typing import List, Dict, Any
# 分页查询结果模型
class PaginatedResult:
def __init__(self, data: List[Dict[Any, Any]], total_count: int, page: int, page_size: int):
self.data = data
self.total_count = total_count
self.page = page
self.page_size = page_size
self.total_pages = (total_count + page_size - 1) // page_size
self.has_next = page < self.total_pages
self.has_prev = page > 1
def __str__(self):
return f"第 {self.page} 页,共 {self.total_pages} 页,每页 {self.page_size} 条,总计 {self.total_count} 条记录"
# 分页查询用户函数
async def get_users_paginated(page: int = 1, page_size: int = 10, search_name: str = None):
# 创建数据库连接实例
database = Database('sqlite:///./example.db')
try:
# 建立数据库连接
await database.connect()
print("数据库连接已建立")
# 构建查询条件
where_clause = ""
query_params = {}
if search_name:
where_clause = "WHERE name LIKE :search_name"
query_params["search_name"] = f"%{search_name}%"
# 计算偏移量
offset = (page - 1) * page_size
# 查询总记录数
count_query = f"SELECT COUNT(*) as count FROM users {where_clause}"
count_result = await database.fetch_one(count_query, values=query_params)
total_count = count_result['count']
# 分页查询用户数据
query = f"""
SELECT id, name, email, age
FROM users
{where_clause}
ORDER BY id
LIMIT :limit OFFSET :offset
"""
# 添加分页参数
query_params.update({
"limit": page_size,
"offset": offset
})
# 执行查询
users = await database.fetch_all(query, values=query_params)
# 创建分页结果对象
result = PaginatedResult(users, total_count, page, page_size)
print(f"分页查询结果: {result}")
# 显示当前页的用户信息
if users:
print(f"\n第 {page} 页用户列表:")
print("-" * 60)
for user in users:
age_str = f", 年龄: {user['age']}" if user['age'] else ""
print(f"ID: {user['id']}, 姓名: {user['name']}, 邮箱: {user['email']}{age_str}")
else:
print("当前页没有找到用户")
return result
except Exception as e:
# 异常处理
print(f"分页查询时发生错误: {e}")
raise
finally:
# 断开数据库连接
await database.disconnect()
print("数据库连接已断开")
# 程序入口点
if __name__ == "__main__":
# 测试分页查询
print("=== 分页查询测试 ===")
# 查询第一页
print("\n--- 第一页 ---")
asyncio.run(get_users_paginated(page=1, page_size=2))
# 查询第二页
print("\n--- 第二页 ---")
asyncio.run(get_users_paginated(page=2, page_size=2))
# 带搜索条件的分页查询
print("\n--- 搜索包含 'a' 的用户,第一页 ---")
asyncio.run(get_users_paginated(page=1, page_size=3, search_name="a"))5. 与 FastAPI 集成示例 #
5.1 FastAPI 集成说明 #
FastAPI 是一个现代、快速的 Web 框架,它与 databases 库完美配合,可以构建高性能的异步 API。通过依赖注入系统,我们可以轻松地在不同的路由之间共享数据库连接,而连接池管理确保了高并发场景下的性能表现。
# 导入必要的库
from fastapi import FastAPI, Depends, HTTPException, status
from databases import Database
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
# 创建 FastAPI 应用实例
app = FastAPI(title="用户管理API", description="使用 databases 和 FastAPI 构建的用户管理系统")
# 定义用户数据模型
class UserBase(BaseModel):
name: str
email: str
age: Optional[int] = None
class UserCreate(UserBase):
pass
class User(UserBase):
id: int
class Config:
from_attributes = True
# 数据库连接配置
DATABASE_URL = "sqlite:///./fastapi_users.db"
# 数据库依赖注入函数
async def get_database():
# 创建数据库连接实例
database = Database(DATABASE_URL)
try:
# 建立数据库连接
await database.connect()
print("数据库连接已建立")
# 确保用户表存在
create_table_query = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
age INTEGER
)
"""
await database.execute(create_table_query)
# 返回数据库连接
yield database
except Exception as e:
# 异常处理
print(f"数据库连接失败: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="数据库连接失败"
)
finally:
# 断开数据库连接
await database.disconnect()
print("数据库连接已断开")
# 获取所有用户的路由
@app.get("/users", response_model=List[User], summary="获取所有用户")
async def get_users(db: Database = Depends(get_database)):
"""
获取数据库中的所有用户
- **db**: 数据库连接依赖
返回所有用户的列表
"""
try:
# 查询所有用户
query = "SELECT id, name, email, age FROM users ORDER BY id"
users = await db.fetch_all(query)
# 转换为 Pydantic 模型
user_list = []
for user_data in users:
user = User(
id=user_data['id'],
name=user_data['name'],
email=user_data['email'],
age=user_data['age']
)
user_list.append(user)
return user_list
except Exception as e:
# 异常处理
print(f"获取用户列表时发生错误: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="获取用户列表失败"
)
# 根据ID获取特定用户的路由
@app.get("/users/{user_id}", response_model=User, summary="根据ID获取用户")
async def get_user(user_id: int, db: Database = Depends(get_database)):
"""
根据用户ID获取特定用户信息
- **user_id**: 用户ID
- **db**: 数据库连接依赖
返回指定用户的信息
"""
try:
# 查询指定ID的用户
query = "SELECT id, name, email, age FROM users WHERE id = :id"
user_data = await db.fetch_one(query, values={"id": user_id})
# 检查用户是否存在
if not user_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"ID为 {user_id} 的用户不存在"
)
# 转换为 Pydantic 模型
user = User(
id=user_data['id'],
name=user_data['name'],
email=user_data['email'],
age=user_data['age']
)
return user
except HTTPException:
# 重新抛出 HTTP 异常
raise
except Exception as e:
# 其他异常处理
print(f"获取用户时发生错误: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="获取用户信息失败"
)
# 创建新用户的路由
@app.post("/users", response_model=User, status_code=status.HTTP_201_CREATED, summary="创建新用户")
async def create_user(user: UserCreate, db: Database = Depends(get_database)):
"""
创建新用户
- **user**: 用户创建请求数据
- **db**: 数据库连接依赖
返回新创建的用户信息
"""
try:
# 检查邮箱是否已存在
check_query = "SELECT id FROM users WHERE email = :email"
existing_user = await db.fetch_one(check_query, values={"email": user.email})
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="该邮箱地址已被注册"
)
# 插入新用户
insert_query = "INSERT INTO users (name, email, age) VALUES (:name, :email, :age) RETURNING id"
user_id = await db.execute(insert_query, values=user.dict())
# 创建返回的用户对象
created_user = User(
id=user_id,
name=user.name,
email=user.email,
age=user.age
)
print(f"成功创建用户: {created_user}")
return created_user
except HTTPException:
# 重新抛出 HTTP 异常
raise
except Exception as e:
# 其他异常处理
print(f"创建用户时发生错误: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="创建用户失败"
)
# 根路径路由
@app.get("/", summary="API 首页")
async def root():
"""
API 首页,提供基本信息
返回 API 的基本信息
"""
return {
"message": "欢迎使用用户管理 API",
"version": "1.0.0",
"framework": "FastAPI",
"database": "databases (SQLite)",
"endpoints": {
"获取所有用户": "GET /users",
"获取特定用户": "GET /users/{user_id}",
"创建新用户": "POST /users"
}
}
# 程序入口点
if __name__ == "__main__":
# 启动 FastAPI 应用
print("正在启动用户管理 API...")
print("访问 http://localhost:8000/docs 查看 API 文档")
print("按 Ctrl+C 停止服务")
# 使用 uvicorn 启动服务
uvicorn.run(
"main:app", # 假设文件名为 main.py
host="0.0.0.0",
port=8000,
reload=True, # 开发模式下启用热重载
log_level="info"
)6. 最佳实践 #
6.1 连接管理最佳实践 #
连接池管理说明 #
databases 库自动管理连接池,但了解其工作原理有助于优化应用性能。连接池维护一组可重用的数据库连接,减少建立新连接的开销。在高并发场景下,合理配置连接池大小可以显著提升性能。
# 导入必要的库
import asyncio
from databases import Database
from contextlib import asynccontextmanager
# 数据库配置
DATABASE_URL = "sqlite:///./best_practices.db"
MAX_CONNECTIONS = 10
MIN_CONNECTIONS = 2
# 创建数据库连接实例(带连接池配置)
database = Database(
DATABASE_URL,
min_size=MIN_CONNECTIONS, # 最小连接数
max_size=MAX_CONNECTIONS, # 最大连接数
force_rollback=True # 强制回滚未提交的事务
)
# 异步上下文管理器装饰器
@asynccontextmanager
async def get_db_connection():
"""
数据库连接上下文管理器
自动管理连接的建立和断开,确保资源正确释放
"""
try:
# 建立数据库连接
await database.connect()
print("数据库连接已建立")
yield database
except Exception as e:
# 异常处理
print(f"数据库连接失败: {e}")
raise
finally:
# 断开数据库连接
await database.disconnect()
print("数据库连接已断开")
# 使用最佳实践的数据库操作函数
async def best_practice_operations():
"""
展示最佳实践的数据库操作
包括:
1. 使用上下文管理器
2. 参数化查询
3. 异常处理
4. 事务管理
"""
async with get_db_connection() as db:
try:
# 创建示例表
create_table_query = """
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL NOT NULL CHECK (price > 0),
stock INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
await db.execute(create_table_query)
print("产品表创建成功")
# 使用事务进行批量操作
async with db.transaction():
# 插入产品数据(参数化查询)
products = [
{"name": "笔记本电脑", "price": 5999.99, "stock": 10},
{"name": "智能手机", "price": 2999.99, "stock": 20},
{"name": "无线耳机", "price": 299.99, "stock": 50}
]
insert_query = """
INSERT INTO products (name, price, stock)
VALUES (:name, :price, :stock)
"""
for product in products:
await db.execute(insert_query, product)
print(f"插入产品: {product['name']}")
print("批量插入完成")
# 查询产品数据
query = "SELECT * FROM products WHERE price > :min_price ORDER BY price DESC"
expensive_products = await db.fetch_all(query, values={"min_price": 1000.0})
print(f"\n价格超过1000元的产品:")
for product in expensive_products:
print(f" {product['name']}: ¥{product['price']:.2f} (库存: {product['stock']})")
except Exception as e:
# 异常处理
print(f"操作过程中发生错误: {e}")
raise
# 程序入口点
if __name__ == "__main__":
# 运行最佳实践示例
asyncio.run(best_practice_operations())6.3 核心最佳实践总结 #
最佳实践要点说明 #
遵循这些最佳实践可以确保数据库操作的安全性、性能和可维护性。这些实践基于实际项目经验,能够帮助开发者避免常见陷阱并构建高质量的数据库应用。
- 使用连接池:
databases自动管理连接池,但可以配置参数优化性能 - 总是使用参数化查询:防止 SQL 注入攻击,提高查询性能
- 使用上下文管理器:确保连接正确关闭,避免资源泄漏
- 处理异常:添加适当的错误处理和日志记录
- 使用类型注解:提高代码可读性和安全性
- 事务管理:对于需要原子性的操作使用事务
- 批量操作:合理使用批量插入和更新提高性能
- 查询优化:避免 N+1 查询问题,合理使用索引
`
6.2 性能优化建议 #
性能优化说明 #
数据库性能优化是一个复杂的话题,涉及查询优化、索引设计、连接池配置等多个方面。databases 库本身已经提供了良好的性能基础,但合理的应用层设计可以进一步提升性能。
# 导入必要的库
import asyncio
import time
from databases import Database
from typing import List, Dict, Any
# 数据库配置
DATABASE_URL = "sqlite:///./performance_test.db"
# 性能测试函数
async def performance_test():
"""
数据库性能测试函数
测试不同查询方式的性能差异
"""
database = Database(DATABASE_URL)
try:
await database.connect()
print("数据库连接已建立")
# 创建测试表
create_table_query = """
CREATE TABLE IF NOT EXISTS performance_test (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
value INTEGER NOT NULL,
category TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
await database.execute(create_table_query)
# 插入测试数据
print("正在插入测试数据...")
test_data = []
for i in range(1000):
test_data.append({
"name": f"Item_{i}",
"value": i % 100,
"category": f"Category_{i % 10}"
})
# 批量插入(性能优化)
insert_query = "INSERT INTO performance_test (name, value, category) VALUES (:name, :value, :category)"
start_time = time.time()
for data in test_data:
await database.execute(insert_query, data)
insert_time = time.time() - start_time
print(f"插入1000条记录耗时: {insert_time:.4f}秒")
# 测试不同查询方式的性能
print("\n=== 性能测试结果 ===")
# 测试1: 简单查询
start_time = time.time()
result1 = await database.fetch_all("SELECT * FROM performance_test LIMIT 100")
time1 = time.time() - start_time
print(f"简单查询100条记录: {time1:.4f}秒")
# 测试2: 条件查询
start_time = time.time()
result2 = await database.fetch_all(
"SELECT * FROM performance_test WHERE category = :category LIMIT 100",
values={"category": "Category_1"}
)
time2 = time.time() - start_time
print(f"条件查询100条记录: {time2:.4f}秒")
# 测试3: 聚合查询
start_time = time.time()
result3 = await database.fetch_all(
"SELECT category, COUNT(*) as count, AVG(value) as avg_value FROM performance_test GROUP BY category"
)
time3 = time.time() - start_time
print(f"聚合查询: {time3:.4f}秒")
# 测试4: 分页查询
start_time = time.time()
result4 = await database.fetch_all(
"SELECT * FROM performance_test ORDER BY id LIMIT :limit OFFSET :offset",
values={"limit": 50, "offset": 100}
)
time4 = time.time() - start_time
print(f"分页查询50条记录: {time4:.4f}秒")
print(f"\n总计查询记录数: {len(result1) + len(result2) + len(result3) + len(result4)}")
except Exception as e:
print(f"性能测试过程中发生错误: {e}")
finally:
await database.disconnect()
print("数据库连接已断开")
# 程序入口点
if __name__ == "__main__":
# 运行性能测试
asyncio.run(performance_test())7. 与其他库的比较 #
7.1 技术选型分析 #
库选择考虑因素 #
选择合适的数据库库需要考虑多个因素,包括项目需求、团队技能、性能要求、维护成本等。databases 库在某些场景下是理想选择,但在其他场景下可能不是最佳方案。
# 导入必要的库
import asyncio
from databases import Database
import sqlite3
import time
# 比较不同数据库访问方式的性能
async def library_comparison():
"""
比较 databases 与其他数据库访问方式的性能
包括:
1. databases (异步)
2. sqlite3 (同步)
3. 性能对比分析
"""
print("=== 数据库库性能对比分析 ===\n")
# 测试数据
test_records = 1000
# 1. 使用 databases (异步)
print("1. 测试 databases (异步)")
database = Database('sqlite:///./comparison_test.db')
try:
await database.connect()
# 创建测试表
await database.execute("""
CREATE TABLE IF NOT EXISTS comparison_test (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
value INTEGER NOT NULL
)
""")
# 清空表
await database.execute("DELETE FROM comparison_test")
# 插入测试数据
start_time = time.time()
for i in range(test_records):
await database.execute(
"INSERT INTO comparison_test (name, value) VALUES (:name, :value)",
{"name": f"Item_{i}", "value": i}
)
insert_time_async = time.time() - start_time
# 查询测试数据
start_time = time.time()
results = await database.fetch_all("SELECT * FROM performance_test LIMIT 100")
query_time_async = time.time() - start_time
print(f" 插入 {test_records} 条记录: {insert_time_async:.4f}秒")
print(f" 查询100条记录: {query_time_async:.4f}秒")
finally:
await database.disconnect()
# 2. 使用 sqlite3 (同步)
print("\n2. 测试 sqlite3 (同步)")
conn = sqlite3.connect('./comparison_test_sync.db')
cursor = conn.cursor()
try:
# 创建测试表
cursor.execute("""
CREATE TABLE IF NOT EXISTS comparison_test (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
value INTEGER NOT NULL
)
""")
# 清空表
cursor.execute("DELETE FROM comparison_test")
# 插入测试数据
start_time = time.time()
for i in range(test_records):
cursor.execute(
"INSERT INTO comparison_test (name, value) VALUES (?, ?)",
(f"Item_{i}", i)
)
conn.commit()
insert_time_sync = time.time() - start_time
# 查询测试数据
start_time = time.time()
cursor.execute("SELECT * FROM comparison_test LIMIT 100")
results = cursor.fetchall()
query_time_sync = time.time() - start_time
print(f" 插入 {test_records} 条记录: {insert_time_sync:.4f}秒")
print(f" 查询100条记录: {query_time_sync:.4f}秒")
finally:
conn.close()
# 3. 性能对比分析
print("\n3. 性能对比分析")
print("-" * 50)
# 插入性能对比
if insert_time_async < insert_time_sync:
faster_insert = "databases (异步)"
insert_improvement = ((insert_time_sync - insert_time_async) / insert_time_sync) * 100
print(f"插入性能: {faster_insert} 更快,提升 {insert_improvement:.1f}%")
else:
faster_insert = "sqlite3 (同步)"
insert_improvement = ((insert_time_async - insert_time_sync) / insert_time_sync) * 100
print(f"插入性能: {faster_insert} 更快,提升 {insert_improvement:.1f}%")
# 查询性能对比
if query_time_async < query_time_sync:
faster_query = "databases (异步)"
query_improvement = ((query_time_sync - query_time_async) / query_time_sync) * 100
print(f"查询性能: {faster_query} 更快,提升 {query_improvement:.1f}%")
else:
faster_query = "sqlite3 (同步)"
query_improvement = ((query_time_async - query_time_sync) / query_time_sync) * 100
print(f"查询性能: {faster_query} 更快,提升 {query_improvement:.1f}%")
# 4. 选择建议
print("\n4. 技术选型建议")
print("-" * 50)
print("选择 databases 库的场景:")
print(" ✓ 构建异步 Web 应用 (FastAPI, Starlette)")
print(" ✓ 需要高并发处理能力")
print(" ✓ 团队熟悉 async/await 语法")
print(" ✓ 需要连接池管理")
print(" ✓ 多数据库支持需求")
print("\n选择其他库的场景:")
print(" ✓ 简单的同步应用")
print(" ✓ 团队不熟悉异步编程")
print(" ✓ 需要 ORM 功能")
print(" ✓ 需要复杂的查询构建器")
# 程序入口点
if __name__ == "__main__":
# 运行库对比分析
asyncio.run(library_comparison())7.2 总结 #
技术选型总结 #
databases 库是一个优秀的异步数据库客户端库,特别适合现代异步 Web 应用。它提供了简单而强大的 API,支持多种数据库,并具有优秀的性能表现。在选择数据库库时,应该根据项目需求、团队技能和性能要求来做出决策。
主要优势:
- 完全异步,支持高并发
- 轻量级,学习成本低
- 支持多种数据库
- 自动连接池管理
- 与 FastAPI 等现代框架完美集成
适用场景:
- 异步 Web 应用
- 高并发 API 服务
- 需要精确控制 SQL 的场景
- 微服务架构
注意事项:
- 需要团队具备异步编程经验
- 不提供 ORM 功能
- 需要手动编写 SQL 语句