一、调度中心

首先下载XXL-JOB

GitHubhttps://github.com/xuxueli/xxljob
GitEE:https://gitee.com/xuxueli0323/xxl-job 项目使用2.3.1版本:
https://github.com/xuxueli/xxl-job/releases/tag/2.3.1

使用IDEA打开项目

xxl-job-admin调度中心 xxl-job-core公共依赖
xxj-job-executorsamples执行器Sample示例
xxl-job-executorsamplespringboot:SpringBoot版本,通过SpringBoot管理执行
xxl-job-executorsampleframeless:无框架版本

根据数据库脚本创建数据库修改数据库连接信息端口启动xxl-job-admin访问http://192.168.101.65:8088/xxl-job-admin/ 账号密码admin/123456
在这里插入图片描述

二、执行

下面配置执行器,执行负责调度中心通信接收调度中心发起的任务调度请求

1.首先在mediaservice工程添加依赖(父工程完成版本控制这里的版本是2.3.1)

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
</dependency>

2. 在nacos下的mediaservicedev.yaml配置xxl-job

       注意这里配置的appname是执行器的应用名,稍后会在调度中心配置执行器的时候使用
xxl:
  job:
    admin:
      addresses: http://192.168.101.65:8088/xxl-job-admin/
    executor:
      appname: media-process-service
      address:
      ip:
      port: 9999
      logpath: /data/applogs/xxl-job-jobhandler
      logretentiondays: 30
    accessToken: default_token

3.配置xxl-job的执行器

  将示例工程下的配置类拷贝media-service工程config路径下,该类中的属性就是获取配置文件中的配置得到的,同时提供了一个执行器的Bean
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    @Value("${xxl.job.accessToken}")
    private String accessToken;
    @Value("${xxl.job.executor.appname}")
    private String appname;
    @Value("${xxl.job.executor.address}")
    private String address;
    @Value("${xxl.job.executor.ip}")
    private String ip;
    @Value("${xxl.job.executor.port}")
    private int port;
    @Value("${xxl.job.executor.logpath}")
    private String logPath;
    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
    /**
    * 针对网卡容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
    *
    *      1、引入依赖:
    *          <dependency>
    *             <groupId>org.springframework.cloud</groupId>
    *             <artifactId>spring-cloud-commons</artifactId>
    *             <version>${version}</version>
    *         </dependency>
    *
    *      2、配置文件或者容器启动变量
    *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
    *
    *      3、获取IP
    *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
    */
    
}
    

在这里插入图片描述

4. 进入调度中心添加执行器

在这里插入图片描述

5. 重启媒资管理服务模块可以看到执行器在调入中心注册成功

在这里插入图片描述

三、执行任务

media-service新建com.xxxx.media.service.jobhandler,在该包下定义我们的任务类(创建任务类,编写任务方法

package com.xxxx.media.jobhandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
 * @author layman
 * @version 1.0
 * @description TODO
 * @date 2023/6/1 16:57
 */
@Slf4j
@Component
public class SimpleJob {
    @XxlJob("testJob")
    public void testJob() {
        log.debug("开始执行.......");
    }
}

然后进入调度中心添加任务,进入任务管理,新增任务信息
在这里插入图片描述

其中JobHandler中填写@XxlJob注解中的名称
随后重新启动media服务,并在任务管理中启动任务
在这里插入图片描述
控制台可以看到执行器的方法执行
在这里插入图片描述

四、分片广播

前面我们了解了一下xxl-job的基本使用,下面思考如何进行分布式任务处理呢?
我们需要启动多个执行器组成一个集群,去执行任务
在这里插入图片描述

执行器在集群部署下调度中心有哪些调度策略呢?
查看xxl-job官方文档

我们这里重点要说的是SHARDING_BROADCAST(分片广播),分片是指调度中心将集群汇总的执行器标上序号:0、1、2、3…,广播是指每次调度会向集群中的所有执行器发送调度请求,请求中携带分片参数
在这里插入图片描述
每个执行器收到调度请求,根据分片参数自行决定是否执行任务;另外xxl-job还支持动态分片,当执行器数量有变更时,调度中心会动态修改分片的数量

作业分片适用于哪些场景呢?

所以广播分片方式不仅可以充分发每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群的分布式处理任务。

使用说明
分片广播和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理
获取分片参数方式参考示例代码

/**
 * 2、分片广播任务
 */
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

    // 分片参数
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();

    XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

    // 业务逻辑
    for (int i = 0; i < shardTotal; i++) {
        if (i == shardIndex) {
            XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
        } else {
            XxlJobHelper.log("第 {} 片, 忽略", i);
        }
    }

}

下面测试作业分片

@XxlJob("shardingJobHandler")
public void shardingJob() {
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();
    log.debug("shardIndex:{}, shardTotal:{}", shardIndex, shardTotal);
}

在调度中心添加任务,注意路由策略选择分片广播
在这里插入图片描述

高级配置说明
子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。

调度过期策略忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间
立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间

阻塞处理策略:调度过于密集执行器来不及处理时的处理策略
单机串行默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记失败
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列然后运行本地调度任务;

任务超时时间支持自定义任务超时时间,任务运行超时将会主动中断任务;

失败重试次数支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试

下面我们需要启动两个执行器实例,观察每个实例的执行情况

spring:
  cloud:
   config:
    override-none: true

将media-service启动两个实例添加vm选项就是本地配置覆盖nacos中的配置,主要是修改端口号和xxl执行器端口
具体操作参见:https://blog.csdn.net/weixin_54514751/article/details/131002436?spm=1001.2014.3001.5501

两个服务启动,观察任务调度中心,可以看到有两个执行器
在这里插入图片描述
控制台查看运行情况
在这里插入图片描述
在这里插入图片描述

到此作业分片任务调试完成。

原文地址:https://blog.csdn.net/weixin_54514751/article/details/130992453

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_48442.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注