背景: 在使用Django框架开发web项目时,很多时候需要设置定时任务或让用户手动页面设置定时任务。

一、使用django-crontab插件来实现定时任务

1.0、 基本介绍

要使用django-crontab插件,只需要下载一个django-crontab包就可以使用cron表达式在Django框架中设置定时任务。这种方法支持windows系统功能也相对简单

1.1、安装插件

pip install django-crontab

1.2、注册app

(1)在settings.py中INSTALLED_APPS引入app,完成子应用注册

INSTALLED_APPS = [
    ...
    'django_crontab'
]

(2)在settings.py中配置定时任务,在settings.py最后加一代码


# 定时任务
'''
*    *    *    *    * :分别表示 分(0-59)、时(0-23)、天(1 - 31)、月(1 - 12) 、周(星期中星期几 (0 - 7) (0 7 均为周天))
crontab范例:
每五分钟执行    */5 * * * *
每小时执行     0 * * * *
每天执行       0 0 * * *
每周一执行       0 0 * * 1
每月执行       0 0 1 * *
每天23点执行   0 23 * * *
'''
CRONJOBS = [
    ('*/1 * * * *', 'base.crontabs.confdict_handle', ' >> /tmp/logs/confdict_handle.log'), # 注意:/tmp/base_api 目录手动创建
]

或者:

CRONJOBS = [
    ('*/5 * * * *', 'appname.cron.test','>>/home/test.log')
]

'''
‘/5 * * *’ 遵循的是crontab 语法。
‘appname.cron.test’,这个appname就是你开发加入settings中的那个。因为你的cron.py文件就在这个下面,否则找不到路径cron 就是你自己起的任务文件名字test就是执行的函数中的内容。
‘>>/home/test.log’,通常会输出信息一个文件中,就使用这个方法,注意的是‘>>’表示追加写入,’>’表示覆盖写入。
'''

一、附件部分(Linux 中的定时任务crontab语法如下

*  *  *  *  * command
分钟(0-59) 小时(0-23) 每个月的哪一天(1-31) 月份(1-12) 周几(0-6) shell脚本或者命令

核心语法

CRONJOBS = [('*/5 * * * *', '任务路径.任务函数名','>>/home/book.log')]

参数说明
‘*/5 * * *’ 表示五分钟一次,而django-crontab调用Linuxcrontab.

常见的参数

  1. "*"表示可选的所有取值范围内的数字
  2. "/"表示'每',比如若一个参数为/5,就是五分钟一次,例如:*/5就是每5个单位
  3. - 代表从某个数字到某个数
  4. , 分开几个离散的数字

    quad

应用示例

  • 两个小时 0 */2 * * *
  • 晚上11点到早上8点之间每两个小时,早上8点 0 23-7,8 * * *
  • 每个月的4号和每个礼拜的礼拜一到礼拜三的早上11点 0 11 4 * 1-3
  • 1月1日早上4点 0 4 1 1 *

    quad

有兴趣的小伙伴可以深入研究下 Linuxcrontab定时任务。参考链接django 定时任务 django-crontab 的使用

在执行脚本中:
0 6 * * * commands >> /tmp/test.log # 每天早上6点执行, 并将信息追加test.log中
0 */2 * * * commands # 每隔2小时执行一次

1.3、编写定时任务方法

在本例中是在apps/base/crontabs.py(一般,执行add new file under appname, named cron.py)中增加的定时任务

from .models import ConfDict # base内的一个model,定时任务多数用来操作数据库,因此给一个示例
import datetime

# 定时任务 
def confdict_handle():
    try:
    	objs = CondDict.objects.all()
    	print(obj)
        loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print('本地时间:'+str(loca_time))
    except Exception as e:
        print('发生错误错误信息为:', e)

1.4、如何使用&运行

(1)开启定时器

python manage.py crontab add

(2)将任务添加生效查看开启定时器

python manage.py crontab show
python manage.py crontab remove

(3)重启django服务,执行

corntab -e

(4)查看定时任务

crontab -l

此时,应该可以看到系统创建了该定时任务。

1.5、django-crontab插件优缺点

二、使用django-apscheduler插件实现定时任务

2.0、基本介绍

django-apscheduler支持三种调度任务:固定时间间隔固定时间点(日期),Crontab 命令。同时,它还支持异步执行、后台执行调度任务 配置简单、功能齐全、使用灵活、支持windowslinux,适合中小型项目。django-apscheduler中相关概念python的定时任务框架apscheduler中的概念是一样的

APScheduler的使用场景

redis持久存储时,使用APScheduler,使数据同步
用户下单后使用,规定30min内必须支付,否则取消订单

APScheduler 与 crontab 同为定时任务工具,有什么区别

(1)crontab:

(2)APScheduler:

2.1、安装插件

pip install django-apscheduler

或者

pip install apscheduler

2.2、使用插件

修改settings.py增加以下代码

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'django_apscheduler',     # 新加入的定时任务插件django-apscheduler
    'UserManger.apps.UsermangerConfig',
]

2.3、迁移数据库

因为django-apscheduler会创建表来存储定时任务的一些信息,所以将app加入之后需要迁移数据

python manage.py migrate

数据库中看一看,生成两个表格,大部分都顾名思义。

  1. django_apscheduler_djangojob——用于存储任务的表格
    在这里插入图片描述
  2. django_apscheduler_djangojobexecution——用于存储任务执行状态表格
    在这里插入图片描述

参数说明:

Note:
两个用来管理你所需要的定时任务,然后就开始在任一view.py下写你需要实现的任务:

2.4、完整示例views.py中增加你的定时任务代码

注意:如果在其他文件中添加代码没有效果

from apscheduler.schedulers.background import BackgroundScheduler # 使用它可以使你的定时任务在后台运行
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
import time
'''
date:在您希望在某个特定时间运行一次作业时使用
interval:当您要以固定时间间隔运行作业时使用
cron:以crontab的方式运行定时任务
minutes:设置以分钟为单位定时器
seconds:设置以秒为单位定时器
'''

try:
    scheduler = BackgroundScheduler() # 创建定时任务的调度对象——实例调度
    # 调度器使用DjangoJobStore()
    scheduler.add_jobstore(DjangoJobStore(), "default")
	# 'cron'方式循环,周一到周五,每天9:30:10执行,id工作ID作为标记  
    # ('scheduler',"interval", seconds=1)  #用interval方式循环,每一秒执行一次  
    @register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')  
    #@register_job(scheduler, "interval", seconds=5)
    
    def my_job(param1, param2):  # 定义定时任务
        # 定时每5秒执行一次
        #t_now = time.localtime()
        print(time.strftime('%Y-%m-%d %H:%M:%S'))
    
    # 监控任务 
    register_events(scheduler)
    
	# 向调度器中添加定时任务
	scheduler.add_job(my_job, 'date', args=[100, 'python'])
    # 启动定时任务调度工作——调度器开始 
    scheduler.start()
except Exception as e:
    print('定时任务异常:%s' % str(e))

2.6、如何使用&运行

apscheduler定时任务会跟随django项目一起运行因此直接启动django即可。

python manage.py runserver

2.7、django-apscheduler插件优缺点

二、附件部分(django-apscheduler功能详解)

创建任务:

quad


有两种创建任务的方法装饰器和add_job函数

1. 装饰
在任意view.py中实现代码(我习惯新开一个app专门实现定时任务):

典型范例:

import time
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_job, register_events

print('django-apscheduler')

def job2(name):
    # 具体要执行的代码
    print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))

# 实例化调度器
scheduler = BackgroundScheduler()
# 调度器使用DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 添加任务1
# 每隔5s执行这个任务
@register_job(scheduler,"interval", seconds=5,args=['王路'],id='job1')
# 每天8点半执行这个任务
#@register_job(scheduler, 'cron', id='test', hour=8, minute=30,args=['test'])
def job1(name):
    # 具体要执行的代码
    print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))

scheduler.add_job(job2,"interval",seconds=10,args=['王飞'],id="job2")

# 监控任务——注册定时任务
register_events(scheduler)
# 调度器开始运行
scheduler.start()

启动服务 python manage.py runserver 这个任务就会被存储django_apscheduler_djangojob表中,并按照设置定时的执行程序

装饰器@register_job()参数说明

  1. scheduler: 指定调度器
  2. trigger: 任务执行的方式,共有三种: ‘date’、‘interval’、‘cron’。
    • ‘date’ + ‘run_date’ 的参数组合, 能实现单次任务。
      例子: 2019-07-07 22:49:00 执行任务
      @register_job(scheduler, ‘date’, id=‘test’, run_date=‘2019-07-07 22:49:00’)
      注:在亲测时,执行完任务会报错,原因时执行完任务后会去mysql删除djangojob表中的任务。但是djangojobexecution表记录着执行结果,有外键关联着djangojob表,所以删除显示外键约束错误。但是任务会正常执行,执行之后也会正常删除。
    • ‘interval’ + ‘hours’ + ‘minutes’ + … 的参数组合,能实现间隔性任务。
      例子:每隔3个半小时执行任务
      @register_job(scheduler, ‘interval’, id=‘test’, hours=3, minutes=30)
      还有seconds,days参数可以选择
      注:如果任务需要执行10秒,而间隔设置为1秒,它是不会给你开10个线程同时去执行10个任务的。它会错过其他任务直到当前任务完成。
    • ‘cron’ + ‘hour’ + ‘minute’+…的参数组合,能实现cron类的任务。
      例子:每天的8点半执行任务
      @register_job(scheduler, ‘cron’, id=‘test’, hour=8, minute=30)
      还有day,second,month等参数可以选择。
  3. id: 任务的名字,不传的话会自动生成。不过为了之后对任务进行暂停、开启、删除等操作建议给一个名字。并且是唯一的,如果多个任务取一个名字,之前的任务就会被覆盖
  4. args: list类型。执行代码所需要的参数。
  5. next_run_time:datetime类型。开始执行时间。如果你现在创建一个定时任务,想3天后凌晨三点半自动给你女朋友发微信,那就需要这个参数了。

还有些其他的参数感兴趣同学可以查看源代码来了解。

2. add_job函数

装饰器的方法适合于写代码的人自己创建任务,如果想让用户通过页面输入参数,并提交手动创建定时任务,就需要使用add_job函数

下面这个小例子前端传递json数据给后端,触发test_add_task函数,来添加任务:

import json
from django.http import JsonResponse
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job


scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), 'default')

# 与前端接口
def test_add_task(request):
    if request.method == 'POST':
        content = json.loads(request.body.decode())  # 接收参数
        try:
            start_time = content['start_time']  # 用户输入的任务开始时间, '10:00:00'
            start_time = start_time.split(':')
            hour = int(start_time)[0]
            minute = int(start_time)[1]
            second = int(start_time)[2]
            s = content['s']  # 接收执行任务的各种参数
            # 创建任务
            scheduler.add_job(test, 'cron', hour=hour, minute=minute, second=second, args=[s])
            code = '200'
            message = 'success'
        except Exception as e:
            code = '400'
            message = e
            
        back = {
            'code': code,
            'message': message
        }
        return JsonResponse(json.dumps(data, ensure_ascii=False), safe=False)
        
# 具体要执行的代码
def test(s):
    pass
    

register_events(scheduler)
scheduler.start()

这样就可以由前端用户来手动设置定时任务了。

add_job函数参数说明
和装饰器的参数大同小异,只是第一个参数不同。
如果具体要执行的函数和调用它的函数在一个文件中,那么只需要传递这个函数名就可以了(如上面的例子)。
但是我习惯将具体的业务代码写到另外一个文件中,view.py中只写前后端交互接口函数,这种情况下传递的参数为一个字符串,格式为: package.module:some.object’,即 包名.模块:函数名

参考链接详解django-apscheduler的使用方法

基础组件

quad


APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)

schedulers(调度器):
它提供 7 种调度器,能够满足我们各种场景的需要。例如:后台执行某个操作异步执行操作等。调度器分别是:

triggers(触发器):

它提供 3种内建的 trigger:

  1. date 触发器 作业任务只会执行一次。它表示特定的时间点触发。它的参数如下
参数 说明
run_date (datetime 或 str) 作业的运行日期或时间
timezone (datetime.tzinfo 或 str) 指定时区

典型范例1:

# 在 2017-12-13 时刻运行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
# 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
# 在 2020-12-13 14:00:01 时刻运行一次 job_func 方法
scheduler.add_job(job3,"date",run_date='2020-12-13 14:00:01',args=['王飞'],id="job3")

典型范例2:

 # 每天0点执行函数的代码,0点的话,hour可以不用写
app.scheduler.add_job(函数名, "cron", hour=0, args=[函数需要传的参数]) 

#每天凌晨3点执行代码
app.scheduler.add_job(函数名, "cron", hour=3, args=[app])

#如果date后面没有参数的话,就是立刻执行代码,一般测试的时候用
app.scheduler.add_job(函数名, "date", args=[app])
  1. interval 触发器 固定时间间隔触发。interval 间隔调度,参数如下
参数 说明
weeks (int) 间隔几周
days (int) 间隔几天
hours (int) 间隔几小时
minutes (int) 间隔几分钟
seconds (int) 间隔多少秒
start_date (datetime 或 str) 开始日期
end_date (datetime 或 str) 结束日期
timezone (datetime.tzinfo 或str) 时区

典型范例:

# 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2)
# 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
  1. cron 触发器 在特定时间周期性地触发,和Linux crontab格式兼容
参数 说明
year (int 或 str) 年,4位数
month (int 或 str) 月 (范围1-12)
day (int 或 str) 日 (范围1-31
week (int 或 str) 周 (范围1-53)
day_of_week (int 或 str) 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str) 时 (范围0-23)
minute (int 或 str) 分 (范围0-59)
second (int 或 str) 秒 (范围0-59)
start_date (datetime 或 str) 最早开始日期(包含)
end_date (datetime 或 str) 最晚结束时间(包含)
timezone (datetime.tzinfo 或str) 指定时区

这些参数是支持算数表达式取值格式如下

在这里插入图片描述

典型范例:

# 在每天的2点35分36分37分 执行 job_func 任务
scheduler.add_job(job4,"cron",hour='2', minute='35-37',args=['王涛'],id="job4")

作业存储(job store):

quad


有两种添加方法,其中一种是 add_job(), 另一种则是@register_job()修饰修饰函数。

quad


这个两种办法的区别是:第一种方法返回一个 Job 的实例,可以用来改变或者移除 job。第二种方法只适用于应用运行期间不会改变的 job。

典型范例:

@register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')·
        def test_job():
           t_now = time.localtime()
           print(t_now)

其他功能:

django-apscheduler框架还提供了很多操作定时任务的函数。比如

  • 删除任务:scheduler.remove_job(job_name)
  • 暂停任务:scheduler.pause_job(job_name)
  • 开启任务:scheduler.resume_job(job_name)
  • 获取所有任务:scheduler.get_jobs()
  • 修改任务:scheduler.modify_job(job_name)

注:修改任务只能修改参数,如果要修改执行时间的话,有3种方法
第一就把任务删了重新创建,
第二直接操作数据库
第三用到下面重设任务。

可以在页面上做一个这样的表格,再加上简单的前后端交互就可以让用户自行管理定时任务:
在这里插入图片描述
其他的还有一些辅助功能(包括显示所有任务,显示任务的执行时间等),同学们可以自行查看

  • 重设任务:scheduler.reschedule_job(job_name)
scheduler.reschedule_job(job_id="job1", trigger='interval', minutes=1)

执行器(executor):

quad


执行调度任务的模块。最常用的 executor 是 ThreadPoolExecutor,存储路径是在Django数据库中。

执行器 executors 可以使用线程池(ThreadPoolExecutor)和进程池(ProcessPoolExecutor)

  1. 线程池:
from apscheduler.executors.pool import ThreadPoolExecutor
ThreadPoolExecutor(max_workers)  
ThreadPoolExecutor(20) # 最多20个线程同时执行

使用方法:

executors = {
      'default': ThreadPoolExecutor(20)
  }
  scheduler = BackgroundScheduler(executors=executors)
  1. 进程池:
from apscheduler.executors.pool import ProcessPoolExecutor
ProcessPoolExecutor(max_workers)
ProcessPoolExecutor(5) # 最多5个进程同时执行

使用方法:

 executors = {
      'default': ProcessPoolExecutor(3)
  }
  scheduler = BackgroundScheduler(executors=executors)

参考链接定时任务APScheduler,随时可以保持数据同步

总结
django-apscheduler使用起来十分方便。提供了基于日期固定时间间隔以及crontab 类型的任务,我们可以在主程序的运行过程快速增加新作业或删除旧作业,并且作业会存储数据库中,当调度器重启时,不必重新添加作业,作业会恢复状态继续执行。django-apscheduler可以当作一个跨平台的调度工具来使用,可以做为 linux 系统crontab 工具windows 计划任务程序的替换。需要注意的是,apscheduler不是一个守护进程或服务,它自身不带有任何命令行工具。它主要是要在现有的应用程序中运行。

对job的操作

  • add_job():会返回一个apscheduler.job.Job的实例,可以用来改变或者移除job。
  • scheduler_job():只适应于应用运行期间不会改变的job

移除job:

参考链接Django高级特性:django-apscheduler定时任务

【以下内容未经验证,选择性参考】:摘自定时任务APScheduler,随时可以保持数据同步

scheduler.start()
  • 停止APScheduler运行(如果报错,调度器就立即停止执行):
scheduler.shutdown()

参考范例【未验证】01:

在任意一个app内的views.py中写好定时任务

from apscheduler.scheduler import Scheduler
from time import sleep


def task_Fun():
    '''
    这里写定时任务
    '''
    sleep(1)



sched = Scheduler()


@sched.interval_schedule(seconds=6)
def my_task1():
    print('定时任务1开始n')
    task_Fun()
    print('定时任务1结束n')

@sched.interval_schedule(hours=4)
def my_task2():
    print('定时任务2开始n')
    sleep(1)
    print('定时任务2结束n')


sched.start()

ok启动django 项目,定时任务就会在你设定的时间执行了

参考链接Django使用apscheduler完成定时任务

参考范例【未验证】02:

from django.utils import timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
#from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
 
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

#异步式的
scheduler = BackgroundScheduler(
    jobstores=jobstores,
    executors=executors,
    job_defaults=job_defaults,
    timezone=timezone.get_current_timezone())
 
#阻塞的,适用于scheduler是独立服务的场景
#scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone())
#scheduler = BlockingScheduler(executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone())
 
def myjob():
    pass
 
try:
    scheduler.start()
    # 5s后执行myjob
    # 传入时间去除毫秒
    deadline = datetime.datetime.now().replace(microsecond=0) + datetime.timedelta(seconds=5)
    scheduler.add_job(myjob, 'date', run_date=deadline)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()

参考链接django apscheduler在特定时间执行一次任务(run at a specify time only once)

三、使用Celery插件实现定时任务

3.0、基本介绍

Celery分布式任务队列。侧重实时操作,可用于生产系统处理数以百万计的任务,都用于大型项目配置和使用较为复杂。Django的分布式主要由Celery框架实现,这是python开发的分布式任务队列。由于它本身不支持消息存储服务,所以需要第三方消息服务来传递任务,一般使用Redis。

3.1、安装依赖
本例建立在认为你已经知道如何使用Celery实现异步任务的基础上,需要学习的请移步 Django使用Celery

本例使用redis作为Borker和backend

pip install -U "celery[redis]"

3.2、使用插件

修改settings.py 增加以下代码

....
# Celery配置
from kombu import Exchange, Queue
# 设置任务接受的类型默认是{'json'}
CELERY_ACCEPT_CONTENT = ['application/json']
# 设置task任务序列列化为json
CELERY_TASK_SERIALIZER = 'json'
# 请任务接受后存储时的类型
CELERY_RESULT_SERIALIZER = 'json'
# 时间格式化中国时间
CELERY_TIMEZONE = 'Asia/Shanghai'
# 是否使用UTC时间
CELERY_ENABLE_UTC = False
# 指定borker为redis 如果指定rabbitmq CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
# 指定存储结果的地方,支持使用rpc数据库redis等等,具体可参考文档 # CELERY_RESULT_BACKEND = 'db+mysql://scott:tiger@localhost/foo' # mysql 作为后端数据库
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 设置任务过期时间 默认是一天,为None或0 表示永不过期
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 设置worker并发数,默认cpu核心数
# CELERYD_CONCURRENCY = 12
# 设置每个worker最大任务数
CELERYD_MAX_TASKS_PER_CHILD = 100
# 指定任务的位置
CELERY_IMPORTS = (
    'base.tasks',
)
# 使用beat启动Celery定时任务
# schedule时间的具体设定参考:https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
CELERYBEAT_SCHEDULE = {
    'add-every-10-seconds': {
        'task': 'base.tasks.cheduler_task',
        'schedule': 10,
        'args': ('hello', )
    },
}
...

3.3、编写定时任务代码

在本例中是在apps/base/tasks.py中增加的定时任务

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

@shared_task
def cheduler_task(name):
    print(name)
    print(time.strftime('%X'))

3.4、如何使用&运行

启动定时任务beat

celery -A dase_django_api beat -l info

启动Celery worker 用来执行定时任务

celery -A dase_django_api worker -l -l info 

注意: 这里的 dase_django_api换成你的Celery APP的名称

3.5、Celery插件的优缺点

四、自建代码实现定时任务

4.1、创建定时任务
apps/base下创建一个文件名为schedules.py;键入一下内容

import os, sys, time, datetime
import threading
import django
base_apth = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# print(base_apth)
# 将项目路径加入系统path中,这样在导入模型模块时就不会报模块找不到了
sys.path.append(base_apth)
os.environ['DJANGO_SETTINGS_MODULE'] ='base_django_api.settings' # 注意:base_django_api 是我的模块名,你在使用时需要跟换为你的模块
django.setup()
from base.models import ConfDict

def confdict_handle():
    while True:
        try:
            loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print('本地时间:'+str(loca_time))
            time.sleep(10)
        except Exception as e:
            print('发生错误错误信息为:', e)
            continue


def main():
    '''
    主函数,用于启动所有定时任务,因为当前定时任务是手动实现,因此可以自由发挥
    '''
    try:
        # 启动定时任务,多个任务时,使用多线程
        task1 = threading.Thread(target=confdict_handle)
        task1.start()
    except Exception as e:
        print('发生异常:%s' % str(e))

if __name__ == '__main__':
    main()

4.2、如何使用&运行

直接运行脚本即可

python apps/base/schedules.py

4.3、自建代码的优缺点
优点:
自定义
高度自由
缺点:
过于简单
当任务过多时,占用资源也会增加

参考链接Django用定时任务比较不同姿势,使用,的,多种,对比
Crontab和APScheduler
django 定时任务 django-crontab & APScheduler 使用

Python 定时任务(schedule, Apscheduler, celery, python-crontab)-爱代码爱编程

定时任务简介

定时任务, linux 自带的 crontab ,windows 自带的任务计划,都可以实现守时任务。没错,操作系统基本都会提供定时任务的实现,但是如果你想要更加精细化的控制,或者说任务程序需要跨平台运行,最好还是自己实现定时任务框架,Python 的 apscheduler 提供了非常丰富而且方便易用的定时任务接口。本文两种方式实现定时任务。可以直接参考目录三Django中使用django-apscheduler,目录内容测试已通过,相关数据库配置暂时无空尝试

APScheduler简介

APscheduler全称Advanced Python Scheduler :作用为在指定的时间规则执行指定的作业。

apscheduler的四大组件,分别是Triggers,Job stores,Executors,Schedulers

Scheduler添加job流程

在这里插入图片描述
Scheduler调度流程

在这里插入图片描述

触发器(triggers):触发器包含调度逻辑描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。

执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件

作业(任务)存储器(job stores):作业存储器指定了作业被存放位置默认情况下作业保存内存,也可将作业保存在各种数据库中,当作业被存放数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载更新查找作业的中间商。在调度器之间不能共享作业存储。

注意:一个任务储存器不要共享多个调度器,否则会导致状态混乱

调度器(schedulers)任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。

一个调度器由上方三个组件构成,一般来说,一个程序只要有一个调度器就可以了。开发者也不必直接操作任务储存器、执行器以及触发器,因为调度器提供了统一的接口,通过调度器就可以操作组件,比如任务的增删改查

调度器工作流程】:

在这里插入图片描述

调度器组件详解:

根据开发需求选择相应的组件,下面是不同的调度器组件:

  1. BlockingScheduler 阻塞式调度器:适用于只跑调度器的程序。

  2. BackgroundScheduler 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。

  3. AsyncIOScheduler AsyncIO调度器,适用于应用使用AsnycIO的情况。

  4. GeventScheduler Gevent调度器,适用于应用通过Gevent的情况。

  5. TornadoScheduler Tornado调度器,适用于构建Tornado应用。

  6. TwistedScheduler Twisted调度器,适用于构建Twisted应用。

  7. QtScheduler Qt调度器,适用于构建Qt应用。

(1)任务储存器的选择:

要看任务是否需要持久化。如果你运行的任务是无状态的,选择默认任务储存器MemoryJobStore就可以应付。但是,如果你需要在程序关闭重启时,保存任务的状态,那么就要选择持久化的任务储存器。如果,作者推荐使用SQLAlchemyJobStore并搭配PostgreSQL作为后台数据库。这个方案可以提供强大的数据整合保护功能。

(2)执行器的选择:

同样要看你的实际需求默认的ThreadPoolExecutor线程池执行器方案可以满足大部分需求。如果,你的程序是计算密集型的,那么最好用ProcessPoolExecutor进程池执行器方案来充分利用多核算力。也可以将ProcessPoolExecutor作为第二执行器,混合使用两种不同的执行器。

配置一个任务,就要设置一个任务触发器。触发器可以设定任务运行的周期、次数和时间。

(3)APScheduler有三种内置的触发器:

  • date 日期:触发任务运行的具体日期

  • interval 间隔:触发任务运行的时间间隔

  • cron 周期:触发任务运行的周期

  • calendarinterval:当您想要在一天中的特定时间以日历为基础的间隔运行任务时使用

一个任务也可以设定多种触发器,比如,可以设定同时满足所有触发器条件而触发,或者满足一项即触发。

触发器代码示例

date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下

【参数说明

  • run_date(datetime or str) 任务运行的日期或者时间
  • timezone(datetime.tzinfo or str) 指定时区
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

def my_job(text):
    print(text)

# 在2019年4月15日执行
scheduler.add_job(my_job, 'date', run_date=date(2019, 4, 15), args=['测试任务'])

scheduler.start()

###########################################################################################
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

def my_job(text):
    print(text)
    
# datetime类型(用于精确时间)
scheduler.add_job(my_job, 'date', run_date=datetime(2019, 4, 15, 17, 30, 5), args=['测试任务'])

scheduler.start()

注意:run_date参数可以是date类型、datetime类型或文本类型。

scheduler.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['测试任务'])

【更多示例代码,参考链接】:python 定时任务APScheduler 使用介绍

针对上述4个模块的详细介绍,参考博文Python定时库APScheduler&Django使用django-apscheduler实现定时任务

有价值的实战项目参考

teprunner测试平台定时任务这次终于稳了

本文开发内容
作为测试平台而言,定时任务算是必备要素了,只有跑起来的自动化,才能算是真正的自动化。本文将给测试计划添加定时任务功能,具体如下:

  1. 前端添加测试计划的定时任务开关
  2. 采用crontab表达式设置计划时间
  3. 后端集成django-apschedule,在数据库中记录任务明细和执行详情。
  4. 定时清理执行记录

前端效果图
在这里插入图片描述

Django – 定时任务模块设计与实践

页面静态化介绍

1. 页面静态化介绍、

1.为什么要做页面静态化

2.什么是页面静态化

2. 首页页面静态化实现

1.首页页面静态化实现步骤

2.首页页面静态化实现

import os
import time
from django.conf import settings
from django.template import loader
from apps.contents.models import ContentCategory
from apps.contents.utils import get_categories

def generate_static_index_html():
    """
    生成静态的主页html文件
    """
    print('%s: generate_static_index_html' % time.ctime())

    # 获取商品频道和分类
    categories = get_categories()

    # 广告内容
    contents = {}
    content_categories = ContentCategory.objects.all()
    for cat in content_categories:
        contents[cat.key] = cat.content_set.filter(status=True).order_by('sequence')

    # 渲染模板
    context = {
        'categories': categories,
        'contents': contents
    }

    # 获取首页模板文件
    template = loader.get_template('index.html')
    # 渲染首页html字符串
    html_text = template.render(context)
    # 将首页html字符写入到指定目录,命名'index.html'
    file_path = os.path.join(settings.STATICFILES_DIRS[0], 'index.html')
    with open(file_path, 'w', encoding='utf-8') as f:
        f.write(html_text)

3. 定时任务crontab静态化首页

对于首页的静态化,考虑到页面的数据可能由多名运营人员维护,并且经常变动,所以将其做成定时任务,即定时执行静态化。

在Django执行定时任务,可以通过 django-crontab扩展来实现。

参考链接定时器任务django-crontab的使用【静态化高频率页面,增加用户体验】【系统的定时器,独立于项目执行】【刘新宇】

APScheduler常见错误

报错No module named ‘apscheduler.scheduler’

BlockingScheduler算是会实行block阻塞程序运行(会阻塞主线程的运行)

APScheduler中两种调度器的区别及使用过程中要注意的问题

apscheduelr整体实现方式和扩展方式解读

参考链接:【Django】定时任务APScheduler

原文地址:https://blog.csdn.net/weixin_42782150/article/details/123212604

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_17123.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注