Django定时任务四种实现方法总结
背景: 在使用Django框架开发web项目时,很多时候需要设置定时任务或让用户手动在页面上设置定时任务。
一、使用django-crontab插件来实现定时任务
要使用django-crontab插件,只需要下载一个django-crontab包就可以使用cron表达式在Django框架中设置定时任务。这种方法不支持windows系统,功能也相对简单。
pip install django-crontab
(1)在settings.py中INSTALLED_APPS引入app,完成子应用注册。
INSTALLED_APPS = [
...
'django_crontab'
]
(2)在settings.py中配置定时任务,在settings.py最后增加一下代码:
# 定时任务
'''
* * * * * :分别表示 分(0-59)、时(0-23)、天(1 - 31)、月(1 - 12) 、周(星期中星期几 (0 - 7) (0 7 均为周天))
crontab范例:
每五分钟执行 */5 * * * *
每小时执行 0 * * * *
每天执行 0 0 * * *
每周一执行 0 0 * * 1
每月执行 0 0 1 * *
每天23点执行 0 23 * * *
'''
CRONJOBS = [
('*/1 * * * *', 'base.crontabs.confdict_handle', ' >> /tmp/logs/confdict_handle.log'), # 注意:/tmp/base_api 目录要手动创建
]
或者:
CRONJOBS = [
('*/5 * * * *', 'appname.cron.test','>>/home/test.log')
]
'''
‘/5 * * *’ 遵循的是crontab 语法。
‘appname.cron.test’,这个appname就是你开发时加入到settings中的那个。因为你的cron.py文件就在这个下面,否则找不到路径。cron 就是你自己起的任务文件的名字。test就是执行的函数中的内容。
‘>>/home/test.log’,通常会输出信息到一个文件中,就使用这个方法,注意的是‘>>’表示追加写入,’>’表示覆盖写入。
'''
一、附件部分(Linux 中的定时任务crontab的语法如下)
* * * * * command
分钟(0-59) 小时(0-23) 每个月的哪一天(1-31) 月份(1-12) 周几(0-6) shell脚本或者命令
CRONJOBS = [('*/5 * * * *', '任务路径.任务函数名','>>/home/book.log')]
参数说明:
‘*/5 * * *’ 表示五分钟一次,而django-crontab是调用Linux的crontab.
常见的参数:
应用示例:
- 每两个小时
0 */2 * * *
- 晚上11点到早上8点之间每两个小时,早上8点
0 23-7,8 * * *
- 每个月的4号和每个礼拜的礼拜一到礼拜三的早上11点
0 11 4 * 1-3
- 1月1日早上4点
0 4 1 1 *
quad
有兴趣的小伙伴可以深入研究下 Linux 的crontab定时任务。参考链接:django 定时任务 django-crontab 的使用
在执行脚本中:
0 6 * * * commands >> /tmp/test.log # 每天早上6点执行, 并将信息追加到test.log中
0 */2 * * * commands # 每隔2小时执行一次
在本例中是在apps/base/crontabs.py
(一般,执行add new file under appname, named cron.py)中增加的定时任务
from .models import ConfDict # base内的一个model,定时任务多数用来操作数据库,因此给一个示例
import datetime
# 定时任务
def confdict_handle():
try:
objs = CondDict.objects.all()
print(obj)
loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('本地时间:'+str(loca_time))
except Exception as e:
print('发生错误,错误信息为:', e)
python manage.py crontab add
python manage.py crontab show
- 删除所有定时任务
python manage.py crontab remove
corntab -e
(4)查看定时任务
crontab -l
1.5、django-crontab插件优缺点:
- 优点:
简单、方便、易于管理
和django服务是分离的,不会影响到django对外提供的web服务。 - 缺点:
无法在Windows平台上运行(django_crontab必须在Linux的crontab开启的情况下才可以使用,不然会出现不执行的情况);
就算在Linux系统上,也可能出现运行了没有效果的消息,至今未知原因。
二、使用django-apscheduler插件实现定时任务
django-apscheduler支持三种调度任务:固定时间间隔,固定时间点(日期),Crontab 命令。同时,它还支持异步执行、后台执行调度任务 配置简单、功能齐全、使用灵活、支持windows和linux,适合中小型项目。django-apscheduler中相关的概念和python的定时任务框架apscheduler中的概念是一样的
redis持久化存储时,使用APScheduler,使数据同步。
用户下单后使用,规定30min内必须支付,否则取消订单。
APScheduler 与 crontab 同为定时任务工具,有什么区别?
(1)crontab:
- crontab是Linux系统提供的一个命令,用来完成定时任务;
- 使用django-crontab 扩展 封装了Linux提供的crontab 命令;
- 可以独立于程序之外,不会占用程序资源,耦合性低;
- 但是它不灵活,
比如上面那个订单支付问题,crontab不知道要什么时候执行,所以它做不到
。
2.1、安装插件
pip install django-apscheduler
或者
pip install apscheduler
2.2、使用插件
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_apscheduler', # 新加入的定时任务插件django-apscheduler
'UserManger.apps.UsermangerConfig',
]
因为django-apscheduler会创建表来存储定时任务的一些信息,所以将app加入之后需要迁移数据
python manage.py migrate
from apscheduler.schedulers.background import BackgroundScheduler # 使用它可以使你的定时任务在后台运行
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
import time
'''
date:在您希望在某个特定时间仅运行一次作业时使用
interval:当您要以固定的时间间隔运行作业时使用
cron:以crontab的方式运行定时任务
minutes:设置以分钟为单位的定时器
seconds:设置以秒为单位的定时器
'''
try:
scheduler = BackgroundScheduler() # 创建定时任务的调度器对象——实例化调度器
# 调度器使用DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 'cron'方式循环,周一到周五,每天9:30:10执行,id为工作ID作为标记
# ('scheduler',"interval", seconds=1) #用interval方式循环,每一秒执行一次
@register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')
#@register_job(scheduler, "interval", seconds=5)
def my_job(param1, param2): # 定义定时任务
# 定时每5秒执行一次
#t_now = time.localtime()
print(time.strftime('%Y-%m-%d %H:%M:%S'))
# 监控任务
register_events(scheduler)
# 向调度器中添加定时任务
scheduler.add_job(my_job, 'date', args=[100, 'python'])
# 启动定时任务调度器工作——调度器开始
scheduler.start()
except Exception as e:
print('定时任务异常:%s' % str(e))
apscheduler定时任务会跟随django项目一起运行因此直接启动django即可。
python manage.py runserver
2.7、django-apscheduler插件优缺点:
- 优点:
简单、定时方式丰富
轻量 - 缺点:
定时任务会跟随django项目一起运行,会影响django对外提供的web服务
使用uwsgi线上运行时,很难启动apscheduler定时任务
按照文档上的设置 —enable–threads 依然没有正常启动,具体原因未知,也查了很多方法,都没有解决。并且也不建议将定时任务和uwsgi放在一起运行,这样定时任务在运行过程中可能会影响uwsgi的服务。
二、附件部分(django-apscheduler功能详解)
创建任务:
quad
1. 装饰器
在任意view.py中实现代码
(我习惯新开一个app专门实现定时任务
):
典型范例:
import time
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_job, register_events
print('django-apscheduler')
def job2(name):
# 具体要执行的代码
print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))
# 实例化调度器
scheduler = BackgroundScheduler()
# 调度器使用DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 添加任务1
# 每隔5s执行这个任务
@register_job(scheduler,"interval", seconds=5,args=['王路'],id='job1')
# 每天8点半执行这个任务
#@register_job(scheduler, 'cron', id='test', hour=8, minute=30,args=['test'])
def job1(name):
# 具体要执行的代码
print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))
scheduler.add_job(job2,"interval",seconds=10,args=['王飞'],id="job2")
# 监控任务——注册定时任务
register_events(scheduler)
# 调度器开始运行
scheduler.start()
启动服务 python manage.py runserver
这个任务就会被存储到django_apscheduler_djangojob表
中,并按照设置定时的执行程序。
- scheduler: 指定调度器
- trigger: 任务执行的方式,共有三种: ‘date’、‘interval’、‘cron’。
- ‘date’ + ‘run_date’ 的参数组合, 能实现单次任务。
例子: 2019-07-07 22:49:00 执行任务
@register_job(scheduler, ‘date’, id=‘test’, run_date=‘2019-07-07 22:49:00’)
注:在亲测时,执行完任务会报错,原因时执行完任务后会去mysql中删除djangojob表中的任务。但是djangojobexecution表记录着执行结果,有外键关联着djangojob表,所以删除时显示有外键约束错误。但是任务会正常执行,执行之后也会正常删除。 - ‘interval’ + ‘hours’ + ‘minutes’ + … 的参数组合,能实现间隔性任务。
例子:每隔3个半小时执行任务
@register_job(scheduler, ‘interval’, id=‘test’, hours=3, minutes=30)
还有seconds,days参数可以选择
注:如果任务需要执行10秒,而间隔设置为1秒,它是不会给你开10个线程同时去执行10个任务的。它会错过其他任务直到当前任务完成。 - ‘cron’ + ‘hour’ + ‘minute’+…的参数组合,能实现cron类的任务。
例子:每天的8点半执行任务
@register_job(scheduler, ‘cron’, id=‘test’, hour=8, minute=30)
还有day,second,month等参数可以选择。
- ‘date’ + ‘run_date’ 的参数组合, 能实现单次任务。
- id: 任务的名字,不传的话会自动生成。不过为了之后对任务进行暂停、开启、删除等操作,建议给一个名字。并且是唯一的,如果多个任务取一个名字,之前的任务就会被覆盖。
- args: list类型。执行代码所需要的参数。
- next_run_time:datetime类型。开始执行时间。如果你现在创建一个定时任务,想3天后凌晨三点半自动给你女朋友发微信,那就需要这个参数了。
2. add_job函数
装饰器的方法适合于写代码的人自己创建任务,如果想让用户通过页面输入参数,并提交来手动创建定时任务,就需要使用add_job函数
。
下面这个小例子,前端传递json数据给后端,触发test_add_task函数
,来添加任务:
import json
from django.http import JsonResponse
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), 'default')
# 与前端的接口
def test_add_task(request):
if request.method == 'POST':
content = json.loads(request.body.decode()) # 接收参数
try:
start_time = content['start_time'] # 用户输入的任务开始时间, '10:00:00'
start_time = start_time.split(':')
hour = int(start_time)[0]
minute = int(start_time)[1]
second = int(start_time)[2]
s = content['s'] # 接收执行任务的各种参数
# 创建任务
scheduler.add_job(test, 'cron', hour=hour, minute=minute, second=second, args=[s])
code = '200'
message = 'success'
except Exception as e:
code = '400'
message = e
back = {
'code': code,
'message': message
}
return JsonResponse(json.dumps(data, ensure_ascii=False), safe=False)
# 具体要执行的代码
def test(s):
pass
register_events(scheduler)
scheduler.start()
add_job函数参数说明:
和装饰器的参数大同小异,只是第一个参数不同。
如果具体要执行的函数和调用它的函数在一个文件中,那么只需要传递这个函数名就可以了(如上面的例子)。
但是我习惯将具体的业务代码写到另外一个文件中,view.py中只写前后端交互的接口函数,这种情况下传递的参数为一个字符串,格式为: ‘package.module:some.object’,即 包名.模块:函数名
参考链接:详解django-apscheduler的使用方法
基础组件:
quad
APScheduler 有四种组件
,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)
。
schedulers(调度器):
它提供 7 种调度器,能够满足我们各种场景的需要。例如:后台执行某个操作,异步执行操作等。调度器分别是:
- BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。
- BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。(Django框架使用)
- AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
- GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 – GeventExecutor 配合使用。
- TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
- TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。scrapy爬虫框架
- QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。
它提供 3种内建的 trigger:
参数 | 说明 |
---|---|
run_date (datetime 或 str) | 作业的运行日期或时间 |
timezone (datetime.tzinfo 或 str) | 指定时区 |
典型范例1:
# 在 2017-12-13 时刻运行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
# 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
# 在 2020-12-13 14:00:01 时刻运行一次 job_func 方法
scheduler.add_job(job3,"date",run_date='2020-12-13 14:00:01',args=['王飞'],id="job3")
典型范例2:
# 每天0点执行函数的代码,0点的话,hour可以不用写
app.scheduler.add_job(函数名, "cron", hour=0, args=[函数需要传的参数])
#每天凌晨3点执行代码
app.scheduler.add_job(函数名, "cron", hour=3, args=[app])
#如果date后面没有参数的话,就是立刻执行代码,一般测试的时候用
app.scheduler.add_job(函数名, "date", args=[app])
参数 | 说明 |
---|---|
weeks (int) | 间隔几周 |
days (int) | 间隔几天 |
hours (int) | 间隔几小时 |
minutes (int) | 间隔几分钟 |
seconds (int) | 间隔多少秒 |
start_date (datetime 或 str) | 开始日期 |
end_date (datetime 或 str) | 结束日期 |
timezone (datetime.tzinfo 或str) | 时区 |
典型范例:
# 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2)
# 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
参数 | 说明 |
---|---|
year (int 或 str) | 年,4位数字 |
month (int 或 str) | 月 (范围1-12) |
day (int 或 str) | 日 (范围1-31 |
week (int 或 str) | 周 (范围1-53) |
day_of_week (int 或 str) | 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) |
hour (int 或 str) | 时 (范围0-23) |
minute (int 或 str) | 分 (范围0-59) |
second (int 或 str) | 秒 (范围0-59) |
start_date (datetime 或 str) | 最早开始日期(包含) |
end_date (datetime 或 str) | 最晚结束时间(包含) |
timezone (datetime.tzinfo 或str) | 指定时区 |
典型范例:
# 在每天的2点35分36分37分 执行 job_func 任务
scheduler.add_job(job4,"cron",hour='2', minute='35-37',args=['王涛'],id="job4")
quad
有两种添加方法,其中一种是add_job()
, 另一种则是@register_job()修饰器
来修饰函数。
quad
这个两种办法的区别是:第一种方法返回一个 Job 的实例,可以用来改变或者移除 job。第二种方法只适用于应用运行期间不会改变的 job。
典型范例:
@register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')·
def test_job():
t_now = time.localtime()
print(t_now)
其他功能:
django-apscheduler框架还提供了很多操作定时任务的函数。比如:
- 删除任务:
scheduler.remove_job(job_name)
- 暂停任务:
scheduler.pause_job(job_name)
- 开启任务:
scheduler.resume_job(job_name)
- 获取所有任务:
scheduler.get_jobs()
- 修改任务:
scheduler.modify_job(job_name)
注:修改任务只能修改参数,如果要修改执行时间的话,有3种方法
第一就把任务删了重新创建,
第二直接操作数据库,
第三用到下面重设任务。
可以在页面上做一个这样的表格,再加上简单的前后端交互就可以让用户自行管理定时任务:
其他的还有一些辅助功能(包括显示所有任务,显示任务的执行时间等),同学们可以自行查看。
- 重设任务:
scheduler.reschedule_job(job_name)
scheduler.reschedule_job(job_id="job1", trigger='interval', minutes=1)
执行器(executor):
quad
执行调度任务的模块。最常用的 executor 是 ThreadPoolExecutor,存储路径是在Django数据库中。
执行器 executors 可以使用线程池(ThreadPoolExecutor)和进程池(ProcessPoolExecutor)
- 线程池:
from apscheduler.executors.pool import ThreadPoolExecutor
ThreadPoolExecutor(max_workers)
ThreadPoolExecutor(20) # 最多20个线程同时执行
使用方法:
executors = {
'default': ThreadPoolExecutor(20)
}
scheduler = BackgroundScheduler(executors=executors)
- 进程池:
from apscheduler.executors.pool import ProcessPoolExecutor
ProcessPoolExecutor(max_workers)
ProcessPoolExecutor(5) # 最多5个进程同时执行
使用方法:
executors = {
'default': ProcessPoolExecutor(3)
}
scheduler = BackgroundScheduler(executors=executors)
参考链接:定时任务APScheduler,随时可以保持数据同步
总结:
django-apscheduler使用起来十分方便。提供了基于日期、固定时间间隔以及crontab 类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业,并且作业会存储在数据库中,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。django-apscheduler可以当作一个跨平台的调度工具来使用,可以做为 linux 系统crontab 工具或 windows 计划任务程序的替换。需要注意的是,apscheduler不是一个守护进程或服务,它自身不带有任何命令行工具。它主要是要在现有的应用程序中运行。
对job的操作:
移除job:
参考链接:Django高级特性:django-apscheduler定时任务
【以下内容未经验证,选择性参考】:摘自定时任务APScheduler,随时可以保持数据同步
scheduler.start()
- 停止APScheduler运行(如果报错,调度器就立即停止执行):
scheduler.shutdown()
参考范例【未验证】01:
from apscheduler.scheduler import Scheduler
from time import sleep
def task_Fun():
'''
这里写定时任务
'''
sleep(1)
sched = Scheduler()
@sched.interval_schedule(seconds=6)
def my_task1():
print('定时任务1开始n')
task_Fun()
print('定时任务1结束n')
@sched.interval_schedule(hours=4)
def my_task2():
print('定时任务2开始n')
sleep(1)
print('定时任务2结束n')
sched.start()
ok。启动django 项目,定时任务就会在你设定的时间执行了
参考链接:Django使用apscheduler完成定时任务
参考范例【未验证】02:
from django.utils import timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
#from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
#异步式的
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=timezone.get_current_timezone())
#阻塞的,适用于scheduler是独立服务的场景。
#scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone())
#scheduler = BlockingScheduler(executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone())
def myjob():
pass
try:
scheduler.start()
# 5s后执行myjob
# 传入时间去除毫秒
deadline = datetime.datetime.now().replace(microsecond=0) + datetime.timedelta(seconds=5)
scheduler.add_job(myjob, 'date', run_date=deadline)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
参考链接:django apscheduler在特定时间执行一次任务(run at a specify time only once)
三、使用Celery插件实现定时任务
Celery为分布式任务队列
。侧重实时操作,可用于生产系统处理数以百万计的任务,都用于大型项目,配置和使用较为复杂。Django的分布式主要由Celery框架实现,这是python开发的分布式任务队列。由于它本身不支持消息存储服务,所以需要第三方消息服务来传递任务,一般使用Redis。
3.1、安装依赖
本例建立在认为你已经知道如何使用Celery实现异步任务的基础上,需要学习的请移步 Django使用Celery
pip install -U "celery[redis]"
3.2、使用插件
修改settings.py 增加以下代码
....
# Celery配置
from kombu import Exchange, Queue
# 设置任务接受的类型,默认是{'json'}
CELERY_ACCEPT_CONTENT = ['application/json']
# 设置task任务序列列化为json
CELERY_TASK_SERIALIZER = 'json'
# 请任务接受后存储时的类型
CELERY_RESULT_SERIALIZER = 'json'
# 时间格式化为中国时间
CELERY_TIMEZONE = 'Asia/Shanghai'
# 是否使用UTC时间
CELERY_ENABLE_UTC = False
# 指定borker为redis 如果指定rabbitmq CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
# 指定存储结果的地方,支持使用rpc、数据库、redis等等,具体可参考文档 # CELERY_RESULT_BACKEND = 'db+mysql://scott:tiger@localhost/foo' # mysql 作为后端数据库
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 设置任务过期时间 默认是一天,为None或0 表示永不过期
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 设置worker并发数,默认是cpu核心数
# CELERYD_CONCURRENCY = 12
# 设置每个worker最大任务数
CELERYD_MAX_TASKS_PER_CHILD = 100
# 指定任务的位置
CELERY_IMPORTS = (
'base.tasks',
)
# 使用beat启动Celery定时任务
# schedule时间的具体设定参考:https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
CELERYBEAT_SCHEDULE = {
'add-every-10-seconds': {
'task': 'base.tasks.cheduler_task',
'schedule': 10,
'args': ('hello', )
},
}
...
3.3、编写定时任务代码
在本例中是在apps/base/tasks.py中增加的定时任务
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time
@shared_task
def cheduler_task(name):
print(name)
print(time.strftime('%X'))
3.4、如何使用&运行
启动定时任务beat
celery -A dase_django_api beat -l info
启动Celery worker 用来执行定时任务
celery -A dase_django_api worker -l -l info
注意: 这里的 dase_django_api 要换成你的Celery APP的名称
3.5、Celery插件的优缺点:
- 优点:
稳定可靠、管理方便
高性能、支持分布式
Celery侧重于实时操作,可用于生产系统每天处理数以百万计的任务,可用于大型项目。
可在分布的机器、进程、线程上执行任务调度。 - 缺点:
实现复杂
部署复杂
非轻量级
配置和使用较为复杂,需要Redis数据库和多个python第三方库。
四、自建代码实现定时任务
4.1、创建定时任务
在apps/base下创建一个文件名为schedules.py;键入一下内容
import os, sys, time, datetime
import threading
import django
base_apth = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# print(base_apth)
# 将项目路径加入到系统path中,这样在导入模型等模块时就不会报模块找不到了
sys.path.append(base_apth)
os.environ['DJANGO_SETTINGS_MODULE'] ='base_django_api.settings' # 注意:base_django_api 是我的模块名,你在使用时需要跟换为你的模块
django.setup()
from base.models import ConfDict
def confdict_handle():
while True:
try:
loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('本地时间:'+str(loca_time))
time.sleep(10)
except Exception as e:
print('发生错误,错误信息为:', e)
continue
def main():
'''
主函数,用于启动所有定时任务,因为当前定时任务是手动实现,因此可以自由发挥
'''
try:
# 启动定时任务,多个任务时,使用多线程
task1 = threading.Thread(target=confdict_handle)
task1.start()
except Exception as e:
print('发生异常:%s' % str(e))
if __name__ == '__main__':
main()
4.2、如何使用&运行
直接运行脚本即可
python apps/base/schedules.py
4.3、自建代码的优缺点:
优点:
自定义
高度自由
缺点:
过于简单
当任务过多时,占用资源也会增加
参考链接:Django用定时任务比较不同姿势,使用,的,多种,对比
Crontab和APScheduler
django 定时任务 django-crontab & APScheduler 使用
Python 定时任务(schedule, Apscheduler, celery, python-crontab)-爱代码爱编程
定时任务简介
定时任务, linux 自带的 crontab ,windows 自带的任务计划,都可以实现守时任务。没错,操作系统基本都会提供定时任务的实现,但是如果你想要更加精细化的控制,或者说任务程序需要跨平台运行,最好还是自己实现定时任务框架,Python 的 apscheduler 提供了非常丰富而且方便易用的定时任务接口。本文两种方式实现定时任务。可以直接参考目录三Django中使用django-apscheduler,目录二内容测试已通过,相关数据库配置暂时无空尝试。
APScheduler简介
APscheduler全称Advanced Python Scheduler :作用为在指定的时间规则执行指定的作业。
apscheduler的四大组件,分别是Triggers,Job stores,Executors,Schedulers
- triggers 触发器 可以按照日期、时间间隔或者contab表达式三种方式触发
- job stores 作业存储器 指定作业存放的位置,默认保存在内存,也可以保存在各种数据库中
- executors 执行器 将指定的作业提交到线程池或者进程池中运行
- schedulers 作业调度器 常用的有BackgroundScheduler(后台运行)和BlockingScheduler(阻塞式)
Scheduler调度流程:
触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
作业(任务)存储器(job stores):作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。
一个调度器由上方三个组件构成,一般来说,一个程序只要有一个调度器就可以了。开发者也不必直接操作任务储存器、执行器以及触发器,因为调度器提供了统一的接口,通过调度器就可以操作组件,比如任务的增删改查。
调度器组件详解:
根据开发需求选择相应的组件,下面是不同的调度器组件:
-
GeventScheduler Gevent调度器,适用于应用通过Gevent的情况。
-
TornadoScheduler Tornado调度器,适用于构建Tornado应用。
-
TwistedScheduler Twisted调度器,适用于构建Twisted应用。
-
QtScheduler Qt调度器,适用于构建Qt应用。
(1)任务储存器的选择:
要看任务是否需要持久化。如果你运行的任务是无状态的,选择默认任务储存器MemoryJobStore就可以应付。但是,如果你需要在程序关闭或重启时,保存任务的状态,那么就要选择持久化的任务储存器。如果,作者推荐使用SQLAlchemyJobStore并搭配PostgreSQL作为后台数据库。这个方案可以提供强大的数据整合与保护功能。
(2)执行器的选择:
同样要看你的实际需求。默认的ThreadPoolExecutor线程池执行器方案可以满足大部分需求。如果,你的程序是计算密集型的,那么最好用ProcessPoolExecutor进程池执行器方案来充分利用多核算力。也可以将ProcessPoolExecutor作为第二执行器,混合使用两种不同的执行器。
配置一个任务,就要设置一个任务触发器。触发器可以设定任务运行的周期、次数和时间。
(3)APScheduler有三种内置的触发器:
-
date 日期:触发任务运行的具体日期
-
interval 间隔:触发任务运行的时间间隔
-
cron 周期:触发任务运行的周期
一个任务也可以设定多种触发器,比如,可以设定同时满足所有触发器条件而触发,或者满足一项即触发。
触发器代码示例:
date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:
【参数说明】
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
def my_job(text):
print(text)
# 在2019年4月15日执行
scheduler.add_job(my_job, 'date', run_date=date(2019, 4, 15), args=['测试任务'])
scheduler.start()
###########################################################################################
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
def my_job(text):
print(text)
# datetime类型(用于精确时间)
scheduler.add_job(my_job, 'date', run_date=datetime(2019, 4, 15, 17, 30, 5), args=['测试任务'])
scheduler.start()
注意:run_date参数可以是date类型、datetime类型或文本类型。
scheduler.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['测试任务'])
【更多示例代码,参考链接】:python 定时任务APScheduler 使用介绍
【针对上述4个模块的详细介绍,参考博文】Python定时库APScheduler&Django使用django-apscheduler实现定时任务
有价值的实战项目参考
teprunner测试平台定时任务这次终于稳了
本文开发内容
作为测试平台而言,定时任务算是必备要素了,只有跑起来的自动化,才能算是真正的自动化。本文将给测试计划添加定时任务功能,具体如下:
前端效果图:
Django – 定时任务模块设计与实践
页面静态化介绍
1. 页面静态化介绍、
2.什么是页面静态化
2. 首页页面静态化实现
2.首页页面静态化实现
import os
import time
from django.conf import settings
from django.template import loader
from apps.contents.models import ContentCategory
from apps.contents.utils import get_categories
def generate_static_index_html():
"""
生成静态的主页html文件
"""
print('%s: generate_static_index_html' % time.ctime())
# 获取商品频道和分类
categories = get_categories()
# 广告内容
contents = {}
content_categories = ContentCategory.objects.all()
for cat in content_categories:
contents[cat.key] = cat.content_set.filter(status=True).order_by('sequence')
# 渲染模板
context = {
'categories': categories,
'contents': contents
}
# 获取首页模板文件
template = loader.get_template('index.html')
# 渲染首页html字符串
html_text = template.render(context)
# 将首页html字符串写入到指定目录,命名'index.html'
file_path = os.path.join(settings.STATICFILES_DIRS[0], 'index.html')
with open(file_path, 'w', encoding='utf-8') as f:
f.write(html_text)
3. 定时任务crontab静态化首页
对于首页的静态化,考虑到页面的数据可能由多名运营人员维护,并且经常变动,所以将其做成定时任务,即定时执行静态化。
在Django执行定时任务,可以通过 django-crontab扩展来实现。
参考链接:定时器任务django-crontab的使用【静态化高频率页面,增加用户体验】【系统的定时器,独立于项目执行】【刘新宇】
APScheduler常见错误
报错No module named ‘apscheduler.scheduler’
BlockingScheduler算是会实行block阻塞程序运行(会阻塞主线程的运行)
APScheduler中两种调度器的区别及使用过程中要注意的问题
apscheduelr整体实现方式和扩展方式解读
原文地址:https://blog.csdn.net/weixin_42782150/article/details/123212604
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_17123.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!