分布式锁

1. 背景

在 Java 开发过程中,锁是非常常见的,如 synchronized、ReentrantLock等,一般我们用其在多线程环境中控制对资源的并发访问。在一个单机环境中实现锁比较简单,因为多线程之间可以共享内存,因此可以简单的采取内存作为标记存储位置。而在分布式环境中,多线程变成了多进程,我们原本布置的锁将在这种场景下失效。因此,分布式锁就应运而生了。

2. 分布式锁概述

2.1 为什么需要分布式锁?

Martin Kleppmann 是英国剑桥大学的分布式系统的研究员,之前和 Redis 之父 Antirez 进行过关于 RedLock (红锁,后续有讲到)是否安全的激烈讨论。Martin 认为一般我们使用分布式锁有两个场景:

  • 效率:使用分布式锁可以避免不同节点重复相同的工作,这些工作会浪费资源。比如用户付了钱之后有可能不同节点会发出多封短信。
  • 正确性:加分布式锁同样可以避免破坏正确性的发生,如果两个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。

2.2 我们需要怎样的分布式锁?

  • 互斥性:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥,即同一个方法在同一时间只能被一台机器上的一个线程执行;
  • 可重入性:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁(避免死锁);
  • 锁超时:和本地锁一样支持锁超时,防止死锁;
  • 高效,高可用:加锁和解锁需要高效,同时也需要保证高可用性,防止分布式锁失效,可以增加降级;
  • 支持阻塞和非阻塞:和 ReentrantLock 一样支持 lock 和 trylock 以及 tryLock(long timeOut)。
  • 支持公平锁和非公平锁(可选):公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的。这个一般来说实现的比较少。

2.3 分布式锁的实现

分布式的 CAP 理论告诉我们:任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要是在用户可以接受的范围内即可。

一般来说,分布式锁的实现有以下几个方式:

  • 基于数据库(MySQL)实现;
  • 基于缓存(Redis)实现;
  • 基于 Zookeeper 实现。

这三种方案没有绝对的好坏之分,需要根据场景选择最适合的实现方式。

3. 基于数据库实现分布式锁

数据库在日常开发中十分常见,因此利用 MySQL 实现的分布式锁相对也比较简单、易理解。首先要建立一张这样的数据库表:

1
2
3
4
5
6
7
8
CREATE TABLE `methodLock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

3.1 基于数据库表

当我们要锁住某个方法时,执行以下 SQL:

1
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)

因为我们对 method_name 做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

方法执行完以后,释放锁的时候执行以下 SQL:

1
delete from methodLock where method_name ='method_name'

上面的实现有几个问题:

  • 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
  • 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
  • 这把锁只能是非阻塞的,因为数据的 insert 操作一旦失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
  • 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

3.2 基于数据库排他锁

在 MySQL 的 InnoDB 引擎下,我们利用数据库排他锁实现分布式锁。

3.2.1 lock()

lock() 方法是阻塞式的获取锁,意思就是不获取到锁誓不罢休,那么我们可以写一个死循环来执行其操作:

1
2
3
4
5
6
7
8
9
public void lock(){
while(true){
if(mysqlLock.lock(method)){
return;
}
// 休眠3ms后重试
LockSupport.parkNanos(1000 * 1000 * 3);
}
}

mysqlLock.lock() 方法内部包含一个 SQL 语句,为了达到可重入锁的效果,应该先查询是否有对应的方法名字,如果有,就需要比较 desc 是否一致,这里的 desc 可以使用机器 IP 或线程名字等来表示,如果一致就增加可重入锁 counter 的值,如果不一致就返回;如果没有这个方法就插入一条新的数据。伪代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Transcation
public boolean lock(String methodName){
if(select * from methodLock where method_name = methodName for update){
// 是否有该方法
if(currentDesc == resultDesc){
// 节点信息是否一致
update methodLock set counter = counter + 1 where method_name = methodName;
return true;
} else{
return false;
}
} else{
insert into method_name ...;
return true;
}
}

这个代码必须加上事务,必须要保证这一系列操作的原子性。

在查询语句后面增加 for update,数据库会在查询过程中给数据库表增加排他锁(由于 InnoDB 引擎的性质,需要为检索的字段加上唯一索引),当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。

当然,假如我们要加锁的资源已经在数据库中了,那么也不需要建表了,直接利用 for update 加锁。

3.2.2 tryLock() 和 tryLock(long timeout)

相比较 lock(),trylock() 是非阻塞获取锁,如果获取不到那么就会马上返回,代码如下:

1
2
3
public boolean trylock(){
return mysqlLock.lock();
}

tryLock(long timeout) 实现如下:

1
2
3
4
5
6
7
8
9
10
11
public boolean trylock(long timeout){
long endTime = System.currentTimeMillis() + timeout;
while(true){
if(mysqlLock.lock()){
return true;
}
if(endTime >= System.currentTimeMillis()){
return false;
}
}
}

这里的 mysqlLock.lock() 实现和上面一样,但是要注意的是 for update 这个是阻塞获取行锁,如果同一个资源并发量较大还是有可能会退化成阻塞的获取锁。

这里还可能存在另外一个问题,虽然我们对 method_name 使用了唯一索引,并且显示使用 for update 来使用行级锁。但是,MySQL 会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。

还有一个问题,就是我们要使用排他锁来进行分布式锁的 lock(),那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆。

3.2.3 unlock()

这里的解锁就是按照前面可重入锁的 counter 来实现的,伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Transcation
public boolean unlock(){
// 表中有数据
if(select * from methodLock){
if(currentDesc == resultDesc){
if(counter > 1){
counter--;
} else{
delete;
}
} else{
return false;
}
} else{
return false;
}
}

3.3 锁超时

有可能集群中的一台机器挂了,分布式锁也就无法释放了。解决办法是启动一个定时任务,通过我们对于一个锁的生命周期的预期(可以计算处理任务的平局时间,再在这个时间基础上略微扩大一些),确定定时任务的周期,超过一定时间,我们就认为是机器宕机,直接将锁释放。

3.4 小结

  • 优点:理解起来简单,不需要维护第三方的中间件。
  • 缺点:实现起来比较繁琐,即使如上考虑了可重入、阻塞与非阻塞、事务以及锁超时等等,还是存在不少问题,在解决问题的过程中会使整个方案变得越来越复杂;性能收到了数据库局限;不适合高并发场景。

4. 基于 Redis 实现分布式锁

相对于基于数据库实现分布式锁的方案来说,基于缓存来实现在性能方面会表现的更好一点,存取速度快很多。而且很多缓存是可以集群部署的,可以解决单点问题。基于缓存的锁有好几种,本文下面主要讲解基于 Redis 的分布式实现。

4.1 原生实现

首先是一个最基础的版本,利用 Redis 的 setnx 命令来进行加锁:

1
SET lockKey value NX

解锁也很简单,使用 del 命令即可。

这样的实现有一个问题:由于某种原因,执行加锁的机器 A 挂了,这个锁就会一直存在,不能释放,其他机器永远也获得不到锁了。

问题的解决思路也很简单:为锁设置一个过期时间。类似的 Redis 命令如下所示:

1
SET lockKey value NX EX 30

这里特别要注意一点,设置锁和过期时间的两个操作要保证原子性,否则机器挂了,锁还是没办法释放。(Redis 2.8 之前可以使用 Lua 脚本实现,2.8 之后支持 nx 和 ex 属于原子操作)

又有一个问题:如何保证在这个过期时间内业务可以执行完成?一个简单的解决思路是统计业务执行的最长耗时,在其基础上增加一段缓冲的时间,保证锁在任务执行完之前不会过期。但是这并没有真正解决问题。

再考虑一个场景:

  • 客户端 A 获取锁成功,过期设置为时间30秒,但它在某个操作上阻塞了50秒;
  • 30秒时间到了,锁将会自动释放;
  • 客户端 B 获取到了对应同一个资源的锁,开始执行自己的业务;
  • 客户端 A 从阻塞中恢复过来,释放掉了客户端 B 持有的锁。

也就是说,如何保证加锁和解锁是同一个客户端完成的呢(锁不会被误删除)?

这时,可以为每个客户端生成一个随机字符串 value,删除之前首先比较 value 的值,确认当前的锁是自己加的,那么才可以删除锁,这样就防止了误删。这个解锁操作也需要 Lua 脚本保证原子性。

4.2 Redisson

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid),它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。

有关 Redisson 的分布式锁的实现,可以先看看这里的 Wiki 文档。下面分析一下 RedissonLock 的实现。

首先是 lock() 的基本用法,代码和注释如下:

1
2
3
4
5
6
7
8
9
10
11
// 1.通过RedissonClient的getLock()方法获得RLock实例 
RLock lock = redissonClient.getLock("foobar");
// 2.获取分布式锁
lock.lock();
try {
// 3.若成功获得则继续往下执行,否则等待锁的释放再尝试获取
...
} finally {
// 4.释放锁
lock.unlock();
}

getLock() 的方法定义如下:

1
2
3
4
@Override
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name, id);
}

其中的 commandExecutor 可以与 Redis 节点进行通信并发送指令,具体是通过 eval 命令,来执行 Lua 脚本(要求 Redis 版本不低于 2.6);name 是锁的全局名称;id 是客户端的唯一标识,用 UUID 表示。

再来看一看 tryLock() 方法:

  1. 使用 Lua 脚本尝试加锁,保证这一操作的原子性。它使用了 Hash 结构,我们的每一个需要锁定的资源都可以看做是一个 HashMap,锁定资源的节点信息是 key,锁定次数是 value。
1
2
3
4
5
6
7
8
9
10
11
if (redis.call('exists', KEYS[1]) == 0) then 
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);

(1) 如果通过 exists 命令发现当前 key 不存在,即锁没被占用,则执行 hset 写入 Hash 类型数据 key:全局锁名称(例如共享资源ID), field:锁实例名称(Redisson 客户端ID:线程ID), value:1,并执行 pexpire 对该 key 设置失效时间,返回空值 nil,至此获取锁成功。

(2) 如果通过 hexists 命令发现 Redis 中已经存在当前 key 和 field 的 Hash 数据,说明当前线程之前已经获取到锁,因为这里的锁是可重入的,则执行 hincrby 对当前 key field 的值加一,并重新设置失效时间,返回空值,至此重入获取锁成功。

(3) 最后是锁已被占用的情况,即当前 key 已经存在,但是 Hash 中的 Field 与当前值不同,则执行 pttl 获取锁的剩余存活时间并返回,至此获取锁失败。

  1. 如果获得锁,则结束流程,回去执行业务逻辑。

  2. 如果没有获得锁,则需等待锁被释放,并通过 Redis 的 channel 订阅锁释放的消息。

  3. 订阅锁的释放消息成功后,进入一个不断重试获取锁的循环,循环中每次都先试着获取锁,并得到已存在的锁的剩余存活时间。 如果在重试中拿到了锁,则结束循环,跳过第 5 步。

  4. 如果锁当前是被占用的,那么等待释放锁的消息,具体实现使用了 JDK 并发的信号量工具 Semaphore 来阻塞线程,当锁释放并发布释放锁的消息后,信号量的 release() 方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。

  5. 在成功获得锁后,就没必要继续订阅锁的释放消息了,因此要取消对 Redis 上相应 channel 的订阅。

解锁的 unlock() 同样需要 Lua 脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;

  1. 利用 Lua 脚本来释放锁:

(1) key 不存在,说明锁已释放,直接执行 publish 命令发布释放锁消息并返回 1

(2) key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil

(3) 因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一

(4) 释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0;如果刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1

  1. 上面执行结果返回 nil 时,因为自己不是锁的持有者,不允许释放别人的锁,故抛出异常。

  2. 执行结果返回 1 的情况,该锁的所有实例都已全部释放,所以不需要再刷新锁的失效时间。

4.3 RedLock

想象一个的场景:当机器 A 申请到一把锁之后,如果 Redis 主机宕机了,由于主从复制是异步的,这个时候从机并没有同步到这一把锁,那么机器 B 就会再次申请到这把锁。为了解决这个问题,就有了 RedLock。RedLock 的核心思想就是,同时使用多个 Redis Master 来冗余,且这些节点都是完全的独立的,也不需要对这些节点之间的数据进行同步。

假设我们有 N 个 Redis 节点,N 应该是一个大于2的奇数。RedLock 的实现步骤如下:

  1. 取得当前时间;
  2. 依次获取 N 个节点的 Redis 锁;
  3. 如果获取到的锁的数量大于 (N/2+1)个,且获取的时间小于锁的有效时间(lock validity time)就认为获取到了一个有效的锁。锁自动释放时间就是最初的锁释放时间减去之前获取锁所消耗的时间。
  4. 如果获取锁的数量小于 (N/2+1),或者在锁的有效时间(lock validity time)内没有获取到足够的锁,就认为获取锁失败。这个时候需要向所有节点发送释放锁的消息。

对于释放锁的实现就很简单了。向所有的 Redis 节点发起释放的操作,无论之前是否获取锁成功。

可以看见 RedLock 基本原理是利用多个 Redis 集群,用多数的集群加锁成功,减少 Redis 某个集群出故障,造成分布式锁出现问题的概率。

5. 基于 Zookeeper 实现分布式锁

5.1 基本实现

假设我们现在有一个父节点 /lock,实现一个分布式锁的流程大致如下:

  1. 客户端连接 Zookeeper,并在 /lock 下创建临时的有序的子节点,第一个客户端对应的子节点为 /lock/lock-0000000000,第二个为 /lock/lock-0000000001,以此类推;
  2. 客户端获取 /lock 下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁(这里有对羊群效应的优化);
  3. 执行业务代码;
  4. 完成业务流程后,删除对应的子节点释放锁。

由于创建的是临时节点,所以当某台服务器宕机时,Zookeeper 会判断与客户端的会话失败,删除结点,所以不会有锁无法解除的问题。

可能有人会有一个疑问:假设客户端 A 有对应子节点 lock-1,客户端 B 有对应子节点 lock-2,当客户端 B 获取子节点列表时发现自己不是最小的节点,准备要订阅删除消息前,客户端 A 正好删除了 lock-1,那么客户端 B 岂不是要一直等待下去了?Zookeeper 保证了不会出现这个问题,因为它的 API 提供的设置监听器的操作与读操作是原子执行的,在获取列表并判断的同时设置监听器,中间不会有其他操作,保证不会丢失事件。

使用Zookeeper也可以有效的解决不可重入的问题,客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。

5.2 Curator

Curator 封装了 Zookeeper 底层的 API,我们可以更加方便地对 Zookeeper 进行操作,并且它封装了分布式锁的功能,这样我们就不需要再自己实现了。框架的基本使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws Exception {
//创建zookeeper的客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);
client.start();

//创建分布式锁, 锁空间的根节点路径为/curator/lock
InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
try{
mutex.acquire();
//获得了锁, 进行业务流程
System.out.println("Enter mutex");
} finally {
//完成业务流程, 释放锁
mutex.release();
}
//关闭客户端
client.close();
}

6. 总结

本文主要讲解了几种分布式锁的实现和它们各自的问题、优缺点等,我们在实际生产过程中要根据具体场景,通过分析找到最适合的解决方案。

参考资料