消息队列,顾名思义,就是一个存放消息的队列。最简单的消息队列包含3个角色

1 基于List 结构实现消息队列

Redis的List是一个双向列表可以从两端存入数据或者取出数据

利用list 结构实现的消息队列主要是依据阻塞指令 BLPOP/RPOP 来模拟消费者监听队列,直到队列中消失时获得该数
优点: 实现简单,且可以持久
缺点: 只能有一个消费者来消费数据,且只能消费一次,无法避免消息的丢失

2 基于PubSub(发布/订阅

PubSub 是一个基于点对点的消息模型消费者可以订阅一个或者多个chanel,当生产者向队列发送了消息时,消费者只要订阅了频道就可以收到处理消息
在这里插入图片描述

使用PSUBSCRIBE pattren 时,支持多种通配符

1

textcolor{blue}{1}

1 ?:匹配个字符

2

textcolor{blue}{2}

2 * :匹配个字符或多个字符

3

textcolor{blue}{3}

3 [] :选择匹配匹配[]中定义字符hell[ae]o 可以匹配 hellohellao

使用PubSub 实现的消费队列时,支持

多生产、多消费

textcolor{red}{多生产、多消费}

多生产、多消费模式,不过PubSub不支持数据持久化,相较于List,它本身就不是一个数据结构无法利用Redis持久数据。并且无法避免消息的丢失,如生产者无人订阅的频道发消息时,数据丢失。另外还会出现由于消费者缓存空间有效,超时缓存上限时,将会出现消息的丢失。由于这些缺点,redis的PUBSUB模式,无法满足对可靠性要求较高的服务

3 基于Stream 数据结构

Stream 是redis5.0 及之后针对消息队列场景设计

数据结构

textcolor{red}{数据结构}

数据结构,因此数据的安全性得到了保障,因为可以持久化。相较于List 数据结构实现的消息队列的方式,有更多针对消息队列的单独命令可以实现一个功能更加完善的消息队列

发送消息

参数说明

K

e

y

textcolor{red}{Key}

Key : 存储消息的队列的名字

[

N

O

M

K

S

T

R

E

A

M

]

textcolor{blue}{[NOMKSTREAM] }

[NOMKSTREAM] :可选参数是否在队列不存在时,创建队列。默认创建

[

<

M

A

X

L

E

N

M

I

N

I

D

>

[

=

 

]

t

h

r

e

s

h

o

l

d

[

L

I

M

I

T

c

o

u

n

t

]

]

textcolor{green}{[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]}

[<MAXLENMINID>[= ]threshold[LIMITcount]] :可选参数设置消息队列的最大消息数,默认是设上限的

<

i

d

>

textcolor{orange}{<* | id>}

<id> :消息的唯一id,* 表示redis自动生成格式时间_递增值 如 1526919030474-0。Id值也可以自定义

F

i

e

l

d

v

a

l

u

e

[

F

i

e

l

d

v

a

l

u

e

.

.

.

]

textcolor{purple}{Field value [Field value …]}

Fieldvalue[Fieldvalue] 消息体

读取消息

  • XREAD

    [

    C

    O

    U

    N

    T

    c

    o

    u

    n

    t

    ]

    textcolor{red}{[COUNT count] }

    [COUNTcount]

    [

    B

    L

    O

    C

    K

    m

    i

    l

    l

    i

    s

    e

    c

    o

    n

    d

    s

    ]

    textcolor{blue}{[BLOCK milliseconds]}

    [BLOCKmilliseconds]

    S

    T

    R

    E

    A

    M

    S

    k

    e

    y

    [

    k

    e

    y

    .

    .

    .

    ]

    textcolor{green}{STREAMS key [key …] }

    STREAMSkey[key]

    i

    d

    [

    i

    d

    .

    .

    .

    ]

    textcolor{orange}{id [id …]}

    id[id]

参数说明

[

C

O

U

N

T

c

o

u

n

t

]

textcolor{red}{[COUNT count] }

[COUNTcount] 可选参数, 指定读取消息的条数

[

B

L

O

C

K

m

i

l

l

i

s

e

c

o

n

d

s

]

textcolor{blue}{[BLOCK milliseconds]}

[BLOCKmilliseconds]没有消息时,读取队列消息的阻塞时长,当设置为0时,永久等待,直到读取到队列中消息

S

T

R

E

A

M

S

k

e

y

[

k

e

y

.

.

.

]

textcolor{green}{STREAMS key [key …] }

STREAMSkey[key] 需要读取的队列的key名字可以多个列中读取数据

i

d

[

i

d

.

.

.

]

textcolor{orange}{id [id …]}

id[id]取消息的起始Id 。有两个特殊的id,0 表示第一个消息读起,$ 表示读取最新一条消息

在读取消息时,可以通过while(true) 循环 调用xread block 0 streams key $ 去永久的监听队列去获得消息。不过这种模式下会出现一个问题,在获得消息并处理消息这个时间间隙中,可能生产者又往队列中增加了好几条消息,由于Id 为$ 只会读取最新一条消息,那么可能会出现消息的漏读。这里可以采用基于消费者组去读取消

3.1 基于消费者组去消费消息

可以将多个消费者划分到一个组中,其中每个组消费消息时都会维护一个最后消费消息的标识

L

a

s

t

d

e

l

i

v

e

r

e

d

i

d

textcolor{red}{Last delivered id}

Lastdeliveredid,当宕机重启后,直接从该标识id之后的消息消费。意味者不会重复消费消息。
在消费者组中还维护了一个 Pending_ids集合,该集合存放了未确认【ACK】消费数据的消息Id,
机器出现宕机后重启,可继续确认处理的消息。可以通过

X

A

C

K

textcolor{red}{XACK}

XACK确认客户端确认已经消费了消息,之后从Pending_ids集合移除

基于消费者组消费消息时,最大程度的保证了消息的安全消费、不重复消费。
在这里插入图片描述

创建消费者组
XGROUP

C

R

E

A

T

E

textcolor{red}{CREATE}

CREATE

K

E

Y

textcolor{green}{KEY }

KEY

G

R

O

U

P

N

A

M

E

textcolor{blue}{GROUPNAME }

GROUPNAME

I

D

textcolor{orange}{ID}

ID

[

M

K

S

T

R

E

A

M

]

textcolor{purple}{ [MKSTREAM]}

[MKSTREAM]

C

R

E

A

T

E

textcolor{red}{CREATE}

CREATE创建

K

E

Y

textcolor{green}{KEY }

KEY :基于哪个队列去创建组

G

R

O

U

P

N

A

M

E

textcolor{blue}{GROUPNAME }

GROUPNAME :创建的消费者组名称

I

D

textcolor{orange}{ID}

ID 消息的标识id。0 从头消费 $ 消费最新的消息

[

M

K

S

T

R

E

A

M

]

textcolor{purple}{ [MKSTREAM]}

[MKSTREAM] : 可选参数,当队列不存在时,是否创建队列,默认是创建

从消费者组中消费消息

XGROUPREAD GROUP

g

r

o

u

p

textcolor{red}{group }

group

c

o

n

s

u

m

e

r

textcolor{green}{consumer }

consumer

[

C

O

U

N

T

c

o

u

n

t

]

textcolor{blue}{[COUNT count] }

[COUNTcount]

[

B

l

o

c

k

m

i

l

l

i

s

e

c

o

n

d

s

]

textcolor{orange}{[Block milliseconds] }

[Blockmilliseconds]

[

N

O

A

C

K

]

textcolor{purple}{ [NOACK] }

[NOACK]

S

T

R

E

A

M

S

K

E

Y

[

k

e

y

.

.

.

]

textcolor{red}{STREAMS KEY [key …]}

STREAMSKEY[key]

I

D

[

I

D

.

.

.

.

.

]

textcolor{green}{ ID [ID…..] }

ID[ID…..]

g

r

o

u

p

textcolor{red}{group }

group : 组的名字定义从哪个消费者组消费消息

c

o

n

s

u

m

e

r

textcolor{green}{consumer }

consumer :消费者名字,如果不存在,自动创建

[

C

O

U

N

T

c

o

u

n

t

]

textcolor{blue}{[COUNT count] }

[COUNTcount] :消费数量

[

B

l

o

c

k

m

i

l

l

i

s

e

c

o

n

d

s

]

textcolor{orange}{[Block milliseconds] }

[Blockmilliseconds] :可选参数,阻塞时长【单位ms】,不设置时为非阻塞消费。

[

N

O

A

C

K

]

textcolor{purple}{ [NOACK] }

[NOACK] :可选参数,是否自动确认true时消息不会进入pending_ids[] 集合中,可能会有未消费的消息。所以为了安全性,无需设置

S

T

R

E

A

M

S

K

E

Y

[

k

e

y

.

.

.

]

textcolor{red}{STREAMS KEY [key …]}

STREAMSKEY[key] : 监听的队列的名字

I

D

[

I

D

.

.

.

.

.

]

textcolor{green}{ ID [ID…..] }

ID[ID…..] :获得消息的起始ID 。
设置成 ">" :从下一个未消费的消息开始消费。
设置成其他:均是从pending-list中获得已消费但是未确认的消息,如0 ,从pending-list第一个消息开始。
根据实际情况可设置不同的ID 去消费消息。正常读取设置> 异常读取未确认的消息

确认消息
XACK

k

e

y

textcolor{red}{key }

key

g

r

o

u

p

textcolor{green}{group }

group

I

D

[

I

D

.

.

.

]

textcolor{blue}{ ID [ID…] }

ID[ID]

k

e

y

textcolor{red}{key }

key :队列名

g

r

o

u

p

textcolor{green}{group }

group :组名称

I

D

[

I

D

.

.

.

]

textcolor{blue}{ ID [ID…] }

ID[ID] :待确认的消息Id

3.2 数据测试

(1) 向order列中添加4条消息

xadd order * voucherId 9 userId 150 orderId 79297921056506055
xadd order * voucherId 9 userId 129 orderId 79297921056506083
xadd order * voucherId 9 userId 111 orderId 79297921056506108
xadd order * voucherId 9 userId 111 orderId 79297921056506101

在这里插入图片描述
(2) 向order队列创建消费者组group_1

## 消费者组从头开始消费数据
XGROUP CREATE order  group_1 0 

(3) 从消费者组中消费消息

## 消费最新的未消费的消息,采用阻塞式获取,最长等待2000ms
XREADGROUP GROUP group_1 consumer_1 COUNT 1 BLOCK 2000 STRAEAMS order >

在这里插入图片描述
第五次消费时,阻塞等待返回空。队列中的消息全部消费,此时都处于为确认状态,全部存入了penging-list中。
此时需要手动确认这些消息确实已经被成功的消费了,需要手动确认将其从pending-list 集合中移除

(4) 手动确认已经消费的消息

 XACK order group_1 1691146911471-0  1691148054821-0 1691148657217-0  1691202770386-0

原文地址:https://blog.csdn.net/contact97/article/details/132105711

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_31844.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注