延迟队列实现

基于监听key过期实现延迟队列实现这里需要继承KeyspaceEventMessageListener类来实现监听redis过期

public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements
		ApplicationEventPublisherAware {

	private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");

	private @Nullable ApplicationEventPublisher publisher;

	/**
	 * Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
	 *
	 * @param listenerContainer must not be {@literal null}.
	 */
	public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {
		super(listenerContainer);
	}

	/*
	 * (non-Javadoc)
	 * @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doRegister(org.springframework.data.redis.listener.RedisMessageListenerContainer)
	 */
	@Override
	protected void doRegister(RedisMessageListenerContainer listenerContainer) {
		listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
	}

	/*
	 * (non-Javadoc)
	 * @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doHandleMessage(org.springframework.data.redis.connection.Message)
	 */
	@Override
	protected void doHandleMessage(Message message) {
		publishEvent(new RedisKeyExpiredEvent(message.getBody()));
	}

	/**
	 * Publish the event in case an {@link ApplicationEventPublisher} is set.
	 *
	 * @param event can be {@literal null}.
	 */
	protected void publishEvent(RedisKeyExpiredEvent event) {

		if (publisher != null) {
			this.publisher.publishEvent(event);
		}
	}

	/*
	 * (non-Javadoc)
	 * @see org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)
	 */
	@Override
	public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
		this.publisher = applicationEventPublisher;
	}
}

KeyExpirationEventMessageListener

KeyExpirationEventMessageListener是Spring Data Redis中的监听器用于监听Redis中Key的过期事件。它的原理处理流程如下

原理

  1. Redis使用一种称为“过期事件expiry event)”的机制处理键的过期。当一个键的过期时间到达时,Redis会生成一个事件通知订阅了该事件客户端

  2. Spring Data Redis的KeyExpirationEventMessageListener就是一个订阅了Redis过期事件客户端。它使用Redis的PSUBSCRIBE命令订阅一个特殊频道channel),通常是__keyevent@0__:expired,其中0表示所有数据库expired表示过期事件

  3. 当有Redis键过期时,Redis会将事件消息发送订阅__keyevent@0__:expired频道客户端,其中就包括了KeyExpirationEventMessageListener

处理流程

  1. KeyExpirationEventMessageListener收到过期事件消息后,会调用回调方法进行处理默认情况下,回调方法onMessage,您可以根据需要自定义回调方法

  2. 回调方法中,您可以处理键过期后的逻辑例如执行一些清理操作发送通知等。具体的处理逻辑由您自己编写

以下是一个简单示例

@Component
public class MyKeyExpirationListener extends KeyExpirationEventMessageListener {

    public MyKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 获取过期的键名
        String expiredKey = message.toString();

        // 在这里编写处理过期键的逻辑例如打印日志
        System.out.println("Key expired: " + expiredKey);
    }
}

上述示例中,MyKeyExpirationListener继承KeyExpirationEventMessageListener,并重写onMessage方法。在onMessage方法中,可以获取过期的键名并编写处理逻辑

需要注意的是,Redis的过期事件是以异步方式发送的,因此处理过期事件时需要考虑并发线程安全性

这就是KeyExpirationEventMessageListener原理和处理流程,它允许您在Redis中监听键的过期事件,并在过期时执行自定义的处理逻辑

当KeyExpirationEventMessageListener收到Redis发布的过期Key的消息时候,会发布RedisKeyExpiredEvent事件

	@Override
	protected void doHandleMessage(Message message) {
		publishEvent(new RedisKeyExpiredEvent(message.getBody()));
	}

	private @Nullable ApplicationEventPublisher publisher;


    protected void publishEvent(RedisKeyExpiredEvent event) {

		if (publisher != null) {
			this.publisher.publishEvent(event);
		}
	}

缺点:

使用Redis的key过期事件来处理延迟任务是一种常见的方法,但它也有一些缺点和限制

  1. 精度问题 Redis的过期事件是以秒为单位的,这意味着你不能精确指定毫秒级的延迟。如果你需要精确延迟例如毫秒级的延迟,Redis的过期事件机制可能不够用,因为它无法提供毫秒级的精度

  2. 不适合大规模任务需要管理大量延迟任务时,Redis的key过期事件可能会导致性能问题。因为Redis是单线程的,当有大量的key过期事件发生时,Redis可能阻塞其他操作,导致性能下降。

  3. 无法处理任务失败重试 Redis的过期事件机制不提供自动重试失败任务功能。如果一个任务在执行失败,你需要自己实现重试逻辑

  4. 无法查看延迟任务队列: 使用Redis的key过期事件,你无法轻松查看当前所有的延迟任务队列,因为Redis不提供直接的命令来列出所有即将过期的key。

  5. 无法修改延迟时间 一旦你将key设置为过期,你不能轻松地修改它的过期时间。如果需要更改任务的延迟时间,你需要执行复杂操作例如将任务重新添加到队列并删除旧的key。

  6. 缺乏监控管理工具 Redis本身并没有提供用于监控管理延迟任务的工具。你需要自行开发使用第三方工具来监视管理任务。

  7. 可靠性问题: Redis的过期事件机制基于内存的,如果Redis服务器发生故障重启,已经设置的过期事件可能丢失,导致任务无法正常执行

虽然Redis的过期事件可以用于一些简单的延迟任务场景,但对于更复杂、大规模、高可用性和高可靠性的延迟任务处理,通常需要考虑使用专门的延迟队列或任务调度系统。这些系统提供了更强大的功能例如精确的延迟控制、重试机制、监控管理工具,以及高可用性的保证。选择适合你需求解决方案是根据具体情况而定。

Redis 使用 Sorted Set实现延迟队列

现在有这样一个场景我们下单时候订单如何没有支付会有一个延迟表示如果订单在10分钟没有被支付,我们就会取消这个订单(举个例子分钟),

创建订单时候 我们将订单ID 以及订单没有支付即将视作取消超时时间分别作为Sorted Set的memberscore添加到订单队列Sorted Set中

    public void createOrder(String orderId, int timeoutMinutes) {
        // 将订单ID和超时时间添加到Sorted Set中
//        double score = System.currentTimeMillis() + (timeoutMinutes * 60 * 1000);
        //测试改为10s
        double score = System.currentTimeMillis() + (10 * 1000);
        redisTemplate.opsForZSet().add(ORDER_QUEUE_KEY, orderId, score);
    }

通过Sorted Set的命令ZREVRANGEBYSCORE返回指定分数区间内的所有订单ID进行处理

  public void cancelExpiredOrders() {
        // 获取当前时间
        double currentTime = System.currentTimeMillis();

        // 获取超时的订单ID列表
        Set<String> expiredOrders = redisTemplate.opsForZSet().rangeByScore(ORDER_QUEUE_KEY, Double.MIN_VALUE, currentTime);

        // 取消超时订单的逻辑
        for (String orderId : expiredOrders) {
            LOGGER.info("订单:{} 即将取消", orderId);
            // 取消订单的业务逻辑
            // ...
            // 取消后,从Sorted Set中移除订单
            redisTemplate.opsForZSet().remove(ORDER_QUEUE_KEY, orderId);
        }
    }

缺点:

使用ZSET有序集合,Sorted Set)来实现延迟任务调度(如订单超时取消)是一种有效的方法,但它也有一些缺点和限制

  1. 内存消耗: ZSET 在Redis中是一个有序集合,它需要占用一定的内存来存储成员分数。如果你需要存储大量的延迟任务,可能会导致内存消耗较大。这可能会对Redis服务器的性能和成本产生影响,特别是在大规模应用中。

  2. 不适用于大规模延迟任务: ZSET 可以处理相对小数量的延迟任务,但当需要管理大规模延迟任务队列时,可能会导致性能下降。在这种情况下,需要考虑更高效的延迟队列解决方案例如使用分布式消息队列。

  3. 无法动态修改延迟时间: 一旦将任务添加ZSET中,你不能轻松地修改任务的延迟时间。如果需要在任务已经添加更改延迟时间,可能需要复杂的操作。

  4. 没有重试机制: ZSET 只能用于一次性延迟任务,无法自动处理任务失败后的重试。如果任务在执行时失败,你需要自己实现重试逻辑。

  5. 没有持久化: Redis是内存数据库,如果Redis服务重启或发生故障,已添加的延迟任务数据丢失。虽然可以通过Redis持久化机制来部分解决这个问题,但仍然存在一定风险

  6. 复杂性增加: 使用ZSET管理延迟任务队列需要编写复杂代码来处理任务的添加检索删除。这可能增加应用程序复杂性。

虽然ZSET在某些场景下是一个有效的延迟任务管理工具,但它并不适用于所有情况。对于需要更多控制、更大规模、更高可用性和更高可靠性的延迟任务处理,可能需要考虑使用专门的延迟队列解决方案,或者结合使用Redis与其他工具来实现延迟任务调度。这可以根据具体需求选择适合的解决方案

Redisson实现延迟队列

使用 Redisson 创建延迟队列。Redisson 提供了 RDelayedQueue 接口RQueue 接口来实现延迟队列。

首先,你需要创建一个基本的队列,然后将它包装在一个延迟队列中。将延迟任务添加到延迟队列中指定任务的内容和延迟时间(以毫秒为单位)。

延迟队列会自动处理过期的任务并将它们移动到基本队列中。你可以从基本队列中获取任务并进行处理。

<dependency>
 <groupId>org.redisson</groupId>
 <artifactId>redisson</artifactId>
 <version>3.16.0</version> 
</dependency>

配置

@Bean
public RedissonClient redissonClient(RedisProperties redisProperties) {
 // 此为单机模式
 Config config = new Config();
 String address = "redis://" + redisProperties.getHost() + ":" + redisProperties.getPort();
 config.useSingleServer()
         .setAddress(address)
         .setPassword(redisProperties.getPassword())
         .setDatabase(redisProperties.getDatabase())
         .setRetryAttempts(3);
 return Redisson.*create*(config);
}

@Bean
public RDelayedQueue delayedQueue(RedissonClient redissonClient) {
 // 创建基本队列
 RQueue<String> queue = redissonClient.getQueue("redisson_delay_Queue");

​    // 创建延迟队列并关联到基本队列
​    RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);
​    return delayedQueue;
}

添加延迟任务到延迟队列中

// 添加延迟任务
delayedQueue.offer("Task 1", 5000); // 5秒延迟
delayedQueue.offer("Task 2", 10000); // 10秒延迟

处理延迟任务

延迟队列会自动处理过期的任务并将它们移动到基本队列中。你可以从基本队列中获取任务并进行处理。

// 获取并处理任务
String task = queue.poll(); // 从基本队列中取出任务
if (task != null) {
 System.out.println("Processing task: " + task);
 // 处理任务的业务逻辑
}

关闭 Redisson 客户端,当你的应用程序结束时,记得关闭 Redisson 客户端来释放资源

redisson.shutdown();

使用 Redisson 实现延迟队列具有许多优点,但也有一些潜在的限制和缺点。以下是使用 Redisson 延迟队列的优点和缺点的详细说明

优点:

  1. 简化任务调度 Redisson 延迟队列使任务调度变得容易,你可以将任务添加到队列中,并指定它们的延迟时间,而不必担心手动处理任务的触发和移动。

  2. 可靠性: Redisson 延迟队列基于 Redis,具有可用性和可靠性。Redis的持久性复制机制确保了任务不会因应用程序服务器故障而丢失

  3. 高性能 Redis是一个高性能的内存数据库能够快速处理延迟队列中的任务。这对于需要快速响应和处理大量任务的应用程序非常重要。

  4. 分布式支持 Redisson 支持分布式部署,因此你可以在多个节点上使用相同的延迟队列。这对于需要横向扩展和高可用性应用程序非常有用。

  5. 语言支持 Redisson 提供了多种语言的客户端,因此你可以在不同编程语言中使用相同的延迟队列。

缺点:

  1. 依赖于Redis: Redisson 延迟队列依赖于 Redis 服务器,因此你需要运行维护一个 Redis 集群。这可能会增加部署维护复杂性。

  2. 不适用于高频率任务: 如果你需要处理高频率的任务,Redisson 延迟队列可能会导致性能问题。Redis是单线程的,当有大量任务需要触发时,可能会出现性能瓶颈。

  3. 精度问题: Redisson 延迟队列的精度受到Redis的时间精度限制,通常是毫秒级。如果需要更高精度的延迟任务,可能需要考虑其他解决方案

  4. 无法处理任务失败和重试: Redisson 延迟队列提供了任务的延迟和触发机制,但无法处理任务失败后的重试。你需要自行实现重试逻辑。

  5. 无法查看延迟任务队列: Redisson 延迟队列没有提供直接的命令来列出当前所有的延迟任务。你需要自己编写代码监视队列。

  6. 复杂性增加: 尽管 Redisson 简化任务调度部分,但仍需要一些额外配置代码来处理延迟任务,这可能增加应用程序复杂性。

总的来说,Redisson 延迟队列是一个强大的工具,适用于许多延迟任务调度场景。然而,根据具体的应用需求,你需要权衡其优点和缺点,可能需要考虑其他延迟队列解决方案,特别是在高频率任务处理、更高精度要求和任务重试方面。选择合适的工具取决于你的应用程序的要求和架构

原文地址:https://blog.csdn.net/huanglu0314/article/details/132677543

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_41104.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注