本文介绍: 1、定义Redis:REmote DIctionary Server(远程字典服务器),是完全开源免费的,用C语言编写的,遵守BSD协议,是一个高性能的(keyvalue分布式内存数据库基于内存运行支持持久化的NoSql数据库,是当前最热门的NoSQL数据库之一,也被人们称之为数据库结构服务器。Redis支持数据化的持久化,可以将内存中的数据保存磁盘中,重启时候可以再次加载进行使用

一、Redis入门介绍

1、入门概述

1、定义

Redis:REmote DIctionary Server(远程字典服务器),是完全开源免费的,用C语言编写的,遵守BSD协议,是一个高性能的(keyvalue分布式内存数据库基于内存运行支持持久化的NoSql数据库,是当前最热门的NoSQL数据库之一,也被人们称之为数据库结构服务器。

Redis与其他keyvalue缓存产品有以下三个特点:

2、能干嘛

3、去哪下

下载地址

4、怎么玩

数据类型基本操作配置

持久化和复制、RDB/AOF

事务控制

主从复制

5、Rides与MemCached区别

Redis是单线程+多路IO复用技术

(与Memcache三点不同: 支持数据类型支持持久化,单线程+多路IO复用

image-20220725163004260

2、Redis的安装

Linux安装

tar -zxvf redis-6.2.7.tar.gz
cd redis-6.2.7
修改配置文件redis.conf

daemonize no修改daemonize yes //这是开启后端运行

设置连接端口
redis-cli -h 127.0.0.1 -p 6379  

禁用bind 127.0.0.1或者将bind 127.0.0.1改为0.0.0.0

将protected-mode yes 改为 protected-mode no

cd /usr/local/bin
ps -ef|grep redis
 redis-server /opt/redis-6.2.7/redis.conf 
redis-cli shutdown

3、Redis启动后杂项基础知识讲解

二、Redis数据类型

1、Redis五大数据类型介绍

image-20220725172110710

如图所示内部为当前字符串实际分配空间capacity一般要高于实际字符串长度len。当字符串长度小 于1M时,扩容都是加倍现有空间,如果超过1M,扩容一次只会多扩1M的空间。

2、哪里去获得redis常见数据类型操作命令

Redis命令参考

3、Redis 键(key

常用命令

4、Redis字符串(String)

常用命令

5、Redis列表(List)【单键多value】

性能总结:
他是一个字符串链表,leftright都可以插入添加;果键不存在创建新的链 表;如果键已存在新增内容;如果值全部移除对应的键也就消失了。
链表的操作无论是头还是效率都极高,但假如对中间元素操作,效率就很惨淡了。

6、Redis集合(Set)【单键多value】

sismember 判断集合key是否含有该value值


- ```markdown
scard <key>  获取集合里面元素的个数

7、Redis哈希(Hash)【KV模式不变,但V是一个键值对】

hmset … 批量设置hash的值
hmget
hgetall
hedl


- ```shell
hlen
  
- ```markdown
  hsetnx <key><field><value>   将Hash表中key中的域field的值设置为value,当且仅当域field不存在

8、Redis有序集合Zset(sorted set)

在set基础上,加一score值。之前set是k1 v1 v2 v3,现在zset是k1 score1 v1 score2 v2

三、解析配置文件redis.conf

1、他在哪

默认安装目录在:/opt/redis-6.2.7

拷贝到:/myredis

2、#########Units单位#########

配置大小单位开头定义了一些基本的度量单位,只支持bytes,不支持bit

大小写敏感

3、########INCLUDE包含#######

redis.conf可以包含其他配置文件

4、####网络相关配置####
5、######GENERAL#######:
6、########SNAPSHOTTING快照########
save 秒钟  写操作的次数

RDB是整个内存的压缩过的Snapshot,RDB的数据结构,可以配置复合快照触发条件

默认:

是一分钟内改了1万次

或5分钟内改了10万次

或15分钟内改了1次

禁用:如果想禁用RDB持久化的策略,只要不设置任何save指令,或者给save传入一个空字符串参数也可以

Stop-writes-on-bgsave-error

默认值为yes,如果配置no表示不在乎数据不一致或者有其他的手段发现控制

rdbcompression

rdbcompression:对于存rdb储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。如果你不想消耗CPU来进行压缩的话,可以设置为关闭功能。【推荐压缩

rdbchecksum

默认为yes;在存储快照后,还可以让redis使用CRC64算法来进行数校验,但是这样做会增加大约10%的性能消耗,如果希望获取最大性能提升,可以关闭功能。【推荐yes】

7、PEPLICATION复制
8、############SECURITY安全33############

访问密码的查看、设置和取消

​ 在命令中设置密码,只是临时的。重启redis服务器,密码就还原了。

​ 永久设置,需要再配置文件中进行设置。

127.0.0.1:6379> config get requirepass
1) "requirepass"
2) ""
设置密码验证
127.0.0.1:6379> config set requirepass "123456"
127.0.0.1:6379> ping
(error) NOAUTH Authentication required.
127.0.0.1:6379> auth 123456
OK
127.0.0.1:6379> config set requirepass ""
OK
9、######LIMITS限制########
10、#######APPEND ONLY MODE追加######
11、常见配置redis.conf介绍
1.Redis默认不是以守护进程方式运行,可以通过配置修改,使用yes启用守护进程
deanonize no
2.当Redis以守护进程方式运行时,Redis默认会把pid写入/var/run/redis.pid文件,可以通过pidfile指定。
pidfile /var/run/redis.pid
3.指定Redis监听端口,默认端口为6379,作者自己一篇博文中揭示了为什么选用6379作为默认端口,因为6379在手机上按键上MERZ对应的号码,而NERZ取自意大利女Alessia Merz名字
port 6379
4.绑定主机地址
bind 127.0.0.1
5.当前客户端闲置多长时间后关闭连接,如果指定为0,表示关闭功能
timeout 300
6.指定日志记录级别,Redis总共支持四个级别:debug、verbose、noticewarning,默认为verbose
loglevel verbose
7.日志记录方式,默认为表准输出,如果配置Redis为守护进程方式运行,而这里有配置为日志记录方式为标准输出,则日志将会发送给/dev/null
logfile stdout
8.设置数据库的数量,默认数据库为0,可以使用SELECT<dbid>命令在连接上指定数据库id
database 16
9.指定在多长时间内,有多少次更新操作,将数据同步数据文件,可以多个条件配合
save<seconds><changes>
Redis默认配置文件中提供了三个条件save 900 1
	save 300 10
	save 60 10000
	分别表示900秒(15分钟)内有1个更改,300秒(5分钟)内有10个更改以及60秒内有10000个更改。
	注:每当FLUSHALL或者SHUTDOWN时,dump.rdb都会被清空,所以在SHUTDOWN前备份一下			dump.rdb文件别的电脑。如果没到规定时间也想保存,可以使用save命令。
10.指定存储本地数据库时是否压缩数据,默认为yes,Redis采用LZF压缩,如果为了节省CPU时间,可以关闭选项,但会导致数据文件变的巨大
rdbcompression yes
11.指定本地数据库文件名,默认值dump.rdb
dbfilename dump.rdb
12.指定本地数据库存放目录
dir ./
13.设置当本机为slav服务时,设置master服务的IP地址及端口,在Redis启动时,他会自动master行数同步
slaveof <masterip> <masterport>
14.当master服务设置了密码保护时,slav服务连接master的密码
masterauth <master-password>
15.设置Redis连接密码,如果配置了连接密码,客户端在连接Redis是需要通过AUTH<password>命令提供密码,默认关闭
requirepass foobared
16.设置同一时间最大客户连接数,默认无限制,Redis可以同时打开客户连接数为Redis进程可以打开的最大文件描述符数,如果设置maxclients 0,表示不作限制。当客户连接数到大限制时,Redis会关闭新的连接并向客户端返回max number of client reached错误信息
maxclients 128
17.指定Redis最大内存限制,Redis在启动时会把数据加载到内存文件中,达到最大内存后,Redis会显尝试清除已到期或即将到期的Key,当此方法处理后,仍然到大最大内存设置,将无法再进行写入操作,但仍然可以进行读取操作。Redis新的vm机制,会把Key存放内存,Value会存放swap区
maxmemory <bytes>
18.指定是否在每次更新操作进行日志记录,Redis在默认情况下是异步的把数据写入磁盘,如果不开启可能会在断电时导致一段时间内的数据丢失。因为redis本身同步数据文件是按上面的save条件同步的,所以有的数据会在一段时间内只存在于内存中。默认为no
19.指定更新日志文件名,默认为appendonly.aof
appendfilename appendonly.aof
20.指定更新日志文件,共有3个可选值
	no:表示等操作系统行数缓存同步磁盘(快)
	always:表示每次更新操作后手动调用fsync()将数据写到磁盘(慢,安全)
	everysec:表示每秒同步一次(这种是默认值)
appendfsync everysec
21. 指定是否用虚拟内存机制默认值为no,简单介绍一下,VM机制将数据分页存放,由Redis将访问量较少的页面即冷数据swap到磁盘上,访问多的页面磁盘自动换出到内存中
vm-enable no
22.虚拟内存文件路径默认值为/tmp/redis.swap,不可多个Redis实例共享
vm-swap-file /tmp/redis.swap
23.将所有大于vm-max-memory的数据存入虚拟内存,无论vm-max-memory设置多小,所有索引数据都是内存存储的(Redis的索引数据,就是keys)也就是说,当vm-max-memory设置为0的时候,其实是所有value都存在于磁盘。默认值为0
vm-max-memory 0
24.Redis.swap文件分成了很多page,一个对象可以保存在多个page上面,但一个page上不能被多个对象共享,vm-page-size是要根据存储的数据大小来设定的,作者建议如果存储很多小对象,page大小最好设置为32或者64bytes;如果存储很大对象,则可以使用更大的page,如果不确定,就是用默认值
vm-page-size 32
25.设置swap文件中的page数量,由于页表(一种表示页面空闲或使用的bitmap)是存放在内存中的,在磁盘上每8个pages将消耗1byte的内存。
vm-page 134217728
26.设置访问swap文件的线程数最好不要超过机器的核数,如果设置为0,那么所有对swap文件的操作都是串行的,可能会造成比较长时间的延迟默认值为4
vm-max-threads 4
27.设置在向客户端应答时,是否把较小的包合并一个包发送,默认开启
glueoutputbuf yes
28.指定在超过一定数量或者最大的元素超过某一临界值时,采用一种特殊哈希算法
hash-max-zipmap-entries 64
hash-max-zipmap-value 512
29.指定是否激活重置哈希,默认为开启
activerehashing yes
30.指定包含其他的配置文件,可以在同一主机上多个Redis实例之间使用同一份配置文件,而同时各个实例有拥有自己的特定配置文件
include /path/to/local.conf

四、Redis的持久化

1、RDB(Redis DataBase)

1、是什么

在指定的时间间隔内将内存中的数据集快照写入磁盘,,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。

2、备份如何执行的

Redis会单独创建fork)一个子进程来进行持久化,会将数据入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失

image-20220727191339895

3、Fork

Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量环境变量程序计数器等)数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程。

4、dump.rdb

RDB保存的是dump.rdb文件,文件名默认为dump.rdb

image-20220727194209659

5、配置文件位置

默认为Redis启动时命令行所在的目录

/usr/local/bin

6、如何触发RDB快照

image-20220727195023782

对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。

如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能推荐yes.

7、如何恢复RDB快照
8、优势&amp;劣势
9、如何停止

动态所有停止RDB保存规则方法:redis-cli config set save “ ”

10、总结
  • RDB是一个非常紧凑的文件
  • RDB在保存RDB文件时父进程唯一要做的就是fork出一个子进程,接下来工作全部由子进程来做,父进程不需要再做其他IO操作,所以RDB持久化方式可以最大化redis的性能
  • 与AOF相比,在恢复大的数据集的时候,RDB方式会更快一些
  • 数据丢失风险
  • RDB需要经常fork子进程来保存数据集到硬盘上,当数据集比较大的时候fork过程是非常耗时的,可能会导致Redis在一些毫秒级不能响应客户端请求

2、AOF(Append Only FIle)

1、是什么

以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取改文件重新构建数据,换言之,redis重启的话就跟据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作

2、AOF保存的是appendonly.aof文件
3、配置位置

/usr/local/bin

4、AOF持久化流程

(1)客户端的请求写命令会被append追加到AOF缓冲区内;

(2)AOF缓冲区根据AOF持久化策略[always、everysec、no]将操作sync同步到磁盘的AOF文件中;

(3)AOF文件大小超过重写策略手动重写时,会对AOF文件进行rewrite重写,压缩AOF文件容量

(4)Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;

image-20220727204852793

5、AOF和RDB同时开启,redis听谁的?

AOF和RDB同时开启系统默认读取AOF的数据(数据不会存在丢失

6、AOF启动/修复/恢复
7、AOF同步频率设置

appendfsync always
始终同步,每次Redis的写入都会立刻计入日志;性能较差但数据完整性比较好

appendfsync everysec(默认)
每秒同步,每秒计入日志一次,如果宕机,本秒的数据可能丢失

appendfsync no
redis不主动进行同步,把同步时机交给操作系统

8、Rewrite压缩
9、重写流程

(1)bgrewriteaof触发重写,判断是否当前有bgsave或bgrewriteaof在运行,如果有,则等待该命令结束后再继续执行。

(2)主进程fork出子进程执行重写操作,保证主进程不会阻塞。

(3)子进程遍历redis内存中数据到临时文件,客户端的写请求同时写入aof_buf缓冲区和aof_rewrite_buf重写缓冲区保证原AOF文件完整以及新AOF文件生成期间的新的数据修改动作不会丢失

(4)1).子进程写完新的AOF文件后,向主进程发信号,父进程更新统计信息。2).主进程把aof_rewrite_buf中的数据写入到新的AOF文件。

(5)使用新的AOF文件覆盖旧的AOF文件,完成AOF重写。

image-20220727210752664

10、优势/劣势

7、总结

  • AOF文件是一个只进行追加的日志文件
  • Redis可以在AOF文件体积变得过大时,自动的在后台对AOF进行重写
  • AOF文件有序的保存了对数据库执行的所有写入操作,这些写入操作以Redis协议格式保存,因此AOF文件的内容非常容易被人读懂,对文件进行分析也很轻松
  • 对于相同的数据来说,AOF文件的体积通常要大于RDB文件的体积
  • 根据所使用的的fsync策略,AOF的速度可能会慢于RDB

Redis默认先加载appendonly.aop文件,如果appendonly.aop文件报错了,则去加载dump.rdb文件;如果appendonly.aof没有报错加载appendonly.aof文件

11、Which One

RDB持久化方式能够在指定的时间间隔能对你的数据进行快照存储。

AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候回重新执行这些命令来恢复原始的数据,AOF命令以redis协议追加保存每次写的操作到文件末尾。Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。

如果对数据不敏感,可以选单独用RDB。

不建议单独用 AOF,因为可能会出现Bug

只做缓存:如果你只希望你的数据在服务器运行的时候存在,可以都不用。

官网建议:同时开启两种持久化方式

  • 在通常情况下,当redis重启的时候会优先加载AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整
  • RDB的数据不实时,同时使用两者时服务器重启也只会找AOF文件。那要不要只是用AOF呢?作者建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在bug,留着作为一个万一的手段。

性能建议:

  • 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留save 900 1 这条规则
  • 如果Enable AOF ,好处是在最恶劣情况下也只会丢失不超过两秒的数据,启动脚本简单只load自己的AOF文件就可以了,代价一是带来了持续的IO,而是AOF rewrite的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只硬盘许可应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小时重写可以改到适当的数值。
  • 如果不Enable AOF,仅靠Master-Slave Replication 实现高可用性也可以。能生一大笔IO也减少了rewrite时带来的系统波动。代价是如果Master/Slave同时倒掉,会丢失十几分钟的数据启动脚本也要比较两个Master/Slave中的RDB文件,载入较新的那个。比如新浪微博就选用这种架构

五、Redis的事务

1、定义

​ Redis事务可以一次执行多个命令,本质一组命令的集合。

两个重要保证:

  • Redis事务是一个单的隔离操作:事务中的所有命令都会序列化,按顺序串行化执行而不会被其他命令插入,不许加塞。
  • 事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。

一个事物从开始到执行会经历三个阶段:开始事务—>命令入队—>执行事务

image-20220726145527955

输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。

组队过程中可以通过discard来放弃组队

2、Redis事务命令:

命令 描述
DISCARD 取消事务,放弃执行事务快内的所有命令。
EXEC 执行所有事务快内的命令。
MULTI 标记一个事务块的开始。
UNWATCH 取消WATCH命令对所有key的监事。
WATCH key [key…] 监视一个(或多个)key,如果在事务执行前这个(或这些)key被其他命令所改动,那么事务将会被打断。

补充:
WATCH 用于为 Redis 事务提供检查和设置 (CAS) 行为监视 WATCHed 键以检测针对它们的更改。如果在执行 EXEC 命令之前至少修改了一个监视键,则整个事务中止,并且 EXEC 返回 Null 回复通知事务失败

3、事务的错误处理

①:组队阶段出现错误,执行时整个队列内所有的命令都被取消

image-20220726145858842

②:执行阶段某个命令出现错误,则仅报错的命令不被执行,其他命令全被执行,不会回滚

image-20220726150107011

综上,exec报错所有命令都不执行;exec报错,仅报错的命令不被执行。

4、事务冲突问题

例子

一个请求想给金额减8000

一个请求想给金额减5000

一个请求想给金额减1000

image-20220726173239049

悲观锁:

image-20220726173320302

悲观锁,顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统关系型数据库就用到了很多这种锁机制,如:行锁、表锁、读锁、写锁等,都在在操作前先上锁。

乐观锁:

image-20220726174251000

乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有更新个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种checkand-set机制实现事务的。

5、Redis事务的三大特性

6、模拟秒杀案例

代码

indexi.jsp页面

<body>
	<h1>iPhone 13 Pro !!! 一元抢购!!!</h1>
	<form id="msform" action="SeckillController" method="POST">
		<input type="hidden" name="prodid" value="0101">
		<input type="button" id="ms_btn" value="点我秒杀">
	</form>
</body>
<script type="text/javascript" src="js/jquery-3.1.1.js"></script>
<script type="text/javascript">
	$(function() {
		$("#ms_btn").click(function() {
			var url = $("#msform").attr("action");
			$.post(url,$("#msform").serialize(),function(data){
				if(data == "false"){
					alert("抢光了");
					$("#ms_btn").attr("disable",true);
				}
			})
		})
	})
</script>

SeckillController.calss控制

@WebServlet("/SeckillController")
public class SeckillController extends HttpServlet {
	private static final long serialVersionUID = 1L;
	
	protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		 String userId = new Random().nextInt(50000) + "";
		 String prodid = request.getParameter("prodid");
		 boolean sk = Seckill_redis.doSeckill(userId, prodid);
		 response.getWriter().print(sk);;
	}

	protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		doGet(request, response);
	}

}

Seckill_redis.class逻辑实现类

public class Seckill_redis {
	public static void main(String[] args) {
		
	}
	
	public static boolean doSeckill(String uid,String prodid) {
		
		//1、uidprodid非空判断
		if(null == uid || null == prodid) {
			return false;
		}
		//2、连接redis
		Jedis jedis = new Jedis("192.168.2.138",6379);
		//3、拼接key  1)库存key  2)秒杀成功用户key
		String kcKey = "sk:"+prodid+":qt";
		String userKey = "sk:"+prodid+":user";
		//4、获取库存,如果库存null秒杀还未开始
		String kc = jedis.get(kcKey);
		if(null == kc) {
			System.out.println("秒杀还未开始,请等待");
			jedis.quit();
			return false;
		}
		//5、判断用户是否重复秒杀操作
		if(jedis.sismember(userKey, uid)) {
			System.out.println("你已秒杀成功,不能重复秒杀");
			jedis.quit();
			return false;
		}
		//6、判断如果商品库存数量小于1,秒杀结束
		int i = Integer.parseInt(kc);
		if(i<=0) {
			System.out.println("商品已被秒杀完");
			jedis.quit();
			return false;
		}
;		//7、秒杀过程   1)库存-1   2)把秒杀成功用户添加清单里面
		jedis.decr(kcKey);
		jedis.sadd(userKey, uid);
		System.out.println("秒杀成功");
		jedis.quit();
		return true;
	}
	
}
模拟秒杀并发

使用工具ab模拟测试,CentOS6 默认安装,CentOS7需要手动安装。

联网下载ab工具

yum install httpd-tools

网络安装ab工具:

进入cd  /run/media/root/CentOS 7 x86_64/Packages(路径centos6不同)
apr-1.4.8-3.el7.x86_64.rpm
apr-util-1.5.2-6.el7.x86_64.rpm
httpd-tools-2.4.6-67.el7.centos.x86_64.rpm 
③通过ab工具测试

创建文件postfile内容:prodid=0101&amp;

ab -n 1000 -c 300 -k -p ~/postfile -T application/x-wwwformurlencoded http://192.168.2.2:8080/Seckill/SeckillController

④:连接超时问题

​ 使用连接池解决

⑤:超卖问题

image-20220727144740035

修改后的Seckill_redis.calss

public static boolean doSeckill(String uid,String prodid) {
		
		//1、uid和prodid非空判断
		if(null == uid || null == prodid) {
			return false;
		}
		//2、连接redis
		JedisPool jedisPool = JedisPoolUtil.getJedisPoolInstance();
		Jedis jedis = jedisPool.getResource();
		//3、拼接key  1)库存key  2)秒杀成功用户key
		String kcKey = "sk:"+prodid+":qt";
		String userKey = "sk:"+prodid+":user";
		//监事事务
		jedis.watch(kcKey);
		//4、获取库存,如果库存为null,秒杀还未开始
		String kc = jedis.get(kcKey);
		if(null == kc) {
			System.out.println("秒杀还未开始,请等待");
			jedis.quit();
			return false;
		}
		//5、判断用户是否重复秒杀操作
		if(jedis.sismember(userKey, uid)) {
			System.out.println("你已秒杀成功,不能重复秒杀");
			jedis.quit();
			return false;
		}
		//6、判断如果商品库存数量小于1,秒杀结束
		int i = Integer.parseInt(kc);
		if(i<=0) {
			System.out.println("商品已被秒杀完");
			jedis.quit();
			return false;
		}
		//开启事务
		Transaction multi = jedis.multi();
;		//7、秒杀过程   1)库存-1   2)把秒杀成功用户添加清单里面
		multi.decr(kcKey);
		multi.sadd(userKey, uid);
		//执行事务
		List<Object> results = multi.exec();
		if(results == null || results.size() == 0) {
			System.out.println("秒杀失败");
			jedis.quit();
			return false;
		}
		System.out.println("秒杀成功");
		jedis.quit();
		return true;
	}
⑥:库存遗留问题

由乐观锁造成。由于乐观锁的存在导致修改数据时版本号不一致。所以修改失败

解决

Lua脚本语言

通过lua脚本淘汰用户解决争抢问题,实际上是redis 利用其单线程特性,用任务队列的方式解决多任务并发问题

package com.atcpl.controller;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import com.atcpl.utils.JedisPoolUtil;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;



public class SecKill_redisByScript {
	
	
	static String secKillScript ="local userid=KEYS[1];rn" + 
			"local prodid=KEYS[2];rn" + 
			"local qtkey='Seckill:'..prodid..":kc";rn" + 
			"local usersKey='Seckill:'..prodid..":user";rn" + 
			"local userExists=redis.call("sismember",usersKey,userid);rn" + 
			"if tonumber(userExists)==1 then rn" + 
			"   return 2;rn" + 
			"endrn" + 
			"local num= redis.call("get" ,qtkey);rn" + 
			"if tonumber(num)<=0 then rn" + 
			"   return 0;rn" + 
			"else rn" + 
			"   redis.call("decr",qtkey);rn" + 
			"   redis.call("sadd",usersKey,userid);rn" + 
			"endrn" + 
			"return 1" ;
			 
	
	public static boolean doSecKill(String uid,String prodid) throws IOException {

				JedisPool jedisPool = JedisPoolUtil.getJedisPoolInstance();
				
				Jedis jedis = jedisPool.getResource();
		 	 	
		 		String sha1=  jedis.scriptLoad(secKillScript);
		 		 
		 		Object result= jedis.evalsha(sha1, 2, uid,prodid);  
 
 
			 	  String reString=String.valueOf(result);
		 		if ("0".equals( reString )  ) {
 					System.err.println("已抢空!!");
				}else if("1".equals( reString )  )  {
 					System.out.println("抢购成功!!!!");
				}else if("2".equals( reString )  )  {
 					System.err.println("该用户已抢过!!");
				}else{
 					System.err.println("抢购异常!!");
				}
		 		  
 
 	 		
		 		jedis.close();
		
		return true;
			 
	}
	
	
 
}

六、Redis的订阅发布

1、定义

​ Redis发布订阅pub/sub)是一种消息通信模式发布者(pub)发送消息订阅者(sub)接收消息

下图展示频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间关系

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

2、Redis发布订阅命令

命令 描述
PSUBSCRIBE pattern [patter…] 订阅一个或多个符合给定模式频道
PUBSUB subcommand [argument [argument…]] 查看订阅发布系统状态
PUBLISH channel message 信息发送到指定的频道
PUNSUBSCRIBE [pattern [pattern…]] 退订所有给定模式频道
SUBSCRIBE channel [channel…] 订阅给定的一个或多个频道信息
UNSUBSCRIBE [channel [channel…]] 指退订给定的频道

3、案例

先订阅后发布后才能收到消息

1、可以一次性订阅多个,SUBSCRIBE c1 c2 c3

2、消息发布,PUBLISH c2 hello-redis

3、订阅多个,通配符*,PSUBSCRIBE new *

4、收取消息,PUBLISH new1 redis2015

七、Redis的主从复制

1、是什么

​ 也是就我们所说的主从复制主机数据跟新后根据配置和策略,自动同步到备机的master/slave机制,Master以写为主,Slave以读为主。

image-20220727211831373

2、怎么玩

​ 1、配从(库)不配主(主)

​ 2、从库配置:

slaveof  <主机IP>  <主机端口>

info replication 查看主从复制相关信息

​ 3、修改配置文件细节操作

拷贝多个redis.conf文件

image-20220724113436961

​ 开启deamonize yes

​ Pid文件名字

​ 指定端口

​ Log文件名字

​ Dump.rdb名字

image-20220727213402906

​ 4、常用3招

​ -一主二从

image-20220724120621238

​ -薪火相传

image-20220724121401462

主机上set k9 v9 在从机81上也能同步到k9

​ -反客为主

image-20220724122606541

SLAVEOF no one 是当前数据库停止与其他数据库的同步,将从机变为主机

3、复制原理

Slave启动成功连接到Master后会发送一个sync(同步)命令,Master接收到命令启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕后,master将传送整个数据文件到slave,以完成一次完全同步。

  • 全量复制:而slave服务器在接收到主数据库文件数据后,将其存盘并加载到内存中。
  • 增量复制:Master继续将新的所有收集到的修改命令一次传给slave,完成同步

但是只要是重新连接Master,一次完全同步(全量复制)将被自动执行。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HH0WhhpJ-1687078790855)(C:Users蜡笔小新AppDataRoamingTyporatypora-user-imagesimage-20220727220215088.png)]

4、哨兵模式sentinel

image-20220727223814099

5、复制的缺点

复制延迟

由于所有的写操作都是在Master上,然后同步更新到Slave上,所以从Master同步到Slave机器有一定的延迟,当系统繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重。

八、Redis的Java客户端Jedis

1、测试Jedis的联通性

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.1.0</version>
</dependency>
public class TestPing {
	public static void main(String[] args) {
		Jedis jedis = new Jedis("192.168.2.138",6379);
		System.out.println(jedis.ping());
	}
}

image-20220725100438094

控制台打印,则说明连接通。

2、Jedis常用API

package com.redis;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import redis.clients.jedis.Jedis;

public class TestPing {
	public static void main(String[] args) {
		Jedis jedis = new Jedis("192.168.2.138",6379);
		//String 
//		jedis.set("k1", "v1");
//		jedis.set("k2", "v2");
//		jedis.set("k3", "v3");
		
		System.out.println(jedis.get("k1"));
		jedis.set("k4","v4");
		System.out.println("============================");
		System.out.println(jedis.mset("k5","v5","k6","v6"));
		System.out.println(jedis.mget("k5","k6"));
		
		//key
		System.out.println("------------------------------------------------key");
		Set<String> keys = jedis.keys("*");
		System.out.println(keys.size());
		for (Iterator iterator = keys.iterator(); iterator.hasNext();) {
			String key = (String) iterator.next();
			System.out.println(key);
		}
		
		System.out.println("jedis.exists===>"+jedis.exists("k2"));
		System.out.println(jedis.ttl("k1"));
		
		//List
		System.out.println("------------------------------------------------list");
//		jedis.lpush("mylist", "v1","v2","v3","v4","v5");//将一个或多个值插入到列表头
		List<String> list = jedis.lrange("mylist", 0, -1);//获取列表指定范围内的元素
		System.out.println(list);
		for(String element : list) {
			System.out.println(element);
		}
		
		
		System.out.println("-----------------------------------------------set");
		jedis.sadd("ords", "set01");//将一个或多个成员元素加入到集合中,已经存在于集合的成员元素将被忽略
		jedis.sadd("ords", "set02");
		jedis.sadd("ords", "set03");
		Set<String> smembers = jedis.smembers("ords");//返回集合中的所有的成员。 不存在的集合 key 被视为空集合。
		System.out.println(smembers);
		for(Iterator<String> iterator = smembers.iterator();iterator.hasNext();) {
			String element = iterator.next();
			System.out.println(element);
		}
		
		jedis.srem("ords", "set02");//移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略
		System.out.println(jedis.smembers("ords").size());
		
		System.out.println("-----------------------------------------------hash");
		jedis.hset("hash1","userName", "lisi");// 将哈希表 key 中的字段 field 的值设为 value 。此命令会覆盖哈希表中已存在的字段。如果哈希表不存在,会创建一个空哈希表,并执行 HMSET 操作。
		String hget = jedis.hget("hash1","userName");
		System.out.println(hget);
		HashMap<String,String> map = new HashMap<String,String>();
		map.put("phone","110");
		map.put("addr","中国");
		map.put("email","123@gmail.com");
		jedis.hmset("hash2",map);//同时将多个 field-value (字段-值)对设置到哈希表中
		List<String> hmget = jedis.hmget("hash2", "phone","addr","email");//返回哈希表中,一个或多个给定字段的值。
		System.out.println(hmget);
		for(String element : hmget) {
			System.out.println(element);
		}
		
		System.out.println("------------------------------------------------zset");
		jedis.zadd("zset01", 60d,"v1");//向有序集合添加一个或多个成员,或者更新已存在成员的分数
		jedis.zadd("zset01", 70d,"v2");
		jedis.zadd("zset01", 80d,"v3");
		jedis.zadd("zset01", 90d,"v4");
		
		Set<String> zrange = jedis.zrange("zset01", 0, -1);//通过索引区间返回有序集合成指定区间内的成员
		System.out.println(zrange);
	}
}

3、Jedis事务

package com.redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

public class TestTx {
	/**
	 * 通俗点讲,watch命令就是标记一个键,如果标记的该建在提交事务之前被别人修改过,那事务就会失败,这种情况通常可以在程序中重新在尝试一次。
	 * 首先标记了键balance,然后检查余额是否足够,不足就取消标记,并不作扣减
	 * 足够的话,就启动事务进行更新操作,
	 * 如果在此期间键balance被其他人修改,那在提交事务(执行exec)时就会报错,
	 * 程序中通常可以捕获这类错误再重新执行一次,直到成功
	 * @return
	 * @throws InterruptedException
	 */
	private boolean transMethod() throws InterruptedException {
		Jedis jedis = new Jedis("192.168.2.138",6379);
		int balance ;//可用余额
		int debt ;//欠额
		int amtToSubtract = 10 ;//实刷额度
		
		jedis.watch("balance");
		Thread.sleep(7000);//用线程模拟加塞
		//jedis.set("balance","5");//此句不该出现,讲课方便。模拟其他程序已经修改了该条目
		balance = Integer.parseInt(jedis.get("balance"));
		if(balance < amtToSubtract) {
			jedis.unwatch();//放弃监控
			System.out.println("余额不足");
			return false;
		}else {
			Transaction transaction = jedis.multi();//开启事务
			transaction.decrBy("balance", amtToSubtract);
			transaction.incrBy("debt",amtToSubtract);
			transaction.exec();//执行事务
			balance = Integer.parseInt(jedis.get("balance"));
			debt = Integer.parseInt(jedis.get("debt"));
			System.out.println("可用余额:"+balance);
			System.out.println("支出:"+debt);
			return true;
		}
	}
	
	public static void main(String[] args) throws InterruptedException {
		TestTx test = new TestTx();
		boolean retValue = test.transMethod();
		System.out.println(retValue);
	}
	
}

正常情况执行结果

image-20220725123538323

事务在未执行前(exec),其他线程改动了balance导致事务不能正常执行,支出为0,结果如图

image-20220725123500343

重新再执行一次可以发现事务正常执行了(其他线程没有操作balance)。

image-20220725124741125

4、Jedis主从复制

package com.redis;

import java.io.Serializable;

import redis.clients.jedis.Jedis;

public class TestMS {
	public static void main(String[] args) {
		Jedis jedis_M = new Jedis("192.168.2.138",6379);//主机    写
		Jedis jedis_S = new Jedis("192.168.2.138",6380);//从机
		jedis_S.slaveof("192.168.2.138", 6379);//配置从机
        
		jedis_M.set("master", "this is comment");
		//内存中读写太快,防止读在写之前先完成而出现null的情况,这里做一下延迟
        Thread.sleep(2000);
		String val = jedis_S.get("master");
		System.out.println(val);
	}
}

第一次控制台打印的是null,再次执行即可原因是内存数据库读写太快,读在写之前文成了,可以手动延迟一下。

image-20220725131826127

5、JedisPool连接池

获取Jedis实例需要从JedisPool中获取,用完Jedis实例需要返还给JedisPool,如果Jedis在使用过程中出错,则也许需要还给JedisPool。

JedisPoolUtil.class

package com.redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class JedisPoolUtil {
	private static volatile JedisPool jedispool = null;
	
	private JedisPoolUtil() {}
	
	public static JedisPool getJedisPoolInstance() {
		if(null == jedispool) {
			synchronized (JedisPoolUtil.class) {
				if(null == jedispool) {
					JedisPoolConfig poolConfig = new JedisPoolConfig();
					poolConfig.setMaxActive(1000);
					poolConfig.setMaxIdle(32);
					poolConfig.setMaxWait(100 * 1000);
					poolConfig.setTestOnBorrow(true);;
					jedispool = new JedisPool(poolConfig,"192.168.2.138",6379);
					
				}
			}
		}
		return jedispool;
	}
	
	public static void release(JedisPool jedisPool,Jedis jedis) {
		if(null != jedis) {
			jedisPool.returnBrokenResource(jedis);
		}
	}
	
}

TestPool.class

package com.redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class TestPool {
	public static void main(String[] args) {
		JedisPool jedisPool = JedisPoolUtil.getJedisPoolInstance();
		
		Jedis jedis = null;
		try {
			jedis = jedisPool.getResource();
			jedis.set("aa","bb");
			System.out.println(jedis.get("aa"));
		} catch (Exception e) {
			 e.printStackTrace();
		}finally {
			JedisPoolUtil.release(jedisPool, jedis);
		}
	}
}

配置总结

JedisPool的配置参数大部分是由JedisPoolConfig的对应项来赋值的。

    maxActive:控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted。
    maxIdle:控制一个pool最多有多少个状态为idle(空闲)的jedis实例;
    whenExhaustedAction:表示当pool中的jedis实例都被allocated完时,pool要采取的操作;默认有三种。
        WHEN_EXHAUSTED_FAIL --> 表示无jedis实例时,直接抛出NoSuchElementException;
        WHEN_EXHAUSTED_BLOCK --> 则表示阻塞住,或者达到maxWait时抛出JedisConnectionException;
        WHEN_EXHAUSTED_GROW --> 则表示新建一个jedis实例,也就说设置的maxActive无用;
    maxWait:表示当borrow一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛JedisConnectionException;
    testOnBorrow:获得一个jedis实例的时候是否检查连接可用性ping());如果为true,则得到的jedis实例均是可用的;
    testOnReturn:return 一个jedis实例给pool时,是否检查连接可用性ping());
    testWhileIdle:如果为true,表示有一个idle object evitor线程对idle object进行扫描,如果validate失败,此object会被从pool中drop掉;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义;
    timeBetweenEvictionRunsMillis:表示idle object evitor两次扫描之间sleep毫秒数;
    numTestsPerEvictionRun:表示idle object evitor每次扫描的最多的对象数;minEvictableIdleTimeMillis
    minEvictableIdleTimeMillis:表示一个对象至少停留在idle状态的最短时间,然后才能被idle object evitor扫描并驱逐;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义;
    softMinEvictableIdleTimeMillis:在minEvictableIdleTimeMillis基础上,加入了至少minIdle个对象已经在pool里面了。如果为-1,evicted不会根据idle time驱逐任何对象。如果minEvictableIdleTimeMillis>0,则此项设置无意义,且只有在timeBetweenEvictionRunsMillis大于0时才有意义;
    lifo:borrowObject返回对象时,是采用DEFAULT_LIFO(last in first out,即类似cache的最频繁使用队列),如果为False,则表示FIFO队列;

其中JedisPoolConfig对一些参数的默认设置如下:

    testWhileIdle=true
    minEvictableIdleTimeMills=60000
    timeBetweenEvictionRunsMillis=30000
    numTestsPerEvictionRun=-1

6、Jedis实例-手机验证码

要求:

​ 1、输入手机号点击发送后随机生成6位数字码,2分钟有效

​ 2、输入验证码点击验证,返回成功或失败

​ 3、每个手机号每天只能输入3次

分析

image-20220726105249925

package com.redis;

import java.util.Random;

import redis.clients.jedis.Jedis;

public class PhoneCode {
	public static void main(String[] args) {
		chickCode("15661311412");
//		getCheckCode("852113");
	}
	
	public static void getCheckCode(String code) {
		Jedis jedis = new Jedis("192.168.2.138",6379);
		String getCode = jedis.get("code");
		if(getCode.equals(code)) {
			System.out.println("成功!");
		}else {
			System.out.println("失败!");
		}
		jedis.quit();
	}
	
	//每个手机每天只能发送三次,验证码写到redis中,设置过期时间
	public static void chickCode(String phone) {
		Jedis jedis = new Jedis("192.168.2.138",6379);
		
		String countKey = "count"; //发送手机号次数
		
		String codeKey = "code"; //存取验证码的key
		
		//从redis读取发送手机号的次数
		String count = jedis.get(countKey);
		System.out.println(count);
		//判断手机号发送次数
		if(null == count) {
			//没有发送,第一次发送,redis中没有这个数据,就相当于set一个key
			jedis.setex(countKey, 24*60*60,"1");
		}else if(Integer.parseInt(count) <= 2) {
			//发送次数+1
			jedis.incr(countKey);
		}else if(Integer.parseInt(count) > 2){
			//发送了三次,禁止再发送
			System.out.println("发送次数已超过3次");
			jedis.quit();
			return;
		}
		
		//调用生成验证码的方法
		String code = getCode();
		//将验证码写到redis中
		jedis.setex(codeKey,120,code);
		jedis.quit();
	}
	
	//生成随机6位验证码
	public static String getCode() {
		Random random_code = new Random();
		String code = "";
		for(int i = 0;i<6;i++) {
			code += random_code.nextInt(10);
		}
		return code;
	}
}

九、Spring Boot整合Redis

Spring Boot整合Redis非常简单,只需要按如下步骤整合即可

整合步骤

1、 添加缓存依赖

<!-- redis -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- spring2.X集成redis所需common-pool2-->
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-pool2</artifactId>
	<version>2.6.0</version>
</dependency>

2、 配置application.properties

#Redis服务器地址
spring.redis.host=192.168.140.136
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0

3、 添加配置类

@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setConnectionFactory(factory);
//key序列化方式
        template.setKeySerializer(redisSerializer);
//value序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
}

十、Redis集群

1、问题

容量不够,redis如何进行扩容

并发写操作,redis如何分摊

另外,主从模式,薪火相传模式,主机宕机,导致ip地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息

通过代理主机来解决,但是redis3.0中提供了解决方案。就是无中心化集群配置。

代理主机:

image-20220728090954551

中心集群

image-20220728091648320

2、什么是集群

Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。

Redis 集群通过分区partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯集群也可以继续处理命令请求。

3、Redis集群搭建

  1. 删除持久化数据
    将rdb,aof文件都删除掉

  2. 配置6实例
    6379,6380,6381,6389,6390,6391

  3. 配置基本信息
    开启daemonize yes
    Pid文件名字
    指定端口
    Log文件名字
    Dump.rdb名字
    Appendonly 关掉或者换名字

  4. 配置集群文件

    include /home/bigdata/redis.conf
    port 6379
    pidfile "/var/run/redis_6379.pid"
    dbfilename "dump6379.rdb"
    # dir "/home/bigdata/redis_cluster"
    # logfile "/home/bigdata/redis_cluster/redis_err_6379.log"
    cluster-enabled yes
    cluster-config-file nodes-6379.conf
    cluster-node-timeout 15000
    
  5. 复制多个redis.conf文件

    image-20220728094637648

  6. 使用查找替换**:%s/6379/6380**修改复制的redis.conf文件

  7. 启动6个redis服务

    image-20220728095747902

  8. 将六个结点合成一个集群

    组合之前,请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常。

    image-20220728100005323

    • 合体

    cd /opt/redis-6.2.7/src (下面的命令必须在该目录下)

    redis-cli --cluster create --cluster-replicas 1 192.168.2.138:6379 192.168.2.138:6380 192.168.2.138:6381 192.168.2.138:6389 192.168.2.138:6390 192.168.2.138:6391
    

    此处不要用127.0.0.1,请用真实IP地址

    replicas 1 采用最简单的方式配置集群,一台主机,一台从机,正好三组。

    image-20220728101553571 image-20220728101734017

  9. redis cluster 如何分配这六个结点

    一个集群至少要有三个节点。

    选项 –cluster-replicas 1 表示我们希望为集群中的每个结点创建一个从节点。
    分配原则尽量保证每个主数据库运行在不同的IP地址,每个从库和主库不在同一个IP地址

4、什么是slots

image-20220728110439373

一个Redis集群包含16384个插槽(hash slot),数据库中的每个键都属于这个16384个插槽的其中一个。

集群使用公式CRC16(key) % 16384来计算键key属于哪个槽,其中CRC16(key)语句用于计算键key的CRC16校验和。

集群中的每个节点负责处理部分插槽。如:如果一个集群可以有主节点,其中:
节点A负责处理0~5460号插槽。

节点B负责处理5641~10922号插槽。

节点C负责处理10923~16383号插槽。

image-20220728111233997

image-20220728112341224

5、在集群中录入

在redis-cli每次录入查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口。

redis-cli客户端提供了 -c 参数实现自动重定向
如:redis-cli -c -p 6379登录后,在录入查询键值对可以自动重定向

不在一个slot下的键值,是不能使用mget、mset等多键操作

image-20220728112923597

可以通过{}来定义组的概念,从而使key中{}内相同内容的键值对放到一个slot中去。

image-20220728113124425

image-20220728113427443

6、查询集群中的值

CLUSTER GETKEYSINSLOT 返回 count 个 slot 槽中的键。

image-20220728113653616

7、故障恢复

如果主节点下线?从节点能否自动升级为主节点?注意15秒超时

image-20220728114015870

主节点恢复后,主从关系会如何?主节点回来变成从机

image-20220728114038097

如果所有某一段插槽的主节点都宕掉,redis服务是否还能继续?

如果某一段插槽的主从都挂掉,而cluster-require-full-coverage为yes,那么,整个集群都挂掉
如果某一段的插槽主从都挂掉,而cluster-require-full-coverage为no,那么,插槽数据全都不能使用,也无法存储。

redis.conf中的参数 cluster-require-full-coverage

8、集群的Jedis开发

即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。

中心化主从集群。无论从哪台主机写的数据,其他主机上都能读到数据。

public class JedisClusterTest {
  public static void main(String[] args) { 
     Set<HostAndPort>set =new HashSet<HostAndPort>();
     set.add(new HostAndPort("192.168.31.211",6379));
     JedisCluster jedisCluster=new JedisCluster(set);
     jedisCluster.set("k1", "v1");
     System.out.println(jedisCluster.get("k1"));
  }
}

9、集群的优势&amp;劣势

十一、Redis应用题解

1、缓存穿透

1、问题描述

image-20220728175752159

当用户在查询一条数据的时候,而此时数据库和缓存却没有关于条数据的任何记录,而这条数据在缓存中没找到就会向数据库请求获取数据。用户拿不到数据时,就会一直发请求,查询数据库,这样会对数据库的访问造成很大的压力。

如:用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,当信息没有返回时,会一直向数据库查询从而给当前数据库的造成很大的访问压力。

缓存穿透的发生一般是受到 “黑客攻击” 所导致的,所以应该进行监控,如果真的是黑客攻击,及时添加黑名单

2、解决方案

**(1)对控制缓存:**缓存空值 会让redis多了很多没用的键,会占用空间 可以设置一个过期时间 expiretime 去自动清除

**(2)设置白名单:**通过使用Bitmaps,将允许访问的id按偏移量存储到Bitmaps)每次访问和Bitmaps中的id进行比较,保证了安全性,牺牲了性能。

**(3)采用布隆过滤器:**它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。)

将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。

(4)实时监控:当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

2、缓存击穿

1、问题描述

key中对应数据存在,当key中对应的数据在缓存中过期,而此时又有大量请求访问该数据,缓存中过期了,请求会直接访问数据库并回设到缓存中,高并发访问数据库会导致数据库崩溃

image-20220728175824137

2、解决方案

(1)**预先设置热门数据:**在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长

(2)实时调整现场监控哪些数据热门,实时调整key的过期时长

(3)使用锁:

  • 就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db。
  • 先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key
  • 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key
  • 当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法

3、缓存雪崩

1、问题描述

key中对应数据存在,在某一时刻,缓存中大量key过期,而此时大量高并发请求访问,会直接访问后端数据库,导致数据库奔溃。

注意:缓存击穿针对一个key对应在缓存中数据过期,缓存雪崩是大部分key对应缓存数据过期

正常访问:

image-20220728180951417

缓存失效瞬间:

image-20220728181035827

2、解决方案

(1) **构建多级缓存架构:**nginx缓存 + redis缓存 +其他缓存(ehcache等)

(2) 使用锁或队列:

加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况

(3) 设置过期标志更新缓存:

记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。

(4) 将缓存失效时间分散开:

比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效事件

4、分布式

1、描述

​ 随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

image-20220728182920870

分布式锁主流的实现方案

  1. 基于数据库实现分布式

  2. 基于缓存(Redis等)

  3. 基于Zookeeper

每一种分布式锁解决方案都有各自的优缺点

  1. 性能:redis最高

  2. 可靠性zookeeper最高

这里,我们就基于redis实现分布式锁。

2、使用redis实现分布式锁

使用setnx 命令上锁,通过del 释放锁。

image-20220728183540529

如果锁一直没有释放呢?设置key过期时间,自动释放。

image-20220728183844539

如果上锁时突然异常,来不及设置过期时间呢?上锁的同时设置过期时间(原子性操作)

使用命令: set nx ex

image-20220728184624395

3、Java代码实现Redis分布式锁

redis中设置key num “0”

@GetMapping("testLock")
public void testLock(){
    //1获取锁,setnx
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
    //2获取锁成功、查询num的值    
 if(lock){
        Object value = redisTemplate.opsForValue().get("num");
        //2.1判断num为空return
        if(StringUtils.isEmpty(value)){
            return;
        }
        //2.2有值就转成成int
        int num = Integer.parseInt(value+"");
        //2.3把redis的num加1
        redisTemplate.opsForValue().set("num", ++num);
        //2.4释放锁,del
        redisTemplate.delete("lock");

    }else{
        //3获取锁失败、每隔0.1秒再获取
        try {
            Thread.sleep(100);
            testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

使用ab工具进行压力测试程序正常执行。(基本实现)

问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放

解决:设置过期时间,自动释放锁。

image-20220728185916991

image-20220728185927644

问题:可能释放其他服务器的锁(如图描述)

image-20220728191342439

解决:setnx获取锁的同时,设置一个指定唯一值(uuid);释放前获取这个值,判断是否是自己的锁。

set uuid nx ex

优化之UUID防误删

image-20220728192000180

image-20220728192016161

问题:删除缺乏原子性(问题描述如图

image-20220728193328968

优化之LUA脚本保证删除的原子性

@GetMapping("testLockLua")
public void testLockLua() {
    //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
    String uuid = UUID.randomUUID().toString();
    //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
    String skuId = "25"; // 访问skuId 为25号的商品 100008348542
    String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
  // 3 获取锁
    Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);

    // 第一种: lock 与过期时间中间不写任何的代码
    // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
    // 如果true
    if (lock) {
        // 执行的业务逻辑开始
        // 获取缓存中的num 数据
        Object value = redisTemplate.opsForValue().get("num");
        // 如果是空直接返回
        if (StringUtils.isEmpty(value)) {
            return;
        }
        // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
        int num = Integer.parseInt(value + "");
        // 使num 每次+1 放入缓存
        redisTemplate.opsForValue().set("num", String.valueOf(++num));
        /*使用lua脚本来锁*/
        // 定义lua 脚本
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        // 使用redis执行lua执行
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        // 设置一下返回值类型 为Long
        // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
        // 那么返回字符串与0 会有发生错误。
        redisScript.setResultType(Long.class);
        // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
        redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
    } else {
        // 其他线程等待
        try {
            // 睡眠
            Thread.sleep(1000);
            // 睡醒了之后,调用方法
            testLockLua();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Lua脚本详解

image-20220728194528146

项目正确使用:

1. 定义key,key应该是为每个sku定义的,也就是每个sku有一把锁。
String locKey ="lock:"+skuId; // 锁住的是每个商品的数据
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid,3,TimeUnit.SECONDS);

image-20220728194612252

总结

(1)加锁

// 1. 从redis中获取锁,set k1 v1 px 20000 nx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue()
      .setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS);

(2)使用Lua释放锁:

// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 设置lua脚本返回的数据类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本返回类型为Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);

(3)重试

Thread.sleep(500);
testLock();

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件

互斥。在任意时刻,只有一个客户端能持有锁。

不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁

解铃还须系铃人加锁解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

redisTemplate.opsForValue().get(“num”);
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
int num = Integer.parseInt(value + “”);
// 使num 每次+1 放入缓存
redisTemplate.opsForValue().set(“num”, String.valueOf(++num));
/使用lua脚本来锁/
// 定义lua 脚本
String script = “if redis.call(‘get’, KEYS[1]) == ARGV[1] then return redis.call(‘del’, KEYS[1]) else return 0 end”;
// 使用redis执行lua执行
DefaultRedisScript redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
// 其他线程等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后,调用方法。
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


Lua脚本详解:

[外链图片转存中...(img-AMGne8nP-1687078790901)]

项目正确使用:

```java
1. 定义key,key应该是为每个sku定义的,也就是每个sku有一把锁。
String locKey ="lock:"+skuId; // 锁住的是每个商品的数据
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid,3,TimeUnit.SECONDS);

[外链图片转存中…(img-NeV4ub46-1687078790902)]

总结

(1)加锁

// 1. 从redis中获取锁,set k1 v1 px 20000 nx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue()
      .setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS);

(2)使用Lua释放锁:

// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 设置lua脚本返回的数据类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本返回类型为Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);

(3)重试

Thread.sleep(500);
testLock();

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件

互斥。在任意时刻,只有一个客户端能持有锁。

不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

加锁和解锁必须具有原子性

原文地址:https://blog.csdn.net/weixin_44900183/article/details/131273550

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_43654.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注