1. schedule #
内容讲解: schedule 是一个轻量级的Python任务调度库,专门用于定期执行任务。它提供了简单直观的API来安排重复性任务,特别适合简单的定时任务需求。相比其他复杂的调度系统,schedule库的优势在于学习成本低、使用简单,是快速实现定时任务的理想选择。
schedule 是一个轻量级的Python任务调度库,用于定期执行任务。它提供了简单直观的API来安排重复性任务。
2. 安装 #
内容讲解: 安装schedule库非常简单,只需要使用pip包管理器即可。这个库是纯Python实现,没有复杂的依赖关系,安装后即可立即使用。
pip install schedule3. 基本用法 #
3.1 基本调度模式 #
内容讲解: 基本调度模式展示了schedule库最常用的功能,包括按时间间隔执行、按具体时间执行、按星期执行等。这些是日常开发中最常用的调度方式,掌握这些基本用法就能满足大部分定时任务需求。
# 导入必要的模块
import schedule
import time
def job():
"""定义要执行的任务函数"""
print("任务执行中...")
# 每10分钟执行一次任务
schedule.every(10).minutes.do(job)
# 每小时执行一次任务
schedule.every().hour.do(job)
# 每天10:30执行任务
schedule.every().day.at("10:30").do(job)
# 每周一执行任务
schedule.every().monday.do(job)
# 每周三13:15执行任务
schedule.every().wednesday.at("13:15").do(job)
# 主循环:持续运行调度器
while True:
# 运行所有到期的任务
schedule.run_pending()
# 等待1秒后再次检查
time.sleep(1)3.2 时间单位 #
内容讲解: schedule库支持多种时间单位,从秒到周都有覆盖。了解这些时间单位可以帮助你精确地安排任务执行时间。每种时间单位都有其适用场景,选择合适的单位可以提高调度的精确性和效率。
# 导入必要的模块
import schedule
import time
def job():
"""定义要执行的任务函数"""
print("执行任务")
# 各种时间单位的调度示例
schedule.every(10).seconds.do(job) # 每10秒执行一次
schedule.every(5).minutes.do(job) # 每5分钟执行一次
schedule.every().minute.do(job) # 每分钟执行一次
schedule.every(2).hours.do(job) # 每2小时执行一次
schedule.every().hour.do(job) # 每小时执行一次
schedule.every().day.do(job) # 每天执行一次
schedule.every().day.at("10:30").do(job) # 每天10:30执行
schedule.every().monday.do(job) # 每周一执行
schedule.every().wednesday.at("13:15").do(job) # 每周三13:15执行
schedule.every().week.do(job) # 每周执行一次
schedule.every(2).weeks.do(job) # 每2周执行一次
# 主循环:持续运行调度器
while True:
# 运行所有到期的任务
schedule.run_pending()
# 等待1秒后再次检查
time.sleep(1)4. 高级功能 #
4.1 带参数的任务 #
内容讲解: 带参数的任务功能允许你向任务函数传递参数,这使得任务更加灵活和可复用。你可以创建通用的任务函数,然后通过参数来定制不同的行为,这是实际开发中非常实用的功能。
# 导入必要的模块
import schedule
import time
def greet(name):
"""带参数的问候任务"""
print(f"你好, {name}! 现在时间是: {time.strftime('%H:%M:%S')}")
def send_email(to, subject, body):
"""带参数的邮件发送任务"""
print(f"发送邮件给 {to}: {subject} - {body}")
# 安排带参数的任务
schedule.every(2).seconds.do(greet, name="张三")
schedule.every().minute.do(send_email, "user@example.com", "测试邮件", "这是邮件内容")
# 主循环:持续运行调度器
while True:
# 运行所有到期的任务
schedule.run_pending()
# 等待1秒后再次检查
time.sleep(1)4.2 使用装饰器 #
内容讲解: 装饰器语法提供了一种更优雅的方式来注册定时任务。使用装饰器可以让代码更加清晰,任务的定义和调度安排在同一个地方,提高了代码的可读性和维护性。这是Python风格的编程方式。
# 导入必要的模块
import schedule
import time
# 使用装饰器注册任务,每10秒执行一次
@schedule.repeat(schedule.every(10).seconds)
def job1():
"""任务1:每10秒执行"""
print("任务1执行")
# 使用装饰器注册任务,每分钟执行一次
@schedule.repeat(schedule.every().minute)
def job2():
"""任务2:每分钟执行"""
print("任务2执行")
# 使用装饰器注册任务,每天12:00执行
@schedule.repeat(schedule.every().day.at("12:00"))
def daily_report():
"""每日报告:每天12:00执行"""
print("生成每日报告")
# 主循环:持续运行调度器
while True:
# 运行所有到期的任务
schedule.run_pending()
# 等待1秒后再次检查
time.sleep(1)4.3 任务标签和管理 #
内容讲解: 任务标签和管理功能允许你为任务添加标签,然后按标签进行分组管理。这对于管理大量任务非常有用,你可以按功能、优先级或其他标准对任务进行分类,然后批量操作特定类型的任务。
# 导入必要的模块
import schedule
import time
def job_a():
"""任务A:监控任务"""
print("任务A")
def job_b():
"""任务B:报告任务"""
print("任务B")
def job_c():
"""任务C:清理任务"""
print("任务C")
# 添加带标签的任务
schedule.every(5).seconds.do(job_a).tag('monitoring')
schedule.every(10).seconds.do(job_b).tag('reporting')
schedule.every(15).seconds.do(job_c).tag('monitoring', 'cleanup')
# 获取所有任务并打印
print("所有任务:")
for job in schedule.get_jobs():
print(f" - {job}")
# 按标签获取任务
print("\n监控任务:")
for job in schedule.get_jobs('monitoring'):
print(f" - {job}")
# 清除特定标签的任务
schedule.clear('cleanup')
print(f"\n清理后剩余任务数: {len(schedule.get_jobs())}")
# 清除所有任务
schedule.clear()
print(f"清除所有任务后: {len(schedule.get_jobs())}")
# 主循环:持续运行调度器
while True:
# 运行所有到期的任务
schedule.run_pending()
# 等待1秒后再次检查
time.sleep(1)4.4 任务取消和重新安排 #
内容讲解: 任务取消和重新安排功能提供了动态管理任务的能力。你可以在运行时取消正在执行的任务,或者重新安排任务的执行时间。这对于需要根据条件动态调整任务执行计划的场景非常有用。
# 导入必要的模块
import schedule
import time
def job():
"""定义要执行的任务函数"""
print("执行任务")
# 安排任务并保存引用
scheduled_job = schedule.every(5).seconds.do(job)
# 运行几次后取消任务
counter = 0
while True:
# 运行所有到期的任务
schedule.run_pending()
# 等待1秒后再次检查
time.sleep(1)
counter += 1
# 运行10次后取消任务
if counter == 10:
print("取消任务")
schedule.cancel_job(scheduled_job)
break
# 重新安排任务
print("重新安排任务")
scheduled_job = schedule.every(2).seconds.do(job)
# 继续运行重新安排的任务
while True:
# 运行所有到期的任务
schedule.run_pending()
# 等待1秒后再次检查
time.sleep(1)5. 错误处理 #
5.1 任务异常处理 #
内容讲解: 任务异常处理是生产环境中非常重要的功能。在实际应用中,任务可能会因为各种原因失败,如网络问题、资源不足、外部服务不可用等。良好的异常处理可以确保单个任务的失败不会影响整个调度系统的运行,同时提供详细的错误日志用于问题诊断。
# 导入必要的模块
import schedule
import time
import logging
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def risky_job():
"""模拟可能失败的任务"""
try:
# 模拟可能失败的任务(基于时间随机失败)
if time.time() % 2 > 1:
raise ValueError("随机错误")
print("任务成功执行")
except Exception as e:
# 记录错误但不中断调度器
logging.error(f"任务执行失败: {e}")
# 这里可以选择重试、发送通知或记录错误
def safe_job():
"""安全的任务,不会失败"""
print("安全任务执行")
# 安排任务
schedule.every(3).seconds.do(risky_job)
schedule.every(5).seconds.do(safe_job)
# 主循环:持续运行调度器
while True:
try:
# 运行所有到期的任务
schedule.run_pending()
except Exception as e:
# 捕获调度器本身的错误
logging.error(f"调度器错误: {e}")
# 等待1秒后再次检查
time.sleep(1)5.2 使用重试机制 #
内容讲解: 重试机制是处理临时性故障的有效方法。当任务因为网络抖动、服务暂时不可用等原因失败时,自动重试可以提高任务的成功率。retrying库提供了灵活的重试策略,包括重试次数、重试间隔、重试条件等配置选项。
# 导入必要的模块
import schedule
import time
from retrying import retry
# 注意:需要先安装retrying库
# pip install retrying
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def unreliable_job():
"""使用重试机制的不可靠任务"""
print("尝试执行不可靠任务...")
# 模拟可能失败的任务(基于时间随机失败)
if time.time() % 2 > 1.5:
raise ConnectionError("连接失败")
print("任务成功!")
# 安排任务
schedule.every(10).seconds.do(unreliable_job)
# 主循环:持续运行调度器
while True:
# 运行所有到期的任务
schedule.run_pending()
# 等待1秒后再次检查
time.sleep(1)6. 实际应用示例 #
6.1 示例1:系统监控任务 #
内容讲解: 系统监控是schedule库的典型应用场景。这个示例展示了如何创建一个完整的系统监控调度器,包括CPU、内存、磁盘使用率监控,服务状态检查,以及定期维护任务。这种监控系统可以及时发现系统问题,确保服务的稳定运行。
# 导入必要的模块
import schedule
import time
import psutil # 需要安装: pip install psutil
import logging
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def monitor_system():
"""监控系统资源使用情况"""
try:
# 获取CPU使用率
cpu_percent = psutil.cpu_percent()
# 获取内存使用情况
memory = psutil.virtual_memory()
# 获取磁盘使用情况
disk = psutil.disk_usage('/')
# 记录系统资源信息
logging.info(f"CPU使用率: {cpu_percent}%")
logging.info(f"内存使用: {memory.percent}%")
logging.info(f"磁盘使用: {disk.percent}%")
# 如果资源使用过高,发送警告
if cpu_percent > 80:
logging.warning("CPU使用率过高!")
if memory.percent > 85:
logging.warning("内存使用率过高!")
except Exception as e:
logging.error(f"系统监控失败: {e}")
def check_services():
"""检查重要服务状态"""
services = ['nginx', 'mysql', 'redis']
for service in services:
try:
# 这里简化处理,实际中可以使用subprocess或psutil检查服务状态
logging.info(f"检查服务: {service} - 运行中")
except Exception as e:
logging.error(f"服务 {service} 异常: {e}")
def cleanup_temp_files():
"""清理临时文件"""
try:
logging.info("执行临时文件清理")
# 这里可以添加实际的清理逻辑
# 例如:删除超过7天的临时文件
except Exception as e:
logging.error(f"清理临时文件失败: {e}")
# 安排监控任务
schedule.every(5).minutes.do(monitor_system).tag('monitoring')
schedule.every().hour.do(check_services).tag('monitoring')
schedule.every().day.at("02:00").do(cleanup_temp_files).tag('maintenance')
# 记录启动信息
logging.info("系统监控调度器启动...")
# 主循环:持续运行调度器
while True:
try:
# 运行所有到期的任务
schedule.run_pending()
# 每分钟检查一次
time.sleep(60)
except KeyboardInterrupt:
# 处理Ctrl+C中断
logging.info("监控调度器停止")
break
except Exception as e:
# 捕获其他异常
logging.error(f"调度器错误: {e}")
time.sleep(60)6.2 示例2:数据备份任务 #
内容讲解: 数据备份是另一个重要的应用场景。这个示例展示了如何创建一个自动化的数据备份系统,包括数据库备份、文件清理、云存储同步等功能。通过合理的调度安排,可以确保数据的安全性和一致性,同时避免备份任务影响正常业务运行。
# 导入必要的模块
import schedule
import time
import subprocess
import datetime
import os
import logging
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def backup_database():
"""备份数据库"""
try:
# 生成带时间戳的备份文件名
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"backup_{timestamp}.sql"
# 使用mysqldump备份(示例命令)
# 注意:实际使用时需要替换为真实的数据库连接信息
cmd = f"mysqldump -u username -p password database > {backup_file}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
logging.info(f"数据库备份成功: {backup_file}")
# 清理7天前的备份
cleanup_old_backups(7)
else:
logging.error(f"数据库备份失败: {result.stderr}")
except Exception as e:
logging.error(f"备份过程中出错: {e}")
def cleanup_old_backups(days=7):
"""清理旧备份文件"""
try:
# 计算截止时间
cutoff_time = time.time() - (days * 24 * 60 * 60)
# 遍历当前目录
for filename in os.listdir('.'):
# 检查是否为备份文件
if filename.startswith('backup_') and filename.endswith('.sql'):
file_time = os.path.getctime(filename)
# 如果文件时间早于截止时间,则删除
if file_time < cutoff_time:
os.remove(filename)
logging.info(f"删除旧备份: {filename}")
except Exception as e:
logging.error(f"清理旧备份失败: {e}")
def sync_to_cloud():
"""同步到云存储"""
try:
# 这里可以集成各种云存储SDK
logging.info("开始同步到云存储...")
# 模拟同步过程
time.sleep(2)
logging.info("云存储同步完成")
except Exception as e:
logging.error(f"云同步失败: {e}")
# 安排备份任务
schedule.every().day.at("01:00").do(backup_database).tag('backup')
schedule.every().day.at("02:00").do(sync_to_cloud).tag('backup', 'cloud')
# 记录启动信息
logging.info("数据备份调度器启动...")
# 主循环:持续运行调度器
while True:
try:
# 运行所有到期的任务
schedule.run_pending()
# 每分钟检查一次
time.sleep(60)
except KeyboardInterrupt:
# 处理Ctrl+C中断
logging.info("备份调度器停止")
break
except Exception as e:
# 捕获其他异常
logging.error(f"调度器错误: {e}")
time.sleep(60)6.3 示例3:Web API定时调用 #
内容讲解: Web API定时调用是schedule库在微服务架构中的重要应用。这个示例展示了如何创建一个API监控系统,定期检查API的健康状态、收集统计数据、生成报告等。通过合理的调度安排,可以实现对多个API端点的持续监控,及时发现服务异常并收集业务数据。
# 导入必要的模块
import schedule
import time
import requests
import json
import logging
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class APIMonitor:
"""API监控类"""
def __init__(self):
"""初始化API监控器"""
# 定义要监控的API端点
self.endpoints = {
'health': 'https://api.example.com/health',
'users': 'https://api.example.com/users/stats',
'orders': 'https://api.example.com/orders/stats'
}
def check_api_health(self):
"""检查API健康状态"""
url = self.endpoints['health']
try:
# 发送GET请求检查API健康状态
response = requests.get(url, timeout=10)
if response.status_code == 200:
logging.info("API健康检查: 正常")
return True
else:
logging.warning(f"API健康检查: 异常状态码 {response.status_code}")
return False
except Exception as e:
logging.error(f"API健康检查失败: {e}")
return False
def collect_user_stats(self):
"""收集用户统计信息"""
url = self.endpoints['users']
try:
# 发送GET请求获取用户统计数据
response = requests.get(url, timeout=15)
# 解析JSON响应
data = response.json()
logging.info(f"用户统计: {json.dumps(data, indent=2)}")
return data
except Exception as e:
logging.error(f"收集用户统计失败: {e}")
return None
def collect_order_stats(self):
"""收集订单统计信息"""
url = self.endpoints['orders']
try:
# 发送GET请求获取订单统计数据
response = requests.get(url, timeout=15)
# 解析JSON响应
data = response.json()
logging.info(f"订单统计: {json.dumps(data, indent=2)}")
return data
except Exception as e:
logging.error(f"收集订单统计失败: {e}")
return None
# 创建监控实例
monitor = APIMonitor()
# 安排API监控任务
schedule.every(5).minutes.do(monitor.check_api_health).tag('api', 'health')
schedule.every().hour.do(monitor.collect_user_stats).tag('api', 'stats')
schedule.every().hour.do(monitor.collect_order_stats).tag('api', 'stats')
# 每天生成报告
schedule.every().day.at("23:59").do(
lambda: logging.info("生成每日API监控报告")
).tag('api', 'report')
# 记录启动信息
logging.info("API监控调度器启动...")
# 主循环:持续运行调度器
while True:
try:
# 运行所有到期的任务
schedule.run_pending()
# 每分钟检查一次
time.sleep(60)
except KeyboardInterrupt:
# 处理Ctrl+C中断
logging.info("API监控调度器停止")
break
except Exception as e:
# 捕获其他异常
logging.error(f"调度器错误: {e}")
time.sleep(60)7. 最佳实践 #
7.1 使用类组织任务 #
内容讲解: 使用类来组织任务是一种良好的编程实践,特别是当你有多个相关的任务时。类提供了更好的代码组织结构,可以将相关的任务方法组织在一起,同时提供共享的状态和配置。这种方式使代码更易维护和扩展。
# 导入必要的模块
import schedule
import time
class TaskScheduler:
"""任务调度器类"""
def __init__(self):
"""初始化调度器"""
# 设置任务调度
self.setup_schedules()
def setup_schedules(self):
"""设置任务调度"""
# 每30分钟执行任务1
schedule.every(30).minutes.do(self.task1)
# 每小时执行任务2
schedule.every().hour.do(self.task2)
# 每天9:00执行每日任务
schedule.every().day.at("09:00").do(self.daily_task)
def task1(self):
"""任务1:定期执行的任务"""
print("执行任务1")
def task2(self):
"""任务2:每小时执行的任务"""
print("执行任务2")
def daily_task(self):
"""每日任务:每天执行一次"""
print("执行每日任务")
def run(self):
"""运行调度器"""
print("调度器启动...")
while True:
# 运行所有到期的任务
schedule.run_pending()
# 等待1秒后再次检查
time.sleep(1)
# 使用示例
if __name__ == "__main__":
# 创建调度器实例
scheduler = TaskScheduler()
# 运行调度器
scheduler.run()7.2 配置化调度 #
内容讲解: 配置化调度允许你将任务配置从代码中分离出来,使用配置文件来定义任务的执行计划。这种方式提供了更大的灵活性,可以在不修改代码的情况下调整任务调度,特别适合需要频繁调整执行时间的生产环境。
# 导入必要的模块
import schedule
import time
import yaml
import logging
# 配置日志记录
logging.basicConfig(level=logging.INFO)
def setup_from_config():
"""根据配置文件设置调度"""
try:
# 加载配置文件
with open('schedule_config.yaml', 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# 遍历配置中的任务
for task_config in config['schedules']:
# 从全局命名空间获取任务函数
task_func = globals().get(task_config['function'])
if task_func:
# 获取时间间隔配置
interval = task_config['interval']
# 根据时间单位设置调度
if interval['unit'] == 'seconds':
schedule.every(interval['value']).seconds.do(task_func)
elif interval['unit'] == 'minutes':
schedule.every(interval['value']).minutes.do(task_func)
elif interval['unit'] == 'hours':
schedule.every(interval['value']).hours.do(task_func)
elif interval['unit'] == 'days':
schedule.every(interval['value']).days.do(task_func)
elif interval['unit'] == 'at':
schedule.every().day.at(interval['value']).do(task_func)
# 记录任务安排信息
logging.info(f"安排任务: {task_config['function']} - 每{interval['value']}{interval['unit']}")
else:
logging.warning(f"未找到任务函数: {task_config['function']}")
except FileNotFoundError:
logging.error("配置文件 schedule_config.yaml 不存在")
except Exception as e:
logging.error(f"加载配置文件失败: {e}")
# 示例任务函数
def monitor_task():
"""监控任务"""
print("执行监控任务")
def report_task():
"""报告任务"""
print("执行报告任务")
def cleanup_task():
"""清理任务"""
print("执行清理任务")
# 示例配置文件 schedule_config.yaml:
# schedules:
# - function: "monitor_task"
# interval:
# unit: "minutes"
# value: 5
# - function: "report_task"
# interval:
# unit: "hours"
# value: 1
# - function: "cleanup_task"
# interval:
# unit: "at"
# value: "02:00"
# 使用示例
if __name__ == "__main__":
# 从配置文件设置调度
setup_from_config()
# 运行调度器
while True:
schedule.run_pending()
time.sleep(1)7.3 使用多线程(高级) #
内容讲解: 多线程调度器适用于需要同时执行多个长时间运行任务的场景。通过将调度器运行在单独的线程中,主线程可以继续处理其他业务逻辑,而不会阻塞任务的执行。这种方式特别适合需要用户交互或处理其他并发任务的应用程序。
# 导入必要的模块
import schedule
import time
import threading
import logging
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def run_scheduler():
"""运行调度器的线程函数"""
logging.info("调度器线程启动")
while True:
try:
# 运行所有到期的任务
schedule.run_pending()
# 等待1秒后再次检查
time.sleep(1)
except Exception as e:
logging.error(f"调度器线程错误: {e}")
time.sleep(5) # 出错后等待5秒再继续
def long_running_task():
"""长时间运行的任务"""
logging.info("开始长时间任务...")
# 模拟长时间运行(实际中可能是文件处理、网络请求等)
time.sleep(10)
logging.info("长时间任务完成")
# 安排任务(注意长时间任务可能会阻塞调度)
schedule.every(5).minutes.do(long_running_task)
# 在单独的线程中运行调度器
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
logging.info("调度器在后台线程中运行...")
# 主线程可以做其他事情
try:
while True:
# 主线程的业务逻辑
time.sleep(1)
except KeyboardInterrupt:
logging.info("程序退出")8. 注意事项 #
内容讲解: 这部分内容总结了使用schedule库时需要注意的重要事项。了解这些限制和注意事项可以帮助你更好地设计任务调度系统,避免常见的问题,并在需要时选择更合适的替代方案。
- 精度问题:
schedule不是实时系统,精度约为秒级 - 阻塞问题:长时间运行的任务会阻塞其他任务的执行
- 时区处理:默认使用系统时区,需要时区支持时考虑使用
pytz - 持久化:
schedule不提供任务持久化,重启后需要重新安排 - 分布式:不适合分布式环境,考虑使用 Celery 或 APScheduler
9. 替代方案 #
内容讲解: 当schedule库无法满足你的需求时,了解其他替代方案很重要。不同的调度系统有不同的特点和适用场景,选择合适的技术栈可以大大提高系统的可靠性和性能。
对于更复杂的需求,可以考虑:
- APScheduler: 功能更丰富的调度库,支持多种后端存储
- Celery: 分布式任务队列,适合大规模分布式系统
- Airflow: 工作流调度平台,适合复杂的数据管道
- Prefect: 现代工作流 orchestration 平台,提供更好的监控和调试功能
总结 #
内容讲解: schedule 模块是Python中处理简单定时任务的理想选择。它的简单性和易用性使得快速开发定时任务变得非常方便,特别适合原型开发和小型项目。通过本教程的学习,你应该能够熟练使用schedule库来创建各种定时任务系统。
schedule 模块非常适合简单的定时任务需求,它的简单性和易用性使得快速开发定时任务变得非常方便。
主要特点:
- 简单易用的API设计
- 支持多种时间单位和调度模式
- 提供任务标签和管理功能
- 支持带参数的任务和装饰器语法
- 轻量级,无复杂依赖
适用场景:
- 简单的定时任务需求
- 原型开发和快速验证
- 单机环境下的任务调度
- 学习和教学用途
- 小型项目的自动化需求
使用建议:
- 对于简单的定时任务,schedule是很好的选择
- 当需要持久化、分布式或高精度时,考虑使用替代方案
- 在生产环境中使用前,充分测试任务的稳定性和错误处理
- 合理设计任务执行时间,避免长时间运行的任务阻塞调度器
通过本教程的学习,你应该能够熟练使用Python的schedule模块来处理各种定时任务需求,并能够根据实际需求选择合适的调度解决方案。