延迟队列实现
基于监听key过期实现的延迟队列实现,这里需要继承KeyspaceEventMessageListener类来实现监听redis键过期
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements
ApplicationEventPublisherAware {
private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
private @Nullable ApplicationEventPublisher publisher;
/**
* Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
*
* @param listenerContainer must not be {@literal null}.
*/
public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/*
* (non-Javadoc)
* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doRegister(org.springframework.data.redis.listener.RedisMessageListenerContainer)
*/
@Override
protected void doRegister(RedisMessageListenerContainer listenerContainer) {
listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
}
/*
* (non-Javadoc)
* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doHandleMessage(org.springframework.data.redis.connection.Message)
*/
@Override
protected void doHandleMessage(Message message) {
publishEvent(new RedisKeyExpiredEvent(message.getBody()));
}
/**
* Publish the event in case an {@link ApplicationEventPublisher} is set.
*
* @param event can be {@literal null}.
*/
protected void publishEvent(RedisKeyExpiredEvent event) {
if (publisher != null) {
this.publisher.publishEvent(event);
}
}
/*
* (non-Javadoc)
* @see org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)
*/
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
}
KeyExpirationEventMessageListener
KeyExpirationEventMessageListener
是Spring Data Redis中的监听器,用于监听Redis中Key的过期事件。它的原理和处理流程如下:
原理:
-
Redis使用一种称为“过期事件(expiry event)”的机制来处理键的过期。当一个键的过期时间到达时,Redis会生成一个事件,通知订阅了该事件的客户端。
-
Spring Data Redis的
KeyExpirationEventMessageListener
就是一个订阅了Redis过期事件的客户端。它使用Redis的PSUBSCRIBE
命令来订阅一个特殊的频道(channel),通常是__keyevent@0__:expired
,其中0
表示所有数据库,expired
表示过期事件。 -
当有Redis键过期时,Redis会将事件消息发送到订阅了
__keyevent@0__:expired
频道的客户端,其中就包括了KeyExpirationEventMessageListener
。
@Component
public class MyKeyExpirationListener extends KeyExpirationEventMessageListener {
public MyKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取过期的键名
String expiredKey = message.toString();
// 在这里编写处理过期键的逻辑,例如打印日志
System.out.println("Key expired: " + expiredKey);
}
}
上述示例中,MyKeyExpirationListener
继承自KeyExpirationEventMessageListener
,并重写了onMessage
方法。在onMessage
方法中,可以获取过期的键名并编写处理逻辑。
需要注意的是,Redis的过期事件是以异步方式发送的,因此处理过期事件时需要考虑并发和线程安全性。
这就是KeyExpirationEventMessageListener
的原理和处理流程,它允许您在Redis中监听键的过期事件,并在过期时执行自定义的处理逻辑。
当KeyExpirationEventMessageListener收到Redis发布的过期Key的消息的时候,会发布RedisKeyExpiredEvent事件
@Override
protected void doHandleMessage(Message message) {
publishEvent(new RedisKeyExpiredEvent(message.getBody()));
}
private @Nullable ApplicationEventPublisher publisher;
protected void publishEvent(RedisKeyExpiredEvent event) {
if (publisher != null) {
this.publisher.publishEvent(event);
}
}
缺点:
使用Redis的key过期事件来处理延迟任务是一种常见的方法,但它也有一些缺点和限制:
精度问题: Redis的过期事件是以秒为单位的,这意味着你不能精确地指定毫秒级的延迟。如果你需要更精确的延迟,例如毫秒级的延迟,Redis的过期事件机制可能不够用,因为它无法提供毫秒级的精度。
不适合大规模任务: 当需要管理大量延迟任务时,Redis的key过期事件可能会导致性能问题。因为Redis是单线程的,当有大量的key过期事件发生时,Redis可能会阻塞其他操作,导致性能下降。
无法处理任务失败和重试: Redis的过期事件机制不提供自动重试失败的任务的功能。如果一个任务在执行时失败,你需要自己实现重试逻辑。
无法查看延迟任务队列: 使用Redis的key过期事件,你无法轻松查看当前所有的延迟任务队列,因为Redis不提供直接的命令来列出所有即将过期的key。
无法修改延迟时间: 一旦你将key设置为过期,你不能轻松地修改它的过期时间。如果需要更改任务的延迟时间,你需要执行复杂的操作,例如将任务重新添加到队列并删除旧的key。
缺乏监控和管理工具: Redis本身并没有提供用于监控和管理延迟任务的工具。你需要自行开发或使用第三方工具来监视和管理任务。
可靠性问题: Redis的过期事件机制是基于内存的,如果Redis服务器发生故障或重启,已经设置的过期事件可能会丢失,导致任务无法正常执行。
虽然Redis的过期事件可以用于一些简单的延迟任务场景,但对于更复杂、大规模、高可用性和高可靠性的延迟任务处理,通常需要考虑使用专门的延迟队列或任务调度系统。这些系统提供了更强大的功能,例如精确的延迟控制、重试机制、监控和管理工具,以及高可用性的保证。选择适合你需求的解决方案是根据具体情况而定。
Redis 使用 Sorted Set实现延迟队列
现在有这样一个场景,我们在下单的时候,订单如何没有支付会有一个延迟表示如果订单在10分钟内没有被支付,我们就会取消这个订单(举个例子十分钟),
创建订单的时候 我们将订单ID 以及订单没有支付即将视作取消的超时时间分别作为Sorted Set的member和score添加到订单队列Sorted Set中
public void createOrder(String orderId, int timeoutMinutes) {
// 将订单ID和超时时间添加到Sorted Set中
// double score = System.currentTimeMillis() + (timeoutMinutes * 60 * 1000);
//测试改为10s
double score = System.currentTimeMillis() + (10 * 1000);
redisTemplate.opsForZSet().add(ORDER_QUEUE_KEY, orderId, score);
}
通过Sorted Set的命令ZREVRANGEBYSCORE返回指定分数区间内的所有订单ID进行处理
public void cancelExpiredOrders() {
// 获取当前时间
double currentTime = System.currentTimeMillis();
// 获取超时的订单ID列表
Set<String> expiredOrders = redisTemplate.opsForZSet().rangeByScore(ORDER_QUEUE_KEY, Double.MIN_VALUE, currentTime);
// 取消超时订单的逻辑
for (String orderId : expiredOrders) {
LOGGER.info("订单:{} 即将取消", orderId);
// 取消订单的业务逻辑
// ...
// 取消后,从Sorted Set中移除订单
redisTemplate.opsForZSet().remove(ORDER_QUEUE_KEY, orderId);
}
}
缺点:
使用
ZSET
(有序集合,Sorted Set)来实现延迟任务调度(如订单超时取消)是一种有效的方法,但它也有一些缺点和限制:
内存消耗:
ZSET
在Redis中是一个有序集合,它需要占用一定的内存来存储成员和分数。如果你需要存储大量的延迟任务,可能会导致内存消耗较大。这可能会对Redis服务器的性能和成本产生影响,特别是在大规模应用中。不适用于大规模延迟任务:
ZSET
可以处理相对较小数量的延迟任务,但当需要管理大规模延迟任务队列时,可能会导致性能下降。在这种情况下,需要考虑更高效的延迟队列解决方案,例如使用分布式消息队列。无法动态修改延迟时间: 一旦将任务添加到
ZSET
中,你不能轻松地修改任务的延迟时间。如果需要在任务已经添加后更改延迟时间,可能需要复杂的操作。没有重试机制:
ZSET
只能用于一次性延迟任务,无法自动处理任务失败后的重试。如果任务在执行时失败,你需要自己实现重试逻辑。没有持久化: Redis是内存数据库,如果Redis服务器重启或发生故障,已添加的延迟任务数据将丢失。虽然可以通过Redis持久化机制来部分解决这个问题,但仍然存在一定风险。
复杂性增加: 使用
ZSET
来管理延迟任务队列需要编写复杂的代码来处理任务的添加、检索和删除。这可能增加应用程序的复杂性。虽然
ZSET
在某些场景下是一个有效的延迟任务管理工具,但它并不适用于所有情况。对于需要更多控制、更大规模、更高可用性和更高可靠性的延迟任务处理,可能需要考虑使用专门的延迟队列解决方案,或者结合使用Redis与其他工具来实现延迟任务调度。这可以根据具体需求来选择适合的解决方案。
Redisson实现延迟队列
使用 Redisson 创建延迟队列。Redisson 提供了 RDelayedQueue
接口和 RQueue
接口来实现延迟队列。
首先,你需要创建一个基本的队列,然后将它包装在一个延迟队列中。将延迟任务添加到延迟队列中,指定任务的内容和延迟时间(以毫秒为单位)。
延迟队列会自动处理过期的任务并将它们移动到基本队列中。你可以从基本队列中获取任务并进行处理。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.0</version>
</dependency>
@Bean
public RedissonClient redissonClient(RedisProperties redisProperties) {
// 此为单机模式
Config config = new Config();
String address = "redis://" + redisProperties.getHost() + ":" + redisProperties.getPort();
config.useSingleServer()
.setAddress(address)
.setPassword(redisProperties.getPassword())
.setDatabase(redisProperties.getDatabase())
.setRetryAttempts(3);
return Redisson.*create*(config);
}
@Bean
public RDelayedQueue delayedQueue(RedissonClient redissonClient) {
// 创建基本队列
RQueue<String> queue = redissonClient.getQueue("redisson_delay_Queue");
// 创建延迟队列并关联到基本队列
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);
return delayedQueue;
}
添加延迟任务到延迟队列中
// 添加延迟任务
delayedQueue.offer("Task 1", 5000); // 5秒延迟
delayedQueue.offer("Task 2", 10000); // 10秒延迟
处理延迟任务
延迟队列会自动处理过期的任务并将它们移动到基本队列中。你可以从基本队列中获取任务并进行处理。
// 获取并处理任务
String task = queue.poll(); // 从基本队列中取出任务
if (task != null) {
System.out.println("Processing task: " + task);
// 处理任务的业务逻辑
}
关闭 Redisson 客户端,当你的应用程序结束时,记得关闭 Redisson 客户端来释放资源
redisson.shutdown();
使用 Redisson 实现延迟队列具有许多优点,但也有一些潜在的限制和缺点。以下是使用 Redisson 延迟队列的优点和缺点的详细说明:
优点:
-
简化任务调度: Redisson 延迟队列使任务调度变得容易,你可以将任务添加到队列中,并指定它们的延迟时间,而不必担心手动处理任务的触发和移动。
-
可靠性: Redisson 延迟队列基于 Redis,具有高可用性和可靠性。Redis的持久性和复制机制确保了任务不会因应用程序或服务器故障而丢失。
-
高性能: Redis是一个高性能的内存数据库,能够快速处理延迟队列中的任务。这对于需要快速响应和处理大量任务的应用程序非常重要。
-
分布式支持: Redisson 支持分布式部署,因此你可以在多个节点上使用相同的延迟队列。这对于需要横向扩展和高可用性的应用程序非常有用。
缺点:
-
依赖于Redis: Redisson 延迟队列依赖于 Redis 服务器,因此你需要运行和维护一个 Redis 集群。这可能会增加部署和维护的复杂性。
-
不适用于高频率任务: 如果你需要处理高频率的任务,Redisson 延迟队列可能会导致性能问题。Redis是单线程的,当有大量任务需要触发时,可能会出现性能瓶颈。
-
精度问题: Redisson 延迟队列的精度受到Redis的时间精度限制,通常是毫秒级。如果需要更高精度的延迟任务,可能需要考虑其他解决方案。
-
无法处理任务失败和重试: Redisson 延迟队列提供了任务的延迟和触发机制,但无法处理任务失败后的重试。你需要自行实现重试逻辑。
-
无法查看延迟任务队列: Redisson 延迟队列没有提供直接的命令来列出当前所有的延迟任务。你需要自己编写代码来监视队列。
-
复杂性增加: 尽管 Redisson 简化了任务调度的部分,但仍需要一些额外的配置和代码来处理延迟任务,这可能增加应用程序的复杂性。
总的来说,Redisson 延迟队列是一个强大的工具,适用于许多延迟任务调度的场景。然而,根据具体的应用需求,你需要权衡其优点和缺点,可能需要考虑其他延迟队列解决方案,特别是在高频率任务处理、更高精度要求和任务重试方面。选择合适的工具取决于你的应用程序的要求和架构。
原文地址:https://blog.csdn.net/huanglu0314/article/details/132677543
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_41104.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!