本文介绍: 兔老大高质量设计第二篇

之前文章

兔老大的系统设计(一)健康度系统

一、背景

延迟队列应用场景非常广泛,如客户主动操作

系统内部操作

二、需求分析

场景多种多样,我们尽量做出一个通用的,功能完备的,能满足大部分场景系统

可以以顾客预约场景为例进行设计假设会量大、量不稳定、存储时间长(比如几个月后执行),这样设计出来的系统就普遍适用。

三、目标明确

3.1功能

普通队列元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理

从某种意义上来讲,延迟队列结构并不像一个队列,而更像是一种以时间为权重的有序结构

实现的功能如下:

1)可以设置定时任务

2)可以修改未到执行时间任务属性(包括执行时间

3)可以查询任务情况,人工干预

3.2设计重点

大部分系统的重点一般在:性能可用性安全三方面。

3.2.1性能

及时:时间到了立刻执行,不能延迟太久。(如crontab分钟粒度就太粗了)

3.2.2可用

可靠:保证任务不重不漏的执行,不能丢任务、不能重复执行

高可用可扩展服务尽量不挂、可抗住突发的大量请求

恢复系统挂了或者任务失败/丢失等等,可以恢复

3.2.3其它

可撤回/修改:如果定时任务还没到执行时间,可以修改执行时间和其他内容,也可取消

时间长:有些场景甚至要保存一年以上,比如用户办理年卡后,要有一些策略诱导消费。

四、一些探索

本章不局限于实现所有的目标,提出一些业内常见的实现方案,供大家增长知识面,和最终方案可以有个对比。

请注意看每种方案下的分析

4.0 数据库

在小型项目中,通过一个线程定时数据库,通过执行时间字段判断是否到时,然后进行操作

优点:简单支持集群操作

缺点:   (1)对服务器内存消耗大

(2)存在延迟,执行时间粒度和mysql本身的速度都会影响

(3)假设你的订单有几千万条,每隔几分钟这样扫描一次数据库损耗极大

4.1 DelayQueue 延时队列

4.1.1 介绍

JDK 中提供了一组实现延迟队列的 API,位于Java.util.concurrent包下的 DelayQueue

DelayQueue一个 BlockingQueue,本质就是封装一个 PriorityQueue(优先队列),内部用堆来实现队列元素排序,向 DelayQueue 队列中添加元素时,会给元素一个 Delay(延迟时间)作为排序条件,队列中最小元素优先放在队首。队列中的元素只有到了 Delay 时间才允许从队列中取出

4.1.2简单实现

1)实现 Delayed 接口接口里只有一个 getDelay 方法用于设置延期时间。

2)Order 类中compareTo()负责对队列中的元素进行排序

public class Order implements Delayed {
    //延迟时间
    @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    private long time;
    String name;

    public Order(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return time - System.currentTimeMillis();
    }
    @Override
    public int compareTo(Delayed o) {
        Order Order = (Order) o;
        long diff = this.time - Order.time;
        if (diff <= 0) {
            return -1;
        } else {
            return 1;
        }
    }
}

DelayQueue 的 put 方法线程安全的,内部使用了ReentrantLock进行线程同步

上边只是简单的实现,实际开发中会有专门的线程负责消息的入队与消费。

4.1.3 分析

分析:事实上,如无必要,我们应该尽可能使用语言自带的库,而非过度设计。从这方面考虑,DelayQueue无疑是一个简单优秀的实现,但是在大型项目中,本地存储的方案确实不太适用。

无论如何,我们仍然可以对它大根堆和线程控制方法进行学习和借鉴。

4.2 RabbitMQ

4.2.1死信队列

4.2.2消息生存时间 TTL

  • 一种方式是直接在创建队列的时候设置整个队列的 TTL 过期时间,所有进入队列的消息,都被设置成了统一的过期时间,一旦消息过期,马上就会被丢弃,进入死信队列,在延迟队列的延迟时间为固定值的时候,比较适合使用这种方式:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
  • 另一种方式是针对单条消息设置,参考代码如下,该消息被设置了 6 秒的过期时间:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg content".getBytes());

4.2.4存在的bug

 4.2.5插件实现原理

上面使用 DLX + TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia。

这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =<ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记交换投递至目标队列,整个消息的投递过程也就完成了。

4.2.6 分析

作为网上流传非常广的一种方案,它似乎真的是一种不错的实现。

尤其是,由于 RabbitMQ 本身的消息可靠发送、消息可靠投递、死信队列等特性,可以保障消息至少被消费一次以及未被正确处理的消息不会被丢弃,让消息的可靠性有了保障。

但是这方案有如下缺点:

1、为了解决一个问题,又引入了队列交换机+mq+私信队列交换机+私信队列+插件,我们并不希望引入如此复杂不可控的架构

2、配置麻烦,额外增加死信交换机和死信队列等配置,不好维护

3、不可靠,实际测试环境延时插件有时收不到消息,不是很稳定。配置错误生产者消费者连接的队列错误和其他未知因素都有可能造成延迟失效。

4、真实消费原因不唯一:消息被拒绝、消息过期、消息超长等等原因都会进入死信队列,这种不唯一也是我们无法忍受的。,我们无法知道死信队列中是否都是过期消息。

如上图所示,时间轮是一个存储延迟消息的环形队列,其底层采用数组实现,可以高效循环遍历。这个环形队列中的每个元素对应一个延迟任务列表,这个列表是一个双向环形链表链表中每一项都代表一个需要执行的延迟任务。时间轮会有表盘指针,表示时间轮当前所指时间,随着时间推移,该指针会不断前进,并处理对应位置上的延迟任务列表

4.3.2添加延迟任务

4.3.3多层时间轮

4.3.4具体实现

从实际上说:相比其它MQ,kafka在我的认知里是最优秀的,事实上在我的十万级压测中,它是唯一性能达标的MQ(有些MQ已经接近挂了它还很健康)。同时kafka也有一定的持久化方案。

但是这种方案依旧有一些问题:

1、正如我开头提到的,需求很可能是保存一个月甚至更长时间,超过了默认log.retention.hours(168)的大小。

2、我们希望执行时间视可修改的,但是kafka的消息一旦由生产者发送,则不可变。关于这方面讨论我贴了一个链接感兴趣的可以看看stackoverflow问题https://stackoverflow.com/questions/60046428/what-is-kafkamessage-tweaking

其实探索到想用众多MQ来实现延迟队列时,我越来越清晰的有一种感觉:非逼着众多MQ(比如kafka)做不擅长的事情本身就有问题,人家的定位就是消息队列,而不是替你保存动辄一个月才执行的消息然后精准执行。

4.4 一些其他方案

这里我不准备继续分析所有方案的优缺点了,因为这是很无聊的(而且影响接下来方案叙述的节奏),如Quartz、ActiveMQ、RocketMQ、nsq、pulsar等等,原因无非是性能不达标、时间粒度不够、存储时间不够等等,在这里放一张MQ的对比图:(如果满足你的要求,当然也可以用)

 4.5 Redis ZSet

基于4.3结尾的考虑,首先要有地方做持久化,redis作为nosql的老大,呼之欲出。

4.5.1过期回调

只是提一嘴,这种歪门邪道的实现就不要想了,事实上容易出大问题,有兴趣可以了解。

4.5.2 正解介绍

  1. 入队操作:我们将需要处理的任务,按其需要延迟处理时间作为 Score 加入到 ZSet 中。Redis 的 ZAdd 的时间复杂度是O(logN)N是ZSet 中元素个数,因此我们能相对比较高效的进行入队操作。
  2. 起一个进程定时(比如每隔一秒)查询 ZSet 中 Score 最小的元素,查询结果有两种情况:
  • 查询出的分数小于等于当前时间戳,说明到这个任务需要执行的时间了,则去异步处理该任务;
  • 查询出的分数大于当前时间戳,说明 ZSet 中所有的任务都还没有到需要执行的时间,则休眠一秒后继续查询

4.5.3 分析

但是redis同样有缺点(但是被解决了):

1、定位问题:和上文提到的众多mq一样,redis定位并不是延迟队列。

(经验告诉我们,如果硬要用与需求定位不符的东西,就是容易出问题)

但是由于这种方式实在过于简单好用,在业界确实非常太有市场,我记得redis作者本人都曾经谈过这个问题,告诉大家最好不要把redis用作消息队列之类的,只不过貌似没人听。

2、持久化不是百分百可靠:redis持久化两种方式我就不讲了,最高级的持久化配置就是每次操作都记录,但是由于性能问题,基本不可能这样配(事实上大公司有明确规定不能这样配)。

3、 真实案例,如果qps过高,虽然redis扛得住(和kafka一样,真男人啊),但是我们的服务扛不住。

五、方案

5.1 思考

读者紧接着4.5.3的想法不要断,我们想用redis,但是有三个问题,如何解决呢?

问题2:没的说,上mysql万事大吉。

问题3:redis接MQ,完美解决问题,这不就是mq天天吹的其中一大作用吗。

问题1:我们只是用来排个序,消息队列和持久化都不是redis做了,符合定位

5.2 整体架构

基于这些考虑,最终我们的架构是这样的:

一条

1)将延迟的消息任务通过 hash 算法路由至不同的 Redis Key 上

2)每个 Redis Key 都对应建立一个处理进程,称为 Event ,轮询,查询是否有待处理的延迟消息

3) Event 进程负责业务基本负责分发消息,具体的业务逻辑通过kafka解耦,消费者实现。

4)我们规定消费者一定要上报执行结果以便我们决定是否重复请求

第二条:

1)将消息写入DB(或更改

2)event进程扫过期一段时间的任务(可配置

3)主动请求消费者执行

5.3 细节补充

1)mysql

结构

2)redis

如觉得写mysql这条链路也太麻烦,并且没有存储很久的需求,可以用redis自身的持久化功能,同时开启RDB和AOF,AOF设置everysec,即每秒异步刷盘一次。极端情况下,可能会丢失一秒的数据

高可用使用的是redis的主从复制模式服务高可用方面,在实现过程中考虑了服务节点的横向扩展,Timer、Cleaner等对同一个redis队列的操作都加了分布式锁。每个服务节点都是无状态的,不需要进行元数据同步等操作,少数服务节点宕机不影响整个服务可用性

3)监控

对于消息堆积,以及消息超过重试次数被丢弃等场景说明消费端服务异常,没有正常消费及ack,需要及时上报并通知业务方及服务提供方,方便快速发现并排查问题。

4)mq

mq的选择上文有对比图,详细分析以后补充吧。

六、总结和QA

分析架构

可能很多人会有些疑惑:你抛出来看起来这么复杂的图,实现起来是不是很麻烦?它真的很好用吗?

下面首先回忆第三章,看是否实现目标。

性能出发,整条链路看:

1)redis key可以增加,不用担心量大影响性能

2)event定时任务每秒轮询,基本没延迟

3)event业务逻辑,校验+转发个消息很快

3)消息队列选用的性能最强的kafka

实际测试也符合要求

从可用出发,整条链路看:

1)将延迟的消息任务通过 hash 算法路由至不同的 Redis Key 上,这样做有两大好处:

  • 避免了当一个 KEY 在存储了较多的延时消息后,入队操作以及查询操作速度变慢的问题(两个操作的时间复杂度均为O(logN))。
  • 系统具有更好的横向可扩展性,当数据量激增时,我们可以通过增加Redis Key 的数量来快速扩展整个系统

但是会存在一个问题,因为增加key的数量,必然涉及到hash算法范围的调整,那么原先集合中的元素就不能通过新的hash算法路由到,所以需要采用一致性hash算法。

2)所有的 Event 进程负责分发消息,具体的业务逻辑通过MQ解耦,由消费者异步处理,这么做的好处也是显而易见的:

  • Event 进程只负责分发消息,那么其处理消息的速度就会非常快,就不太会出现因为业务逻辑复杂而导致消息堆积的情况。
  • 采用一个额外的消息队列后,消息处理的可扩展性也会更好,我们可以通过增加消费者进程数量来扩展整个系统的消息处理能力。
  • Event 是多机多进程模型,保证整个系统的高可用性。采用 Zookeeper 选主的方式,保证同一时间只会有一个进程去处理消息,一旦 Zookeeper 的 leader 主机宕机,会自动选择新的 leader 来处理。

从其他功能看:

要求可恢复、可撤回、可修改、保存时间超长。

我们用mysql解决了大部分问题,修改时,记得把redis也改了(这也是比用kafka好的点,可修改)

到底如何设计?

这是本人一点粗浅的理解

回看我们的方案,第一,无论是基于死信队列还是数据先存储(mysql/redis)后投递(kafka),亦或是redis超时时间,本质上都是将延迟待发送的消息数据与正常订阅的队列分开存储,这么做一方面降低耦合度,另一方面也是为了降低数据不可控的时间。

这也是我想说的,经验告诉我们,大量数据存在不可操作、不可见的地方是一件很糟糕的事(本文提的很多方案有这个毛病),服务不是写完就完了,还要维护,所以我们尽量不要这么做。

第二,既然选择数据分离,整条链路的存储组件和队列组件的选择,按需选择,十分重要。

本方案就是mysql/redis+kafka

第三、无论是检查队头消息TTL还是调度存储的数据,本质上都是通过定时任务来完成的,定时任务的触发策略也是决定你方案优劣的决定性因素:你是crontab配置,还是主备选举策略、还是大家一起抢分布式锁,也值得根据具体情况具体分析

还是觉得太复杂,能否简化一点?

可以,我的建议是,如果qps不高的话,去掉kafka会是一个简单方案。

多线程如何处理?

如果你指的是,高并发场景下存在同一条消息被多次消费的情况,你可以使用分布式锁,如zookpeerredis的红锁、自己做一个等等。

本方案目前不存在这类问题

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注