本文介绍: 因为我的项目是多数据库配置的,在配置settings.py的INSTALLED_APPS代码后,要在DATABASE_APPS_MAPPING中也要配置一下。同步数据库,数据库中将生成2个表,django_apscheduler_djangojobdjango_apscheduler_djangojobexecutiondjango_apscheduler_djangojobdjango_apscheduler_djangojobexecution表。应该映射到哪个数据库生成表。成功生成以下数据表

先说一下djangoapscheduler定时器使用过程

djangoapscheduler基本使用

1.安装djangoapscheduler代码如下(示例):

pip install django-apscheduler

2.配置settings.py的INSTALLED_APPS代码如下(示例):

INSTALLED_APPS = (
    # ...
    "django_apscheduler",
)

3.通过命令生成定时记录

使用pythonmanage命令我们应该cd项目目录下,就是manage.py文件所在的目录


我们应该使用python manage.py makemigrationspython manage.py migrate同步数据库,数据库中将生成2个表,django_apscheduler_djangojobdjango_apscheduler_djangojobexecution。

这里问题!!!migrate后,死活没有在MySQL中创建django_apscheduler_djangojobdjango_apscheduler_djangojobexecution表

问题原因:

        因为我的项目是多数据库配置的,在配置settings.py的INSTALLED_APPS代码后,要在DATABASE_APPS_MAPPING中也要配置一下django_apscheduler应该映射到哪个数据库生成表。

解决办法

DATABASE_APPS_MAPPING = {
    ........这里的代码应该有default数据库映射的代码了
    'django_apscheduler': 'default',  ##主要是就是这行代码
}

完事再来一次

python manage.py makemigrations
python manage.py migrate

成功生成以下数据表! 

4.创建runapscheduler.py文件

runapscheduler.py正是通过自定义创建manange命令的py文件,可以通过python manage.py runapscheduler 启动定时程序

import logging

from django.conf import settings

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import util

logger = logging.getLogger(__name__)


def my_job():
  # Your job processing logic here...
  print(123)
  pass

def delete_old_job_executions(max_age=604_800):
  """
  此作业数据库中删除早于“max_age”的APScheduler作业执行条目。
  它有助于防止数据库中塞满不再有用的旧历史记录。
  最长7天
  """
  DjangoJobExecution.objects.delete_old_job_executions(max_age)


class Command(BaseCommand):
  help = "Runs APScheduler."

  def handle(self, *args, **options):
    scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
    scheduler.add_jobstore(DjangoJobStore(), "default")

    scheduler.add_job(
      my_job,
      trigger=CronTrigger(second="*/10"),  # Every 10 seconds
      id="my_job",  # The `id` assigned to each job MUST be unique
      max_instances=1,
      replace_existing=True,
    )
    logger.info("Added job 'my_job'.")

    scheduler.add_job(
      delete_old_job_executions,
      trigger=CronTrigger(
        day_of_week="mon", hour="00", minute="00"
      ),  # Midnight on Monday, before start of the next work week.
      id="delete_old_job_executions",
      max_instances=1,
      replace_existing=True,
    )
    logger.info(
      "Added weekly job: 'delete_old_job_executions'."
    )

    try:
      logger.info("Starting scheduler...")
      scheduler.start()
    except KeyboardInterrupt:
      logger.info("Stopping scheduler...")
      scheduler.shutdown()
      logger.info("Scheduler shut down successfully!")

1234这4步是django-apscheduler官网的使用步骤,经过测试,定时程序已经正常运行了。

现在,数据库中已经有了相关定时任务记录

原文地址:https://blog.csdn.net/qq_38680405/article/details/134584705

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_13875.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注