# schedule
import schedule | |
import time | |
def job(): | |
# 这里是你要执行的任务代码 | |
print("执行任务") | |
# 设定每天的特定时间,例如每天 23:59:59 | |
schedule.every().day.at("23:59:59").do(job) | |
---------------------------------------------------------- | |
# 每小时执行一次 | |
# schedule.every().hour.at(":00").do(job) | |
---------------------------------------------------------- | |
# 每小时的 30 分钟,执行一次 | |
# schedule.every().hour.at(":30").do(job) | |
# 循环调度任务 | |
while True: | |
schedule.run_pending() | |
time.sleep(1) |
# BlockingScheduler(推荐)
from apscheduler.schedulers.blocking import BlockingScheduler | |
def job(): | |
print("定时任务执行了!") | |
shanghai = 'Asia/Shanghai' | |
# 创建调度器 | |
scheduler = BlockingScheduler() | |
scheduler.configure(timezone=shanghai) | |
# 添加定时任务,每 2 小时执行一次 | |
scheduler.add_job(job, 'interval', hours=2) | |
# 添加定时任务,每月 1 号凌晨 0 点执行 | |
scheduler.add_job(job, 'cron', month='1', day='1', hour='0', minute='0', second='0') | |
# 启动调度器 | |
scheduler.start() |
# 参数说明
weeks (int) – 隔多少周执行一次任务
days (int) – 隔多少天执行一次任务
hours (int) – 隔多少个小时执行一次任务
minutes (int) – 隔几分钟执行一次任务
seconds (int) – 隔几秒执行一次任务
start_date - 指定开始时间 (datetime|str) – starting point for the interval calculation
- 可以是 datetime 对象、date 对象、"% Y-% m-% d % H:% M:% S" 的字符串
end_date - 指定结束时间
- 可以是 datetime 对象、date 对象、"% Y-% m-% d % H:% M:% S" 的字符串
timezone - 时区,中国就用 'Asia/Shanghai' 就行
jitter 时间偏移量,单位秒
在预定运行时间的基础上,加上一个随机的秒数
例如 预定运行时间为 10:20:20 偏移量为 10 则真实运行时间期间为 [10:20:20-10:20:30]
# 形参说明
如果要执行的任务需要参数,可以通过 scheduler.add_job (args=(a,b)) 进行添加
def job(a, b): | |
print("定时任务执行了!", a, b) | |
.... | |
scheduler.add_job(job, 'interval', arghours=2) | |
.... |