1.Starlette #
本节介绍 Starlette 的基本定位。Starlette 是一个轻量级的 ASGI(Asynchronous Server Gateway Interface)框架/工具包,专为构建高性能的异步 Web 服务和应用程序而设计。它提供了构建现代 Web 应用所需的核心组件,包括路由、中间件、WebSocket 支持等,同时保持代码的简洁性和高性能。
2. 核心特性 #
本节概述 Starlette 的主要特性,包括异步支持、轻量级设计和高性能表现。这些特性使得 Starlette 特别适合构建需要高并发处理的现代 Web 应用。
2.1. 异步支持 #
Starlette 完全基于 Python 的 asyncio 构建,支持真正的异步 I/O 操作。这意味着它可以高效地处理大量并发连接,而不会阻塞其他请求的处理。
# 导入 Starlette 应用类
from starlette.applications import Starlette
# 导入 JSON 响应类
from starlette.responses import JSONResponse
# 导入路由类
from starlette.routing import Route
# 导入异步 IO 模块
import asyncio
# 定义异步首页处理函数
async def homepage(request):
# 模拟异步操作(如数据库查询、外部 API 调用等)
await asyncio.sleep(0.1)
# 返回 JSON 响应
return JSONResponse({"hello": "world"})
# 创建 Starlette 应用实例
app = Starlette(debug=True, routes=[
# 注册路由,将根路径映射到 homepage 函数
Route("/", homepage),
])
# 定义主函数
def main():
# 导入 uvicorn 服务器
import uvicorn
# 启动应用服务器
uvicorn.run(app, host="127.0.0.1", port=8000)
# 脚本入口
if __name__ == "__main__":
main()2.2. 轻量级和高性能 #
Starlette 本身非常轻量,但提供了构建完整 Web 应用所需的所有组件。它的设计哲学是"只包含必要的功能",避免不必要的复杂性,同时保持高性能。
3. 核心组件 #
本节详细介绍 Starlette 的核心组件,包括应用实例、路由系统、请求和响应处理等。这些组件是构建 Starlette 应用的基础,理解它们有助于更好地使用框架。
3.1. 应用 (Starlette) #
Starlette 应用是框架的核心,它管理着整个应用的生命周期,包括路由注册、中间件配置、启动和关闭事件等。
# 导入 Starlette 应用类
from starlette.applications import Starlette
# 导入中间件类
from starlette.middleware import Middleware
# 导入 CORS 中间件
from starlette.middleware.cors import CORSMiddleware
# 定义启动事件处理函数
async def startup_event():
# 应用启动时的初始化逻辑
print("应用启动中...")
# 定义关闭事件处理函数
async def shutdown_event():
# 应用关闭时的清理逻辑
print("应用关闭中...")
# 创建 Starlette 应用实例
app = Starlette(
# 启用调试模式
debug=True,
# 路由列表(稍后定义)
routes=[],
# 中间件列表
middleware=[
# 配置 CORS 中间件,允许所有来源
Middleware(CORSMiddleware, allow_origins=["*"])
],
# 启动事件列表
on_startup=[startup_event],
# 关闭事件列表
on_shutdown=[shutdown_event]
)
# 定义主函数
def main():
# 导入 uvicorn 服务器
import uvicorn
# 启动应用服务器
uvicorn.run(app, host="127.0.0.1", port=8000)
# 脚本入口
if __name__ == "__main__":
main()3.2. 路由 (Route, WebSocketRoute) #
Starlette 的路由系统支持多种类型的路由,包括 HTTP 路由、WebSocket 路由和静态文件挂载。路由系统灵活且易于使用,支持路径参数和类型转换。
# 导入路由相关类
from starlette.routing import Route, WebSocketRoute, Mount
# 导入静态文件处理类
from starlette.staticfiles import StaticFiles
# 定义首页处理函数
async def homepage(request):
# 返回简单的文本响应
from starlette.responses import PlainTextResponse
return PlainTextResponse("首页")
# 定义用户详情处理函数
async def user_detail(request):
# 获取路径参数中的用户 ID
user_id = request.path_params["user_id"]
from starlette.responses import JSONResponse
return JSONResponse({"user_id": user_id})
# 定义 WebSocket 端点
async def websocket_endpoint(websocket):
# 接受 WebSocket 连接
await websocket.accept()
# 发送欢迎消息
await websocket.send_text("WebSocket 连接成功")
# 定义路由列表
routes = [
# HTTP 路由:首页
Route("/", homepage),
# HTTP 路由:用户详情,支持路径参数和类型转换
Route("/users/{user_id:int}", user_detail),
# WebSocket 路由
WebSocketRoute("/ws", websocket_endpoint),
# 静态文件挂载
Mount("/static", app=StaticFiles(directory="static"), name="static")
]
# 定义主函数
def main():
# 导入必要的模块
from starlette.applications import Starlette
import uvicorn
# 创建应用实例
app = Starlette(routes=routes)
# 启动服务器
uvicorn.run(app, host="127.0.0.1", port=8000)
# 脚本入口
if __name__ == "__main__":
main()<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket 通信测试</title>
<style>
body {
font-family: 'Microsoft YaHei', Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
background-color: #f5f5f5;
}
.container {
background: white;
padding: 30px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
}
h1 {
color: #333;
text-align: center;
margin-bottom: 30px;
}
.status {
padding: 15px;
margin: 20px 0;
border-radius: 5px;
font-weight: bold;
}
.connected {
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.disconnected {
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
.message-area {
margin: 20px 0;
}
.message-input {
width: 100%;
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
margin-bottom: 10px;
font-size: 16px;
}
.send-btn {
background-color: #007bff;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
}
.send-btn:hover {
background-color: #0056b3;
}
.send-btn:disabled {
background-color: #6c757d;
cursor: not-allowed;
}
.messages {
background-color: #f8f9fa;
border: 1px solid #dee2e6;
border-radius: 5px;
padding: 15px;
height: 300px;
overflow-y: auto;
margin-top: 20px;
}
.message {
margin: 10px 0;
padding: 10px;
border-radius: 5px;
}
.message.sent {
background-color: #d1ecf1;
border-left: 4px solid #17a2b8;
}
.message.received {
background-color: #d4edda;
border-left: 4px solid #28a745;
}
.timestamp {
font-size: 12px;
color: #6c757d;
margin-top: 5px;
}
.controls {
text-align: center;
margin: 20px 0;
}
.control-btn {
margin: 0 10px;
padding: 10px 20px;
border: none;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
}
.connect-btn {
background-color: #28a745;
color: white;
}
.disconnect-btn {
background-color: #dc3545;
color: white;
}
.clear-btn {
background-color: #6c757d;
color: white;
}
</style>
</head>
<body>
<div class="container">
<h1>🔌 WebSocket 通信测试</h1>
<!-- 连接状态显示 -->
<div id="status" class="status disconnected">
状态:未连接
</div>
<!-- 控制按钮 -->
<div class="controls">
<button id="connectBtn" class="control-btn connect-btn">连接</button>
<button id="disconnectBtn" class="control-btn disconnect-btn" disabled>断开</button>
<button id="clearBtn" class="control-btn clear-btn">清空消息</button>
</div>
<!-- 消息发送区域 -->
<div class="message-area">
<input type="text" id="messageInput" class="message-input" placeholder="输入要发送的消息..." disabled>
<button id="sendBtn" class="send-btn" disabled>发送消息</button>
</div>
<!-- 消息显示区域 -->
<div class="messages" id="messages"></div>
</div>
<script>
class WebSocketClient {
constructor() {
this.ws = null;
this.isConnected = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.initElements();
this.bindEvents();
}
initElements() {
this.statusEl = document.getElementById('status');
this.connectBtn = document.getElementById('connectBtn');
this.disconnectBtn = document.getElementById('disconnectBtn');
this.clearBtn = document.getElementById('clearBtn');
this.messageInput = document.getElementById('messageInput');
this.sendBtn = document.getElementById('sendBtn');
this.messagesEl = document.getElementById('messages');
}
bindEvents() {
this.connectBtn.addEventListener('click', () => this.connect());
this.disconnectBtn.addEventListener('click', () => this.disconnect());
this.clearBtn.addEventListener('click', () => this.clearMessages());
this.sendBtn.addEventListener('click', () => this.sendMessage());
this.messageInput.addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
this.sendMessage();
}
});
}
connect() {
try {
// 创建 WebSocket 连接,连接到服务器的 /ws 端点
this.ws = new WebSocket('ws://127.0.0.1:8000/ws');
this.ws.onopen = () => {
this.isConnected = true;
this.updateStatus('已连接', true);
this.updateControls(true);
this.addMessage('系统', 'WebSocket 连接成功!', 'received');
this.reconnectAttempts = 0;
};
this.ws.onmessage = (event) => {
this.addMessage('服务器', event.data, 'received');
};
this.ws.onclose = (event) => {
this.isConnected = false;
this.updateStatus('连接已断开', false);
this.updateControls(false);
this.addMessage('系统', 'WebSocket 连接已断开', 'received');
// 尝试重连
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
this.addMessage('系统', `尝试重连... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`, 'received');
setTimeout(() => this.connect(), this.reconnectDelay);
} else {
this.addMessage('系统', '重连失败,请手动重新连接', 'received');
}
};
this.ws.onerror = (error) => {
this.addMessage('系统', '连接错误: ' + error.message, 'received');
};
} catch (error) {
this.addMessage('系统', '连接失败: ' + error.message, 'received');
}
}
disconnect() {
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
sendMessage() {
const message = this.messageInput.value.trim();
if (message && this.isConnected) {
this.ws.send(message);
this.addMessage('我', message, 'sent');
this.messageInput.value = '';
}
}
addMessage(sender, content, type) {
const messageDiv = document.createElement('div');
messageDiv.className = `message ${type}`;
const timestamp = new Date().toLocaleTimeString();
messageDiv.innerHTML = `
<strong>${sender}:</strong> ${content}
<div class="timestamp">${timestamp}</div>
`;
this.messagesEl.appendChild(messageDiv);
this.messagesEl.scrollTop = this.messagesEl.scrollHeight;
}
clearMessages() {
this.messagesEl.innerHTML = '';
}
updateStatus(text, connected) {
this.statusEl.textContent = `状态:${text}`;
this.statusEl.className = `status ${connected ? 'connected' : 'disconnected'}`;
}
updateControls(connected) {
this.connectBtn.disabled = connected;
this.disconnectBtn.disabled = !connected;
this.messageInput.disabled = !connected;
this.sendBtn.disabled = !connected;
}
}
// 页面加载完成后初始化 WebSocket 客户端
document.addEventListener('DOMContentLoaded', () => {
new WebSocketClient();
});
</script>
</body>
</html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket 通信测试</title>
<style>
body {
font-family: 'Microsoft YaHei', Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
background-color: #f5f5f5;
}
.container {
background: white;
padding: 30px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
}
h1 {
color: #333;
text-align: center;
margin-bottom: 30px;
}
.status {
padding: 15px;
margin: 20px 0;
border-radius: 5px;
font-weight: bold;
}
.connected {
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.disconnected {
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
.message-area {
margin: 20px 0;
}
.message-input {
width: 100%;
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
margin-bottom: 10px;
font-size: 16px;
}
.send-btn {
background-color: #007bff;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
}
.send-btn:hover {
background-color: #0056b3;
}
.send-btn:disabled {
background-color: #6c757d;
cursor: not-allowed;
}
.messages {
background-color: #f8f9fa;
border: 1px solid #dee2e6;
border-radius: 5px;
padding: 15px;
height: 300px;
overflow-y: auto;
margin-top: 20px;
}
.message {
margin: 10px 0;
padding: 10px;
border-radius: 5px;
}
.message.sent {
background-color: #d1ecf1;
border-left: 4px solid #17a2b8;
}
.message.received {
background-color: #d4edda;
border-left: 4px solid #28a745;
}
.timestamp {
font-size: 12px;
color: #6c757d;
margin-top: 5px;
}
.controls {
text-align: center;
margin: 20px 0;
}
.control-btn {
margin: 0 10px;
padding: 10px 20px;
border: none;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
}
.connect-btn {
background-color: #28a745;
color: white;
}
.disconnect-btn {
background-color: #dc3545;
color: white;
}
.clear-btn {
background-color: #6c757d;
color: white;
}
</style>
</head>
<body>
<div class="container">
<h1>🔌 WebSocket 通信测试</h1>
<!-- 连接状态显示 -->
<div id="status" class="status disconnected">
状态:未连接
</div>
<!-- 控制按钮 -->
<div class="controls">
<button id="connectBtn" class="control-btn connect-btn">连接</button>
<button id="disconnectBtn" class="control-btn disconnect-btn" disabled>断开</button>
<button id="clearBtn" class="control-btn clear-btn">清空消息</button>
</div>
<!-- 消息发送区域 -->
<div class="message-area">
<input type="text" id="messageInput" class="message-input" placeholder="输入要发送的消息..." disabled>
<button id="sendBtn" class="send-btn" disabled>发送消息</button>
</div>
<!-- 消息显示区域 -->
<div class="messages" id="messages"></div>
</div>
<script>
class WebSocketClient {
constructor() {
this.ws = null;
this.isConnected = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.initElements();
this.bindEvents();
}
initElements() {
this.statusEl = document.getElementById('status');
this.connectBtn = document.getElementById('connectBtn');
this.disconnectBtn = document.getElementById('disconnectBtn');
this.clearBtn = document.getElementById('clearBtn');
this.messageInput = document.getElementById('messageInput');
this.sendBtn = document.getElementById('sendBtn');
this.messagesEl = document.getElementById('messages');
}
bindEvents() {
this.connectBtn.addEventListener('click', () => this.connect());
this.disconnectBtn.addEventListener('click', () => this.disconnect());
this.clearBtn.addEventListener('click', () => this.clearMessages());
this.sendBtn.addEventListener('click', () => this.sendMessage());
this.messageInput.addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
this.sendMessage();
}
});
}
connect() {
try {
// 创建 WebSocket 连接,连接到服务器的 /ws 端点
this.ws = new WebSocket('ws://127.0.0.1:8000/ws');
this.ws.onopen = () => {
this.isConnected = true;
this.updateStatus('已连接', true);
this.updateControls(true);
this.addMessage('系统', 'WebSocket 连接成功!', 'received');
this.reconnectAttempts = 0;
};
this.ws.onmessage = (event) => {
this.addMessage('服务器', event.data, 'received');
};
this.ws.onclose = (event) => {
this.isConnected = false;
this.updateStatus('连接已断开', false);
this.updateControls(false);
this.addMessage('系统', 'WebSocket 连接已断开', 'received');
// 尝试重连
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
this.addMessage('系统', `尝试重连... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`, 'received');
setTimeout(() => this.connect(), this.reconnectDelay);
} else {
this.addMessage('系统', '重连失败,请手动重新连接', 'received');
}
};
this.ws.onerror = (error) => {
this.addMessage('系统', '连接错误: ' + error.message, 'received');
};
} catch (error) {
this.addMessage('系统', '连接失败: ' + error.message, 'received');
}
}
disconnect() {
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
sendMessage() {
const message = this.messageInput.value.trim();
if (message && this.isConnected) {
this.ws.send(message);
this.addMessage('我', message, 'sent');
this.messageInput.value = '';
}
}
addMessage(sender, content, type) {
const messageDiv = document.createElement('div');
messageDiv.className = `message ${type}`;
const timestamp = new Date().toLocaleTimeString();
messageDiv.innerHTML = `
<strong>${sender}:</strong> ${content}
<div class="timestamp">${timestamp}</div>
`;
this.messagesEl.appendChild(messageDiv);
this.messagesEl.scrollTop = this.messagesEl.scrollHeight;
}
clearMessages() {
this.messagesEl.innerHTML = '';
}
updateStatus(text, connected) {
this.statusEl.textContent = `状态:${text}`;
this.statusEl.className = `status ${connected ? 'connected' : 'disconnected'}`;
}
updateControls(connected) {
this.connectBtn.disabled = connected;
this.disconnectBtn.disabled = !connected;
this.messageInput.disabled = !connected;
this.sendBtn.disabled = !connected;
}
}
// 页面加载完成后初始化 WebSocket 客户端
document.addEventListener('DOMContentLoaded', () => {
new WebSocketClient();
});
</script>
</body>
</html>3.3. 请求和响应 #
Starlette 提供了丰富的请求和响应类型,支持 JSON、HTML、纯文本、重定向、流式响应和文件下载等多种格式。请求对象提供了便捷的方法来访问请求数据。
3.3.1 服务器 #
# 导入Request请求对象
from starlette.requests import Request
# 导入各种响应类型
from starlette.responses import (
JSONResponse, # JSON响应
HTMLResponse, # HTML响应
PlainTextResponse, # 纯文本响应
RedirectResponse, # 重定向响应
StreamingResponse, # 流式响应
FileResponse, # 文件响应
)
# 导入异步IO模块
import asyncio
import tempfile
import os
# 定义用户详情处理函数,支持GET和POST方法
async def user_detail(request: Request):
# 从路径参数中获取用户ID
user_id = request.path_params["user_id"]
# 打印调试信息:请求方法和用户ID
print(f"DEBUG: 请求方法 = {request.method}, 用户ID = {user_id}")
# 判断请求方法是否为GET
if request.method == "GET":
# 处理GET请求,返回用户基本信息
print("DEBUG: 处理GET请求")
return JSONResponse(
{"user_id": user_id, "method": "GET", "message": "获取用户信息成功"}
)
# 判断请求方法是否为POST
elif request.method == "POST":
# 处理POST请求,尝试解析请求体
print("DEBUG: 处理POST请求")
try:
# 尝试解析JSON格式的请求体
data = await request.json()
# 打印解析到的JSON数据
print(f"DEBUG: JSON数据 = {data}")
# 返回包含数据的JSON响应
return JSONResponse(
{
"user_id": user_id,
"method": "POST",
"data": data,
"message": "用户数据更新成功",
}
)
except Exception as e:
# JSON解析失败,打印异常信息
print(f"DEBUG: JSON解析失败 = {e}")
try:
# 尝试解析表单数据
form_data = await request.form()
# 打印解析到的表单数据
print(f"DEBUG: 表单数据 = {dict(form_data)}")
# 返回包含表单数据的JSON响应
return JSONResponse(
{
"user_id": user_id,
"method": "POST",
"form_data": dict(form_data),
"message": "表单数据接收成功",
}
)
except Exception as e2:
# 表单解析也失败,打印异常信息
print(f"DEBUG: 表单解析失败 = {e2}")
# 返回错误信息,提示无法解析请求数据
return JSONResponse(
{
"user_id": user_id,
"method": "POST",
"error": "无法解析请求数据",
"message": "请发送有效的JSON或表单数据",
},
status_code=400,
)
else:
# 处理不支持的请求方法
print(f"DEBUG: 不支持的请求方法 = {request.method}")
# 返回405错误,提示只允许GET和POST
return JSONResponse(
{"error": "不支持的请求方法", "allowed_methods": ["GET", "POST"]},
status_code=405,
)
# 定义HTML响应函数
async def html_response(request: Request):
# 构造HTML内容,包含当前时间、请求路径和方法
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>HTML响应测试</title>
</head>
<body>
<h1>这是一个HTML响应</h1>
<p>当前时间: {}</p>
<p>请求路径: {}</p>
<p>请求方法: {}</p>
</body>
</html>
""".format(
asyncio.get_event_loop().time(), request.url.path, request.method
)
# 返回HTML响应
return HTMLResponse(html_content)
# 定义纯文本响应函数
async def text_response(request: Request):
# 构造多行纯文本内容,包含时间、路径、方法和用户代理
text_content = f"""
纯文本响应示例
================
当前时间: {asyncio.get_event_loop().time()}
请求路径: {request.url.path}
请求方法: {request.method}
用户代理: {request.headers.get('user-agent', '未知')}
这是一个多行文本响应,用于测试PlainTextResponse。
""".strip()
# 返回纯文本响应
return PlainTextResponse(text_content)
# 定义重定向响应函数
async def redirect_response(request: Request):
# 返回重定向响应,跳转到静态文件页面
return RedirectResponse(url="/static/test_all.html")
# 定义流式数据响应函数
async def stream_data(request: Request):
# 定义异步生成器函数,用于逐条生成数据
async def generate():
# 循环生成10条数据
for i in range(10):
# 生成一行数据
yield f"data: {i}\n\n"
# 每次等待1秒
await asyncio.sleep(1)
# 返回流式响应,媒体类型为text/plain
return StreamingResponse(generate(), media_type="text/plain")
# 定义文件响应函数
async def file_response(request: Request):
# 创建临时文件,写入测试内容
with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt", delete=False, encoding="utf-8"
) as f:
# 写入多行内容
f.write("这是一个测试文件\n")
f.write("包含多行内容\n")
f.write("用于测试FileResponse\n")
f.write(f"创建时间: {asyncio.get_event_loop().time()}\n")
f.write("文件内容结束\n")
# 获取临时文件路径
temp_file_path = f.name
# 返回文件响应,设置下载文件名和媒体类型
return FileResponse(
path=temp_file_path,
filename="test_file.txt",
media_type="text/plain",
headers={"Content-Disposition": "attachment; filename=test_file.txt"},
)
# 定义表单处理函数
async def form_handler(request: Request):
# 判断请求方法是否为POST
if request.method == "POST":
# 解析表单数据
form_data = await request.form()
# 返回表单数据的JSON响应
return JSONResponse({"message": "表单提交成功", "form_data": dict(form_data)})
else:
# 构造HTML表单页面
html_form = """
<!DOCTYPE html>
<html>
<head>
<title>表单测试</title>
</head>
<body>
<h1>表单测试</h1>
<form method="POST">
<label>姓名: <input type="text" name="name" required></label><br><br>
<label>邮箱: <input type="email" name="email" required></label><br><br>
<label>消息: <textarea name="message" rows="4" cols="50"></textarea></label><br><br>
<button type="submit">提交</button>
</form>
</body>
</html>
"""
# 返回HTML响应
return HTMLResponse(html_form)
# 定义主函数,启动Starlette应用
def main():
# 导入Starlette相关模块
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.staticfiles import StaticFiles
import uvicorn
# 创建Starlette应用实例,配置路由
app = Starlette(
routes=[
# 用户详情接口,支持GET和POST
Route("/users/{user_id:int}", user_detail, methods=["GET", "POST"]),
# HTML响应接口
Route("/html", html_response),
# 纯文本响应接口
Route("/text", text_response),
# 重定向接口
Route("/redirect", redirect_response),
# 流式响应接口
Route("/stream", stream_data),
# 文件下载接口
Route("/file", file_response),
# 表单处理接口
Route("/form", form_handler),
# 挂载静态文件目录
Mount("/static", app=StaticFiles(directory="static"), name="static"),
]
)
# 打印服务器启动信息和可用端点
print(" 服务器启动中...")
print("📱 访问地址: http://127.0.0.1:8000")
print("🧪 测试页面: http://127.0.0.1:8000/static/test_all.html")
print(" 可用端点:")
print(" - GET /users/{user_id} - 用户详情")
print(" - GET /html - HTML响应")
print(" - GET /text - 纯文本响应")
print(" - GET /redirect - 重定向响应")
print(" - GET /stream - 流式响应")
print(" - GET /file - 文件下载")
print(" - GET/POST /form - 表单处理")
print(" - GET /static/* - 静态文件")
# 启动uvicorn服务器,监听127.0.0.1:8000
uvicorn.run(app, host="127.0.0.1", port=8000)
# 判断是否为主模块,作为脚本入口
if __name__ == "__main__":
main()
3.3.2 test_all.html #
<!DOCTYPE html>
<html>
<head>
<title>所有响应类型测试</title>
</head>
<body>
<h1>Starlette 响应类型测试</h1>
<ul>
<li><a href="json_test.html">JSON响应测试</a></li>
<li><a href="html_test.html">HTML响应测试</a></li>
<li><a href="text_test.html">纯文本响应测试</a></li>
<li><a href="redirect_test.html">重定向响应测试</a></li>
<li><a href="stream_test.html">流式响应测试</a></li>
<li><a href="file_test.html">文件响应测试</a></li>
</ul>
<h2>当前可用的端点</h2>
<ul>
<li><code>/users/{user_id}</code> - JSON响应(需要POST请求)</li>
<li><code>/stream</code> - 流式响应</li>
</ul>
<p>注意:某些响应类型需要在4.py中添加对应的路由才能正常工作。</p>
</body>
</html> 3.3.3 file_test.html #
<!DOCTYPE html>
<html>
<head>
<title>文件响应测试</title>
</head>
<body>
<h1>文件响应测试</h1>
<button onclick="testFileResponse()">测试文件响应</button>
<div id="result"></div>
<script>
async function testFileResponse() {
try {
document.getElementById('result').textContent = '正在请求文件...';
const response = await fetch('/file');
console.log('文件响应状态:', response.status);
console.log('响应头:', Object.fromEntries(response.headers.entries()));
if (response.ok) {
const blob = await response.blob();
console.log('文件大小:', blob.size, '字节');
const url = URL.createObjectURL(blob);
const link = document.createElement('a');
link.href = url;
link.download = 'downloaded_file.txt';
link.textContent = '点击下载文件 (test_file.txt)';
link.style.display = 'block';
link.style.margin = '10px 0';
link.style.padding = '10px';
link.style.backgroundColor = '#007bff';
link.style.color = 'white';
link.style.textDecoration = 'none';
link.style.borderRadius = '5px';
document.getElementById('result').innerHTML = '';
document.getElementById('result').appendChild(link);
// 显示文件信息
const info = document.createElement('div');
info.innerHTML = `<br><strong>文件信息:</strong><br>大小: ${blob.size} 字节<br>类型: ${blob.type}`;
document.getElementById('result').appendChild(info);
} else {
const errorText = await response.text();
document.getElementById('result').textContent = `文件下载失败:\n状态码: ${response.status}\n错误信息: ${errorText}`;
}
} catch (error) {
console.error('文件下载错误:', error);
document.getElementById('result').textContent = '错误: ' + error.message;
}
}
</script>
</body>
</html>3.3.4 html_test.html #
<!DOCTYPE html>
<html>
<head>
<title>HTML响应测试</title>
</head>
<body>
<h1>HTML响应测试</h1>
<button onclick="testHtmlResponse()">测试HTML响应</button>
<div id="result"></div>
<script>
async function testHtmlResponse() {
try {
const response = await fetch('/html');
const html = await response.text();
document.getElementById('result').innerHTML = html;
} catch (error) {
document.getElementById('result').textContent = '错误: ' + error.message;
}
}
</script>
</body>
</html> 3.3.5 json_test.html #
<!DOCTYPE html>
<html>
<head>
<title>JSON响应测试</title>
</head>
<body>
<h1>JSON响应测试</h1>
<h3>GET请求测试</h3>
<button onclick="testGet()">测试GET请求</button>
<h3>POST请求测试</h3>
<form id="jsonForm">
<label>用户ID: <input type="number" id="userId" value="123"></label><br>
<label>JSON数据: <textarea id="jsonData">{"name": "测试用户", "age": 25}</textarea></label><br>
<button type="submit">发送POST请求</button>
</form>
<div id="result"></div>
<script>
// GET请求测试
async function testGet() {
const userId = document.getElementById('userId').value;
try {
const response = await fetch(`/users/${userId}`, {
method: 'GET'
});
// 检查响应状态
if (!response.ok) {
const errorText = await response.text();
document.getElementById('result').textContent = `GET请求失败:\n状态码: ${response.status}\n错误信息: ${errorText}`;
return;
}
const result = await response.json();
document.getElementById('result').textContent = 'GET请求结果:\n' + JSON.stringify(result, null, 2);
} catch (error) {
document.getElementById('result').textContent = 'GET请求错误: ' + error.message;
}
}
// POST请求测试
document.getElementById('jsonForm').onsubmit = async function (e) {
e.preventDefault();
const userId = document.getElementById('userId').value;
const jsonData = document.getElementById('jsonData').value;
try {
const response = await fetch(`/users/${userId}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: jsonData
});
// 检查响应状态
if (!response.ok) {
const errorText = await response.text();
document.getElementById('result').textContent = `POST请求失败:\n状态码: ${response.status}\n错误信息: ${errorText}`;
return;
}
const result = await response.json();
document.getElementById('result').textContent = 'POST请求结果:\n' + JSON.stringify(result, null, 2);
} catch (error) {
document.getElementById('result').textContent = 'POST请求错误: ' + error.message;
}
};
</script>
</body>
</html>3.3.6 redirect_test.html #
<!DOCTYPE html>
<html>
<head>
<title>重定向响应测试</title>
</head>
<body>
<h1>重定向响应测试</h1>
<button onclick="testRedirect()">测试重定向</button>
<div id="result"></div>
<script>
async function testRedirect() {
try {
const response = await fetch('/redirect');
if (response.redirected) {
document.getElementById('result').textContent = '重定向到: ' + response.url;
} else {
document.getElementById('result').textContent = '没有重定向';
}
} catch (error) {
document.getElementById('result').textContent = '错误: ' + error.message;
}
}
</script>
</body>
</html> 3.3.7 simple_test.html #
<!DOCTYPE html>
<html>
<head>
<title>简单测试</title>
</head>
<body>
<h1>简单测试页面</h1>
<h2>测试GET请求</h2>
<button onclick="testGet()">测试GET /users/123</button>
<h2>测试POST请求</h2>
<button onclick="testPost()">测试POST /users/123</button>
<h2>结果</h2>
<pre id="result"></pre>
<script>
async function testGet() {
try {
console.log('发送GET请求...');
const response = await fetch('/users/123', { method: 'GET' });
console.log('响应状态:', response.status);
const text = await response.text();
console.log('响应内容:', text);
document.getElementById('result').textContent = `GET请求结果:\n状态: ${response.status}\n内容: ${text}`;
} catch (error) {
console.error('GET请求错误:', error);
document.getElementById('result').textContent = `GET请求错误: ${error.message}`;
}
}
async function testPost() {
try {
console.log('发送POST请求...');
const response = await fetch('/users/123', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ name: '测试用户', age: 25 })
});
console.log('响应状态:', response.status);
const text = await response.text();
console.log('响应内容:', text);
document.getElementById('result').textContent = `POST请求结果:\n状态: ${response.status}\n内容: ${text}`;
} catch (error) {
console.error('POST请求错误:', error);
document.getElementById('result').textContent = `POST请求错误: ${error.message}`;
}
}
</script>
</body>
</html>3.3.8 stream_test.html #
<!DOCTYPE html>
<html>
<head>
<title>流式响应测试</title>
</head>
<body>
<h1>流式响应测试</h1>
<button onclick="testStream()">测试流式响应</button>
<button onclick="clearResult()">清空结果</button>
<div id="result"></div>
<script>
async function testStream() {
try {
const response = await fetch('/stream');
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const {done, value} = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.substring(6);
const div = document.createElement('div');
div.textContent = data;
document.getElementById('result').appendChild(div);
}
}
}
} catch (error) {
document.getElementById('result').textContent = '错误: ' + error.message;
}
}
function clearResult() {
document.getElementById('result').innerHTML = '';
}
</script>
</body>
</html> 3.3.9 text_test.html #
<!DOCTYPE html>
<html>
<head>
<title>纯文本响应测试</title>
</head>
<body>
<h1>纯文本响应测试</h1>
<button onclick="testTextResponse()">测试纯文本响应</button>
<div id="result"></div>
<script>
async function testTextResponse() {
try {
const response = await fetch('/text');
const text = await response.text();
document.getElementById('result').textContent = text;
} catch (error) {
document.getElementById('result').textContent = '错误: ' + error.message;
}
}
</script>
</body>
</html>4. 中间件系统 #
本节介绍 Starlette 的中间件系统,包括内置中间件和自定义中间件的使用方法。中间件是 Starlette 中处理请求和响应的强大工具,可以在请求处理前后添加自定义逻辑。
4.1. 内置中间件 #
Starlette 提供了多种内置中间件,包括 CORS 处理、HTTPS 重定向、可信主机验证等。这些中间件可以快速配置常用的 Web 应用功能。
# 导入中间件类
from starlette.middleware import Middleware
# 导入 CORS 中间件
from starlette.middleware.cors import CORSMiddleware
# 导入 HTTPS 重定向中间件
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
# 导入可信主机中间件
from starlette.middleware.trustedhost import TrustedHostMiddleware
# 定义中间件列表
middleware = [
# CORS 中间件:允许所有来源的跨域请求
Middleware(CORSMiddleware, allow_origins=["*"]),
# 可信主机中间件:只允许特定主机访问
Middleware(TrustedHostMiddleware, allowed_hosts=["example.com"]),
# HTTPS 重定向中间件:将 HTTP 请求重定向到 HTTPS
Middleware(HTTPSRedirectMiddleware)
]
# 定义主函数
def main():
# 导入必要的模块
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import PlainTextResponse
import uvicorn
# 定义简单的处理函数
async def homepage(request):
return PlainTextResponse("Hello, World!")
# 创建应用实例
app = Starlette(
routes=[Route("/", homepage)],
middleware=middleware
)
# 启动服务器
uvicorn.run(app, host="127.0.0.1", port=8000)
# 脚本入口
if __name__ == "__main__":
main()4.2. 自定义中间件 #
Starlette 允许创建自定义中间件来处理特定的业务逻辑,如请求计时、日志记录、认证等。自定义中间件继承自 BaseHTTPMiddleware 类。
# 导入基础中间件类
from starlette.middleware.base import BaseHTTPMiddleware
# 导入时间模块
import time
# 定义计时中间件类
class TimingMiddleware(BaseHTTPMiddleware):
# 实现 dispatch 方法来处理请求
async def dispatch(self, request, call_next):
# 记录请求开始时间
start_time = time.time()
# 调用下一个中间件或路由处理函数
response = await call_next(request)
# 计算处理时间
process_time = time.time() - start_time
# 在响应头中添加处理时间
response.headers["X-Process-Time"] = str(process_time)
# 返回响应
return response
# 定义主函数
def main():
# 导入必要的模块
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import PlainTextResponse
import uvicorn
# 定义简单的处理函数
async def homepage(request):
return PlainTextResponse("Hello, World!")
# 创建应用实例
app = Starlette(routes=[Route("/", homepage)])
# 添加自定义中间件
app.add_middleware(TimingMiddleware)
# 启动服务器
uvicorn.run(app, host="127.0.0.1", port=8000)
# 脚本入口
if __name__ == "__main__":
main()5. 数据库和后台任务 #
本节介绍如何在 Starlette 中集成数据库和后台任务。Starlette 本身不包含数据库 ORM,但可以轻松集成第三方数据库库,并支持后台任务处理。
5.1. 数据库集成 #
Starlette 可以与各种数据库库集成,如 databases、SQLAlchemy 等。通过应用的生命周期事件,可以管理数据库连接。
uv add databases[all] sqlalchemy# 导入数据库库
from databases import Database
# 导入 SQLAlchemy
import sqlalchemy
# 创建数据库连接
database = Database("sqlite:///example.db")
# 创建元数据对象
metadata = sqlalchemy.MetaData()
# 定义启动事件处理函数
async def startup():
# 启动时连接数据库
await database.connect()
print("数据库连接成功")
# 定义关闭事件处理函数
async def shutdown():
# 关闭时断开数据库连接
await database.disconnect()
print("数据库连接已断开")
# 定义主函数
def main():
# 导入必要的模块
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import PlainTextResponse
import uvicorn
# 定义简单的处理函数
async def homepage(request):
return PlainTextResponse("数据库集成示例")
# 创建应用实例
app = Starlette(
routes=[Route("/", homepage)],
on_startup=[startup],
on_shutdown=[shutdown]
)
# 启动服务器
uvicorn.run(app, host="127.0.0.1", port=8000)
# 脚本入口
if __name__ == "__main__":
main()5.2. 后台任务 #
Starlette 支持后台任务,可以在响应返回后继续执行耗时的操作,如发送邮件、处理文件等。这提高了应用的响应性能。
# 导入后台任务类
from starlette.background import BackgroundTasks
# 导入异步 IO 模块
import asyncio
# 定义发送通知的异步函数
async def send_notification(email: str, message: str):
# 模拟发送邮件(实际项目中替换为真实的邮件发送逻辑)
print(f"正在发送邮件到 {email}: {message}")
# 模拟网络延迟
await asyncio.sleep(1)
print(f"邮件发送完成: {email}")
# 定义创建用户的处理函数
async def create_user(request):
# 解析请求数据
data = await request.json()
# 创建后台任务对象
background = BackgroundTasks()
# 添加后台任务
background.add_task(send_notification, data["email"], "欢迎加入!")
# 返回响应,包含后台任务
from starlette.responses import JSONResponse
return JSONResponse({"status": "用户创建成功"}, background=background)
# 定义主函数
def main():
# 导入必要的模块
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import PlainTextResponse, HTMLResponse
from starlette.staticfiles import StaticFiles
import uvicorn
# 定义首页处理函数
async def homepage(request):
return HTMLResponse(
"""
<!DOCTYPE html>
<html>
<head>
<title>后台任务示例</title>
</head>
<body>
<h1>后台任务示例</h1>
<p><a href="/users">用户管理页面</a></p>
</body>
</html>
"""
)
# 定义用户页面处理函数
async def users_page(request):
with open("static/users.html", "r", encoding="utf-8") as f:
html_content = f.read()
return HTMLResponse(html_content)
# 创建应用实例
app = Starlette(
routes=[
Route("/", homepage),
Route("/users", users_page),
Route("/api/users", create_user, methods=["POST"]),
]
)
# 启动服务器
uvicorn.run(app, host="127.0.0.1", port=8000)
# 脚本入口
if __name__ == "__main__":
main()
users.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>用户管理</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 500px;
margin: 50px auto;
padding: 20px;
}
.form-group {
margin-bottom: 15px;
}
label {
display: block;
margin-bottom: 5px;
font-weight: bold;
}
input {
width: 100%;
padding: 8px;
border: 1px solid #ddd;
border-radius: 4px;
}
button {
background: #007bff;
color: white;
padding: 10px 20px;
border: none;
border-radius: 4px;
cursor: pointer;
}
button:hover {
background: #0056b3;
}
.result {
margin-top: 20px;
padding: 10px;
border-radius: 4px;
}
.success {
background: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.error {
background: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
</style>
</head>
<body>
<h1>用户管理</h1>
<form id="userForm">
<div class="form-group">
<label for="email">邮箱地址:</label>
<input type="email" id="email" name="email" required>
</div>
<button type="submit">创建用户</button>
</form>
<div id="result"></div>
<script>
document.getElementById('userForm').addEventListener('submit', async function (e) {
e.preventDefault();
const email = document.getElementById('email').value;
const resultDiv = document.getElementById('result');
try {
const response = await fetch('/api/users', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ email: email })
});
const data = await response.json();
if (response.ok) {
resultDiv.innerHTML = `<div class="result success">${data.status}</div>`;
document.getElementById('email').value = '';
} else {
resultDiv.innerHTML = `<div class="result error">创建失败: ${data.detail || '未知错误'}</div>`;
}
} catch (error) {
resultDiv.innerHTML = `<div class="result error">请求失败: ${error.message}</div>`;
}
});
</script>
</body>
</html>6. 模板渲染 #
本节介绍如何在 Starlette 中使用模板引擎。Starlette 支持多种模板引擎,最常用的是 Jinja2。模板渲染使得创建动态 HTML 页面变得简单。
# 导入 Jinja2 模板引擎
from starlette.templating import Jinja2Templates
# 创建模板引擎实例,指定模板目录
templates = Jinja2Templates(directory="templates")
# 定义用户页面处理函数
async def user_page(request):
# 从路径参数中获取用户 ID
user_id = request.path_params["user_id"]
# 渲染模板并返回响应
return templates.TemplateResponse(
"user.html", {"request": request, "user_id": user_id}
)
# 定义主函数
def main():
# 导入必要的模块
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import PlainTextResponse
import uvicorn
# 定义首页处理函数
async def homepage(request):
return PlainTextResponse("模板渲染示例")
# 创建应用实例
app = Starlette(
routes=[Route("/", homepage), Route("/users/{user_id:int}", user_page)]
)
# 启动服务器
uvicorn.run(app, host="127.0.0.1", port=8000)
# 脚本入口
if __name__ == "__main__":
main()
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>用户页面</title>
<style>
body {
font-family: Arial, sans-serif;
text-align: center;
margin-top: 100px;
}
h1 {
color: #333;
}
p {
color: #666;
}
</style>
</head>
<body>
<h1>用户 ID: {{ user_id }}</h1>
<p>欢迎访问用户页面!</p>
</body>
</html>7. 配置和部署 #
本节介绍如何配置和部署 Starlette 应用,包括环境配置、使用 Uvicorn 服务器等。正确的配置和部署对于生产环境的应用至关重要。
7.1. 环境配置 #
Starlette 支持从环境变量和配置文件读取配置,这使得应用在不同环境中可以有不同的配置。
# 导入操作系统模块
import os
# 导入 Starlette 配置类
from starlette.config import Config
# 创建配置对象,从 .env 文件读取配置
config = Config(".env")
# 读取配置项,支持类型转换和默认值
DEBUG = config("DEBUG", cast=bool, default=False)
DATABASE_URL = config("DATABASE_URL")
SECRET_KEY = config("SECRET_KEY")
# 定义主函数
def main():
# 打印配置信息
print(f"DEBUG 模式: {DEBUG}")
print(f"数据库 URL: {DATABASE_URL}")
print(f"密钥: {SECRET_KEY}")
# 导入必要的模块
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import PlainTextResponse
import uvicorn
# 定义简单的处理函数
async def homepage(request):
return PlainTextResponse(f"配置示例 - DEBUG: {DEBUG}")
# 创建应用实例
app = Starlette(routes=[Route("/", homepage)])
# 启动服务器
uvicorn.run(app, host="127.0.0.1", port=8000)
# 脚本入口
if __name__ == "__main__":
main()7.2. 使用 Uvicorn 部署 #
Uvicorn 是 Starlette 的推荐 ASGI 服务器,它提供了高性能的异步服务器实现,支持热重载和多工作进程。
# 安装 Uvicorn
pip install uvicorn
# 运行开发服务器(支持热重载)
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
# 生产环境运行(多工作进程)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 48. 最佳实践 #
本节总结使用 Starlette 开发应用时的一些最佳实践。遵循这些实践可以帮助创建更可靠、更易维护的代码。
- 使用类型注解:充分利用 Request 和 Response 的类型提示
- 错误处理:合理处理异常,提供有意义的错误信息
- 中间件选择:只添加必要的中间件,避免性能开销
- 异步操作:确保所有IO操作都是异步的
- 资源管理:正确管理数据库连接和其他资源
9. 与其他框架的关系 #
本节介绍 Starlette 与其他 Python Web 框架的关系,帮助理解 Starlette 在整个 Python Web 生态系统中的位置。
- FastAPI:基于 Starlette 构建,添加了自动文档、数据验证等功能
- Django Channels:使用 ASGI,可以与 Starlette 组件集成
- 其他 ASGI 框架:都可以与 Starlette 中间件和组件互操作