Redis定时任务的核心在于”@Schedule“注解,Redis Zset,List数据结构,Redis管道技术
1.前端用户发起定时任务创建定时任务任务,像定时任务模块发起定时任务请求并且携带必要参数
首先我们在定时任务服务中中判断当前任务执行时间是否小于当前时间,
public long addTask(Taskinfo taskinfo) {
if (null == taskinfo.getExecuteTime()){
//保存消息到数据库中
Calendar executeTime = Calendar.getInstance();
executeTime.add(Calendar.MINUTE, 5);
Date futureTime = executeTime.getTime();
taskinfo.setExecuteTime(futureTime);
}
taskInfoMapper.insert(taskinfo);
//判断当前时间与执行时间的关系
if (new Date().compareTo(taskinfo.getExecuteTime()) > 0){
String key = taskinfo.getTaskType() + "_" + taskinfo.getPriority();
/**
* 执行时间超过当前日期直接加入到未来任务执行队列当中
*/
cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(taskinfo),taskinfo.getExecuteTime().getTime());
}
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo, taskinfoLogs);
taskinfoLogs.setVersion(1);
taskinfoLogs.setStatus(ScheduleConstants.TaskStatus.INIT);
//保存到日志表中
taskInfoLogsService.save(taskinfoLogs);
return taskinfo.getTaskId();
}
如果小于当前系统时间(任务补偿机制。这种情况有可能出现在于由于微服务架构中一般定时任务被单独抽取成为一个模块,模块中通信可能基于消息队列,对于消息阻塞的情况可能会出现定时任务时间小于当前时间),直接存储到Redis的Zset集合中,并设置Score为当前时间戳(只占用4个字节 时间从 1970-01-01 到 2060年),否则则存入数据库中。
2.数据库表结构,对于定时任务我设计了两张表,一张表为taskInfo,另外一张为taskInfoLog表。日志表的作用在于记录任务是否完成,用于排查异常并且对未执行的任务进行补偿机制。
taskInfoLog表
3.回到代码层面,这个时候我们使用Java提供的定时任务每5分钟从数据库中查询[当前时间,当前时间+5min]区间内的任务,这里推荐大家使用日历类Calendar来添加时间,Date类中的方法已经过时。
注意这里必须使用分布式锁,由于我们项目是多中心部署,每个中心都有16台主机。如果不加分布式锁可能会出现多台主机同时拉取任务的情况没必要且浪费性能(当然不用担心拉取重复任务的情况,Zset结构自动去重不会出现相同的任务)
/**
* 五分钟执行一次从数据库中得到数据存到redis中
*/
@Scheduled(cron = "0 0/5 * * * ?")
public void schedule(){
//尝试获取分布式锁
String lock = cacheService.tryLock(ScheduleConstants.LOCK_NAME, 3000);
if (Strings.isBlank(lock)){
log.info("当前主机没有抢占到redis分布式锁,故不执行刷新定时任务");
return;
}
log.info("-----------从数据库中查询任务存入redis中-----------");
Calendar future = Calendar.getInstance();
future.add(Calendar.MINUTE, 5);
LambdaQueryWrapper<Taskinfo> taskinfoLambdaQueryWrapper = new LambdaQueryWrapper<>();
//时间区间为:[当前时间,当前时间+5分钟]
LambdaQueryWrapper<Taskinfo> queryWrapper = taskinfoLambdaQueryWrapper.le(Taskinfo::getExecuteTime, future.getTime())
.ge(Taskinfo::getExecuteTime, new Date());
List<Taskinfo> taskinfos = taskInfoMapper.selectList(queryWrapper);
if (!taskinfos.isEmpty()){
//找出了任务
//存入redis的zset集合中去
log.info("从DB中查询出来的需要放入redis中未来队列的数据有:{}",taskinfos);
for (Taskinfo taskinfo : taskinfos) {
//存入zset集合中去
String key = taskinfo.getTaskType() + "_" + taskinfo.getPriority();
cacheService.zAdd(ScheduleConstants.FUTURE + key,JSON.toJSONString(taskinfo),System.currentTimeMillis());
}
}
log.info("-----------从数据库中查询任务存入redis中结束-----------");
}
4.任务加入到future队列后等待被执行,我们每分钟到redis里面查询需要执行的任务,使用redis的zRangeOfScore方法可以保证任务执行时间误差控制在1分钟内,使用这个方法可以获取当前时间到未来一分钟内需要执行的任务
将得到的任务通过redis管道存入List执行队列中,并且从原先zset队列中删除原有的任务,redis的管道效率实在太高辣!!!!!
大家可以自己取测试一下,我做过一个测试向list中加入相同1000条数据的情况下,使用管道技术效率高了接近20倍
@Scheduled(cron = "0 0/1 * * * ? ")
public void refreshRedisTask(){
log.info("---------------刷新future队列中的任务开始------------");
//模糊查询所有未来任务队列
Set<String> likeKey = cacheService.scan(ScheduleConstants.FUTURE + "*");
likeKey.forEach(key ->{
//从futureKey中得到所有
Set<String> taskKeys = cacheService.zRange(key, System.currentTimeMillis(), System.currentTimeMillis() + 1 * 60 * 1000);
if (taskKeys.isEmpty()){
log.info("当前未来任务队列中没有任务");
return;
}
String consumerKey = ScheduleConstants.CONSUMER + key.substring(key.indexOf("_") + 1);
String futureKey = ScheduleConstants.FUTURE + key.substring(key.indexOf("_") + 1);
log.info("取出{}条任务转存到消费队列中去:{}", taskKeys.size(),taskKeys);
cacheService.refreshWithPipeline(futureKey,consumerKey,taskKeys);
});
}
后续就简单了在需要消费任务的地方使用监听器监听list队列消费任务即可
原文地址:https://blog.csdn.net/m0_57334678/article/details/131840276
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_20370.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!