APScheduler 用户指南

2018-12-12
9分钟阅读时长

本文为笔者学习 Python 常用调度器所写

全文译自 APScheduler 官方文档 原文地址:https://apscheduler.readthedocs.io/en/latest/userguide.html

安装

首选安装方式为通过 pip 安装

pip install apscheduler

你可以下载并运行 get-pip.py 来轻易的安装 pip

某些原因下,pip 不能工作,你可以从 PypI 手动下载 APScheduler 包文件,然后完成安装

python setup.py install

示例代码

包文件中包含了名为 example 的文件夹,可以在该文件夹下找到 APScheduler 的多种使用样例。样例同样也可以 在线浏览

基本概念

APSchedule 有四种基本的组件

  • 触发器(triggers)
  • 作业存储器(job stores)
  • 执行器(executors)
  • 调度器(schedulers)

触发器 包含调度逻辑。每一个任务有自己的调度器来确定何时运行作业。除了初始配置之外,触发器是完全无状态的(stateless)

作业存储器 存储被调度的作业。默认的任务存储器只将作业保存在内存中,其他的任务存储器可以将作业保存在多种数据库中。作业的数据在保存到持久性的数据存储器中时都会被序列化,从中加载时将会被反序列化。除了默认的存储器外,作业存储器不会将作业存储在内存中,而是在扮演一个中间人从后端进行保存,加载,更新,查询等工作。绝对不能在调度器之间共享任务存储器

执行器 处理作业的运行。通常通过将作业中可调用对象提交给线程或者进程池来完成此操作。作业完成后执行器将会通知调度器,调度器随后发出相应的事件

调度器 是剩余的部分。通常来说你只有一个调度器运行在一个应用实例中。开发者一般不会直接处理触发器、作业存储器和执行器,而是通过调度器来提供相应的接口来处理。配置作业存储器和执行器通过调度器来完成,比如添加,修改,删除作业

选择合适的调度器、作业存储器、执行器和触发器

选择的调度器主要取决于您的开发环境和使用 APScheduler 的目的,以下是选择调度器的快速说明:

为了选择合适的作业存储器,你需要确定是否需要持久化作业,如果程序启动时作业总是被重新创建,你可以使用默认值(MemoryJobStore)。但是如果你需要你的作业在应用重启或者崩溃时保留作业调度程序,那么您可以选择您工作环境中最常用的工具。由于 PostgreSQL 强大的数据完整性保护,使用 <code>SQLAlchemyJobStore</code>PostgreSQL 是比较好的选择

如果你使用调度器选择部分所述框架之一,执行器通常会给你选定。否则的话,默认的 <code>ThreadPoolExecutor</code> 应该足以应付大多数情况。如果你的工作负载涉及 CPU 密集操作。则应该考虑 <code>ProcessPoolExecutor</code> 来使用多个 CPU 核心。你也可以同时使用这两者。将 ProcessPoolExecutor 添加为第二执行器

当你调度一个作业时,你需要为它选择一个触发器。触发器决定了作业触发的时间逻辑。APScheduler 有三种内建的触发器类型:

也可以将多个触发器组合成一个触发器,该触发器将在所有参与触发器的约定时间内出发。也可以在任何触发器触发时触发。更多文档请看 <code>combining triggers</code>

你可以在各自的 API 文档页面上找到每个作业存储,执行器,触发器类型的插件名称。

配置调度器

APScheduler 提供了多种方式来配置调度器。你可以使用配置字典,也可以将选项作为关键在参数传递,也可以先实例化调度器,然后再添加作业并配置调度器。这样你就可以在任何环境下的最大灵活性

可以在 <code>BaseScheduler</code> 类的 API 参考上找到调度器级别配置选线的完整列表。调度器子类还包含额外的选项,在相关的 API 文档中已经给出。每个作业存储器和执行器的配置选项也可以在各自相关的 API 文档中找到

假设你希望在你的应用中运行一个默认的作业存储器和执行器的 BackgroundScheduler

from apscheduler.schedulers.background import BackgroundScheduler


scheduler = BackgroundScheduler()

# Initialize the rest of the application here, or before the scheduler initialization

这将为您提供一个 BackgroundScheduler,其 MemoryJobStore 名为“default”,ThreadPoolExecutor 名为“default”,默认最大线程数为 10。

现在假设你需要两个作业存储器使用两个执行器。并且你希望调整新作业的默认值并且设置不同的时区。以下三个示例完全等效,并且提供:

  • 一个名为 mongo 的 MongoDBJobStore
  • 一个名为 default 的 SQLAlchemyJobStore (使用 SQLite)
  • 一个名为 default 的 ThreadPoolExecutor 最大线程为 20
  • 一个名为 processpool 的 ProcessPoolExecutor 最大线程为 5
  • UTC 作为调度器的时区
  • 默认新作业的 coalescing 关闭
  • 新作业的默认最大实例限制为 3

示例 1

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

示例 2

from apscheduler.schedulers.background import BackgroundScheduler


# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

示例 3

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

启动调度器

通过调用 <code>start()</code> 可以很简单的启动调度器。对于 BlockingScheduler 以外的调度器,此调用程序将立刻返回,你可以继续应用程序的初始化过程。

对于 BlockingScheduler,你只能在完成所有初始化步骤之后再调用 start()

注意: 在调度器启动后,你不能再更改任何设置

添加作业

有两种方式添加作业到调度器

  1. 调用 <code>add_job()</code>
  2. 使用 <code>scheduled_job()</code> 装饰函数

第一种方式是最常用的方式。第二种方法主要是方便的声明那些在应用运行时不会更改的作业。 add_job() 方法返回一个 <code>apscheduler.job.Job</code> 实例,可以稍后用来修改或删除作业

你可以在任何时间在调度器上安排作业。如果添加作业时调度器尚未运行。作业将会被暂时调度,当调度器启动时,该作业的第一个运行时间(触发器触发的时间点)才会被计算

必须注意到的是,如果你使用序列化的执行器和作业存储器。它会给你的作业添加一下要求:

  1. 可调用的目标必须是可以全局访问的
  2. 对可调用目标的任何参数必须是可序列化的

内置作业存储器中,只有 MemoryJobStore 不会序列化作业;内置执行器中,只有 ProcessPoolExecutor 会序列化作业

重要:在初始化阶段如果你在持久化作业存储器中调度作业,则必须为作业定义显式 ID 并且使用 replace_existing = True 否则每次应用重启都会获得作业的新副本 技巧:要立即运行作业,在添加作业时忽略 trigger 参数

移除作业

当你从调度器移除一个作业时,他将会从关联的作业存储器中移除,并且不会再执行。有两种方式可以删除作业:

  1. 调用 <code>remove_job()</code> 带有作业的 ID 和作业存储器的别名
  2. 通过由 add_job() 返回的作业实例调用 <code>remove()</code>

后一个方法更方便,但是需要你在某处存储添加作业时返回的对象实例,对于通过scheduled_job()调度的作业,仅能通过第一个方法来删除

当某个作业调度结束(例如:它的触发器再也不会触发),该作业将会自动移除

例:

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

同样的,使用显式的作业 ID

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

暂停和恢复作业

你可以很轻易的通过一个 Job 实例或调度器来暂停和恢复作业。当一个作业被暂停时,下一次的触发时间将会被清楚,并且不再进行计算之后的触发时间,直到该作业被恢复。要暂停一个作业,可以使用以下方法:

要恢复一个作业:

获取作业调度列表

要获得可处理的计划作业列表,可以使用 <code>get_jobs()</code> 方法。该方法会返回一个 Job 实例列表。如果你只想关心某个特定作业存储器中的作业,可以将作业存储器的别名当作参数传入

方便起见,你可以使用 <code>print_jobs</code> 方法,该方法将打印格式化的作业列表,其触发器和下次运行时间

修改作业

你可以通过调用 <code>apscheduler.job.Job.modify()</code><code>modify_job()</code> 来修改任何一个作业的除了 id 之外的任何属性属性

示例

job.modify(max_instances=6, name='Alternate name')

如果你想要重新调度一个作业,即改变触发器可以使用 <code>apscheduler.job.Job.reschedule()</code><code>reschedule_job()</code>。这两个方法会为作业构造一个新的触发器,并且会根据新的触发器重新计算下一个运行时间

示例

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

停止调度器

为了停止调度器,可以执行

scheduler.shutdown()

默认情况下,调度器关闭其作业存储器和执行器,并等待所有当前正在执行的作业完成。如果你不想等待你可以:

scheduler.shutdown(wait=False)

这样将会关闭作业存储器和执行器但是不会等到任何运行的作业完成

暂停和恢复作业处理(调度器暂停)

暂停作业处理是可能的。

scheduler.pause()

这会造成调度器在恢复处理之前不在会唤醒

scheduler.resume()

也可以在暂停状态下启用调度器,即没有第一个唤醒调用

scheduler.start(paused=True)

如果你需要删除以一些你不需要的但是有机会运行的作业这会非常有用

限制并发执行的作业实例数量

默认情况下,一个作业只能有一个实例在一个时间点运行。这意味着,如果作业即将运行但是前一个时间点运行的还没有完成,则最新的作业运行时将会视为失败。在添加作业时指定 max_instances 关键字参数,可以是设定调度器将允许并发运行的该作业实例数量

丢失的作业执行器和合并(coalescing)

有些时候,调度器可能不能在计划运行时执行调度作业,最常见的情况是当作业被在序列化作业存储中调用时,调度器在该作业应该执行后关闭并重启。发生这种情况时,该作业被认为是“失火”,调度器将会根据 misfire_grace_time 选项(可按作业设置或者全局调度器选项设置)来检查每个错过的执行时间,以查看是否仍需要出发作业,这可能导致多次连续执行作业

如果你的用例不希望出现这种情况,则可以使用 coalescing 来将所有的错过执行合并为一个。总而言之,如果 coalescing 选项为开启的,并且调度器看到了一个或多个队列中的作业执行,那么触发器将会执行一次

注意:如果由于可用池中没有可用的线程或者进程而延迟作业,则执行器可能会因为它运行得太晚而挑过它(与最初的运行时间相比)。如果您的应用程序中可能发生这种情况,你可能希望增加执行器的线程/进程数,或者将 misfire_grace_time 的值调整的更高

调度器事件

可以将事件监听器附加到调度器上。调度器在某些情况下被触发,并且可能携带该特定事件的详细附加信息。通过给定合适的 mask 参数到 <code>add_listener()</code> 方法或者将不同的常量放在一起,可以只监听某种特定类型的事件。可调用的接收器可以被带着一个 event 对象调用

有关可用事件及其属性的详细信息,请查看关于 <code>events</code> 模块的文档

示例

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

故障排除

如果调度器没有按照预想来工作,提高 apscheduler logger 的日志级别为 DEBUG 会很有帮助

如果尚未开启日志记录,可以执行以下操作:

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

这应该提供许多关于调度程序内部发生的有用信息。

另外,请确保查看 常见问题 部分,以查看问题是否已解决。

Reporting bugs

A bug tracker is provided by Github.

Getting help

If you have problems or other questions, you can either: