ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1.Starlette
  • 2. 核心特性
    • 2.1. 异步支持
    • 2.2. 轻量级和高性能
  • 3. 核心组件
    • 3.1. 应用 (Starlette)
    • 3.2. 路由 (Route, WebSocketRoute)
    • 3.3. 请求和响应
      • 3.3.1 服务器
      • 3.3.2 test_all.html
      • 3.3.3 file_test.html
      • 3.3.4 html_test.html
      • 3.3.5 json_test.html
      • 3.3.6 redirect_test.html
      • 3.3.7 simple_test.html
      • 3.3.8 stream_test.html
      • 3.3.9 text_test.html
  • 4. 中间件系统
    • 4.1. 内置中间件
    • 4.2. 自定义中间件
  • 5. 数据库和后台任务
    • 5.1. 数据库集成
    • 5.2. 后台任务
  • 6. 模板渲染
  • 7. 配置和部署
    • 7.1. 环境配置
    • 7.2. 使用 Uvicorn 部署
  • 8. 最佳实践
  • 9. 与其他框架的关系

1.Starlette #

本节介绍 Starlette 的基本定位。Starlette 是一个轻量级的 ASGI(Asynchronous Server Gateway Interface)框架/工具包,专为构建高性能的异步 Web 服务和应用程序而设计。它提供了构建现代 Web 应用所需的核心组件,包括路由、中间件、WebSocket 支持等,同时保持代码的简洁性和高性能。

2. 核心特性 #

本节概述 Starlette 的主要特性,包括异步支持、轻量级设计和高性能表现。这些特性使得 Starlette 特别适合构建需要高并发处理的现代 Web 应用。

2.1. 异步支持 #

Starlette 完全基于 Python 的 asyncio 构建,支持真正的异步 I/O 操作。这意味着它可以高效地处理大量并发连接,而不会阻塞其他请求的处理。

# 导入 Starlette 应用类
from starlette.applications import Starlette
# 导入 JSON 响应类
from starlette.responses import JSONResponse
# 导入路由类
from starlette.routing import Route
# 导入异步 IO 模块
import asyncio

# 定义异步首页处理函数
async def homepage(request):
    # 模拟异步操作(如数据库查询、外部 API 调用等)
    await asyncio.sleep(0.1)
    # 返回 JSON 响应
    return JSONResponse({"hello": "world"})

# 创建 Starlette 应用实例
app = Starlette(debug=True, routes=[
    # 注册路由,将根路径映射到 homepage 函数
    Route("/", homepage),
])

# 定义主函数
def main():
    # 导入 uvicorn 服务器
    import uvicorn
    # 启动应用服务器
    uvicorn.run(app, host="127.0.0.1", port=8000)

# 脚本入口
if __name__ == "__main__":
    main()

2.2. 轻量级和高性能 #

Starlette 本身非常轻量,但提供了构建完整 Web 应用所需的所有组件。它的设计哲学是"只包含必要的功能",避免不必要的复杂性,同时保持高性能。

3. 核心组件 #

本节详细介绍 Starlette 的核心组件,包括应用实例、路由系统、请求和响应处理等。这些组件是构建 Starlette 应用的基础,理解它们有助于更好地使用框架。

3.1. 应用 (Starlette) #

Starlette 应用是框架的核心,它管理着整个应用的生命周期,包括路由注册、中间件配置、启动和关闭事件等。

# 导入 Starlette 应用类
from starlette.applications import Starlette
# 导入中间件类
from starlette.middleware import Middleware
# 导入 CORS 中间件
from starlette.middleware.cors import CORSMiddleware

# 定义启动事件处理函数
async def startup_event():
    # 应用启动时的初始化逻辑
    print("应用启动中...")

# 定义关闭事件处理函数
async def shutdown_event():
    # 应用关闭时的清理逻辑
    print("应用关闭中...")

# 创建 Starlette 应用实例
app = Starlette(
    # 启用调试模式
    debug=True,
    # 路由列表(稍后定义)
    routes=[],
    # 中间件列表
    middleware=[
        # 配置 CORS 中间件,允许所有来源
        Middleware(CORSMiddleware, allow_origins=["*"])
    ],
    # 启动事件列表
    on_startup=[startup_event],
    # 关闭事件列表
    on_shutdown=[shutdown_event]
)

# 定义主函数
def main():
    # 导入 uvicorn 服务器
    import uvicorn
    # 启动应用服务器
    uvicorn.run(app, host="127.0.0.1", port=8000)

# 脚本入口
if __name__ == "__main__":
    main()

3.2. 路由 (Route, WebSocketRoute) #

Starlette 的路由系统支持多种类型的路由,包括 HTTP 路由、WebSocket 路由和静态文件挂载。路由系统灵活且易于使用,支持路径参数和类型转换。

# 导入路由相关类
from starlette.routing import Route, WebSocketRoute, Mount
# 导入静态文件处理类
from starlette.staticfiles import StaticFiles

# 定义首页处理函数
async def homepage(request):
    # 返回简单的文本响应
    from starlette.responses import PlainTextResponse
    return PlainTextResponse("首页")

# 定义用户详情处理函数
async def user_detail(request):
    # 获取路径参数中的用户 ID
    user_id = request.path_params["user_id"]
    from starlette.responses import JSONResponse
    return JSONResponse({"user_id": user_id})

# 定义 WebSocket 端点
async def websocket_endpoint(websocket):
    # 接受 WebSocket 连接
    await websocket.accept()
    # 发送欢迎消息
    await websocket.send_text("WebSocket 连接成功")

# 定义路由列表
routes = [
    # HTTP 路由:首页
    Route("/", homepage),
    # HTTP 路由:用户详情,支持路径参数和类型转换
    Route("/users/{user_id:int}", user_detail),
    # WebSocket 路由
    WebSocketRoute("/ws", websocket_endpoint),
    # 静态文件挂载
    Mount("/static", app=StaticFiles(directory="static"), name="static")
]

# 定义主函数
def main():
    # 导入必要的模块
    from starlette.applications import Starlette
    import uvicorn

    # 创建应用实例
    app = Starlette(routes=routes)
    # 启动服务器
    uvicorn.run(app, host="127.0.0.1", port=8000)

# 脚本入口
if __name__ == "__main__":
    main()
<!DOCTYPE html>
<html lang="zh-CN">

<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>WebSocket 通信测试</title>
  <style>
    body {
      font-family: 'Microsoft YaHei', Arial, sans-serif;
      max-width: 800px;
      margin: 0 auto;
      padding: 20px;
      background-color: #f5f5f5;
    }

    .container {
      background: white;
      padding: 30px;
      border-radius: 10px;
      box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
    }

    h1 {
      color: #333;
      text-align: center;
      margin-bottom: 30px;
    }

    .status {
      padding: 15px;
      margin: 20px 0;
      border-radius: 5px;
      font-weight: bold;
    }

    .connected {
      background-color: #d4edda;
      color: #155724;
      border: 1px solid #c3e6cb;
    }

    .disconnected {
      background-color: #f8d7da;
      color: #721c24;
      border: 1px solid #f5c6cb;
    }

    .message-area {
      margin: 20px 0;
    }

    .message-input {
      width: 100%;
      padding: 10px;
      border: 1px solid #ddd;
      border-radius: 5px;
      margin-bottom: 10px;
      font-size: 16px;
    }

    .send-btn {
      background-color: #007bff;
      color: white;
      border: none;
      padding: 10px 20px;
      border-radius: 5px;
      cursor: pointer;
      font-size: 16px;
    }

    .send-btn:hover {
      background-color: #0056b3;
    }

    .send-btn:disabled {
      background-color: #6c757d;
      cursor: not-allowed;
    }

    .messages {
      background-color: #f8f9fa;
      border: 1px solid #dee2e6;
      border-radius: 5px;
      padding: 15px;
      height: 300px;
      overflow-y: auto;
      margin-top: 20px;
    }

    .message {
      margin: 10px 0;
      padding: 10px;
      border-radius: 5px;
    }

    .message.sent {
      background-color: #d1ecf1;
      border-left: 4px solid #17a2b8;
    }

    .message.received {
      background-color: #d4edda;
      border-left: 4px solid #28a745;
    }

    .timestamp {
      font-size: 12px;
      color: #6c757d;
      margin-top: 5px;
    }

    .controls {
      text-align: center;
      margin: 20px 0;
    }

    .control-btn {
      margin: 0 10px;
      padding: 10px 20px;
      border: none;
      border-radius: 5px;
      cursor: pointer;
      font-size: 16px;
    }

    .connect-btn {
      background-color: #28a745;
      color: white;
    }

    .disconnect-btn {
      background-color: #dc3545;
      color: white;
    }

    .clear-btn {
      background-color: #6c757d;
      color: white;
    }
  </style>
</head>

<body>
  <div class="container">
    <h1>🔌 WebSocket 通信测试</h1>

    <!-- 连接状态显示 -->
    <div id="status" class="status disconnected">
      状态:未连接
    </div>

    <!-- 控制按钮 -->
    <div class="controls">
      <button id="connectBtn" class="control-btn connect-btn">连接</button>
      <button id="disconnectBtn" class="control-btn disconnect-btn" disabled>断开</button>
      <button id="clearBtn" class="control-btn clear-btn">清空消息</button>
    </div>

    <!-- 消息发送区域 -->
    <div class="message-area">
      <input type="text" id="messageInput" class="message-input" placeholder="输入要发送的消息..." disabled>
      <button id="sendBtn" class="send-btn" disabled>发送消息</button>
    </div>

    <!-- 消息显示区域 -->
    <div class="messages" id="messages"></div>
  </div>

  <script>
    class WebSocketClient {
      constructor() {
        this.ws = null;
        this.isConnected = false;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
        this.reconnectDelay = 1000;

        this.initElements();
        this.bindEvents();
      }

      initElements() {
        this.statusEl = document.getElementById('status');
        this.connectBtn = document.getElementById('connectBtn');
        this.disconnectBtn = document.getElementById('disconnectBtn');
        this.clearBtn = document.getElementById('clearBtn');
        this.messageInput = document.getElementById('messageInput');
        this.sendBtn = document.getElementById('sendBtn');
        this.messagesEl = document.getElementById('messages');
      }

      bindEvents() {
        this.connectBtn.addEventListener('click', () => this.connect());
        this.disconnectBtn.addEventListener('click', () => this.disconnect());
        this.clearBtn.addEventListener('click', () => this.clearMessages());
        this.sendBtn.addEventListener('click', () => this.sendMessage());
        this.messageInput.addEventListener('keypress', (e) => {
          if (e.key === 'Enter') {
            this.sendMessage();
          }
        });
      }

      connect() {
        try {
          // 创建 WebSocket 连接,连接到服务器的 /ws 端点
          this.ws = new WebSocket('ws://127.0.0.1:8000/ws');

          this.ws.onopen = () => {
            this.isConnected = true;
            this.updateStatus('已连接', true);
            this.updateControls(true);
            this.addMessage('系统', 'WebSocket 连接成功!', 'received');
            this.reconnectAttempts = 0;
          };

          this.ws.onmessage = (event) => {
            this.addMessage('服务器', event.data, 'received');
          };

          this.ws.onclose = (event) => {
            this.isConnected = false;
            this.updateStatus('连接已断开', false);
            this.updateControls(false);
            this.addMessage('系统', 'WebSocket 连接已断开', 'received');

            // 尝试重连
            if (this.reconnectAttempts < this.maxReconnectAttempts) {
              this.reconnectAttempts++;
              this.addMessage('系统', `尝试重连... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`, 'received');
              setTimeout(() => this.connect(), this.reconnectDelay);
            } else {
              this.addMessage('系统', '重连失败,请手动重新连接', 'received');
            }
          };

          this.ws.onerror = (error) => {
            this.addMessage('系统', '连接错误: ' + error.message, 'received');
          };

        } catch (error) {
          this.addMessage('系统', '连接失败: ' + error.message, 'received');
        }
      }

      disconnect() {
        if (this.ws) {
          this.ws.close();
          this.ws = null;
        }
      }

      sendMessage() {
        const message = this.messageInput.value.trim();
        if (message && this.isConnected) {
          this.ws.send(message);
          this.addMessage('我', message, 'sent');
          this.messageInput.value = '';
        }
      }

      addMessage(sender, content, type) {
        const messageDiv = document.createElement('div');
        messageDiv.className = `message ${type}`;

        const timestamp = new Date().toLocaleTimeString();
        messageDiv.innerHTML = `
                    <strong>${sender}:</strong> ${content}
                    <div class="timestamp">${timestamp}</div>
                `;

        this.messagesEl.appendChild(messageDiv);
        this.messagesEl.scrollTop = this.messagesEl.scrollHeight;
      }

      clearMessages() {
        this.messagesEl.innerHTML = '';
      }

      updateStatus(text, connected) {
        this.statusEl.textContent = `状态:${text}`;
        this.statusEl.className = `status ${connected ? 'connected' : 'disconnected'}`;
      }

      updateControls(connected) {
        this.connectBtn.disabled = connected;
        this.disconnectBtn.disabled = !connected;
        this.messageInput.disabled = !connected;
        this.sendBtn.disabled = !connected;
      }
    }

    // 页面加载完成后初始化 WebSocket 客户端
    document.addEventListener('DOMContentLoaded', () => {
      new WebSocketClient();
    });
  </script>
</body>

</html>
<html lang="zh-CN">

<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>WebSocket 通信测试</title>
  <style>
    body {
      font-family: 'Microsoft YaHei', Arial, sans-serif;
      max-width: 800px;
      margin: 0 auto;
      padding: 20px;
      background-color: #f5f5f5;
    }

    .container {
      background: white;
      padding: 30px;
      border-radius: 10px;
      box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
    }

    h1 {
      color: #333;
      text-align: center;
      margin-bottom: 30px;
    }

    .status {
      padding: 15px;
      margin: 20px 0;
      border-radius: 5px;
      font-weight: bold;
    }

    .connected {
      background-color: #d4edda;
      color: #155724;
      border: 1px solid #c3e6cb;
    }

    .disconnected {
      background-color: #f8d7da;
      color: #721c24;
      border: 1px solid #f5c6cb;
    }

    .message-area {
      margin: 20px 0;
    }

    .message-input {
      width: 100%;
      padding: 10px;
      border: 1px solid #ddd;
      border-radius: 5px;
      margin-bottom: 10px;
      font-size: 16px;
    }

    .send-btn {
      background-color: #007bff;
      color: white;
      border: none;
      padding: 10px 20px;
      border-radius: 5px;
      cursor: pointer;
      font-size: 16px;
    }

    .send-btn:hover {
      background-color: #0056b3;
    }

    .send-btn:disabled {
      background-color: #6c757d;
      cursor: not-allowed;
    }

    .messages {
      background-color: #f8f9fa;
      border: 1px solid #dee2e6;
      border-radius: 5px;
      padding: 15px;
      height: 300px;
      overflow-y: auto;
      margin-top: 20px;
    }

    .message {
      margin: 10px 0;
      padding: 10px;
      border-radius: 5px;
    }

    .message.sent {
      background-color: #d1ecf1;
      border-left: 4px solid #17a2b8;
    }

    .message.received {
      background-color: #d4edda;
      border-left: 4px solid #28a745;
    }

    .timestamp {
      font-size: 12px;
      color: #6c757d;
      margin-top: 5px;
    }

    .controls {
      text-align: center;
      margin: 20px 0;
    }

    .control-btn {
      margin: 0 10px;
      padding: 10px 20px;
      border: none;
      border-radius: 5px;
      cursor: pointer;
      font-size: 16px;
    }

    .connect-btn {
      background-color: #28a745;
      color: white;
    }

    .disconnect-btn {
      background-color: #dc3545;
      color: white;
    }

    .clear-btn {
      background-color: #6c757d;
      color: white;
    }
  </style>
</head>

<body>
  <div class="container">
    <h1>🔌 WebSocket 通信测试</h1>

    <!-- 连接状态显示 -->
    <div id="status" class="status disconnected">
      状态:未连接
    </div>

    <!-- 控制按钮 -->
    <div class="controls">
      <button id="connectBtn" class="control-btn connect-btn">连接</button>
      <button id="disconnectBtn" class="control-btn disconnect-btn" disabled>断开</button>
      <button id="clearBtn" class="control-btn clear-btn">清空消息</button>
    </div>

    <!-- 消息发送区域 -->
    <div class="message-area">
      <input type="text" id="messageInput" class="message-input" placeholder="输入要发送的消息..." disabled>
      <button id="sendBtn" class="send-btn" disabled>发送消息</button>
    </div>

    <!-- 消息显示区域 -->
    <div class="messages" id="messages"></div>
  </div>

  <script>
    class WebSocketClient {
      constructor() {
        this.ws = null;
        this.isConnected = false;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
        this.reconnectDelay = 1000;

        this.initElements();
        this.bindEvents();
      }

      initElements() {
        this.statusEl = document.getElementById('status');
        this.connectBtn = document.getElementById('connectBtn');
        this.disconnectBtn = document.getElementById('disconnectBtn');
        this.clearBtn = document.getElementById('clearBtn');
        this.messageInput = document.getElementById('messageInput');
        this.sendBtn = document.getElementById('sendBtn');
        this.messagesEl = document.getElementById('messages');
      }

      bindEvents() {
        this.connectBtn.addEventListener('click', () => this.connect());
        this.disconnectBtn.addEventListener('click', () => this.disconnect());
        this.clearBtn.addEventListener('click', () => this.clearMessages());
        this.sendBtn.addEventListener('click', () => this.sendMessage());
        this.messageInput.addEventListener('keypress', (e) => {
          if (e.key === 'Enter') {
            this.sendMessage();
          }
        });
      }

      connect() {
        try {
          // 创建 WebSocket 连接,连接到服务器的 /ws 端点
          this.ws = new WebSocket('ws://127.0.0.1:8000/ws');

          this.ws.onopen = () => {
            this.isConnected = true;
            this.updateStatus('已连接', true);
            this.updateControls(true);
            this.addMessage('系统', 'WebSocket 连接成功!', 'received');
            this.reconnectAttempts = 0;
          };

          this.ws.onmessage = (event) => {
            this.addMessage('服务器', event.data, 'received');
          };

          this.ws.onclose = (event) => {
            this.isConnected = false;
            this.updateStatus('连接已断开', false);
            this.updateControls(false);
            this.addMessage('系统', 'WebSocket 连接已断开', 'received');

            // 尝试重连
            if (this.reconnectAttempts < this.maxReconnectAttempts) {
              this.reconnectAttempts++;
              this.addMessage('系统', `尝试重连... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`, 'received');
              setTimeout(() => this.connect(), this.reconnectDelay);
            } else {
              this.addMessage('系统', '重连失败,请手动重新连接', 'received');
            }
          };

          this.ws.onerror = (error) => {
            this.addMessage('系统', '连接错误: ' + error.message, 'received');
          };

        } catch (error) {
          this.addMessage('系统', '连接失败: ' + error.message, 'received');
        }
      }

      disconnect() {
        if (this.ws) {
          this.ws.close();
          this.ws = null;
        }
      }

      sendMessage() {
        const message = this.messageInput.value.trim();
        if (message && this.isConnected) {
          this.ws.send(message);
          this.addMessage('我', message, 'sent');
          this.messageInput.value = '';
        }
      }

      addMessage(sender, content, type) {
        const messageDiv = document.createElement('div');
        messageDiv.className = `message ${type}`;

        const timestamp = new Date().toLocaleTimeString();
        messageDiv.innerHTML = `
                    <strong>${sender}:</strong> ${content}
                    <div class="timestamp">${timestamp}</div>
                `;

        this.messagesEl.appendChild(messageDiv);
        this.messagesEl.scrollTop = this.messagesEl.scrollHeight;
      }

      clearMessages() {
        this.messagesEl.innerHTML = '';
      }

      updateStatus(text, connected) {
        this.statusEl.textContent = `状态:${text}`;
        this.statusEl.className = `status ${connected ? 'connected' : 'disconnected'}`;
      }

      updateControls(connected) {
        this.connectBtn.disabled = connected;
        this.disconnectBtn.disabled = !connected;
        this.messageInput.disabled = !connected;
        this.sendBtn.disabled = !connected;
      }
    }

    // 页面加载完成后初始化 WebSocket 客户端
    document.addEventListener('DOMContentLoaded', () => {
      new WebSocketClient();
    });
  </script>
</body>

</html>

3.3. 请求和响应 #

Starlette 提供了丰富的请求和响应类型,支持 JSON、HTML、纯文本、重定向、流式响应和文件下载等多种格式。请求对象提供了便捷的方法来访问请求数据。

3.3.1 服务器 #

# 导入Request请求对象
from starlette.requests import Request

# 导入各种响应类型
from starlette.responses import (
    JSONResponse,  # JSON响应
    HTMLResponse,  # HTML响应
    PlainTextResponse,  # 纯文本响应
    RedirectResponse,  # 重定向响应
    StreamingResponse,  # 流式响应
    FileResponse,  # 文件响应
)

# 导入异步IO模块
import asyncio
import tempfile
import os


# 定义用户详情处理函数,支持GET和POST方法
async def user_detail(request: Request):
    # 从路径参数中获取用户ID
    user_id = request.path_params["user_id"]

    # 打印调试信息:请求方法和用户ID
    print(f"DEBUG: 请求方法 = {request.method}, 用户ID = {user_id}")

    # 判断请求方法是否为GET
    if request.method == "GET":
        # 处理GET请求,返回用户基本信息
        print("DEBUG: 处理GET请求")
        return JSONResponse(
            {"user_id": user_id, "method": "GET", "message": "获取用户信息成功"}
        )
    # 判断请求方法是否为POST
    elif request.method == "POST":
        # 处理POST请求,尝试解析请求体
        print("DEBUG: 处理POST请求")
        try:
            # 尝试解析JSON格式的请求体
            data = await request.json()
            # 打印解析到的JSON数据
            print(f"DEBUG: JSON数据 = {data}")
            # 返回包含数据的JSON响应
            return JSONResponse(
                {
                    "user_id": user_id,
                    "method": "POST",
                    "data": data,
                    "message": "用户数据更新成功",
                }
            )
        except Exception as e:
            # JSON解析失败,打印异常信息
            print(f"DEBUG: JSON解析失败 = {e}")
            try:
                # 尝试解析表单数据
                form_data = await request.form()
                # 打印解析到的表单数据
                print(f"DEBUG: 表单数据 = {dict(form_data)}")
                # 返回包含表单数据的JSON响应
                return JSONResponse(
                    {
                        "user_id": user_id,
                        "method": "POST",
                        "form_data": dict(form_data),
                        "message": "表单数据接收成功",
                    }
                )
            except Exception as e2:
                # 表单解析也失败,打印异常信息
                print(f"DEBUG: 表单解析失败 = {e2}")
                # 返回错误信息,提示无法解析请求数据
                return JSONResponse(
                    {
                        "user_id": user_id,
                        "method": "POST",
                        "error": "无法解析请求数据",
                        "message": "请发送有效的JSON或表单数据",
                    },
                    status_code=400,
                )
    else:
        # 处理不支持的请求方法
        print(f"DEBUG: 不支持的请求方法 = {request.method}")
        # 返回405错误,提示只允许GET和POST
        return JSONResponse(
            {"error": "不支持的请求方法", "allowed_methods": ["GET", "POST"]},
            status_code=405,
        )


# 定义HTML响应函数
async def html_response(request: Request):
    # 构造HTML内容,包含当前时间、请求路径和方法
    html_content = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>HTML响应测试</title>
    </head>
    <body>
        <h1>这是一个HTML响应</h1>
        <p>当前时间: {}</p>
        <p>请求路径: {}</p>
        <p>请求方法: {}</p>
    </body>
    </html>
    """.format(
        asyncio.get_event_loop().time(), request.url.path, request.method
    )
    # 返回HTML响应
    return HTMLResponse(html_content)


# 定义纯文本响应函数
async def text_response(request: Request):
    # 构造多行纯文本内容,包含时间、路径、方法和用户代理
    text_content = f"""
纯文本响应示例
================

当前时间: {asyncio.get_event_loop().time()}
请求路径: {request.url.path}
请求方法: {request.method}
用户代理: {request.headers.get('user-agent', '未知')}

这是一个多行文本响应,用于测试PlainTextResponse。
    """.strip()
    # 返回纯文本响应
    return PlainTextResponse(text_content)


# 定义重定向响应函数
async def redirect_response(request: Request):
    # 返回重定向响应,跳转到静态文件页面
    return RedirectResponse(url="/static/test_all.html")


# 定义流式数据响应函数
async def stream_data(request: Request):
    # 定义异步生成器函数,用于逐条生成数据
    async def generate():
        # 循环生成10条数据
        for i in range(10):
            # 生成一行数据
            yield f"data: {i}\n\n"
            # 每次等待1秒
            await asyncio.sleep(1)

    # 返回流式响应,媒体类型为text/plain
    return StreamingResponse(generate(), media_type="text/plain")


# 定义文件响应函数
async def file_response(request: Request):
    # 创建临时文件,写入测试内容
    with tempfile.NamedTemporaryFile(
        mode="w", suffix=".txt", delete=False, encoding="utf-8"
    ) as f:
        # 写入多行内容
        f.write("这是一个测试文件\n")
        f.write("包含多行内容\n")
        f.write("用于测试FileResponse\n")
        f.write(f"创建时间: {asyncio.get_event_loop().time()}\n")
        f.write("文件内容结束\n")
        # 获取临时文件路径
        temp_file_path = f.name

    # 返回文件响应,设置下载文件名和媒体类型
    return FileResponse(
        path=temp_file_path,
        filename="test_file.txt",
        media_type="text/plain",
        headers={"Content-Disposition": "attachment; filename=test_file.txt"},
    )


# 定义表单处理函数
async def form_handler(request: Request):
    # 判断请求方法是否为POST
    if request.method == "POST":
        # 解析表单数据
        form_data = await request.form()
        # 返回表单数据的JSON响应
        return JSONResponse({"message": "表单提交成功", "form_data": dict(form_data)})
    else:
        # 构造HTML表单页面
        html_form = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>表单测试</title>
        </head>
        <body>
            <h1>表单测试</h1>
            <form method="POST">
                <label>姓名: <input type="text" name="name" required></label><br><br>
                <label>邮箱: <input type="email" name="email" required></label><br><br>
                <label>消息: <textarea name="message" rows="4" cols="50"></textarea></label><br><br>
                <button type="submit">提交</button>
            </form>
        </body>
        </html>
        """
        # 返回HTML响应
        return HTMLResponse(html_form)


# 定义主函数,启动Starlette应用
def main():
    # 导入Starlette相关模块
    from starlette.applications import Starlette
    from starlette.routing import Route, Mount
    from starlette.staticfiles import StaticFiles
    import uvicorn

    # 创建Starlette应用实例,配置路由
    app = Starlette(
        routes=[
            # 用户详情接口,支持GET和POST
            Route("/users/{user_id:int}", user_detail, methods=["GET", "POST"]),
            # HTML响应接口
            Route("/html", html_response),
            # 纯文本响应接口
            Route("/text", text_response),
            # 重定向接口
            Route("/redirect", redirect_response),
            # 流式响应接口
            Route("/stream", stream_data),
            # 文件下载接口
            Route("/file", file_response),
            # 表单处理接口
            Route("/form", form_handler),
            # 挂载静态文件目录
            Mount("/static", app=StaticFiles(directory="static"), name="static"),
        ]
    )

    # 打印服务器启动信息和可用端点
    print(" 服务器启动中...")
    print("📱 访问地址: http://127.0.0.1:8000")
    print("🧪 测试页面: http://127.0.0.1:8000/static/test_all.html")
    print(" 可用端点:")
    print("   - GET  /users/{user_id} - 用户详情")
    print("   - GET  /html - HTML响应")
    print("   - GET  /text - 纯文本响应")
    print("   - GET  /redirect - 重定向响应")
    print("   - GET  /stream - 流式响应")
    print("   - GET  /file - 文件下载")
    print("   - GET/POST /form - 表单处理")
    print("   - GET  /static/* - 静态文件")

    # 启动uvicorn服务器,监听127.0.0.1:8000
    uvicorn.run(app, host="127.0.0.1", port=8000)


# 判断是否为主模块,作为脚本入口
if __name__ == "__main__":
    main()

3.3.2 test_all.html #

<!DOCTYPE html>
<html>
<head>
    <title>所有响应类型测试</title>
</head>
<body>
    <h1>Starlette 响应类型测试</h1>
    <ul>
        <li><a href="json_test.html">JSON响应测试</a></li>
        <li><a href="html_test.html">HTML响应测试</a></li>
        <li><a href="text_test.html">纯文本响应测试</a></li>
        <li><a href="redirect_test.html">重定向响应测试</a></li>
        <li><a href="stream_test.html">流式响应测试</a></li>
        <li><a href="file_test.html">文件响应测试</a></li>
    </ul>

    <h2>当前可用的端点</h2>
    <ul>
        <li><code>/users/{user_id}</code> - JSON响应(需要POST请求)</li>
        <li><code>/stream</code> - 流式响应</li>
    </ul>

    <p>注意:某些响应类型需要在4.py中添加对应的路由才能正常工作。</p>
</body>
</html> 

3.3.3 file_test.html #

<!DOCTYPE html>
<html>

<head>
    <title>文件响应测试</title>
</head>

<body>
    <h1>文件响应测试</h1>
    <button onclick="testFileResponse()">测试文件响应</button>
    <div id="result"></div>

    <script>
        async function testFileResponse() {
            try {
                document.getElementById('result').textContent = '正在请求文件...';

                const response = await fetch('/file');
                console.log('文件响应状态:', response.status);
                console.log('响应头:', Object.fromEntries(response.headers.entries()));

                if (response.ok) {
                    const blob = await response.blob();
                    console.log('文件大小:', blob.size, '字节');

                    const url = URL.createObjectURL(blob);

                    const link = document.createElement('a');
                    link.href = url;
                    link.download = 'downloaded_file.txt';
                    link.textContent = '点击下载文件 (test_file.txt)';
                    link.style.display = 'block';
                    link.style.margin = '10px 0';
                    link.style.padding = '10px';
                    link.style.backgroundColor = '#007bff';
                    link.style.color = 'white';
                    link.style.textDecoration = 'none';
                    link.style.borderRadius = '5px';

                    document.getElementById('result').innerHTML = '';
                    document.getElementById('result').appendChild(link);

                    // 显示文件信息
                    const info = document.createElement('div');
                    info.innerHTML = `<br><strong>文件信息:</strong><br>大小: ${blob.size} 字节<br>类型: ${blob.type}`;
                    document.getElementById('result').appendChild(info);
                } else {
                    const errorText = await response.text();
                    document.getElementById('result').textContent = `文件下载失败:\n状态码: ${response.status}\n错误信息: ${errorText}`;
                }
            } catch (error) {
                console.error('文件下载错误:', error);
                document.getElementById('result').textContent = '错误: ' + error.message;
            }
        }
    </script>
</body>

</html>

3.3.4 html_test.html #

<!DOCTYPE html>
<html>
<head>
    <title>HTML响应测试</title>
</head>
<body>
    <h1>HTML响应测试</h1>
    <button onclick="testHtmlResponse()">测试HTML响应</button>
    <div id="result"></div>

    <script>
        async function testHtmlResponse() {
            try {
                const response = await fetch('/html');
                const html = await response.text();
                document.getElementById('result').innerHTML = html;
            } catch (error) {
                document.getElementById('result').textContent = '错误: ' + error.message;
            }
        }
    </script>
</body>
</html> 

3.3.5 json_test.html #

<!DOCTYPE html>
<html>

<head>
    <title>JSON响应测试</title>
</head>

<body>
    <h1>JSON响应测试</h1>

    <h3>GET请求测试</h3>
    <button onclick="testGet()">测试GET请求</button>

    <h3>POST请求测试</h3>
    <form id="jsonForm">
        <label>用户ID: <input type="number" id="userId" value="123"></label><br>
        <label>JSON数据: <textarea id="jsonData">{"name": "测试用户", "age": 25}</textarea></label><br>
        <button type="submit">发送POST请求</button>
    </form>

    <div id="result"></div>

    <script>
        // GET请求测试
        async function testGet() {
            const userId = document.getElementById('userId').value;
            try {
                const response = await fetch(`/users/${userId}`, {
                    method: 'GET'
                });

                // 检查响应状态
                if (!response.ok) {
                    const errorText = await response.text();
                    document.getElementById('result').textContent = `GET请求失败:\n状态码: ${response.status}\n错误信息: ${errorText}`;
                    return;
                }

                const result = await response.json();
                document.getElementById('result').textContent = 'GET请求结果:\n' + JSON.stringify(result, null, 2);
            } catch (error) {
                document.getElementById('result').textContent = 'GET请求错误: ' + error.message;
            }
        }

        // POST请求测试
        document.getElementById('jsonForm').onsubmit = async function (e) {
            e.preventDefault();
            const userId = document.getElementById('userId').value;
            const jsonData = document.getElementById('jsonData').value;

            try {
                const response = await fetch(`/users/${userId}`, {
                    method: 'POST',
                    headers: { 'Content-Type': 'application/json' },
                    body: jsonData
                });

                // 检查响应状态
                if (!response.ok) {
                    const errorText = await response.text();
                    document.getElementById('result').textContent = `POST请求失败:\n状态码: ${response.status}\n错误信息: ${errorText}`;
                    return;
                }

                const result = await response.json();
                document.getElementById('result').textContent = 'POST请求结果:\n' + JSON.stringify(result, null, 2);
            } catch (error) {
                document.getElementById('result').textContent = 'POST请求错误: ' + error.message;
            }
        };
    </script>
</body>

</html>

3.3.6 redirect_test.html #

<!DOCTYPE html>
<html>
<head>
    <title>重定向响应测试</title>
</head>
<body>
    <h1>重定向响应测试</h1>
    <button onclick="testRedirect()">测试重定向</button>
    <div id="result"></div>

    <script>
        async function testRedirect() {
            try {
                const response = await fetch('/redirect');
                if (response.redirected) {
                    document.getElementById('result').textContent = '重定向到: ' + response.url;
                } else {
                    document.getElementById('result').textContent = '没有重定向';
                }
            } catch (error) {
                document.getElementById('result').textContent = '错误: ' + error.message;
            }
        }
    </script>
</body>
</html> 

3.3.7 simple_test.html #

<!DOCTYPE html>
<html>

<head>
  <title>简单测试</title>
</head>

<body>
  <h1>简单测试页面</h1>

  <h2>测试GET请求</h2>
  <button onclick="testGet()">测试GET /users/123</button>

  <h2>测试POST请求</h2>
  <button onclick="testPost()">测试POST /users/123</button>

  <h2>结果</h2>
  <pre id="result"></pre>

  <script>
    async function testGet() {
      try {
        console.log('发送GET请求...');
        const response = await fetch('/users/123', { method: 'GET' });
        console.log('响应状态:', response.status);
        const text = await response.text();
        console.log('响应内容:', text);
        document.getElementById('result').textContent = `GET请求结果:\n状态: ${response.status}\n内容: ${text}`;
      } catch (error) {
        console.error('GET请求错误:', error);
        document.getElementById('result').textContent = `GET请求错误: ${error.message}`;
      }
    }

    async function testPost() {
      try {
        console.log('发送POST请求...');
        const response = await fetch('/users/123', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({ name: '测试用户', age: 25 })
        });
        console.log('响应状态:', response.status);
        const text = await response.text();
        console.log('响应内容:', text);
        document.getElementById('result').textContent = `POST请求结果:\n状态: ${response.status}\n内容: ${text}`;
      } catch (error) {
        console.error('POST请求错误:', error);
        document.getElementById('result').textContent = `POST请求错误: ${error.message}`;
      }
    }
  </script>
</body>

</html>

3.3.8 stream_test.html #

<!DOCTYPE html>
<html>
<head>
    <title>流式响应测试</title>
</head>
<body>
    <h1>流式响应测试</h1>
    <button onclick="testStream()">测试流式响应</button>
    <button onclick="clearResult()">清空结果</button>
    <div id="result"></div>

    <script>
        async function testStream() {
            try {
                const response = await fetch('/stream');
                const reader = response.body.getReader();
                const decoder = new TextDecoder();

                while (true) {
                    const {done, value} = await reader.read();
                    if (done) break;

                    const text = decoder.decode(value);
                    const lines = text.split('\n');

                    for (const line of lines) {
                        if (line.startsWith('data: ')) {
                            const data = line.substring(6);
                            const div = document.createElement('div');
                            div.textContent = data;
                            document.getElementById('result').appendChild(div);
                        }
                    }
                }
            } catch (error) {
                document.getElementById('result').textContent = '错误: ' + error.message;
            }
        }

        function clearResult() {
            document.getElementById('result').innerHTML = '';
        }
    </script>
</body>
</html> 

3.3.9 text_test.html #

<!DOCTYPE html>
<html>

<head>
  <title>纯文本响应测试</title>
</head>

<body>
  <h1>纯文本响应测试</h1>
  <button onclick="testTextResponse()">测试纯文本响应</button>
  <div id="result"></div>

  <script>
    async function testTextResponse() {
      try {
        const response = await fetch('/text');
        const text = await response.text();
        document.getElementById('result').textContent = text;
      } catch (error) {
        document.getElementById('result').textContent = '错误: ' + error.message;
      }
    }
  </script>
</body>

</html>

4. 中间件系统 #

本节介绍 Starlette 的中间件系统,包括内置中间件和自定义中间件的使用方法。中间件是 Starlette 中处理请求和响应的强大工具,可以在请求处理前后添加自定义逻辑。

4.1. 内置中间件 #

Starlette 提供了多种内置中间件,包括 CORS 处理、HTTPS 重定向、可信主机验证等。这些中间件可以快速配置常用的 Web 应用功能。

# 导入中间件类
from starlette.middleware import Middleware
# 导入 CORS 中间件
from starlette.middleware.cors import CORSMiddleware
# 导入 HTTPS 重定向中间件
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
# 导入可信主机中间件
from starlette.middleware.trustedhost import TrustedHostMiddleware

# 定义中间件列表
middleware = [
    # CORS 中间件:允许所有来源的跨域请求
    Middleware(CORSMiddleware, allow_origins=["*"]),
    # 可信主机中间件:只允许特定主机访问
    Middleware(TrustedHostMiddleware, allowed_hosts=["example.com"]),
    # HTTPS 重定向中间件:将 HTTP 请求重定向到 HTTPS
    Middleware(HTTPSRedirectMiddleware)
]

# 定义主函数
def main():
    # 导入必要的模块
    from starlette.applications import Starlette
    from starlette.routing import Route
    from starlette.responses import PlainTextResponse
    import uvicorn

    # 定义简单的处理函数
    async def homepage(request):
        return PlainTextResponse("Hello, World!")

    # 创建应用实例
    app = Starlette(
        routes=[Route("/", homepage)],
        middleware=middleware
    )
    # 启动服务器
    uvicorn.run(app, host="127.0.0.1", port=8000)

# 脚本入口
if __name__ == "__main__":
    main()

4.2. 自定义中间件 #

Starlette 允许创建自定义中间件来处理特定的业务逻辑,如请求计时、日志记录、认证等。自定义中间件继承自 BaseHTTPMiddleware 类。

# 导入基础中间件类
from starlette.middleware.base import BaseHTTPMiddleware
# 导入时间模块
import time

# 定义计时中间件类
class TimingMiddleware(BaseHTTPMiddleware):
    # 实现 dispatch 方法来处理请求
    async def dispatch(self, request, call_next):
        # 记录请求开始时间
        start_time = time.time()
        # 调用下一个中间件或路由处理函数
        response = await call_next(request)
        # 计算处理时间
        process_time = time.time() - start_time
        # 在响应头中添加处理时间
        response.headers["X-Process-Time"] = str(process_time)
        # 返回响应
        return response

# 定义主函数
def main():
    # 导入必要的模块
    from starlette.applications import Starlette
    from starlette.routing import Route
    from starlette.responses import PlainTextResponse
    import uvicorn

    # 定义简单的处理函数
    async def homepage(request):
        return PlainTextResponse("Hello, World!")

    # 创建应用实例
    app = Starlette(routes=[Route("/", homepage)])
    # 添加自定义中间件
    app.add_middleware(TimingMiddleware)
    # 启动服务器
    uvicorn.run(app, host="127.0.0.1", port=8000)

# 脚本入口
if __name__ == "__main__":
    main()

5. 数据库和后台任务 #

本节介绍如何在 Starlette 中集成数据库和后台任务。Starlette 本身不包含数据库 ORM,但可以轻松集成第三方数据库库,并支持后台任务处理。

5.1. 数据库集成 #

Starlette 可以与各种数据库库集成,如 databases、SQLAlchemy 等。通过应用的生命周期事件,可以管理数据库连接。

uv add databases[all] sqlalchemy
# 导入数据库库
from databases import Database
# 导入 SQLAlchemy
import sqlalchemy

# 创建数据库连接
database = Database("sqlite:///example.db")
# 创建元数据对象
metadata = sqlalchemy.MetaData()

# 定义启动事件处理函数
async def startup():
    # 启动时连接数据库
    await database.connect()
    print("数据库连接成功")

# 定义关闭事件处理函数
async def shutdown():
    # 关闭时断开数据库连接
    await database.disconnect()
    print("数据库连接已断开")

# 定义主函数
def main():
    # 导入必要的模块
    from starlette.applications import Starlette
    from starlette.routing import Route
    from starlette.responses import PlainTextResponse
    import uvicorn

    # 定义简单的处理函数
    async def homepage(request):
        return PlainTextResponse("数据库集成示例")

    # 创建应用实例
    app = Starlette(
        routes=[Route("/", homepage)],
        on_startup=[startup],
        on_shutdown=[shutdown]
    )
    # 启动服务器
    uvicorn.run(app, host="127.0.0.1", port=8000)

# 脚本入口
if __name__ == "__main__":
    main()

5.2. 后台任务 #

Starlette 支持后台任务,可以在响应返回后继续执行耗时的操作,如发送邮件、处理文件等。这提高了应用的响应性能。

# 导入后台任务类
from starlette.background import BackgroundTasks

# 导入异步 IO 模块
import asyncio


# 定义发送通知的异步函数
async def send_notification(email: str, message: str):
    # 模拟发送邮件(实际项目中替换为真实的邮件发送逻辑)
    print(f"正在发送邮件到 {email}: {message}")
    # 模拟网络延迟
    await asyncio.sleep(1)
    print(f"邮件发送完成: {email}")


# 定义创建用户的处理函数
async def create_user(request):
    # 解析请求数据
    data = await request.json()
    # 创建后台任务对象
    background = BackgroundTasks()
    # 添加后台任务
    background.add_task(send_notification, data["email"], "欢迎加入!")

    # 返回响应,包含后台任务
    from starlette.responses import JSONResponse

    return JSONResponse({"status": "用户创建成功"}, background=background)


# 定义主函数
def main():
    # 导入必要的模块
    from starlette.applications import Starlette
    from starlette.routing import Route
    from starlette.responses import PlainTextResponse, HTMLResponse
    from starlette.staticfiles import StaticFiles
    import uvicorn

    # 定义首页处理函数
    async def homepage(request):
        return HTMLResponse(
            """
        <!DOCTYPE html>
        <html>
        <head>
            <title>后台任务示例</title>
        </head>
        <body>
            <h1>后台任务示例</h1>
            <p><a href="/users">用户管理页面</a></p>
        </body>
        </html>
        """
        )

    # 定义用户页面处理函数
    async def users_page(request):
        with open("static/users.html", "r", encoding="utf-8") as f:
            html_content = f.read()
        return HTMLResponse(html_content)

    # 创建应用实例
    app = Starlette(
        routes=[
            Route("/", homepage),
            Route("/users", users_page),
            Route("/api/users", create_user, methods=["POST"]),
        ]
    )

    # 启动服务器
    uvicorn.run(app, host="127.0.0.1", port=8000)


# 脚本入口
if __name__ == "__main__":
    main()

users.html

<!DOCTYPE html>
<html lang="zh-CN">

<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>用户管理</title>
  <style>
    body {
      font-family: Arial, sans-serif;
      max-width: 500px;
      margin: 50px auto;
      padding: 20px;
    }

    .form-group {
      margin-bottom: 15px;
    }

    label {
      display: block;
      margin-bottom: 5px;
      font-weight: bold;
    }

    input {
      width: 100%;
      padding: 8px;
      border: 1px solid #ddd;
      border-radius: 4px;
    }

    button {
      background: #007bff;
      color: white;
      padding: 10px 20px;
      border: none;
      border-radius: 4px;
      cursor: pointer;
    }

    button:hover {
      background: #0056b3;
    }

    .result {
      margin-top: 20px;
      padding: 10px;
      border-radius: 4px;
    }

    .success {
      background: #d4edda;
      color: #155724;
      border: 1px solid #c3e6cb;
    }

    .error {
      background: #f8d7da;
      color: #721c24;
      border: 1px solid #f5c6cb;
    }
  </style>
</head>

<body>
  <h1>用户管理</h1>

  <form id="userForm">
    <div class="form-group">
      <label for="email">邮箱地址:</label>
      <input type="email" id="email" name="email" required>
    </div>

    <button type="submit">创建用户</button>
  </form>

  <div id="result"></div>

  <script>
    document.getElementById('userForm').addEventListener('submit', async function (e) {
      e.preventDefault();

      const email = document.getElementById('email').value;
      const resultDiv = document.getElementById('result');

      try {
        const response = await fetch('/api/users', {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
          },
          body: JSON.stringify({ email: email })
        });

        const data = await response.json();

        if (response.ok) {
          resultDiv.innerHTML = `<div class="result success">${data.status}</div>`;
          document.getElementById('email').value = '';
        } else {
          resultDiv.innerHTML = `<div class="result error">创建失败: ${data.detail || '未知错误'}</div>`;
        }
      } catch (error) {
        resultDiv.innerHTML = `<div class="result error">请求失败: ${error.message}</div>`;
      }
    });
  </script>
</body>

</html>

6. 模板渲染 #

本节介绍如何在 Starlette 中使用模板引擎。Starlette 支持多种模板引擎,最常用的是 Jinja2。模板渲染使得创建动态 HTML 页面变得简单。

# 导入 Jinja2 模板引擎
from starlette.templating import Jinja2Templates

# 创建模板引擎实例,指定模板目录
templates = Jinja2Templates(directory="templates")


# 定义用户页面处理函数
async def user_page(request):
    # 从路径参数中获取用户 ID
    user_id = request.path_params["user_id"]
    # 渲染模板并返回响应
    return templates.TemplateResponse(
        "user.html", {"request": request, "user_id": user_id}
    )


# 定义主函数
def main():
    # 导入必要的模块
    from starlette.applications import Starlette
    from starlette.routing import Route
    from starlette.responses import PlainTextResponse
    import uvicorn

    # 定义首页处理函数
    async def homepage(request):
        return PlainTextResponse("模板渲染示例")

    # 创建应用实例
    app = Starlette(
        routes=[Route("/", homepage), Route("/users/{user_id:int}", user_page)]
    )
    # 启动服务器
    uvicorn.run(app, host="127.0.0.1", port=8000)


# 脚本入口
if __name__ == "__main__":
    main()
<!DOCTYPE html>
<html lang="zh-CN">

<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>用户页面</title>
  <style>
    body {
      font-family: Arial, sans-serif;
      text-align: center;
      margin-top: 100px;
    }

    h1 {
      color: #333;
    }

    p {
      color: #666;
    }
  </style>
</head>

<body>
  <h1>用户 ID: {{ user_id }}</h1>
  <p>欢迎访问用户页面!</p>
</body>

</html>

7. 配置和部署 #

本节介绍如何配置和部署 Starlette 应用,包括环境配置、使用 Uvicorn 服务器等。正确的配置和部署对于生产环境的应用至关重要。

7.1. 环境配置 #

Starlette 支持从环境变量和配置文件读取配置,这使得应用在不同环境中可以有不同的配置。

# 导入操作系统模块
import os
# 导入 Starlette 配置类
from starlette.config import Config

# 创建配置对象,从 .env 文件读取配置
config = Config(".env")

# 读取配置项,支持类型转换和默认值
DEBUG = config("DEBUG", cast=bool, default=False)
DATABASE_URL = config("DATABASE_URL")
SECRET_KEY = config("SECRET_KEY")

# 定义主函数
def main():
    # 打印配置信息
    print(f"DEBUG 模式: {DEBUG}")
    print(f"数据库 URL: {DATABASE_URL}")
    print(f"密钥: {SECRET_KEY}")

    # 导入必要的模块
    from starlette.applications import Starlette
    from starlette.routing import Route
    from starlette.responses import PlainTextResponse
    import uvicorn

    # 定义简单的处理函数
    async def homepage(request):
        return PlainTextResponse(f"配置示例 - DEBUG: {DEBUG}")

    # 创建应用实例
    app = Starlette(routes=[Route("/", homepage)])
    # 启动服务器
    uvicorn.run(app, host="127.0.0.1", port=8000)

# 脚本入口
if __name__ == "__main__":
    main()

7.2. 使用 Uvicorn 部署 #

Uvicorn 是 Starlette 的推荐 ASGI 服务器,它提供了高性能的异步服务器实现,支持热重载和多工作进程。

# 安装 Uvicorn
pip install uvicorn

# 运行开发服务器(支持热重载)
uvicorn main:app --host 0.0.0.0 --port 8000 --reload

# 生产环境运行(多工作进程)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

8. 最佳实践 #

本节总结使用 Starlette 开发应用时的一些最佳实践。遵循这些实践可以帮助创建更可靠、更易维护的代码。

  1. 使用类型注解:充分利用 Request 和 Response 的类型提示
  2. 错误处理:合理处理异常,提供有意义的错误信息
  3. 中间件选择:只添加必要的中间件,避免性能开销
  4. 异步操作:确保所有IO操作都是异步的
  5. 资源管理:正确管理数据库连接和其他资源

9. 与其他框架的关系 #

本节介绍 Starlette 与其他 Python Web 框架的关系,帮助理解 Starlette 在整个 Python Web 生态系统中的位置。

  • FastAPI:基于 Starlette 构建,添加了自动文档、数据验证等功能
  • Django Channels:使用 ASGI,可以与 Starlette 组件集成
  • 其他 ASGI 框架:都可以与 Starlette 中间件和组件互操作

访问验证

请输入访问令牌

Token不正确,请重新输入