您的当前位置:首页>全部文章>文章详情

python定时任务框架(APScheduler)

发表于:2019-10-15 11:28:45浏览:56次TAG: #python

介绍

Advanced Python Scheduler(APScheduler)是一个Python库,可让您安排Python代码稍后执行,一次或定期执行。您可以根据需要随时添加或删除旧作业。如果将作业存储在数据库中,它们还将在调度程序重新启动并保持其状态的过程中幸免。重新启动调度程序后,它将运行脱机1时应运行的所有作业。

安装

pip install apscheduler

如果您未安装pip,可以通过下载并运行 get-pin.py 进行安装

# 下载安装脚本
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py

# 运行安装脚本 (默认python版本)
sudo python get-pip.py

# 指定python版本安装
sudo python3 get-pip.py

注意: 一般情况 pip 对应的是 Python 2.7,pip3 对应的是 Python 3.x。

基本概念

  1. 模块索引

  2. triggers - 触发器

  3. job stores - 作业存储器

  4. executors - 执行器

  5. schedulers - 调度器

1. 触发器

  • date:在您希望在某个特定时间仅运行一次作业时使用

    • run_date(datetime | str) - 作业的运行日期或时间

    • timezone(datetime.tzinfo | str) - 指定时区

        # 2019-01-01 运行一次 方法 job_function
        scheduler.add_job(job_function, 'date', run_date = date(2019, 1, 1))
        # 2019-01-01 00:00:01 运行一次 方法 job_function
        scheduler.add_job(job_function, 'date', run_date = datetime(2019, 1, 1, 0, 0, 1))
      
  • interval:当您要以固定的时间间隔运行作业时使用

    • weeks - int / 间隔几周

    • daysint / 间隔几天

    • hoursint / 间隔几小时

    • minutes - int / 间隔几分钟

    • seconds - int / 间隔多少秒

    • start_date - datetime | str / 开始日期

    • end_date - datetime | str / 结束日期

    • timezone - datetime.tzinfo | str / 时区

      # 每两个小时调一下job_function
      scheduler.add_job(job_function, 'interval', hours = 2)
      
      # 每三天十二个小时调一下job_function
      scheduler.add_job(job_function, 'interval', days = 3, hours = 12)
      
  • cron:当您想在一天中的特定时间定期运行作业时使用

    • year - int | str / 年,4位数字

    • month - int | str / 月 (范围1-12)

    • day - int | str / 日 (范围1-31)

    • week - int | str / 周 (范围1-53)

    • day_of_week - int | str / 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)

    • hour - int | str / 时 (范围0-23)

    • minute - int | str / 分 (范围0-59)

    • second - int | str / 秒 (范围0-59)

    • start_date - datetime | str / 最早开始日期(包含)

    • end_date - datetime | str / 最晚结束时间(包含)

    • timezone - datetime.tzinfo | str / 指定时区

      # job_function将会在6,7,8,11,12月的第3个周五的1,2,3点运行
      scheduler.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
      
      # 截止到2019-12-30 00:00:00,每周一到周五早上五点半运行job_function
      scheduler.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2019-12-31')
      
  • calendarinterval:在您要在一天的特定时间以日历为基础的间隔运行作业时使用

2. 调度器

  1. 官方文档
    https://apscheduler.readthedocs.io/en/latest/modules/schedulers/base.html

  2. 所有调度程序的抽象基类。

    from apscheduler.schedulers.blocking import BlockingScheduler
    scheduler = BlockingScheduler()
    
  3. 启动已配置的执行程序和作业存储,并开始处理计划的作业。
    scheduler.start(paused=False) 
    # paused(bool)–如果True,在resume()被调用之前不要开始作业处理
    
  4. 关闭调度程序及其执行程序和作业存储。
    scheduler.shutdown(wait = True)
    # wait(bool)– True等待所有当前正在执行的作业完成
    
  5. 暂停调度程序中的作业处理。
    scheduler.pause()
    
  6. 在调度程序中恢复作业处理。
    scheduler.resume()
    
  7. 将给定的作业添加到作业列表中,如果调度程序已经在运行,则将其唤醒。
    scheduler.add_job(func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', executor='default', replace_existing=False, **trigger_args)
    # func –在给定时间运行的可调用(或对一个文本的引用)
    # trigger(str[date|interval|cron] | apscheduler.triggers.base.BaseTrigger)–确定何时func调用的触发器
    # args(list | tuple)–用来调用func的位置参数列表
    # kwargs(dict)–使用以下参数调用func的关键字参数的字典
    # id(str | unicode)–作业的显式标识符(稍后进行修改)
    # name(str | unicode)–工作的文字描述
    # misfire_grace_time(int)–在指定的运行时之后仍允许运行作业的秒数
    # coalesce(布尔)–如果调度程序确定该作业应连续运行多次,则运行一次而不是多次
    # max_instances(int)–此作业允许的并发运行实例的最大数量
    # next_run_time(datetime)–何时首次运行作业,而不考虑触发器(通过 None将作业添加为暂停状态)
    # jobstore(str | unicode)–作业存储的别名,用于将作业存储在
    # executor(str | unicode)–运行作业的执行者的别名
    # replace_existing(bool)– True用相同的作业替换现有作业id (但保留现有作业的运行次数)
    
  8. 修改单个作业的属性。
    scheduler.modify_job(job_id, jobstore=None, **changes)
    # job_id(str | unicode)–作业的标识符
    # jobstore(str | unicode)–包含作业的作业存储库的别名
    
  9. 使给定作业在明确恢复之前不执行。
    scheduler.pause_job((job_id, jobstore=None)
    # job_id(str | unicode)–作业的标识符
    # jobstore(str | unicode)–包含作业的作业存储库的别名
    
  10. 恢复给定作业的计划,如果计划已完成,则将其删除。
    scheduler.resume_job((job_id, jobstore=None)
    # job_id(str | unicode)–作业的标识符
    # jobstore(str | unicode)–包含作业的作业存储库的别名
    
  11. 返回与给定匹配的Job job_id。
    scheduler.get_job(job_id, jobstore=None)
    # job_id(str | unicode)–作业的标识符
    # jobstore(str | unicode)–最有可能包含作业的作业存储库的别名
    
  12. 从特定的作业存储或所有作业中返回挂起作业(如果尚未启动调度程序)和计划作业的列表。
    scheduler.get_jobs(jobstore=None, pending=None)
    # jobstore(str | unicode)–作业存储的别名
    
  13. 删除作业,使其无法再运行。
    scheduler.remove_job(job_id, jobstore=None)
    # job_id(str | unicode)–作业的标识符
    # jobstore(str | unicode)–包含作业的作业存储库的别名
    
  14. 从指定的作业存储中删除所有作业,或者如果没有给出任何作业,则删除所有作业存储。
    scheduler.remove_all_jobs(jobstore=None)
    # jobstore(str | unicode)–作业存储的别名
    
  15. 将执行程序添加到此调度程序。
    scheduler.add_executor(executor, alias ='default', ** executor_opts)
    # executor (str|unicode|apscheduler.executors.base.BaseExecutor) – 执行程序实例或执行程序插件的名称
    # alias (str|unicode) – 调度程序的别名
    
  16. 从此调度程序中删除具有给定别名的执行程序。
    scheduler.remove_executor(alias, shutdown=True)
    # alias (str|unicode) - 执行者的别名
    # shutdown (bool) – True删除执行程序后将其关闭
    
  17. 添加调度程序事件的侦听器。
    scheduler.add_listener(callback,mask = EVENT_ALL)
    # callback -带有一个参数的任何可调用对象
    # mask(int)–位掩码,指示应侦听哪些事件
    
  18. 删除以前添加的事件侦听器。
    scheduler.remove_listener()
    

3. 作业存储器

github代码示例
https://github.com/agronholm/apscheduler/tree/master/examples/jobstores

默认值 - Memory

from datetime import datetime
import os

from apscheduler.schedulers.blocking import BlockingScheduler

def tick():
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

SQLAlchemy

from datetime import datetime, timedelta
import sys
import os

from apscheduler.schedulers.blocking import BlockingScheduler

def alarm(time):
    print('Alarm! This alarm was scheduled at %s.' % time)

if __name__ == '__main__':
    scheduler = BlockingScheduler()
    url = sys.argv[1] if len(sys.argv) > 1 else 'sqlite:///example.sqlite'
    scheduler.add_jobstore('sqlalchemy', url=url)
    alarm_time = datetime.now() + timedelta(seconds=10)
    scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
    print('To clear the alarms, delete the example.sqlite file.')
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

MongoDB

from datetime import datetime, timedelta
import sys
import os

from apscheduler.schedulers.blocking import BlockingScheduler

def alarm(time):
    print('Alarm! This alarm was scheduled at %s.' % time)

if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_jobstore('mongodb', collection='example_jobs')
    if len(sys.argv) > 1 and sys.argv[1] == '--clear':
        scheduler.remove_all_jobs()

    alarm_time = datetime.now() + timedelta(seconds=10)
    scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
    print('To clear the alarms, run this example with the --clear argument.')
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

Redis

from datetime import datetime, timedelta
import sys
import os

from apscheduler.schedulers.blocking import BlockingScheduler

def alarm(time):
    print('Alarm! This alarm was scheduled at %s.' % time)

if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')
    if len(sys.argv) > 1 and sys.argv[1] == '--clear':
        scheduler.remove_all_jobs()

    alarm_time = datetime.now() + timedelta(seconds=10)
    scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
    print('To clear the alarms, run this example with the --clear argument.')
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

3. 执行器

from datetime import datetime
import os

from apscheduler.schedulers.blocking import BlockingScheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_executor('processpool')
    scheduler.add_job(tick, 'interval', seconds=3)
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass
栏目分类全部>
腾讯云采购季云服务器一折促销