Fork me on GitHub

基于Redis的分布式锁之ReentrantLock

1. 原理

利用Redis的setnx命令,此命令是原子性操作。

2. 加锁

设置锁和设置过期时间应该是一个原子性操作,防止出现锁设置成功,但是执行设置锁过期时间失败导致的死锁现象。

伪代码:

setnx(key,value,expire)

key的取值:一般是业务id,比如商品编号等。

value的取值:机器id + 线程id 能唯一标识一个线程即可。

expire的取值:过期时间的取值大多根据时间业务场景,预判执行时间。

注意:

这里有个问题,正常情况下可以预测执行时间来设置过期时间。

但是如果线程卡住了,比如突然gc,可能导致所有线程停止工作,或者sql执行慢等等导致程序还没执行完,Redis中的key就已经过期了。

此时,别的线程可能拿到这把锁。导致同一个时间,两个线程拿到同一把锁,分布式锁也就失效了。

3. 锁的续期

续期必要性:

上面有提到,如果业务代码还没执行完,自己的锁就过期了。别人就可能得到锁,导致锁失效。

此时要引入续期的概念,当程序还没执行完,又到了锁过期的时间,就会再次加锁,相当于延长了锁的时间,直到程序执行完毕,手动删除锁。

实现思路:

守护线程:在线程获得锁的时候,新启一个守护线程。

假设设置锁的过期时间为30秒,如果到了15秒,程序还未执行完,守护线程开始执行,给锁再延期30秒,然后每15秒再执行一次。

当执行业务逻辑的线程执行完毕后关掉守护线程。具体参考Redisson实现方式。

注: 15秒是随便取的,后面讲Redisson的时候会有具体取值算法。

4. 解锁

看到网上有不少文章写道,解锁时需要判断该锁是否为自己线程加的,可以根据Redis中value的值来判断。

但是其实从理论上来说,只有在自身线程加的锁到期了,但是业务还未执行完毕的情况下,才可能出现别的线程拿到锁。但是这种情况本身就已经是锁失效了,是绝不允许出现的。需要使用续期的逻辑严格保证不会出现这种情况。

所以个人认为,解锁只需要删除锁即可,当然,如果你一定要判断该锁是自己的,也是可以的。

5. 使用Redisson实现分布式锁

有轮子可用,就不要自己手撸了。当然,要知道怎么去使用轮子,也要理解其背后的原理。

以下代码使用的是 Reentrant Lock (可重入锁)。

目前官方文档上英文版没有写 Reentrant Lock 而是直接写的 Lock,但是中文文档依然显示的是可重入锁(Reentrant Lock)

5.1 API使用

5.1.1 创建配置对象

Redisson支持redis单实例、redis哨兵、master-slave、redis cluster等模式。

单机模式

1
2
3
4
5
private static Config getSignalRLock() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.1.117:6379").setDatabase(13);
return config;
}

哨兵模式

1
2
3
4
5
6
private static Config getSentinelRLock() {
Config config = new Config();
// TODO 本人虚拟机上搭建的是cluster集群,所以没有哨兵,没法测试
config.useSentinelServers().addSentinelAddress("").setDatabase(13);
return config;
}

主从

1
2
3
4
5
private static Config getMasterSlaveRLock() {
Config config = new Config();
config.useMasterSlaveServers().setMasterAddress("").addSlaveAddress("").setDatabase(13);
return config;
}

cluster集群模式

1
2
3
4
5
6
7
8
private static Config getClusterRLock() {
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://192.168.1.117:7000", "redis://192.168.1.117:7001",
"redis://192.168.1.153:7002", "redis://192.168.1.153:7003",
"redis://192.168.1.164:7004", "redis://192.168.1.164:7005");
return config;
}

当然,生产环境可以通过读取yml文件来配置Redis相关参数,官网上也给出了相应的示例:

1
config = Config.fromYAML(new File("config-file.yaml"));

也可以结合 SpringBoot 的配置文件。

5.1.2 加锁测试

在创建配置对象之后,通过配置对象创建RedissonClient对象,然后获取锁对象。

我是使用的单机模式获取Config对象的(手头正好没有cluster集群)。

具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static void lock() throws InterruptedException {
Config config = getSignalRLock();
RedissonClient redissonClient = Redisson.create(config);
RLock rLock = redissonClient.getLock("redis_lock_test");
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
rLock.lock();
System.err.println("thread1获得锁");
try {
Thread.sleep(60000L);// 模拟业务代码执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
rLock.unlock();
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
rLock.lock();
System.err.println("thread2获得锁");
try {
Thread.sleep(60000L);// 模拟业务代码执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
rLock.unlock();
}
});
thread1.start();
thread2.start();
}

新建两个线程模拟争抢锁,利用Thread.sleep(60000L)模拟业务代码执行过程中耗时过长,导致锁已到期业务程序还未执行完毕的情况。

执行结果:

thread2获得锁

12:46:30.520 [Thread-0] DEBUG org.redisson.command.CommandAsyncService - acquired connection for command (EVAL) and params [if (redis.call(‘exists’, KEYS[1]) ==

12:46:40.584 [pool-1-thread-1] DEBUG org.redisson.command.CommandAsyncService - acquired connection for command (EVAL) and params [if (redis.call(‘hexists’

12:46:50.689 [pool-1-thread-1] DEBUG org.redisson.command.CommandAsyncService - acquired connection for command (EVAL) and params [if (redis.call(‘hexists’,

12:47:00.500 [Thread-0] DEBUG org.redisson.command.CommandAsyncService - acquired connection for command (EVAL) and params [if (redis.call(‘exists’, KEYS[1]) ==

……

12:47:30.537 [redisson-netty-2-13] DEBUG org.redisson.command.CommandAsyncService - connection released for command (EVAL) and params [if (redis.call(‘hexists’,

thread1获得锁

查看日志可以发现,首先是thread2拿到了锁,然后每隔10秒钟会有一个单独的线程去续期。

thread2是从46:30拿到锁的,一直到47:30才释放锁。thread2释放锁之后,thread1才拿到锁。

有两个疑问:锁默认的过期时间是多少?从程序的执行可以看出10秒续期一次,为什么是10秒呢?

仔细看上面的代码会发现,我是没有给锁设置过期时间的,那么 Redisson 默认的过期时间是多少?先看官网上的一段话。

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

第一句说的锁死状态我到现在还不明白(难道是宕机之后锁无法删除,只能到期之后自动被Redis删除?这个叫锁死吗?),有理解的大佬可以给我提issue。

这句话可以跳过去,不影响我们看后面的。简而言之就是引入了看门狗的概念,一个单独的线程去定时续期。刚刚每隔十秒续期的操作就是它完成的。

那为什么是10秒续期呢?接下来我们来看看源码。

5.2 Redisson锁源码分析

从入口 rLock.lock() 开始分析,进入该实现类,一步步debug。首先进入 RedissonLock 类的 lock 方法。

1
2
3
4
5
6
7
8
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}

在没有设置锁的过期时间时,lock 方法的 leaseTime 默认传-1,然后再进入另外一个 lock 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}

RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);

try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}

// waiting for message
if (ttl >= 0) {
try {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
getEntry(threadId).getLatch().acquire();
} else {
getEntry(threadId).getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}

这个 lock 方法进入了 tryAcquire 方法,然后调用 tryAcquireAsync 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}

// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}

tryAcquireAsync 方法里面有个判断,如果 leaseTime != -1 ,直接执行 tryLockInnerAsync 方法之后返回;否则的话,除了执行 tryLockInnerAsync ,后续还要执行 scheduleExpirationRenewal 。

而在 scheduleExpirationRenewal 方法中有一个 renewExpiration 方法。这个方法就是看门狗的执行方法。

所以我们会发现一个问题,如果我们加锁的时候不是使用的 lock()无参方法,而是自定义了锁过期时间,那么 leaseTime 值就不为-1,就不会执行到 scheduleExpirationRenewal 。

我自己测试了一下, 使用rLock.lock(10 ,TimeUnit.SECONDS) ,是不执行看门狗程序的。有兴趣可以试试。

小结下:只有我们使用 lock 无参方法进行加锁时,看门狗才会执行。

接着往下看,因为我们这里是使用的 lock 无参方法,所以会执行这一行代码。

1
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);

所以进入 tryLockInnerAsync 方法时,传入的 leaseTime 值,我们取的是

1
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()

其实就是 lockWatchdogTimeout 的值。而 lockWatchdogTimeout 的值为 30秒,这个是 Config 类的默认值。

1
private long lockWatchdogTimeout = 30 * 1000;

进入到 tryLockInnerAsync 方法中 ,先给 internalLockLeaseTime 赋值。

internalLockLeaseTime = unit.toMillis(leaseTime);

将 lockWatchdogTimeout 的值赋值给了 internalLockLeaseTime(锁的过期时间),现在我们清楚默认的锁的过期时间是30秒了。

接下来我们来看看 tryLockInnerAsync 方法,这个方法是真正执行 Redis 加锁的方法的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

可以看到,这个方法其实就是执行了 Lua 脚本去给 Redis 赋值。这里简单说一下为什么要使用Lua脚本,因为很多的业务逻辑可以封装为一个脚本,执行时保证原子性

接下来我们分析一下这个Lua脚本。

先看第一个 if 语句。

KEYS[1] 表示加锁的 key 值,我这里使用的是 redis_lock_test 。这句话的意思是如果这个 key 不存在,就执行下面的。

ARGV[2] 代表的是加锁的客户端的ID,我自己测试的是 671d5dc5-f172-4179-96d2-2d0a4c7a93e9: 45 。

这行语句就是把 key 以 Set 的形式写入 Redis 中。经测试,此时的数据结构为:

1
2
3
4
5
{
"redis_lock_test":{
"671d5dc5-f172-4179-96d2-2d0a4c7a93e9: 45": 1
}
}

ARGV[1] 代表的是默认的过期时间,这里是30秒,这行语句是给 key 设置过期时间。加锁就算是完成了。

注意: 我们用的是可重入锁,顾名思义,可以重复设置。如果我们在程序中加锁两次会怎么样?

我改了下 thread1 的代码,试了下:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void run() {
rLock.lock();
rLock.lock();
System.err.println("thread1获得锁");
try {
Thread.sleep(60000L);// 模拟业务代码执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
rLock.unlock();
}

执行之后发现效果如下:

1
2
3
4
5
{
"redis_lock_test":{
"671d5dc5-f172-4179-96d2-2d0a4c7a93e9: 45": 2
}
}

我们发现 value 值从1变成了2。也就是说,允许同一个线程对同一个 key 加锁多次。

我们分析下第二个 if 语句。

redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1

如果是同一个线程第二次加锁,是会进入这个 if 语句的。

因为 KEYS[1] 的值还是 redis_lock_test ,ARGV[2] 的值还是 671d5dc5-f172-4179-96d2-2d0a4c7a93e9: 45 。

而如果换一个线程,ARGV[2] 的值就变了。就不会进入这个 if 语句了。

1
'hincrby', KEYS[1], ARGV[2], 1

这行语句就是用 incrby 把 set 中的 value 值加1 ,也就是1变为2。

下一句同样是设置过期时间为默认的30秒。

锁的互斥机制

如果这时候有另外一个线程来给同样的 key 加锁会怎么办?

首先,第一个 if 语句肯定进不去,因为 这个 key 已经存在了;

其次,第二个 if 语句也进不去,因为不同线程 ARGV[2] 值也不一样;

这个时候,只能执行最后一行语句:

1
"return redis.call('pttl', KEYS[1]);",

就是返回 key 的剩余过期时间。线程2在拿到过期时间后,会进入一个死循环,不停的循环尝试加锁。知道线程1删除 key 后,线程2才能获得锁。

下面,我们再看看看门狗的执行, scheduleExpirationRenewal 方法。

1
2
3
4
5
6
7
8
9
10
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}

最终进入 renewExpiration 方法。这个方法其实就是一个递归死循环,值得注意的一点是 internalLockLeaseTime / 3, TimeUnit.MILLISECONDS 。这个方法每隔 internalLockLeaseTime / 3 执行一次。

这里我们终于明白为什么看门狗每隔10秒执行一次了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}

Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}

RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}

if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);
}

5.3 lock和tryLock的区别

看门狗是否执行续期取决于使用 lock 或者 tryLock 方法时是否用的无参方法,如果使用有参方法,传入了 leaseTime ,看门狗就不执行了。而 lock 和 tryLock 的区别在于 tryLock 在尝试加锁失败后会返回 false ,而 lock 尝试加锁没成功的情况下会进入while中阻塞。

6. Q&A

如果业务程序在lock之后还没来得及unlock就宕机了,怎么办?

看门狗的线程和业务线程在同一台机器上,如果宕机,看门狗不会再续期,到期之后,Redis自动清理掉。

如果使用Redis集群,在一个节点上加了锁,还没有同步到其他节点,该节点就宕机了,又有另外一个线程拿着同一把锁进来也可以加锁成功,这不就是锁失效了吗?

这个问题是无法通过 可重入锁(Reentrant Lock)解决的。这也是 Redis 作为分布式锁的一个痛点。Redis 集群之间的同步是异步的,是 AP 模型,并不能保证完全的数据一致性。但是 Redis 的作者使用 Red Lock 来解决这个问题,下一节再来介绍。

如果我们想自定义锁过期时间,又想让看门狗执行续期,怎么办?

通过 Config 类 或者配置文件中自定义 lockWatchdogTimeout 。比如在本次的测试中

1
config.setLockWatchdogTimeout(20 * 1000);

设置 lockWatchdogTimeout 为20秒。再次执行发现,每隔大约7秒钟看门狗会续期一次。这个我测试过,确实如此。

7. 总结一下,画个图

参考资料

官方GitHub:https://github.com/redisson/redisson

官方文档:https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers

本文标题:基于Redis的分布式锁之ReentrantLock

原始链接:https://zhaoxiaofa.com/2019/08/30/基于Redis的分布式锁之ReentrantLock/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。