python定时任务框架(APScheduler)
介绍
Advanced Python Scheduler(APScheduler)是一个Python库,可让您安排Python代码稍后执行,一次或定期执行。您可以根据需要随时添加或删除旧作业。如果将作业存储在数据库中,它们还将在调度程序重新启动并保持其状态的过程中幸免。重新启动调度程序后,它将运行脱机1时应运行的所有作业。
安装
pip install apscheduler
如果您未安装pip,可以通过下载并运行 get-pin.py 进行安装
# 下载安装脚本
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
# 运行安装脚本 (默认python版本)
sudo python get-pip.py
# 指定python版本安装
sudo python3 get-pip.py
注意: 一般情况 pip 对应的是 Python 2.7,pip3 对应的是 Python 3.x。
基本概念
triggers- 触发器job stores- 作业存储器executors- 执行器schedulers- 调度器
1. 触发器
date:在您希望在某个特定时间仅运行一次作业时使用run_date(datetime | str)- 作业的运行日期或时间timezone(datetime.tzinfo | str)- 指定时区# 2019-01-01 运行一次 方法 job_function scheduler.add_job(job_function, 'date', run_date = date(2019, 1, 1)) # 2019-01-01 00:00:01 运行一次 方法 job_function scheduler.add_job(job_function, 'date', run_date = datetime(2019, 1, 1, 0, 0, 1))
interval:当您要以固定的时间间隔运行作业时使用weeks-int/ 间隔几周days–int/ 间隔几天hours–int/ 间隔几小时minutes-int/ 间隔几分钟seconds-int/ 间隔多少秒start_date-datetime | str/ 开始日期end_date-datetime | str/ 结束日期timezone-datetime.tzinfo | str/ 时区# 每两个小时调一下job_function scheduler.add_job(job_function, 'interval', hours = 2) # 每三天十二个小时调一下job_function scheduler.add_job(job_function, 'interval', days = 3, hours = 12)
cron:当您想在一天中的特定时间定期运行作业时使用year-int | str/ 年,4位数字month-int | str/ 月 (范围1-12)day-int | str/ 日 (范围1-31)week-int | str/ 周 (范围1-53)day_of_week-int | str/ 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)hour-int | str/ 时 (范围0-23)minute-int | str/ 分 (范围0-59)second-int | str/ 秒 (范围0-59)start_date-datetime | str/ 最早开始日期(包含)end_date-datetime | str/ 最晚结束时间(包含)timezone-datetime.tzinfo | str/ 指定时区# job_function将会在6,7,8,11,12月的第3个周五的1,2,3点运行 scheduler.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 截止到2019-12-30 00:00:00,每周一到周五早上五点半运行job_function scheduler.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2019-12-31')
calendarinterval:在您要在一天的特定时间以日历为基础的间隔运行作业时使用
2. 调度器
官方文档
https://apscheduler.readthedocs.io/en/latest/modules/schedulers/base.html所有调度程序的抽象基类。
from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler()- 启动已配置的执行程序和作业存储,并开始处理计划的作业。
scheduler.start(paused=False) # paused(bool)–如果True,在resume()被调用之前不要开始作业处理 - 关闭调度程序及其执行程序和作业存储。
scheduler.shutdown(wait = True) # wait(bool)– True等待所有当前正在执行的作业完成 - 暂停调度程序中的作业处理。
scheduler.pause() - 在调度程序中恢复作业处理。
scheduler.resume() - 将给定的作业添加到作业列表中,如果调度程序已经在运行,则将其唤醒。
scheduler.add_job(func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', executor='default', replace_existing=False, **trigger_args) # func –在给定时间运行的可调用(或对一个文本的引用) # trigger(str[date|interval|cron] | apscheduler.triggers.base.BaseTrigger)–确定何时func调用的触发器 # args(list | tuple)–用来调用func的位置参数列表 # kwargs(dict)–使用以下参数调用func的关键字参数的字典 # id(str | unicode)–作业的显式标识符(稍后进行修改) # name(str | unicode)–工作的文字描述 # misfire_grace_time(int)–在指定的运行时之后仍允许运行作业的秒数 # coalesce(布尔)–如果调度程序确定该作业应连续运行多次,则运行一次而不是多次 # max_instances(int)–此作业允许的并发运行实例的最大数量 # next_run_time(datetime)–何时首次运行作业,而不考虑触发器(通过 None将作业添加为暂停状态) # jobstore(str | unicode)–作业存储的别名,用于将作业存储在 # executor(str | unicode)–运行作业的执行者的别名 # replace_existing(bool)– True用相同的作业替换现有作业id (但保留现有作业的运行次数) - 修改单个作业的属性。
scheduler.modify_job(job_id, jobstore=None, **changes) # job_id(str | unicode)–作业的标识符 # jobstore(str | unicode)–包含作业的作业存储库的别名 - 使给定作业在明确恢复之前不执行。
scheduler.pause_job((job_id, jobstore=None) # job_id(str | unicode)–作业的标识符 # jobstore(str | unicode)–包含作业的作业存储库的别名 - 恢复给定作业的计划,如果计划已完成,则将其删除。
scheduler.resume_job((job_id, jobstore=None) # job_id(str | unicode)–作业的标识符 # jobstore(str | unicode)–包含作业的作业存储库的别名 - 返回与给定匹配的Job job_id。
scheduler.get_job(job_id, jobstore=None) # job_id(str | unicode)–作业的标识符 # jobstore(str | unicode)–最有可能包含作业的作业存储库的别名 - 从特定的作业存储或所有作业中返回挂起作业(如果尚未启动调度程序)和计划作业的列表。
scheduler.get_jobs(jobstore=None, pending=None) # jobstore(str | unicode)–作业存储的别名 - 删除作业,使其无法再运行。
scheduler.remove_job(job_id, jobstore=None) # job_id(str | unicode)–作业的标识符 # jobstore(str | unicode)–包含作业的作业存储库的别名 - 从指定的作业存储中删除所有作业,或者如果没有给出任何作业,则删除所有作业存储。
scheduler.remove_all_jobs(jobstore=None) # jobstore(str | unicode)–作业存储的别名 - 将执行程序添加到此调度程序。
scheduler.add_executor(executor, alias ='default', ** executor_opts) # executor (str|unicode|apscheduler.executors.base.BaseExecutor) – 执行程序实例或执行程序插件的名称 # alias (str|unicode) – 调度程序的别名 - 从此调度程序中删除具有给定别名的执行程序。
scheduler.remove_executor(alias, shutdown=True) # alias (str|unicode) - 执行者的别名 # shutdown (bool) – True删除执行程序后将其关闭 - 添加调度程序事件的侦听器。
scheduler.add_listener(callback,mask = EVENT_ALL) # callback -带有一个参数的任何可调用对象 # mask(int)–位掩码,指示应侦听哪些事件 - 删除以前添加的事件侦听器。
scheduler.remove_listener()
3. 作业存储器
github代码示例
https://github.com/agronholm/apscheduler/tree/master/examples/jobstores
默认值 - Memory
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
SQLAlchemy
from datetime import datetime, timedelta
import sys
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)
if __name__ == '__main__':
scheduler = BlockingScheduler()
url = sys.argv[1] if len(sys.argv) > 1 else 'sqlite:///example.sqlite'
scheduler.add_jobstore('sqlalchemy', url=url)
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
print('To clear the alarms, delete the example.sqlite file.')
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
MongoDB
from datetime import datetime, timedelta
import sys
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_jobstore('mongodb', collection='example_jobs')
if len(sys.argv) > 1 and sys.argv[1] == '--clear':
scheduler.remove_all_jobs()
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
print('To clear the alarms, run this example with the --clear argument.')
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
Redis
from datetime import datetime, timedelta
import sys
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')
if len(sys.argv) > 1 and sys.argv[1] == '--clear':
scheduler.remove_all_jobs()
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
print('To clear the alarms, run this example with the --clear argument.')
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
3. 执行器
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_executor('processpool')
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass

