300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > python任务调度框架_Python任务调度模块APScheduler

python任务调度框架_Python任务调度模块APScheduler

时间:2020-06-20 01:51:46

相关推荐

python任务调度框架_Python任务调度模块APScheduler

一、APScheduler 是什么&APScheduler四种组成部分?

APScheduler全程为Advanced Python Scheduler,是一款轻量级的Python任务调度框架。它允许你像Cron那样安排定期执行的任务,并且支持Python函数或任意可调用的对象。

1、调度器(scheduler)

调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。

对于不同的不场景,可以选择的调度器:

BlockingScheduler : 当调度器是你应用中唯一要运行的东西时。

BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(常用)。

AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。

GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。

TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。

TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用

QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。

# BackgroundScheduler: 调度器在后台线程中运行,不会阻塞当前线程。

from apscheduler.schedulers.background import BackgroundScheduler

import time

scheduler = BackgroundScheduler()

def job1():

print "%s: 执行任务" % time.asctime()

scheduler.add_job(job1, 'interval', seconds=3)

scheduler.start()

while True:

pass

2、作业存储(job store)

作业存储(job store)主要用来存储被调度的作业,默认的作业存储是简单地把作业保存在内存中。

jobstore提供对scheduler中job的增删改查接口,根据存储方式的不同,分以下几种:

MemoryJobStore:没有序列化,jobs就存在内存里,增删改查也都是在内存中操作

SQLAlchemyJobStore:所有sqlalchemy支持的数据库都可以做为backend,增删改查操作转化为对应backend的sql语句

MongoDBJobStore:用mongodb作backend

RedisJobStore:用redis作backend

RethinkDBJobStore:用rethinkdb作backend

ZooKeeperJobStore:用ZooKeeper做backend

3、执行器(executor)

执行器(executor)主要处理任务的运行,主要是把定时任务中的可调用对象(function)提交给一个一个线程或者进程来进行。当任务完成时,执行器将会通知调度器。

最常用的 执行器(executor) 有两种:

ProcessPoolExecutor(进程池)

ThreadPoolExecutor(线程池,max:10)

4、触发器(triggers)

当调度一个任务时,需要为它设置一个触发器(triggers)。触发器决定在什么日期/时间、用什么样的形式来执行执行这个定时任务。

APScheduler 有三种内建的 trigger:

date: 指定某个确定的时间点,job仅执行一次。

interval: 指定时间间隔(fixed intervals)周期性执行。

cron: 使用cron风格表达式周期性执行,用于(在指定时间内)定期运行job的场景。使用同linux下crontab的方式。

4.1、date 定时调度(作业只会执行一次)

参数如下:

run_date (datetime|str) – 作业的运行日期或时间

timezone (datetime.tzinfo|str) – 指定时区

sched.add_job(job_function, 'date', run_date=date(, 12, 12), args=['text'])

sched.add_job(job_function, 'date', run_date=datetime(, 12, 12, 12, 0, 0), args=['text'])

4.2、interval: 每隔一段时间执行一次

weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None

weeks (int) – 间隔几周

days (int) – 间隔几天

hours (int) – 间隔几小时

minutes (int) – 间隔几分钟

seconds (int) – 间隔多少秒

start_date (datetime|str) – 开始日期

end_date (datetime|str) – 结束日期

timezone (datetime.tzinfo|str) – 时区

scheduler.add_job(my_job, 'interval', hours=2)

scheduler.add_job(my_job, 'interval', hours=2, start_date='-9-8 21:30:00', end_date='-06-15 21:30:00)

@scheduler.scheduled_job('interval', id='my_job_id', hours=2)

def my_job():

print("Hello World")

4.3、cron: 使用同linux下crontab的方式

(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

除了week和 day_of_week,它们的默认值是 *

例如 day=1, minute=20 ,这就等于

year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0

工作将在每个月的第一天以每小时20分钟的时间执行

表达式

参数类型

描述

*

所有

通配符。例: minutes=* 即每分钟触发

*/a

所有

可被a整除的通配符。

a-b

所有

范围a-b触发

a-b/c

所有

范围a-b,且可被c整除时触发

xth y

第几个星期几触发。x为第几个,y为星期几

last x

一个月中,最后个星期几触发

last

一个月最后一天触发

x,y,z

所有

组合表达式,可以组合确定值或上方的表达式

year (int|str) – 年,4位数字

month (int|str) – 月 (范围1-12)

day (int|str) – 日 (范围1-31)

week (int|str) – 周 (范围1-53)

day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)

hour (int|str) – 时 (范围0-23)

minute (int|str) – 分 (范围0-59)

second (int|str) – 秒 (范围0-59)

start_date (datetime|str) – 最早开始日期(包含)

end_date (datetime|str) – 最晚结束时间(包含)

timezone (datetime.tzinfo|str) – 指定时区

sched.add_job(my_job, 'cron', hour=3, minute=30)

sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='-10-30')

@sched.scheduled_job('cron', id='my_job_id', day='last sun')

def some_decorated_task():

print("I am printed at 00:00:00 on the last Sunday of every month!")

二、 How:APSched 怎么用?

安装

pip 安装

pip install apscheduler

python setup.py install

快速上手

# first.py

from apscheduler.schedulers.blocking import BlockingScheduler

import time

# 实例化一个调度器

scheduler = BlockingScheduler()

def job1():

print "%s: 执行任务" % time.asctime()

# 添加任务并设置触发方式为3s一次

scheduler.add_job(job1, 'interval', seconds=3)

# 开始运行调度器

scheduler.start()

执行输出:

> python first.py

Fri Sep 8 20:41:55 : 执行任务

Fri Sep 8 20:41:58 : 执行任务

...

任务操作

1、添加任务

方法一:调用add_job()方法

调用add_job()方法返回一个apscheduler.job.Job 的实例,可以用来改变或者移除 job

job = scheduler.add_job(myfunc, 'interval', minutes=2)

方法二:使用装饰器scheduled_job()

适用于应用运行期间不会改变的 job

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

# 装饰器

@sched.scheduled_job('interval', id='my_job_id', seconds=5)

def job_function():

print("Hello World")

# 开始

sched.start()

2、删除任务

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')

scheduler.remove_job('my_job_id')

job = scheduler.add_job(myfunc, 'interval', minutes=2)

job.remove()

3、暂停&继续任务

可以通过Job实例或调度程序本身轻松暂停和恢复作业。 当作业暂停时,下一个运行时间将被清除,直到作业恢复,不会再计算运行时间。 要暂停作业,请使用以下任一方法:

job = scheduler.add_job(myfunc, 'interval', minutes=2)

job.pause()

job.resume()

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')

scheduler.pause_job('my_job_id')

scheduler.resume_job('my_job_id')

4、修改任务属性

job.modify(max_instances=6, name='Alternate name')

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

5、获得job列表

使用get_jobs()来获取所有的job实例。

使用print_jobs()来输出所有格式化的作业列表。

使用get_job(任务ID)获取指定任务的作业列表。

apscheduler.get_jobs()

6、开始&关闭任务

scheduler.start() #开启

scheduler.shotdown(wait=True|False) #关闭 False 无论任务是否执行,强制关闭

三、一些定时任务脚本

1、定时任务运行脚本每日凌晨00:30:30执行

import datetime

from apscheduler.schedulers.blocking import BlockingScheduler

from app.untils.log_builder import sys_logging

scheduler = BlockingScheduler() # 后台运行

# 设置为每日凌晨00:30:30时执行一次调度程序

@scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30')

def rebate():

print "schedule execute"

sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

if __name__ == '__main__':

try:

scheduler.start()

sys_logging.debug("statistic scheduler start success")

except (KeyboardInterrupt, SystemExit):

scheduler.shutdown()

sys_logging.debug("statistic scheduler start-up fail")

2、每天晚上0点 - 早上8点期间,每5秒执行一次任务。

# 每天晚上0点 - 早上8点期间,每5秒执行一次任务。

scheduler.add_job(my_job, 'cron',day_of_week='*',hour = '0-8',second = '*/5')

3、在0、10、20、30、40、50分时执行任务。

scheduler.add_job(my_job, 'cron',day_of_week='*',minute = '*/10')

4、直到-05-30,每周从周一到周五的早上5:30都执行一次定时任务

sched.add_job(my_job(),'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='-05-30')

sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='-12-31')

5、在6,7,8,11,12月的第3个周五的1,2,3点执行定时任务

sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

6、每5秒执行该程序一次

sched.add_job(my_job,'cron',second='*/5')

参考资料

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。