08_Redis分布式锁
本笔记来源于:尚硅谷Redis零基础到进阶,最强redis7教程,阳哥亲自带练(附redis面试题)
b站视频
文章来自:
https://github.com/Romantic-Lei/Learning-in-practice
面试总结
Redis除了拿来做缓存,还有基于Redis的其他用法吗
数据共享,分布式Session
分布式锁
全局ID
计算器、点赞
位统计
购物车
轻量级消息队列
list -> stream
抽奖
点赞、签到、打卡
交集差集并集,用户关注,可能认识的人
热点新闻、热搜排行
Redis做分布式的时候需要注意什么问题?
你们公司自己实现的分布式锁是否用的setnx命令实现?这个是最合适的吗?你如何考虑分布式锁的可重入问题?
如果Redis是单点部署的,会带来什么问题?准备怎么解决单点问题呢?
Redis集群模式下,比如主从模式下,CAP方面有没有什么问题?
Redis集群是AP,高可用;Redis单机是C,数据一致性
86592695
简单介绍一下RedLock,谈谈Redisson
Redis分布式锁如何续期,看门狗是什么?
锁的种类
单机版同一个虚拟机内,synchronized或者Lock接口。
分布式多个不同jvm虚拟机,单机的线程锁机制不再起作用,资源类在不同的服务器之间共享了。
一个靠谱的分布式锁需要具备的条件和刚需
独占性
任何时刻有且只能有一个线程持有
高可用
若为redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况
高并发请求下,依旧性能OK好使
防死锁
杜绝死锁,必须有超时控制机制或者撤销操作,有个兜底终止跳出方案
不乱抢
防止张冠李戴,不能私底下unlock别人的锁,只能自己加锁自己释放
重入性
同一个节点的同一个线程如果获得锁之后,它也可以再次获取这个锁
分布式锁
setnx key value
setnx+expire不安全,两条命令为非原子性命令
set key value[EX seconds] [PX milliseconds] [NX|XX]
重点
JUC中AQS锁的规范落地参考+可重入锁+Lua脚本+Redis命令一步步实现分布式锁
Base案例(boot+redis)
使用场景:多个服务间保证同一时间段内同一用户只能有一个请求(防止关键业务出现并发攻击)
建Module
redis_distributed_lock2
redis_distributed_lock3
写POM
1 |
|
改YML
1 |
|
主启动
业务类
InventoryService
1 |
|
InventoryController
1 |
|
手写分布式锁思路分析2023
上述笔记代码的不足
初始化版本简单添加
业务类,将上面7777的业务逻辑代码原样拷贝到8888,加上synchronized或者Lock
nginx分布式微服务架构
v2.0版本代码分布式部署后,单机锁还是出现超卖现象,需要分布式锁
Nginx配置负载均衡
命令地址+配置地址
命令地址:/usr/local/nginx/sbin
配置地址:/usr/local/nginx/conf
启动
/usr/local/nginx/sbin 执行./nginx
启动Nginx并测试通过,浏览器看到Nginx欢迎welcome页面
/usr/local/nginx/conf目录下修改配置文件nginx.conf新增反向代理和负载均衡配置
关闭
/usr/local/nginx/sbin 执行./nginx -s stop
指定配置启动
在/usr/local/nginx/sbin 路径下执行命令 ./nginx -c nginx.conf全路径
重启
在/usr/local/nginx/sbin 执行./nginx -s reload
v2.0版本代码修改+启动两个微服务
通过Nginx访问,自己Linux服务器IP地址,反向代理+负载均衡
可以点击查看效果,可以看到一边一个,通过配置的权重来轮询
上面纯手工点击验证OK,下面高并发模拟
使用jmeter进行压测
创建线程组Redis
创建一个http请求
jmeter压测
查看报告
执行绿色执行按钮,开始并发请求程序
发现商品出现超卖
Bug?why?
synchronized和ReentrantLock是单机锁,只能管住当前自己的jvm。
在单机环境下,可以使用synchronized或Lock来实现
但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个jvm中),所以需要一个让所有进程都能访问到的锁来实现(比如redis或者zookeeper来构建)
不同进程ivm层面的锁就不管用了,那么可以利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程
分布式锁出现
跨进程+跨服务
解决超卖
防止缓存击穿
解决
上Redis分布式锁setnx;Redis具有极高的性能,且其命令对分布式锁支持友好,借助SET 命令即可实现加锁处理。
redis分布式锁
代码改造为3.1版
1 |
|
通过递归的方式来完成重试,不断获取锁
但是依旧有问题:手工设置5000个线程来抢占锁,压测OK,但是容易导致StackOverflowError,不推荐,需要进一步完善
修改为3.2版
多线程判断想想JUC里面说过的虚假唤醒,用while替代if,用自旋锁代替递归重试
1 |
|
宕机防止死锁
需要防止服务器突然宕机导致程序死锁
V3.2代码存在的问题
部署了微服务的Java程序机器挂了,代码层面根本没有走到finally这块,没办法保证解锁(无过期时间该key一直存在),这个key没有被删除,需要加入一个过期时间限定key
解决
修改代码为v4.1版
1
2
3
4
5
6
7while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)) {
// 线程休眠20毫秒,进行递归重试
try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
}
// 设置过期时间
stringRedisTemplate.expire(key, 30L, TimeUnit.SECONDS);4.1版本结论
不可行,设置的key+过期时间分开了,必须要合并成一行使其具备原子性
修改为4.2版
1
2
3
4while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {
// 线程休眠20毫秒,进行递归重试
try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
}4.2版本结论
加锁和过期时间设置必须同一行,保证原子性
防止误删key问题
v4.2版本代码问题
实际业务中,如果处理时间超过了设置的key的过期时间,那删除key的时候岂不是张冠李戴,删除了别人的锁
解决
只能自己删除自己的,不允许删除别人的说,修改代码为5.0版本
1 |
|
Lua保证原子性
v5.0依旧存在问题
finally代码块的判断+del删除不是原子性的
启用Lua脚本编写redis分布式锁判断+删除判断代码
lua脚本
官网解释:https://redis.io/docs/manual/patterns/distributed-locks/
Lua脚本浅谈
Lua脚本初识
Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接用return返回脚本执行后的结果值
eval luascript numkeys [key [key …]] [arg [arg …]]
Lua脚本进一步
Redis分布式锁Lua脚本官网练习
1
eval "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" 1 luojiaRedisLock 1111
条件语法判断
条件判断案例
1
if KEYS[1] > KEYS[2] then return ARGV[1] elseif KEYS[1] < KEYS[2] then return ARGV[2] else return ARGV[3] end
通过Lua脚本解决v5.0版本Bug
1 |
|
可重入锁
可重入锁又名递归锁
说明:
1 |
|
可重入锁这四个字分开来解释:
可:可以
重:再次
入:进入
锁:同步锁
进入什么:进入同步域(即同步代码块/方法或显示锁锁定的代码)
一句话:一个线程中的多个流程可以获取同一把锁,持有这把同步锁可以再次进入。自己可以获取自己的内部锁
可重入锁种类
隐式锁(即synchronized关键字使用的锁) 默认是可重入锁
1
2指的是可重复可递归调用的锁,在外层使用锁之后。在内层仍然可以使用,并目不发生死锁,这样的锁就叫做可重入锁。简单的来说就是:在一个synchronied修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的。
与可重入锁相反,不可重入锁不可递归调用,递归调用就发生死锁。Synchronized的重入的实现机理
1
2
3
4
5
6每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。
当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。
在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么 Java 虚拟机可以将其计数器加1,否则需要等待直至持有线程释放该锁。
当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。显式锁(即Lock)也有ReentrantLock这样的可重入锁
1
一个Synchronized修饰的方法或代码块的内部调用本类的其他Synchronized修饰的方法或代码块时,是永远可以得到锁
lock/unlock配合可重入锁进行AQS源码分析讲解
切记,一般而言,你Lock了几次就要unlock几次;
可以分析new ReentrantLock();查看
可重入锁的redis代替
上述可重入锁的计数问题,redis中哪个数据类型可以替代
K K V 数据结构
1 |
|
Map<String, Map<Object, Object>>
小总结
setnx只能解决有无的问题,够用但是不完美
hset,不但解决有无,还能解决可重入问题
设计重点(一横一纵)
目前两个分支,目的是保证同一个时候只能有一个线程持有锁进去redis做扣减库存动作
一个分支保证加锁/解锁,lock/unlock;
另一个分支扣减库存,redis命令的原子性
Lua脚本
加锁Lua脚本lock
先判断redis分布式锁这个key是否存在
EXISTS key:
返回零说明不存在,hset新建当前线程属于自己的锁,field key格式为UUID:ThreadID,value为加锁次数;
返回壹说明已经有锁,需进一步判断是不是当前线程自己的 ->
HEXISTS key uuid:ThreadID:返回0说明不是自己的锁;返回1说明是自己的锁,自增1次表示重入
HINCRBY key UUID:ThreadID 1
上述设计修改为Lua脚本
V1版本
1
2
3
4
5
6
7
8
9
10
11
12// 加锁的Lua脚本,对标我们的lock方法
if redis.call('exists', 'key') == 0 then
redis.call('hset', 'key', 'uuid:threadid', 1)
redis.call('expire', 'key', 50)
return 1
elseif redis.call('hexists', 'key', 'uuid:threadid') == 1 then
redis.call('hincrby', 'key', 'uuid:threadid', 1)
redis.call('expire', 'key', 50)
return 1
else
return 0
endV2版本
当key不存在的时候,hincrby可以自动创建这个key并且自增
1
2
3
4
5
6
7
8// V2 合并相同的代码,用hincrby替代hset,精简代码
if redis.call('exists', 'key') == 0 or redis.call('hexists', 'key', 'uuid:threadid') == 1 then
redis.call('hincrby', 'key', 'uuid:threadid', 1)
redis.call('expire', 'key', 50)
return 1
else
return 0
endV3版本
1
2
3
4
5
6
7
8// V3 脚本OK,换上参数来替代写死的key,value
if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then
redis.call('hincrby', KEYS[1], ARGV[1], 1)
redis.call('expire', KEYS[1], ARGV[2])
return 1
else
return 0
end测试
1
2
3
4-- 已完成验证
if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 0 end
eval "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then redis.call('hincrby', KEYS[1], ARGV[1], 1) redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 0 end" 1 luojiaRedisLock 001122:1 50
解锁LUA脚本unlock
设计思路:有锁且还是自己的锁 -> HEXISTS key uuid:ThreadID
返回零,说明根本没有锁,程序块返回nil
不是零,说明有锁且是自己的锁,直接调用HINCRBY负一,表示每次减一,解锁一次直到它变成零表示可以删除该锁Key,del锁
上述设计修改为Lua脚本
V1版本
1
2
3
4
5
6
7
8// 解锁的Lua脚本,对标我们的lock方法
if redis.call('hexists', 'key', uuid:threadid) == 0 then
return nil
elseif redis.call('hincrby', key, uuid:threadid, -1) == 0 then
return redis.call('del', key)
else
return 0
endV2版本
1
2
3
4
5
6
7if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then
return nil
elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then
return redis.call('del', KEYS[1])
else
return 0
end测试验证
1
2
3if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then return redis.call('del', KEYS[1]) else return 0 end
eval "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then return redis.call('del', KEYS[1]) else return 0 end " 1 luojiaRedisLock 001122:1
Lua整合java程序
步骤一:复原程序为初识无锁版本
1 |
|
步骤二:新建RedisDistributedLock类实现JUC里面的Lock接口
步骤三:满足JUC里面AQS对Lock锁的接口规范定义来进行实现落地代码
步骤四:结合设计模式开发属于自己的Redis分布式锁工具类
lua脚本加锁
1 |
|
lua脚本解锁
1 |
|
工厂设计模式引入
通过实现JUC里面的Lock接口,实现Redis分布式锁RedisDistributedLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105package com.luojia.redislock.mylock;
import cn.hutool.core.util.IdUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 自研的分布式锁,实现了Lock接口
*/
public class RedisDistributedLock implements Lock {
private StringRedisTemplate stringRedisTemplate;
private String lockName; // KEYS[1]
private String uuidValule; // ARGV[1]
private long expireTime; // ARGV[2]
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValule = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
this.expireTime = 50L;
}
@Override
public void lock() {
tryLock();
}
@Override
public boolean tryLock() {
try {
tryLock(-1L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (-1 == time) {
String script =
"if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
"redis.call('expire', KEYS[1], ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end";
System.out.println("lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);
// 加锁失败需要自旋一直获取锁
while (!stringRedisTemplate.execute(
new DefaultRedisScript<>(script, Boolean.class),
Arrays.asList(lockName),
uuidValule,
String.valueOf(expireTime))) {
// 休眠60毫秒再来重试
try {TimeUnit.MILLISECONDS.sleep(60);} catch (InterruptedException e) {e.printStackTrace();}
}
return true;
}
return false;
}
@Override
public void unlock() {
String script = "" +
"if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " +
"return nil " +
"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then " +
"return redis.call('del', KEYS[1]) " +
"else " +
"return 0 " +
"end";
System.out.println("lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);
// LUA脚本由C语言编写,nil -> false; 0 -> false; 1 -> true;
// 所以此处DefaultRedisScript构造函数返回值不能是Boolean,Boolean没有nil
Long flag = stringRedisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Arrays.asList(lockName),
uuidValule);
if (null == flag) {
throw new RuntimeException("this lock does not exists.");
}
}
// 下面两个暂时用不到,不用重写
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public Condition newCondition() {
return null;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// v7.0 使用自研的lock/unlock+LUA脚本自研的Redis分布式锁
Lock redisDistributedLock = new RedisDistributedLock(stringRedisTemplate, "luojiaRedisLock");
public String sale() {
String resMessgae = "";
redisDistributedLock.lock();
try {
// 1 抢锁成功,查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory01");
// 2 判断库存书否足够
Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
// 3 扣减库存,每次减少一个库存
if (inventoryNum > 0) {
stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
} else {
resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
}
} finally {
redisDistributedLock.unlock();
}
return resMessgae;
}如果直接使用上面的代码设计,会有什么问题
需要考虑扩展,本次是Redis实现分布式锁,以后zookeeper、MySQL实现呢?
引入工厂模式改造7.1版code
DistributedLockFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30package com.luojia.redislock.mylock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.locks.Lock;
@Component
public class DistributedLockFactory {
@Autowired
private StringRedisTemplate stringRedisTemplate;
public Lock getDistributedLock(String lockType, String lockName) {
if (lockType == null) {
return null;
}
if ("REDIS".equalsIgnoreCase(lockType)) {
return new RedisDistributedLock(stringRedisTemplate, lockName);
} else if ("ZOOKEEPER".equalsIgnoreCase(lockType)) {
// 后面存在就返回对应的分布式锁
} else if ("MYSQL".equalsIgnoreCase(lockType)) {
// 后面存在就返回对应的分布式锁
}
return null;
}
}InventoryService使用工厂模式版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// v7.0 使用自研的lock/unlock+LUA脚本自研的Redis分布式锁
@Autowired
private DistributedLockFactory distributedLockFactory;
public String sale() {
String resMessgae = "";
Lock redisLock = distributedLockFactory.getDistributedLock("REDIS", "luojiaRedisLock");
redisLock.lock();
try {
// 1 抢锁成功,查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory01");
// 2 判断库存书否足够
Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);
// 3 扣减库存,每次减少一个库存
if (inventoryNum > 0) {
stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));
resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
} else {
resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;
log.info(resMessgae);
}
} finally {
redisLock.unlock();
}
return resMessgae;
}单机+并发通过测试
可重入测试+设计模式
可重入测试
InventoryService类新增可重入测试方法
1 |
|
结果:无法重入
testReEntry方法中,会创建一个新的锁Key,因为ThreadID一致了,但是UUID不OK
引入工厂模式改造7.2版本code
DistributedLockFactory -> 新增一个无参构造函数
1 |
|
RedisDistributedLock -> 修改构造方法
1 |
|
使用@Autowired创建的工厂类是一个单例的,在spring进行注入的时候已经初始化好了,所以所有线程产生的UUID都是一样的
自动续费
确保RedisLock过期时间大于业务执行时间的问题
时间到了,业务没有执行完需要自动续期
CAP
Redis集群是AP
redis异步复制造成的锁丢失,比如:主节点没来的及把刚刚set进来的这条数据给从节点,master就挂了,从机上位但是从机上无数据
Zookeeper集群是CP
故障:
Eureka集群是AP
Nacos集群是AP
自动续期的LUA脚本
1 |
|
测试验证
1 |
|
8.0新增自动续期功能
1 |
|
总结
synchronized单机版OK; -> v1.0
Nginx分布式微服务,轮询多台服务器,单机锁不行;-> v2.0
取消单机锁,上redis分布式锁setnx,中小企业使用没问题;-> v3.1
只是加锁了,没有释放锁,出异常的话,可能无法释放锁,必须要在代码层面finally释放锁 -> v3.2
如果服务宕机,部署了微服务代码层面根本就没有走到finally这块,没办法保证解锁,这个Key没有被删除,需要对锁设置过期时间 -> v3.2
为redis的分布式锁key增加过期时间,还必须要保证setnx+过期时间在同一行,保证原子性 -> v4.1
程序由于执行超过锁的过期时间,所以在finally中必须规定只能自己删除自己的锁,不能把别人的锁删除了,防止张冠李戴 -> v5.0
将Lock、unlock变成LUA脚本保证原子性; -> v6.0
保证锁的可重入性,hset替代setnx+Lock变成LUA脚本,保障可重入性; -> v7.0
锁的自动续期 -> v8.0