分布式锁
基于数据库
- 基于数据库表记录做唯一约束(表中记录方法名称)
- 基于数据库表做悲观锁(InnoDB,for update)
- 基于数据库表做乐观锁,用于分布式锁。(version)
基于数据库表数据记录做唯一约束(表中记录方法名称)
- 要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。
- 当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。 创建这样一张数据库表:
- 当我们想要锁住某个方法时,执行以下SQL:
因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功(原子性),那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:
上面这种简单的实现有以下几个问题:
- 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
- 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
- 这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
- 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
当然,我们也可以有其他方式解决上面的问题。
- 数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
- 没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
- 非阻塞的?搞一个while循环,直到insert成功再返回成功。
- 非重入的?在数据库表中加两个字段,一个记录当前获得锁的机器的主机信息和线程信息,另一个是count值,用于记录重入的次数,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了,并把count加1。在释放锁的时候把count值减1,当count值为0时候,删除记录即可。
基于数据库表做悲观锁(InnoDB引擎,for update语句)
除了可以通过增删操作数据表中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。 我们还用刚刚创建的那张数据库表。可以通过数据库的排他锁来实现分布式锁。 基于MySql的InnoDB引擎,可以使用以下方法来实现加锁操作:
在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁(这里再多提一句,InnoDB引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给method_name添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上)。
当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。 我们可以认为获得排它锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,再通过以下方法解锁:
1 | public void lock(){ |
通过 connection.commit() 操作来释放锁。
这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。
- 阻塞锁? for update语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。
- 锁定之后服务宕机,无法释放?使用这种方式,服务宕机之后数据库会自己把锁释放掉
但是还是无法直接解决数据库单点和可重入问题。
这里还可能存在另外一个问题,虽然我们对method_name 使用了唯一索引,并且显示使用for update来使用行级锁。但是,MySql会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。如果发生这种情况就悲剧了。。。
还有一个问题,就是我们要使用排他锁来进行分布式锁的lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆
基于数据库资源表做乐观锁,用于分布式锁:
首先说明乐观锁的含义:
大多数是基于数据版本(VERSION)的记录机制实现的。何谓数据版本号?即为数据增加一个版本标识,
在基于数据库表的版本解决方案中,一般是通过为数据库表添加一个“VERSION”字段来实现读取出数据时
,将此版本号一同读出,之后更新时,对此版本号加1。在更新过程中,会对版本号进行比较,如果是一致的,没有发生改变,则会成功执行本次操作;
如果版本号不一致,则会更新失败。
复制代码
- 对乐观锁的含义有了一定的了解后,结合具体的例子,我们来推演下我们应该怎么处理:
假设我们有一张资源表,如下图所示: T_RESOURCE , 其中有6个字段ID, RESOOURCE, STATE, ADD_TIME, UPDATE_TIME, VERSION,分别表示表主键、资源、分配状态(1未分配 2已分配)、资源创建时间、资源更新时间、资源数据版本号。
假设我们现在我们对ID=5780这条数据进行分配,那么非分布式场景的情况下,我们一般先查询出来STATE=1(未分配)的数据,然后从其中选取一条数据可以通过以下语句进行,如果可以更新成功,那么就说明已经占用了这个资源 UPDATE T_RESOURCE SET STATE=2 WHERE STATE=1 AND ID=5780。
如果在分布式场景中,由于数据库的UPDATE操作是原子是原子的,其实上边这条语句理论上也没有问题,但是这条语句如果在典型的“ABA”情况下,我们是无法感知的。有人可能会问什么是“ABA”问题呢?大家可以网上搜索一下,这里我说简单一点就是,如果在你第一次SELECT和第二次UPDATE过程中,由于两次操作是非原子的,所以这过程中,如果有一个线程,先是占用了资源(STATE=2),然后又释放了资源(STATE=1),实际上最后你执行UPDATE操作的时候,是无法知道这个资源发生过变化的。也许你会说这个在你说的场景中应该也还好吧,但是在实际的使用过程中,比如银行账户存款或者扣款的过程中,这种情况是比较恐怖的。
那么如果使用乐观锁我们如何解决上边的问题呢?
A. 先执行SELECT操作查询当前数据的数据版本号,比如当前数据版本号是26:
SELECT ID, RESOURCE, STATE,VERSION FROM T_RESOURCE WHERE STATE=1 AND ID=5780;
B. 执行更新操作:
UPDATE T_RESOURE SET STATE=2, VERSION=27, UPDATE_TIME=NOW() WHERE RESOURCE=XXXXXX AND
STATE=1 AND VERSION=26
C. 如果上述UPDATE语句真正更新影响到了一行数据,那就说明占位成功。如果没有更新影响到一行数据
,则说明这个资源已经被别人占位了。
基于数据库表做乐观锁的一些缺点:
- 这种操作方式,使原本一次的UPDATE操作,必须变为2次操作: SELECT版本号一次;UPDATE一次。增加了数据库操作的次数。
- 如果业务场景中的一次业务流程中,多个资源都需要用保证数据一致性,那么如果全部使用基于数据库资源表的乐观锁,就要让每个资源都有一张资源表,这个在实际使用场景中肯定是无法满足的。而且这些都基于数据库操作,在高并发的要求下,对数据库连接的开销一定是无法忍受的。
- 乐观锁机制往往基于系统中的数据存储逻辑,因此可能会造成脏数据被更新到数据库中。在系统设计阶段,我们应该充分考虑到这些情况出现的可能性,并进行相应调整,如将乐观锁策略在数据库存储过程中实现,对外只开放基于此存储过程的数据更新途径,而不是将数据库表直接对外公开。
讲了乐观锁的实现方式和缺点,是不是会觉得不敢使用乐观锁了呢???当然不是,在文章开头我自己的业务场景中,场景1和场景2的一部分都使用了基于数据库资源表的乐观锁,已经很好的解决了线上问题。所以大家要根据的具体业务场景选择技术方案,并不是随便找一个足够复杂、足够新潮的技术方案来解决业务问题就是好方案?!比如,如果在我的场景一中,我使用zookeeper做锁,可以这么做,但是真的有必要吗???答案觉得是没有必要的!!!
总结一下使用数据库来实现分布式锁的方式,这两种方式都是依赖数据库的一张表,一种是通过表中的记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。
数据库实现分布式锁的优点
直接借助数据库,容易理解。
数据库实现分布式锁的缺点
会有各种各样的问题,在解决问题的过程中会使整个方案变得越来越复杂。
操作数据库需要一定的开销,性能问题需要考虑。
使用数据库的行级锁并不一定靠谱,尤其是当我们的锁表并不大的时候。
基于缓存
- 使用redis的setnx()用于分布式锁。(setNx,直接设置值为当前时间+超时时间,保持操作原子性)
使用memcached的add()方法,用于分布式锁。使用Tair的put()方法,用于分布式锁。
基于缓存实现分布式锁 Redis
使用redis的setnx()用于分布式锁。(原子性)
SETNX是将 key 的值设为 value,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不做任何动作。
- 返回1,说明该进程获得锁,SETNX将键 lock.id 的值设置为锁的超时时间,当前时间 +加上锁的有效时间。
- 返回0,说明其他进程已经获得了锁,进程不能进入临界区。进程可以在一个循环中不断地尝试 SETNX 操作,以获得锁。
存在死锁的问题
SETNX实现分布式锁,可能会存在死锁的情况。与单机模式下的锁相比,分布式环境下不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。某个线程获取了锁之后,断开了与Redis 的连接,锁没有及时释放,竞争该锁的其他线程都会hung,产生死锁的情况。所以在这种情况下需要对获取的锁进行超时时间设置,即setExpire,超时自动释放锁
Redission的分布式锁
1 | Redission是redis官网推荐的一个redis客户端,除了基于redis的基础的CURD命令以外,重要的是就是Redission提供了方便好用的分布式锁API |
一、 基本用法
1 | RedissonClient redissonClient = RedissonTool.getInstance(); |
代码流程
- 通过 redissonClient 获取 RLock 实例
- tryLock 获取尝试获取锁,第一个是等待时间,第二个是锁的超时时间,第三个是时间单位
- 执行完业务逻辑后,最终释放锁
二、 具体实现
我们通过tryLock来分析redission分布式的实现,lock方法跟tryLock差不多,只不过没有最长等待时间的设置,会自旋循环等待锁的释放,直到获取锁为止
1 | long time = unit.toMillis(waitTime); |
代码分析
- 首先 tryAcquire 尝试获取锁,若返回ttl为null,说明获取到锁了
- 判断等待时间是否过期,如果过期,直接返回获取锁失败
- 通过 Redis 的 Channel 订阅监听队列,subscribe 内部通过信号量 semaphore,再通过await方法阻塞,内部其实是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果,来保证订阅成功,再判断是否到了等待时间
- 再次尝试申请锁和等待时间的判断,循环阻塞在这里等待锁释放的消息 RedissonLockEntry 也维护了一个semaphore 的信号量
- 无论是否释放锁,最终都要取消订阅这个队列消息
- redission 内部的 getEntryName 是客户端实例ID + 锁名称 来保证多个实例下的锁可重入
tryAcquire获取锁
redisssion获取锁的核心代码,内部其实是异步调用,但是用get方法阻塞了
1 | private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { |
tryLockInnerAsync方法内部是基于Lua脚本来获取锁的
- 先判断KEYS[1](锁名称)对应的key是否存在,不存在获取到锁,hset设置key的value,pexpire设置过期时间,返回null表示获取到锁
- 存在的话,锁被占,hexists判断是否是当前线程的锁,若是的话,hincrby增加重入次数,重新设置过期时间,不是当前线程的锁,返回当前锁的过期时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}Redission避免死锁的解决方案:
Redission为了避免锁未被释放,采用了一个特殊的解决方案,若未设置过期时间的话,redission默认的过期时间是30s,同时未避免锁在业务未处理完成之前被提前释放,Redisson在获取到锁且默认过期时间的时候,会在当前客户端内部启动一个定时任务,每隔internalLockLeaseTime/3的时间去刷新key的过期时间,这样既避免了锁提前释放,同时如果客户端宕机的话,这个锁最多存活30s的时间就会自动释放(刷新过期时间的定时任务进程也宕机)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45// lock acquired,获取到锁的时候设置定期更新时间的任务
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
//expirationRenewalMap的并发安全MAP记录设置过的缓存,避免并发情况下重复设置任务,internalLockLeaseTime / 3的时间后重新设置过期时间
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
unlock解锁
1 | protected RFuture<Boolean> unlockInnerAsync(long threadId) { |
Redission的unlock解锁也是基于Lua脚本实现的,内部逻辑是先判断锁是否存在,不存在说明已经被释放了,发布锁释放消息后返回,锁存在再判断当前线程是否锁拥有者,不是的话,无权释放返回,解锁的话,会减去重入的次数,重新更新过期时间,若重入数捡完,删除当前key,发布锁释放消息
基于Zookeeper
- 每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。 判断是否获取锁只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。
基于Zookeeper实现分布式锁
基于 zookeeper 临时有序节点可以实现的分布式锁。
大致思想即为:
每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的临时有
序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。
当释放锁的时候,只需将这个临时节点删除即可。同时,排队的节点需要监听排在自己之前的节点,这样能
在节点释放时候接收到回调通知,让其获得锁。zk的session由客户端管理,其可以避免服务宕机导致的锁无
法释放,而产生的死锁问题,不需要关注锁超时。
复制代码
来看下Zookeeper能不能解决前面提到的问题。
- 锁无法释放?使用Zookeeper可以有效的解决锁无法释放的问题,因为在创建锁的时候,客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
- 非阻塞锁?使用Zookeeper可以实现阻塞的锁,客户端可以通过在ZK中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。
- 不可重入?使用Zookeeper也可以有效的解决不可重入的问题,客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。
- 单点问题?使用Zookeeper可以有效的解决单点问题,ZK是集群部署的,只要集群中有半数以上的机器存活,就可以对外提供服务。
可以直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。
Curator提供的InterProcessMutex是分布式锁的实现。acquire方法用户获取锁,release方法用于释放锁。
使用ZK实现的分布式锁好像完全符合了本文开头我们对一个分布式锁的所有期望。但是,其实并不是,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同步到所有的Follower机器上。
其实,使用Zookeeper也有可能带来并发问题,只是并不常见而已。考虑这样的情况,由于网络抖动,客户端到ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。就可能产生并发问题。这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点。(所以,选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡。)
基于ZK的方案的总结
使用Zookeeper实现分布式锁的优点
有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。
使用Zookeeper实现分布式锁的缺点
性能上不如使用缓存实现分布式锁。 需要对ZK的原理有所了解。
三种方案的比较
上面几种方式,哪种方式都无法做到完美。就像CAP一样,在复杂性、可靠性、性能等方面无法同时满足,所以,根据不同的应用场景选择最适合自己的才是王道。
从理解的难易程度角度(从低到高)
数据库 > 缓存 > Zookeeper
从实现的复杂性角度(从低到高)
Zookeeper >= 缓存 > 数据库
从性能角度(从高到低)
缓存 > Zookeeper >= 数据库
从可靠性角度(从高到低)
Zookeeper > 缓存 > 数据库