ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1. 什么是 databases
  • 2. 核心特性
  • 3. 基本用法
    • 3.1 创建数据库连接
    • 3.2 连接字符串格式说明
    • 3.3 连接和断开连接
      • 3.3.1 连接管理说明
    • 3.4 执行查询操作
      • 3.4.1 数据库操作概述
    • 3.4.2 创建表和数据插入
      • 3.4.2.1 表结构设计说明
    • 3.5 查询数据
      • 3.5.1 数据查询说明
    • 3.6 带参数的查询
      • 3.6.1 参数化查询说明
    • 3.7 使用连接上下文管理器(推荐)
      • 3.7.1 上下文管理器优势说明
  • 4. 高级用法
    • 4.1 事务管理
      • 4.1.1 事务概念说明
    • 4.2 使用 Pydantic 模型(类型安全)
      • 4.2.1 类型安全说明
    • 4.3 分页查询
      • 4.3.1 分页查询说明
  • 5. 与 FastAPI 集成示例
    • 5.1 FastAPI 集成说明
  • 6. 最佳实践
    • 6.1 连接管理最佳实践
      • 连接池管理说明
    • 6.3 核心最佳实践总结
      • 最佳实践要点说明
    • 6.2 性能优化建议
      • 性能优化说明
  • 7. 与其他库的比较
    • 7.1 技术选型分析
      • 库选择考虑因素
    • 7.2 总结
      • 技术选型总结

1. 什么是 databases #

databases 是一个 异步 的 SQL 数据库客户端库,为 Python 的异步生态(如 FastAPI、Starlette)提供了简单而强大的数据库访问能力。

databases 库的设计理念是提供一个轻量级、高性能的异步数据库访问接口。它不包含 ORM 功能,而是专注于提供底层的 SQL 查询能力,这使得它非常适合需要精确控制 SQL 语句的场景。库的核心优势在于其异步特性,能够有效处理高并发请求,是现代异步 Web 应用的理想选择。

2. 核心特性 #

  1. 完全异步:基于 async/await 语法,支持高并发
  2. SQL 查询支持:直接执行原始 SQL 语句
  3. 连接池管理:自动管理数据库连接池
  4. 多数据库支持:PostgreSQL、MySQL、SQLite
  5. 类型安全:支持记录查询和类型注解

这些特性使得 databases 特别适合构建高性能的异步应用。异步特性意味着可以同时处理多个数据库请求而不阻塞,连接池管理确保了资源的有效利用,而多数据库支持则提供了部署的灵活性。

3. 基本用法 #

3.1 创建数据库连接 #

3.2 连接字符串格式说明 #

不同的数据库使用不同的连接字符串格式。PostgreSQL 使用 postgresql:// 前缀,MySQL 使用 mysql:// 前缀,而 SQLite 使用 sqlite:/// 前缀。这些连接字符串包含了连接数据库所需的所有信息,如用户名、密码、主机地址、端口和数据库名称。

# 导入必要的库
from databases import Database
import asyncio

# 定义不同数据库的连接字符串
# PostgreSQL 连接字符串格式:postgresql://用户名:密码@主机:端口/数据库名
postgresql_url = 'postgresql://user:password@localhost:5432/mydb'

# MySQL 连接字符串格式:mysql://用户名:密码@主机:端口/数据库名
mysql_url = 'mysql://user:password@localhost:3306/mydb'

# SQLite 连接字符串格式:sqlite:///./数据库文件路径(相对路径)
sqlite_url = 'sqlite:///./test.db'

# 创建数据库连接实例
database = Database(sqlite_url)

# 测试连接函数
async def test_connection():
    try:
        # 建立数据库连接
        await database.connect()
        print("数据库连接成功!")

        # 执行一个简单的测试查询
        result = await database.fetch_one("SELECT 1 as test")
        print(f"测试查询结果: {result}")

    except Exception as e:
        print(f"连接失败: {e}")
    finally:
        # 断开数据库连接
        await database.disconnect()
        print("数据库连接已断开")

# 运行测试
if __name__ == "__main__":
    asyncio.run(test_connection())

3.3 连接和断开连接 #

3.3.1 连接管理说明 #

正确的连接管理对于数据库应用至关重要。databases 提供了 connect() 和 disconnect() 方法来管理连接的生命周期。建议使用 try-finally 结构确保即使发生异常,连接也能被正确关闭,避免资源泄漏。

# 导入必要的库
import asyncio
from databases import Database

# 定义主函数
async def main():
    # 创建数据库连接实例
    database = Database('sqlite:///./example.db')

    try:
        # 建立数据库连接
        await database.connect()
        print("数据库连接已建立")

        # 执行数据库操作
        # 这里可以添加各种数据库查询操作
        print("正在执行数据库操作...")

        # 模拟一些数据库操作
        await asyncio.sleep(1)
        print("数据库操作完成")

    except Exception as e:
        # 异常处理
        print(f"操作过程中发生错误: {e}")
    finally:
        # 确保在函数结束时断开连接
        await database.disconnect()
        print("数据库连接已断开")

# 程序入口点
if __name__ == "__main__":
    # 运行异步主函数
    asyncio.run(main())

3.4 执行查询操作 #

3.4.1 数据库操作概述 #

databases 提供了多种查询方法:execute() 用于执行 INSERT、UPDATE、DELETE 等操作,fetch_one() 用于获取单条记录,fetch_all() 用于获取多条记录。这些方法都支持参数化查询,可以有效防止 SQL 注入攻击。

3.4.2 创建表和数据插入 #

3.4.2.1 表结构设计说明 #

这个示例展示了如何创建一个用户表,包含基本的用户信息字段。使用 CREATE TABLE IF NOT EXISTS 语句可以避免重复创建表的错误,AUTOINCREMENT 关键字确保 ID 字段自动递增,UNIQUE 约束确保邮箱地址的唯一性。

# 导入必要的库
import asyncio
from databases import Database

# 数据库设置函数
async def setup_database():
    # 创建数据库连接实例
    database = Database('sqlite:///./example.db')

    try:
        # 建立数据库连接
        await database.connect()
        print("数据库连接已建立")

        # 创建用户表的SQL语句
        # 使用 IF NOT EXISTS 避免重复创建表的错误
        create_table_query = """
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,  -- 主键,自动递增
            name TEXT NOT NULL,                    -- 用户名,不能为空
            email TEXT NOT NULL UNIQUE            -- 邮箱,不能为空且唯一
        )
        """

        # 执行创建表的SQL语句
        await database.execute(create_table_query)
        print("用户表创建成功")

        # 插入用户数据的SQL语句
        # 使用参数化查询防止SQL注入
        insert_query = "INSERT INTO users (name, email) VALUES (:name, :email)"

        # 定义要插入的用户数据
        values = {"name": "Alice", "email": "alice@example.com"}

        # 执行插入操作并获取新插入记录的ID
        user_id = await database.execute(insert_query, values)
        print(f"成功插入用户,ID: {user_id}")

        # 插入更多用户数据
        more_users = [
            {"name": "Bob", "email": "bob@example.com"},
            {"name": "Charlie", "email": "charlie@example.com"}
        ]

        for user_data in more_users:
            user_id = await database.execute(insert_query, user_data)
            print(f"成功插入用户 {user_data['name']},ID: {user_id}")

        print("数据库初始化完成")

    except Exception as e:
        # 异常处理
        print(f"设置数据库时发生错误: {e}")
    finally:
        # 断开数据库连接
        await database.disconnect()
        print("数据库连接已断开")

# 程序入口点
if __name__ == "__main__":
    # 运行数据库设置函数
    asyncio.run(setup_database())

3.5 查询数据 #

3.5.1 数据查询说明 #

这个示例展示了如何从数据库中查询数据。fetch_all() 方法返回所有匹配的记录,每条记录都是一个类似字典的对象,可以通过键名访问字段值。这种方法适合需要处理多条记录的场景。

# 导入必要的库
import asyncio
from databases import Database

# 查询所有用户的函数
async def query_users():
    # 创建数据库连接实例
    database = Database('sqlite:///./example.db')

    try:
        # 建立数据库连接
        await database.connect()
        print("数据库连接已建立")

        # 查询所有用户的SQL语句
        query = "SELECT * FROM users"

        # 执行查询并获取所有结果
        users = await database.fetch_all(query)

        # 检查是否找到用户
        if not users:
            print("数据库中没有找到用户")
            return

        # 遍历并显示所有用户信息
        print(f"找到 {len(users)} 个用户:")
        print("-" * 50)

        for user in users:
            # 每条记录都是一个类似字典的对象
            print(f"ID: {user['id']}")
            print(f"姓名: {user['name']}")
            print(f"邮箱: {user['email']}")
            print("-" * 50)

    except Exception as e:
        # 异常处理
        print(f"查询用户时发生错误: {e}")
    finally:
        # 断开数据库连接
        await database.disconnect()
        print("数据库连接已断开")

# 程序入口点
if __name__ == "__main__":
    # 运行查询函数
    asyncio.run(query_users())

3.6 带参数的查询 #

3.6.1 参数化查询说明 #

参数化查询是数据库操作的最佳实践,它不仅可以防止 SQL 注入攻击,还能提高查询性能。通过使用命名参数(如 :id),我们可以安全地将用户输入传递给 SQL 语句,数据库会自动处理参数的类型转换和转义。

# 导入必要的库
import asyncio
from databases import Database

# 根据ID查询特定用户的函数
async def query_user_by_id(user_id: int):
    # 创建数据库连接实例
    database = Database('sqlite:///./example.db')

    try:
        # 建立数据库连接
        await database.connect()
        print("数据库连接已建立")

        # 使用参数化查询的SQL语句
        # :id 是一个命名参数,会被 values 字典中的值替换
        query = "SELECT * FROM users WHERE id = :id"

        # 定义查询参数
        values = {"id": user_id}

        # 执行查询并获取单条结果
        user = await database.fetch_one(query, values=values)

        # 检查是否找到用户
        if user:
            print(f"找到用户:")
            print(f"  ID: {user['id']}")
            print(f"  姓名: {user['name']}")
            print(f"  邮箱: {user['email']}")
        else:
            print(f"未找到ID为 {user_id} 的用户")

    except Exception as e:
        # 异常处理
        print(f"查询用户时发生错误: {e}")
    finally:
        # 断开数据库连接
        await database.disconnect()
        print("数据库连接已断开")

# 程序入口点
if __name__ == "__main__":
    # 测试查询不同ID的用户
    test_ids = [1, 2, 999]  # 包括存在的和不存在的ID

    for test_id in test_ids:
        print(f"\n正在查询ID为 {test_id} 的用户...")
        asyncio.run(query_user_by_id(test_id))

3.7 使用连接上下文管理器(推荐) #

3.7.1 上下文管理器优势说明 #

使用 async with 语句可以自动管理数据库连接的生命周期,无需手动调用 connect() 和 disconnect() 方法。这种方式更加简洁、安全,能够确保即使发生异常,连接也会被正确关闭,是生产环境中的推荐做法。

# 导入必要的库
import asyncio
from databases import Database

# 使用上下文管理器的函数
async def using_context_manager():
    # 使用 async with 自动管理数据库连接
    # 进入 with 块时自动连接,退出时自动断开
    async with Database('sqlite:///./example.db') as database:
        print("数据库连接已建立(通过上下文管理器)")

        try:
            # 查询所有用户
            users = await database.fetch_all("SELECT * FROM users")

            # 检查是否找到用户
            if not users:
                print("数据库中没有找到用户")
                return

            # 显示用户信息
            print(f"找到 {len(users)} 个用户:")
            for user in users:
                print(f"  {user['name']} ({user['email']})")

        except Exception as e:
            # 异常处理
            print(f"查询过程中发生错误: {e}")

        # 不需要手动断开连接,上下文管理器会自动处理
        print("即将退出上下文管理器,连接将自动断开")

# 程序入口点
if __name__ == "__main__":
    # 运行使用上下文管理器的函数
    asyncio.run(using_context_manager())

4. 高级用法 #

4.1 事务管理 #

4.1.1 事务概念说明 #

事务是数据库操作的重要概念,它确保一组相关的操作要么全部成功,要么全部失败。在 databases 中,使用 database.transaction() 上下文管理器来管理事务。事务特别适用于需要保证数据一致性的场景,如银行转账、库存管理等。

# 导入必要的库
import asyncio
from databases import Database

# 资金转账函数
async def transfer_funds(from_id: int, to_id: int, amount: float):
    # 创建数据库连接实例
    database = Database('sqlite:///./example.db')

    try:
        # 建立数据库连接
        await database.connect()
        print("数据库连接已建立")

        # 首先创建账户表(如果不存在)
        create_accounts_table = """
        CREATE TABLE IF NOT EXISTS accounts (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            balance REAL NOT NULL DEFAULT 0.0
        )
        """
        await database.execute(create_accounts_table)

        # 插入一些测试账户数据
        await database.execute(
            "INSERT OR IGNORE INTO accounts (id, name, balance) VALUES (1, 'Alice', 1000.0)"
        )
        await database.execute(
            "INSERT OR IGNORE INTO accounts (id, name, balance) VALUES (2, 'Bob', 500.0)"
        )

        print(f"转账前账户余额:")
        alice = await database.fetch_one("SELECT * FROM accounts WHERE id = :id", {"id": from_id})
        bob = await database.fetch_one("SELECT * FROM accounts WHERE id = :id", {"id": to_id})

        if alice and bob:
            print(f"Alice (ID: {from_id}): ${alice['balance']:.2f}")
            print(f"Bob (ID: {to_id}): ${bob['balance']:.2f}")

        # 使用事务进行资金转账
        async with database.transaction():
            print(f"\n开始转账 ${amount:.2f} 从账户 {from_id} 到账户 {to_id}")

            # 扣款操作
            await database.execute(
                "UPDATE accounts SET balance = balance - :amount WHERE id = :id",
                {"amount": amount, "id": from_id}
            )
            print(f"已从账户 {from_id} 扣除 ${amount:.2f}")

            # 存款操作
            await database.execute(
                "UPDATE accounts SET balance = balance + :amount WHERE id = :id", 
                {"amount": amount, "id": to_id}
            )
            print(f"已向账户 {to_id} 转入 ${amount:.2f}")

            # 验证转账结果
            alice_after = await database.fetch_one("SELECT * FROM accounts WHERE id = :id", {"id": from_id})
            bob_after = await database.fetch_one("SELECT * FROM accounts WHERE id = :id", {"id": to_id})

            print(f"\n转账后账户余额:")
            print(f"Alice (ID: {from_id}): ${alice_after['balance']:.2f}")
            print(f"Bob (ID: {to_id}): ${bob_after['balance']:.2f}")

            print("转账成功完成!")

    except Exception as e:
        # 异常处理
        print(f"转账过程中发生错误: {e}")
        print("由于使用了事务,所有更改都已回滚")
    finally:
        # 断开数据库连接
        await database.disconnect()
        print("数据库连接已断开")

# 程序入口点
if __name__ == "__main__":
    # 执行转账操作
    asyncio.run(transfer_funds(1, 2, 200.0))

4.2 使用 Pydantic 模型(类型安全) #

4.2.1 类型安全说明 #

Pydantic 是一个强大的数据验证库,它与 databases 结合使用可以提供类型安全的数据库操作。通过定义 Pydantic 模型,我们可以在运行时验证数据的类型和格式,确保数据的完整性和一致性。这种方式特别适合构建 API 和需要严格数据验证的应用。

# 导入必要的库
import asyncio
from databases import Database
from pydantic import BaseModel, EmailStr
from typing import List, Optional

# 定义用户数据模型
class User(BaseModel):
    id: int
    name: str
    email: str
    age: Optional[int] = None  # 可选字段,默认值为None

# 定义创建用户的请求模型
class CreateUserRequest(BaseModel):
    name: str
    email: str
    age: Optional[int] = None

# 定义用户列表响应模型
class UserListResponse(BaseModel):
    users: List[User]
    total_count: int

# 获取所有用户并转换为Pydantic模型的函数
async def get_users_as_models() -> UserListResponse:
    # 创建数据库连接实例
    database = Database('sqlite:///./example.db')

    try:
        # 建立数据库连接
        await database.connect()
        print("数据库连接已建立")

        # 首先确保用户表存在并包含age字段
        create_table_query = """
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT NOT NULL UNIQUE,
            age INTEGER
        )
        """
        await database.execute(create_table_query)

        # 插入一些测试数据(如果表为空)
        count_query = "SELECT COUNT(*) as count FROM users"
        result = await database.fetch_one(count_query)

        if result['count'] == 0:
            print("插入测试用户数据...")
            test_users = [
                {"name": "Alice", "email": "alice@example.com", "age": 25},
                {"name": "Bob", "email": "bob@example.com", "age": 30},
                {"name": "Charlie", "email": "charlie@example.com", "age": None}
            ]

            for user_data in test_users:
                await database.execute(
                    "INSERT INTO users (name, email, age) VALUES (:name, :email, :age)",
                    user_data
                )
            print("测试数据插入完成")

        # 查询所有用户
        query = "SELECT id, name, email, age FROM users"
        rows = await database.fetch_all(query)

        # 将数据库行转换为Pydantic模型
        users = []
        for row in rows:
            # 使用dict(row)将数据库行转换为字典,然后传递给Pydantic模型
            user = User(**dict(row))
            users.append(user)
            print(f"验证用户数据: {user}")

        # 创建响应模型
        response = UserListResponse(users=users, total_count=len(users))

        print(f"\n成功获取 {len(users)} 个用户")
        return response

    except Exception as e:
        # 异常处理
        print(f"获取用户时发生错误: {e}")
        raise
    finally:
        # 断开数据库连接
        await database.disconnect()
        print("数据库连接已断开")

# 创建新用户的函数
async def create_user(user_request: CreateUserRequest) -> User:
    # 创建数据库连接实例
    database = Database('sqlite:///./example.db')

    try:
        # 建立数据库连接
        await database.connect()
        print("数据库连接已建立")

        # 插入新用户
        insert_query = "INSERT INTO users (name, email, age) VALUES (:name, :email, :age) RETURNING id"
        user_id = await database.execute(insert_query, user_request.dict())

        # 创建并返回用户模型
        user = User(id=user_id, **user_request.dict())
        print(f"成功创建用户: {user}")
        return user

    except Exception as e:
        # 异常处理
        print(f"创建用户时发生错误: {e}")
        raise
    finally:
        # 断开数据库连接
        await database.disconnect()
        print("数据库连接已断开")

# 程序入口点
if __name__ == "__main__":
    # 获取所有用户
    print("=== 获取所有用户 ===")
    users_response = asyncio.run(get_users_as_models())

    print(f"\n=== 创建新用户 ===")
    # 创建新用户请求
    new_user_request = CreateUserRequest(
        name="David",
        email="david@example.com",
        age=28
    )

    # 创建新用户
    new_user = asyncio.run(create_user(new_user_request))
    print(f"新用户创建成功: {new_user}")

4.3 分页查询 #

4.3.1 分页查询说明 #

分页查询是处理大量数据的常用技术,它通过 LIMIT 和 OFFSET 子句将结果集分割成多个页面。这种方式可以显著提高应用性能,减少内存使用,并改善用户体验。在实际应用中,分页查询通常与排序和过滤功能结合使用。

# 导入必要的库
import asyncio
from databases import Database
from typing import List, Dict, Any

# 分页查询结果模型
class PaginatedResult:
    def __init__(self, data: List[Dict[Any, Any]], total_count: int, page: int, page_size: int):
        self.data = data
        self.total_count = total_count
        self.page = page
        self.page_size = page_size
        self.total_pages = (total_count + page_size - 1) // page_size
        self.has_next = page < self.total_pages
        self.has_prev = page > 1

    def __str__(self):
        return f"第 {self.page} 页,共 {self.total_pages} 页,每页 {self.page_size} 条,总计 {self.total_count} 条记录"

# 分页查询用户函数
async def get_users_paginated(page: int = 1, page_size: int = 10, search_name: str = None):
    # 创建数据库连接实例
    database = Database('sqlite:///./example.db')

    try:
        # 建立数据库连接
        await database.connect()
        print("数据库连接已建立")

        # 构建查询条件
        where_clause = ""
        query_params = {}

        if search_name:
            where_clause = "WHERE name LIKE :search_name"
            query_params["search_name"] = f"%{search_name}%"

        # 计算偏移量
        offset = (page - 1) * page_size

        # 查询总记录数
        count_query = f"SELECT COUNT(*) as count FROM users {where_clause}"
        count_result = await database.fetch_one(count_query, values=query_params)
        total_count = count_result['count']

        # 分页查询用户数据
        query = f"""
        SELECT id, name, email, age 
        FROM users 
        {where_clause}
        ORDER BY id 
        LIMIT :limit OFFSET :offset
        """

        # 添加分页参数
        query_params.update({
            "limit": page_size,
            "offset": offset
        })

        # 执行查询
        users = await database.fetch_all(query, values=query_params)

        # 创建分页结果对象
        result = PaginatedResult(users, total_count, page, page_size)

        print(f"分页查询结果: {result}")

        # 显示当前页的用户信息
        if users:
            print(f"\n第 {page} 页用户列表:")
            print("-" * 60)
            for user in users:
                age_str = f", 年龄: {user['age']}" if user['age'] else ""
                print(f"ID: {user['id']}, 姓名: {user['name']}, 邮箱: {user['email']}{age_str}")
        else:
            print("当前页没有找到用户")

        return result

    except Exception as e:
        # 异常处理
        print(f"分页查询时发生错误: {e}")
        raise
    finally:
        # 断开数据库连接
        await database.disconnect()
        print("数据库连接已断开")

# 程序入口点
if __name__ == "__main__":
    # 测试分页查询
    print("=== 分页查询测试 ===")

    # 查询第一页
    print("\n--- 第一页 ---")
    asyncio.run(get_users_paginated(page=1, page_size=2))

    # 查询第二页
    print("\n--- 第二页 ---")
    asyncio.run(get_users_paginated(page=2, page_size=2))

    # 带搜索条件的分页查询
    print("\n--- 搜索包含 'a' 的用户,第一页 ---")
    asyncio.run(get_users_paginated(page=1, page_size=3, search_name="a"))

5. 与 FastAPI 集成示例 #

5.1 FastAPI 集成说明 #

FastAPI 是一个现代、快速的 Web 框架,它与 databases 库完美配合,可以构建高性能的异步 API。通过依赖注入系统,我们可以轻松地在不同的路由之间共享数据库连接,而连接池管理确保了高并发场景下的性能表现。

# 导入必要的库
from fastapi import FastAPI, Depends, HTTPException, status
from databases import Database
from pydantic import BaseModel
from typing import List, Optional
import uvicorn

# 创建 FastAPI 应用实例
app = FastAPI(title="用户管理API", description="使用 databases 和 FastAPI 构建的用户管理系统")

# 定义用户数据模型
class UserBase(BaseModel):
    name: str
    email: str
    age: Optional[int] = None

class UserCreate(UserBase):
    pass

class User(UserBase):
    id: int

    class Config:
        from_attributes = True

# 数据库连接配置
DATABASE_URL = "sqlite:///./fastapi_users.db"

# 数据库依赖注入函数
async def get_database():
    # 创建数据库连接实例
    database = Database(DATABASE_URL)

    try:
        # 建立数据库连接
        await database.connect()
        print("数据库连接已建立")

        # 确保用户表存在
        create_table_query = """
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT NOT NULL UNIQUE,
            age INTEGER
        )
        """
        await database.execute(create_table_query)

        # 返回数据库连接
        yield database

    except Exception as e:
        # 异常处理
        print(f"数据库连接失败: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="数据库连接失败"
        )
    finally:
        # 断开数据库连接
        await database.disconnect()
        print("数据库连接已断开")

# 获取所有用户的路由
@app.get("/users", response_model=List[User], summary="获取所有用户")
async def get_users(db: Database = Depends(get_database)):
    """
    获取数据库中的所有用户

    - **db**: 数据库连接依赖

    返回所有用户的列表
    """
    try:
        # 查询所有用户
        query = "SELECT id, name, email, age FROM users ORDER BY id"
        users = await db.fetch_all(query)

        # 转换为 Pydantic 模型
        user_list = []
        for user_data in users:
            user = User(
                id=user_data['id'],
                name=user_data['name'],
                email=user_data['email'],
                age=user_data['age']
            )
            user_list.append(user)

        return user_list

    except Exception as e:
        # 异常处理
        print(f"获取用户列表时发生错误: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="获取用户列表失败"
        )

# 根据ID获取特定用户的路由
@app.get("/users/{user_id}", response_model=User, summary="根据ID获取用户")
async def get_user(user_id: int, db: Database = Depends(get_database)):
    """
    根据用户ID获取特定用户信息

    - **user_id**: 用户ID
    - **db**: 数据库连接依赖

    返回指定用户的信息
    """
    try:
        # 查询指定ID的用户
        query = "SELECT id, name, email, age FROM users WHERE id = :id"
        user_data = await db.fetch_one(query, values={"id": user_id})

        # 检查用户是否存在
        if not user_data:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"ID为 {user_id} 的用户不存在"
            )

        # 转换为 Pydantic 模型
        user = User(
            id=user_data['id'],
            name=user_data['name'],
            email=user_data['email'],
            age=user_data['age']
        )

        return user

    except HTTPException:
        # 重新抛出 HTTP 异常
        raise
    except Exception as e:
        # 其他异常处理
        print(f"获取用户时发生错误: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="获取用户信息失败"
        )

# 创建新用户的路由
@app.post("/users", response_model=User, status_code=status.HTTP_201_CREATED, summary="创建新用户")
async def create_user(user: UserCreate, db: Database = Depends(get_database)):
    """
    创建新用户

    - **user**: 用户创建请求数据
    - **db**: 数据库连接依赖

    返回新创建的用户信息
    """
    try:
        # 检查邮箱是否已存在
        check_query = "SELECT id FROM users WHERE email = :email"
        existing_user = await db.fetch_one(check_query, values={"email": user.email})

        if existing_user:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="该邮箱地址已被注册"
            )

        # 插入新用户
        insert_query = "INSERT INTO users (name, email, age) VALUES (:name, :email, :age) RETURNING id"
        user_id = await db.execute(insert_query, values=user.dict())

        # 创建返回的用户对象
        created_user = User(
            id=user_id,
            name=user.name,
            email=user.email,
            age=user.age
        )

        print(f"成功创建用户: {created_user}")
        return created_user

    except HTTPException:
        # 重新抛出 HTTP 异常
        raise
    except Exception as e:
        # 其他异常处理
        print(f"创建用户时发生错误: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="创建用户失败"
        )

# 根路径路由
@app.get("/", summary="API 首页")
async def root():
    """
    API 首页,提供基本信息

    返回 API 的基本信息
    """
    return {
        "message": "欢迎使用用户管理 API",
        "version": "1.0.0",
        "framework": "FastAPI",
        "database": "databases (SQLite)",
        "endpoints": {
            "获取所有用户": "GET /users",
            "获取特定用户": "GET /users/{user_id}",
            "创建新用户": "POST /users"
        }
    }

# 程序入口点
if __name__ == "__main__":
    # 启动 FastAPI 应用
    print("正在启动用户管理 API...")
    print("访问 http://localhost:8000/docs 查看 API 文档")
    print("按 Ctrl+C 停止服务")

    # 使用 uvicorn 启动服务
    uvicorn.run(
        "main:app",  # 假设文件名为 main.py
        host="0.0.0.0",
        port=8000,
        reload=True,  # 开发模式下启用热重载
        log_level="info"
    )

6. 最佳实践 #

6.1 连接管理最佳实践 #

连接池管理说明 #

databases 库自动管理连接池,但了解其工作原理有助于优化应用性能。连接池维护一组可重用的数据库连接,减少建立新连接的开销。在高并发场景下,合理配置连接池大小可以显著提升性能。

# 导入必要的库
import asyncio
from databases import Database
from contextlib import asynccontextmanager

# 数据库配置
DATABASE_URL = "sqlite:///./best_practices.db"
MAX_CONNECTIONS = 10
MIN_CONNECTIONS = 2

# 创建数据库连接实例(带连接池配置)
database = Database(
    DATABASE_URL,
    min_size=MIN_CONNECTIONS,      # 最小连接数
    max_size=MAX_CONNECTIONS,      # 最大连接数
    force_rollback=True            # 强制回滚未提交的事务
)

# 异步上下文管理器装饰器
@asynccontextmanager
async def get_db_connection():
    """
    数据库连接上下文管理器

    自动管理连接的建立和断开,确保资源正确释放
    """
    try:
        # 建立数据库连接
        await database.connect()
        print("数据库连接已建立")
        yield database
    except Exception as e:
        # 异常处理
        print(f"数据库连接失败: {e}")
        raise
    finally:
        # 断开数据库连接
        await database.disconnect()
        print("数据库连接已断开")

# 使用最佳实践的数据库操作函数
async def best_practice_operations():
    """
    展示最佳实践的数据库操作

    包括:
    1. 使用上下文管理器
    2. 参数化查询
    3. 异常处理
    4. 事务管理
    """
    async with get_db_connection() as db:
        try:
            # 创建示例表
            create_table_query = """
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                price REAL NOT NULL CHECK (price > 0),
                stock INTEGER NOT NULL DEFAULT 0,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """
            await db.execute(create_table_query)
            print("产品表创建成功")

            # 使用事务进行批量操作
            async with db.transaction():
                # 插入产品数据(参数化查询)
                products = [
                    {"name": "笔记本电脑", "price": 5999.99, "stock": 10},
                    {"name": "智能手机", "price": 2999.99, "stock": 20},
                    {"name": "无线耳机", "price": 299.99, "stock": 50}
                ]

                insert_query = """
                INSERT INTO products (name, price, stock) 
                VALUES (:name, :price, :stock)
                """

                for product in products:
                    await db.execute(insert_query, product)
                    print(f"插入产品: {product['name']}")

                print("批量插入完成")

            # 查询产品数据
            query = "SELECT * FROM products WHERE price > :min_price ORDER BY price DESC"
            expensive_products = await db.fetch_all(query, values={"min_price": 1000.0})

            print(f"\n价格超过1000元的产品:")
            for product in expensive_products:
                print(f"  {product['name']}: ¥{product['price']:.2f} (库存: {product['stock']})")

        except Exception as e:
            # 异常处理
            print(f"操作过程中发生错误: {e}")
            raise

# 程序入口点
if __name__ == "__main__":
    # 运行最佳实践示例
    asyncio.run(best_practice_operations())

6.3 核心最佳实践总结 #

最佳实践要点说明 #

遵循这些最佳实践可以确保数据库操作的安全性、性能和可维护性。这些实践基于实际项目经验,能够帮助开发者避免常见陷阱并构建高质量的数据库应用。

  1. 使用连接池:databases 自动管理连接池,但可以配置参数优化性能
  2. 总是使用参数化查询:防止 SQL 注入攻击,提高查询性能
  3. 使用上下文管理器:确保连接正确关闭,避免资源泄漏
  4. 处理异常:添加适当的错误处理和日志记录
  5. 使用类型注解:提高代码可读性和安全性
  6. 事务管理:对于需要原子性的操作使用事务
  7. 批量操作:合理使用批量插入和更新提高性能
  8. 查询优化:避免 N+1 查询问题,合理使用索引 `

6.2 性能优化建议 #

性能优化说明 #

数据库性能优化是一个复杂的话题,涉及查询优化、索引设计、连接池配置等多个方面。databases 库本身已经提供了良好的性能基础,但合理的应用层设计可以进一步提升性能。

# 导入必要的库
import asyncio
import time
from databases import Database
from typing import List, Dict, Any

# 数据库配置
DATABASE_URL = "sqlite:///./performance_test.db"

# 性能测试函数
async def performance_test():
    """
    数据库性能测试函数

    测试不同查询方式的性能差异
    """
    database = Database(DATABASE_URL)

    try:
        await database.connect()
        print("数据库连接已建立")

        # 创建测试表
        create_table_query = """
        CREATE TABLE IF NOT EXISTS performance_test (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            value INTEGER NOT NULL,
            category TEXT NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        """
        await database.execute(create_table_query)

        # 插入测试数据
        print("正在插入测试数据...")
        test_data = []
        for i in range(1000):
            test_data.append({
                "name": f"Item_{i}",
                "value": i % 100,
                "category": f"Category_{i % 10}"
            })

        # 批量插入(性能优化)
        insert_query = "INSERT INTO performance_test (name, value, category) VALUES (:name, :value, :category)"

        start_time = time.time()
        for data in test_data:
            await database.execute(insert_query, data)
        insert_time = time.time() - start_time

        print(f"插入1000条记录耗时: {insert_time:.4f}秒")

        # 测试不同查询方式的性能
        print("\n=== 性能测试结果 ===")

        # 测试1: 简单查询
        start_time = time.time()
        result1 = await database.fetch_all("SELECT * FROM performance_test LIMIT 100")
        time1 = time.time() - start_time
        print(f"简单查询100条记录: {time1:.4f}秒")

        # 测试2: 条件查询
        start_time = time.time()
        result2 = await database.fetch_all(
            "SELECT * FROM performance_test WHERE category = :category LIMIT 100",
            values={"category": "Category_1"}
        )
        time2 = time.time() - start_time
        print(f"条件查询100条记录: {time2:.4f}秒")

        # 测试3: 聚合查询
        start_time = time.time()
        result3 = await database.fetch_all(
            "SELECT category, COUNT(*) as count, AVG(value) as avg_value FROM performance_test GROUP BY category"
        )
        time3 = time.time() - start_time
        print(f"聚合查询: {time3:.4f}秒")

        # 测试4: 分页查询
        start_time = time.time()
        result4 = await database.fetch_all(
            "SELECT * FROM performance_test ORDER BY id LIMIT :limit OFFSET :offset",
            values={"limit": 50, "offset": 100}
        )
        time4 = time.time() - start_time
        print(f"分页查询50条记录: {time4:.4f}秒")

        print(f"\n总计查询记录数: {len(result1) + len(result2) + len(result3) + len(result4)}")

    except Exception as e:
        print(f"性能测试过程中发生错误: {e}")
    finally:
        await database.disconnect()
        print("数据库连接已断开")

# 程序入口点
if __name__ == "__main__":
    # 运行性能测试
    asyncio.run(performance_test())

7. 与其他库的比较 #

7.1 技术选型分析 #

库选择考虑因素 #

选择合适的数据库库需要考虑多个因素,包括项目需求、团队技能、性能要求、维护成本等。databases 库在某些场景下是理想选择,但在其他场景下可能不是最佳方案。

# 导入必要的库
import asyncio
from databases import Database
import sqlite3
import time

# 比较不同数据库访问方式的性能
async def library_comparison():
    """
    比较 databases 与其他数据库访问方式的性能

    包括:
    1. databases (异步)
    2. sqlite3 (同步)
    3. 性能对比分析
    """
    print("=== 数据库库性能对比分析 ===\n")

    # 测试数据
    test_records = 1000

    # 1. 使用 databases (异步)
    print("1. 测试 databases (异步)")
    database = Database('sqlite:///./comparison_test.db')

    try:
        await database.connect()

        # 创建测试表
        await database.execute("""
            CREATE TABLE IF NOT EXISTS comparison_test (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                value INTEGER NOT NULL
            )
        """)

        # 清空表
        await database.execute("DELETE FROM comparison_test")

        # 插入测试数据
        start_time = time.time()
        for i in range(test_records):
            await database.execute(
                "INSERT INTO comparison_test (name, value) VALUES (:name, :value)",
                {"name": f"Item_{i}", "value": i}
            )
        insert_time_async = time.time() - start_time

        # 查询测试数据
        start_time = time.time()
        results = await database.fetch_all("SELECT * FROM performance_test LIMIT 100")
        query_time_async = time.time() - start_time

        print(f"  插入 {test_records} 条记录: {insert_time_async:.4f}秒")
        print(f"  查询100条记录: {query_time_async:.4f}秒")

    finally:
        await database.disconnect()

    # 2. 使用 sqlite3 (同步)
    print("\n2. 测试 sqlite3 (同步)")
    conn = sqlite3.connect('./comparison_test_sync.db')
    cursor = conn.cursor()

    try:
        # 创建测试表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS comparison_test (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                value INTEGER NOT NULL
            )
        """)

        # 清空表
        cursor.execute("DELETE FROM comparison_test")

        # 插入测试数据
        start_time = time.time()
        for i in range(test_records):
            cursor.execute(
                "INSERT INTO comparison_test (name, value) VALUES (?, ?)",
                (f"Item_{i}", i)
            )
        conn.commit()
        insert_time_sync = time.time() - start_time

        # 查询测试数据
        start_time = time.time()
        cursor.execute("SELECT * FROM comparison_test LIMIT 100")
        results = cursor.fetchall()
        query_time_sync = time.time() - start_time

        print(f"  插入 {test_records} 条记录: {insert_time_sync:.4f}秒")
        print(f"  查询100条记录: {query_time_sync:.4f}秒")

    finally:
        conn.close()

    # 3. 性能对比分析
    print("\n3. 性能对比分析")
    print("-" * 50)

    # 插入性能对比
    if insert_time_async < insert_time_sync:
        faster_insert = "databases (异步)"
        insert_improvement = ((insert_time_sync - insert_time_async) / insert_time_sync) * 100
        print(f"插入性能: {faster_insert} 更快,提升 {insert_improvement:.1f}%")
    else:
        faster_insert = "sqlite3 (同步)"
        insert_improvement = ((insert_time_async - insert_time_sync) / insert_time_sync) * 100
        print(f"插入性能: {faster_insert} 更快,提升 {insert_improvement:.1f}%")

    # 查询性能对比
    if query_time_async < query_time_sync:
        faster_query = "databases (异步)"
        query_improvement = ((query_time_sync - query_time_async) / query_time_sync) * 100
        print(f"查询性能: {faster_query} 更快,提升 {query_improvement:.1f}%")
    else:
        faster_query = "sqlite3 (同步)"
        query_improvement = ((query_time_async - query_time_sync) / query_time_sync) * 100
        print(f"查询性能: {faster_query} 更快,提升 {query_improvement:.1f}%")

    # 4. 选择建议
    print("\n4. 技术选型建议")
    print("-" * 50)
    print("选择 databases 库的场景:")
    print("  ✓ 构建异步 Web 应用 (FastAPI, Starlette)")
    print("  ✓ 需要高并发处理能力")
    print("  ✓ 团队熟悉 async/await 语法")
    print("  ✓ 需要连接池管理")
    print("  ✓ 多数据库支持需求")

    print("\n选择其他库的场景:")
    print("  ✓ 简单的同步应用")
    print("  ✓ 团队不熟悉异步编程")
    print("  ✓ 需要 ORM 功能")
    print("  ✓ 需要复杂的查询构建器")

# 程序入口点
if __name__ == "__main__":
    # 运行库对比分析
    asyncio.run(library_comparison())

7.2 总结 #

技术选型总结 #

databases 库是一个优秀的异步数据库客户端库,特别适合现代异步 Web 应用。它提供了简单而强大的 API,支持多种数据库,并具有优秀的性能表现。在选择数据库库时,应该根据项目需求、团队技能和性能要求来做出决策。

主要优势:

  • 完全异步,支持高并发
  • 轻量级,学习成本低
  • 支持多种数据库
  • 自动连接池管理
  • 与 FastAPI 等现代框架完美集成

适用场景:

  • 异步 Web 应用
  • 高并发 API 服务
  • 需要精确控制 SQL 的场景
  • 微服务架构

注意事项:

  • 需要团队具备异步编程经验
  • 不提供 ORM 功能
  • 需要手动编写 SQL 语句

访问验证

请输入访问令牌

Token不正确,请重新输入