消息队列,顾名思义,就是一个存放消息的队列。最简单的消息队列包含3个角色
- 生产者:将消息存入队列中
- 队列:存放和管理消息
- 消费者: 将消息从队列中取出来并做业务处理
R
L
结构、
P
u
S
u
b
、
S
结构
textcolor{red}{Redis 提供了三种实现消息队列的方式,基于List结构、PubSub、Stream结构}
1 基于List 结构实现消息队列
Redis的List是一个双向列表。可以从两端存入数据或者取出数据。
利用list 结构实现的消息队列主要是依据阻塞取指令 BLPOP/RPOP 来模拟消费者监听队列,直到队列中有消失时获得该数据
优点:
实现简单,且可以持久化
缺点:
只能有一个消费者来消费数据,且只能消费一次,无法避免消息的丢失
2 基于PubSub(发布/订阅)
PubSub 是一个基于点对点的消息模型,消费者可以订阅一个或者多个chanel,当生产者向队列发送了消息时,消费者只要订阅了频道就可以收到并处理消息
- PUBLISH channel message
将信息 message 发送到指定的频道 channel
- SUBSCRIBE channel [channel …]
订阅一个或多个频道
- PSUBSCRIBE pattren
订阅与通配符匹配的chanel
在使用PSUBSCRIBE pattren 时,支持多种通配符
1
2
3
多生产、多消费
多生产、多消费的模式,不过PubSub不支持数据的持久化,相较于List,它本身就不是一个数据结构无法利用Redis持久化数据。并且无法避免消息的丢失,如生产者向无人订阅的频道发消息时,数据会丢失。另外还会出现由于消费者的缓存空间有效,超时缓存上限时,将会出现消息的丢失。由于这些缺点,redis的PUBSUB模式,无法满足对可靠性要求较高的服务。
3 基于Stream 数据结构
Stream 是redis5.0 及之后针对消息队列场景设计的
数据结构
数据结构,因此数据的安全性得到了保障,因为可以持久化。相较于List 数据结构实现的消息队列的方式,有更多针对消息队列的单独命令,可以实现一个功能更加完善的消息队列
发送消息
- XADD
[
N
O
M
K
S
T
R
E
A
M
]
textcolor{blue}{[NOMKSTREAM] }
[
<
M
A
X
L
E
N
∣
M
I
N
I
D
>
[
=
∣
]
t
e
s
l
[
L
I
M
I
T
u
n
t
]
]
textcolor{green}{[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]}
<
∗
∣
i
d
>
F
i
e
l
d
a
l
u
e
[
F
i
e
l
d
v
a
l
u
e
.
.
.
]
textcolor{purple}{Field value [Field value …]}
K
e
[
N
O
M
K
S
T
R
E
A
M
]
textcolor{blue}{[NOMKSTREAM] }
[NOMKSTREAM] :可选参数,是否在队列不存在时,创建队列。默认是创建的
[
<
M
A
X
L
E
N
∣
M
I
N
I
D
>
[
=
∣
]
t
r
e
s
o
l
d
[
L
I
M
I
T
c
o
u
n
t
]
]
textcolor{green}{[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]}
[<MAXLEN∣MINID>[=∣ ]threshold[LIMITcount]] :可选参数,设置消息队列的最大消息数,默认是设上限的
<
∗
∣
i
d
>
<∗∣id> :消息的唯一id,* 表示有redis自动生成,格式是时间戳_递增值 如 1526919030474-0。Id值也可以自定义。
F
i
e
l
d
v
a
l
u
e
[
F
i
e
l
d
v
a
l
u
e
.
.
.
]
textcolor{purple}{Field value [Field value …]}
Fieldvalue[Fieldvalue…] 消息体
读取消息
- XREAD
[
C
O
U
N
T
c
o
u
n
t
]
textcolor{red}{[COUNT count] }
[
B
L
O
C
K
m
i
l
l
i
s
e
c
o
n
d
s
]
textcolor{blue}{[BLOCK milliseconds]}
S
T
R
E
A
M
S
k
e
[
k
e
.
.
.
]
textcolor{green}{STREAMS key [key …] }
i
d
[
i
d
.
.
.
]
[
C
O
U
N
T
c
o
u
n
t
]
textcolor{red}{[COUNT count] }
[
B
L
O
C
K
m
i
l
l
i
s
e
c
o
n
d
s
]
textcolor{blue}{[BLOCK milliseconds]}
[BLOCKmilliseconds] 当没有消息时,读取队列消息的阻塞时长,当设置为0时,永久等待,直到读取到队列中消息
S
T
R
E
A
M
S
k
e
y
[
k
e
y
.
.
.
]
textcolor{green}{STREAMS key [key …] }
STREAMSkey[key…] 需要读取的队列的key名字,可以从多个队列中读取数据
i
d
[
i
d
.
.
.
]
在读取消息时,可以通过while(true) 循环 调用xread block 0 streams key $ 去永久的监听队列去获得消息。不过这种模式下会出现一个问题,在获得消息并处理消息这个时间间隙中,可能生产者又往队列中增加了好几条消息,由于Id 为$ 只会读取最新的一条消息,那么可能会出现消息的漏读。这里可以采用基于消费者组去读取消息
3.1 基于消费者组去消费消息
可以将多个消费者划分到一个组中,其中每个组消费消息时都会维护一个最后消费消息的标识
L
a
s
t
d
e
l
i
v
e
r
e
d
i
d
textcolor{red}{Last delivered id}
Lastdeliveredid,当宕机重启后,直接从该标识id之后的消息消费。意味者不会重复消费消息。
在消费者组中还维护了一个 Pending_ids集合,该集合中存放了未确认【ACK】消费数据的消息Id,
机器出现宕机后重启,可继续确认未处理的消息。可以通过
X
A
C
K
textcolor{red}{XACK}
XACK来确认客户端确认已经消费了消息,之后从Pending_ids集合中移除。
基于消费者组消费消息时,最大程度的保证了消息的安全消费、不重复消费。
创建消费者组
XGROUP
C
R
E
A
T
E
textcolor{red}{CREATE}
CREATE
K
E
Y
textcolor{green}{KEY }
KEY
G
R
O
U
P
N
A
M
E
textcolor{blue}{GROUPNAME }
GROUPNAME
I
D
textcolor{orange}{ID}
ID
[
M
K
S
T
R
E
A
M
]
textcolor{purple}{ [MKSTREAM]}
[MKSTREAM]
C
R
E
A
T
E
textcolor{red}{CREATE}
CREATE :创建组
K
E
Y
textcolor{green}{KEY }
KEY :基于哪个队列去创建组
G
R
O
U
P
N
A
M
E
textcolor{blue}{GROUPNAME }
GROUPNAME :创建的消费者组名称
I
D
textcolor{orange}{ID}
[
M
K
S
T
R
E
A
M
]
从消费者组中消费消息
XGROUPREAD GROUP
g
r
o
u
p
textcolor{red}{group }
group
c
o
n
s
u
m
e
r
textcolor{green}{consumer }
consumer
[
C
O
U
N
T
c
o
u
n
t
]
textcolor{blue}{[COUNT count] }
[COUNTcount]
[
B
l
o
c
k
m
i
l
l
i
s
e
c
o
n
d
s
]
textcolor{orange}{[Block milliseconds] }
[
N
O
A
C
K
]
textcolor{purple}{ [NOACK] }
[NOACK]
S
T
R
E
A
M
S
K
E
Y
[
k
e
y
.
.
.
]
textcolor{red}{STREAMS KEY [key …]}
STREAMSKEY[key…]
I
D
[
I
D
.
.
.
.
.
]
textcolor{green}{ ID [ID…..] }
ID[ID…..]
g
r
o
u
p
textcolor{red}{group }
c
o
n
s
u
m
e
r
textcolor{green}{consumer }
[
C
O
U
N
T
c
o
u
n
t
]
textcolor{blue}{[COUNT count] }
[COUNTcount] :消费数量
[
B
l
o
c
k
m
i
l
l
i
s
e
c
o
n
d
s
]
textcolor{orange}{[Block milliseconds] }
[Blockmilliseconds] :可选参数,阻塞时长【单位ms】,不设置时为非阻塞消费。
[
N
O
A
C
K
]
textcolor{purple}{ [NOACK] }
[NOACK] :可选参数,是否自动确认。true时消息不会进入pending_ids[] 集合中,可能会有未消费的消息。所以为了安全性,无需设置。
S
T
R
E
A
M
S
K
E
Y
[
k
e
y
.
.
.
]
textcolor{red}{STREAMS KEY [key …]}
STREAMSKEY[key…] : 监听的队列的名字
I
D
[
I
D
.
.
.
.
.
]
textcolor{green}{ ID [ID…..] }
ID[ID…..] :获得消息的起始ID 。
设置成 ">" :从下一个未消费的消息开始消费。
设置成其他:均是从pending-list中获得已消费但是未确认的消息,如0 ,从pending-list中第一个消息开始。
根据实际情况可设置不同的ID 去消费消息。正常读取设置> 异常读取未确认的消息
确认消息
XACK
k
e
y
textcolor{red}{key }
key
g
r
o
u
p
textcolor{green}{group }
group
I
D
[
I
D
.
.
.
]
textcolor{blue}{ ID [ID…] }
ID[ID…]
k
e
y
textcolor{red}{key }
key :队列名称
g
r
o
u
p
textcolor{green}{group }
group :组名称
I
D
[
I
D
.
.
.
]
textcolor{blue}{ ID [ID…] }
ID[ID…] :待确认的消息Id
3.2 数据测试
xadd order * voucherId 9 userId 150 orderId 79297921056506055
xadd order * voucherId 9 userId 129 orderId 79297921056506083
xadd order * voucherId 9 userId 111 orderId 79297921056506108
xadd order * voucherId 9 userId 111 orderId 79297921056506101
## 消费者组从头开始消费数据
XGROUP CREATE order group_1 0
(3) 从消费者组中消费消息
## 消费最新的未消费的消息,采用阻塞式获取,最长等待2000ms
XREADGROUP GROUP group_1 consumer_1 COUNT 1 BLOCK 2000 STRAEAMS order >
第五次消费时,阻塞等待后返回空。队列中的消息全部消费,此时都处于为确认状态,全部存入了penging-list中。
此时需要手动确认这些消息确实已经被成功的消费了,需要手动确认将其从pending-list 集合中移除
(4) 手动确认已经消费的消息
XACK order group_1 1691146911471-0 1691148054821-0 1691148657217-0 1691202770386-0
原文地址:https://blog.csdn.net/contact97/article/details/132105711
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_31844.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!