分布式锁出现的原因
在传统单体应用单机部署的情况下,可以使用并发处相关的功能(如java并发处理相关的API:ReentrantLock或者syncchronized等)进行互斥控制来解决。但随着业务发展,系统架构也会逐步优化升级,原本单体单机部署的系统被演化为分布式集群系统。由于分布式系统多线程,多进程并且分布在多个不同机器上,这将使原来单机部署情况下的并发控制锁策略无法满足,并不能提供分布式锁的能力。为了解决这个问题就需一个跨机器的互斥机制来控制共享资源的访问,这就是分布式锁的解决的难题。
分布式锁应用的场景
分布式锁需要具备的特性
特性 | 说明 |
---|---|
排他性 | 同一时间只会有一个客户端能获取到锁,其它客户端无法同时获取 |
避免死锁 | 锁在一段时间内有效,超过这个时间后会被释放(正常释放或异常释放) |
高可用 | 获取或释放锁的机制必须高可用且性能不能过差 |
首先我们可以使用Redis实现初步具有锁能力的代码
@Test
public void testDistributedLockRedis() {
String LOCK_KEY = "goods_001";
String lockThreadFlag = UUID.randomUUID().toString();
Boolean lockResult = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_KEY, lockThreadFlag, 30, TimeUnit.SECONDS);
try {
if (Boolean.TRUE.equals(lockResult)) {
// 执行业务代码
doBusinessCode();
// ....
}
} finally {
if (Boolean.TRUE.equals(lockResult) && lockThreadFlag.equals(stringRedisTemplate.opsForValue().get(LOCK_KEY))) {
stringRedisTemplate.delete(LOCK_KEY);
}
}
}
问题:①如果业务代码执行的时间超过锁过期时间那么资源锁被释放了,就会有并发问题。如果时间设置过久,程序宕机没有释放锁,会导致锁时间过长。②重入问题没有考虑
为解决以上问题,需要获得锁的线程开启一个守护线程,用来给快要过期的锁”续航”。例如每过10s检查,如果业务代码没执行完则重设锁时长为30。由于业务线程和守护线程在同一个进程,业务线程执行完成或者终止,守护线程也会停下。这把锁到了超时的时候,没人给它续命,也就自动释放了。但是编写这些代码在实际生产过程种可能要考虑更多问题,此时我们可以用Redisson
框架的封装完善的锁。
redission加锁和解锁采用的是Lua脚本,需要先研究一下加锁和解锁脚本都做了什么
加锁脚本
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);
加锁脚本流程解读
解锁脚本
-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
解锁脚本流程解读
广播解锁消息的作用:通知其它争抢锁阻塞住的线程,从阻塞中解除,并再次去争抢锁
加锁和解锁总流程图
/**
* 普通非公平重入锁
*/
@Test
public void testRedissionLock() {
String LOCK_KEY = "goods_001";
//获取分布式锁,只要锁的名字一样,就是同一把锁
RLock lock = redissonClient.getLock(LOCK_KEY);
//加锁(阻塞等待),默认过期时间是30秒,实现的JDK的LOCK接口,也可使用tryLock尝试加锁
lock.lock();
try {
//如果业务执行过长,Redisson会自动给锁续期
doBusinessCode();
} finally {
//解锁,如果业务执行完成,就不会继续续期,即使没有手动释放锁,在30秒过后,也会释放锁
lock.unlock();
}
}
/**
* 公平锁
*/
@Test
public void testRedissionFairLock() {
String LOCK_KEY = "goods_001";
//获取分布式锁,只要锁的名字一样,就是同一把锁
RLock lock = redissonClient.getFairLock(LOCK_KEY);
//加锁(阻塞等待)
lock.lock();
try {
//如果业务执行过长,Redisson会自动给锁续期
doBusinessCode();
} finally {
//解锁,如果业务执行完成,就不会继续续期,即使没有手动释放锁,在30秒过后,也会释放锁
lock.unlock();
}
}
/**
* 读写锁
*/
@Test
public void testRedissionReadWriteLock() {
String LOCK_KEY = "goods_001";
//获取分布式锁,只要锁的名字一样,就是同一把锁
RReadWriteLock lock = redissonClient.getReadWriteLock(LOCK_KEY);
//加锁(阻塞等待)
RLock readLock = lock.readLock();
try {
//如果业务执行过长,Redisson会自动给锁续期
doBusinessCode();
} finally {
//解锁,如果业务执行完成,就不会继续续期,即使没有手动释放锁,在30秒过后,也会释放锁
readLock.unlock();
}
}
/**
* 批量连锁
*/
@Test
public void testRedissionMultiLock() {
RLock lock1 = redissonClient.getLock("goods_001");
RLock lock2 = redissonClient.getLock("goods_002");
RLock lock3 = redissonClient.getLock("goods_003");
// 同时对几个资源一起加锁
RLock lock = redissonClient.getMultiLock(lock1, lock2, lock3);
//加锁(阻塞等待)
lock.lock();
try {
//如果业务执行过长,Redisson会自动给锁续期
doBusinessCode();
} finally {
//解锁,如果业务执行完成,就不会继续续期,即使没有手动释放锁,在30秒过后,也会释放锁
lock.unlock();
}
}
/**
* CountDownLatch
*/
@Test
public void testRedissionCountDownLatch() {
String LOCK_KEY = "TEST_COUNT_DOWN_LATCH";
// 获取countDownLatch,其它地方有个设置countDown的数量 countDownLatch.trySetCount(10);
RCountDownLatch countDownLatch = redissonClient.getCountDownLatch(LOCK_KEY);
countDownLatch.countDown();
doBusinessCode();
}
/**
* Semaphore
*/
@Test
public void testRedissionSemaphoreKey() throws InterruptedException {
String SEMAPHORE_KEY = "TEST_SEMAPHORE";
RSemaphore semaphore = redissonClient.getSemaphore(SEMAPHORE_KEY);
semaphore.acquire(2);
doBusinessCode();
semaphore.release(2);
}
因为Redis是AP架构,主从之间是异步复制。极端情况下如果master节点挂掉,但是slave节点还未同步到master数据,这时候锁会失效。为了避免这种极端情况可以使用RedLock【其实不推荐使用,RedLock本身也存在一些问题,达到CP的效果不如直接使用zookeeper或者etcd这种本就是CP的架构】
算法大概逻辑:部署多台与master节点同等级别的其他节点,这几个Redis不参与其他的业务。每一个线程在向master节点请求锁的同时,也向这几个同等级别的节点发送加锁请求,只有当超过一半的节点数加锁成功,此时的分布式锁才算真正的成功。
缺点:
zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁。并且其他节点就会监听序号比自己小的节点【利用zookeeper的Watcher机制】,一旦序号比自己小的节点被删除了,其他节点就会得到相应的事件,然后查看自己是否为序号最小的节点,如果是,则获取锁。
zookeeper实现的分布式锁是CP的,性能没redis的高。但是不用担心redis主从架构,主节点挂掉从节点还没同步数据造成的锁失效。
可重入是利用JDK线程ThreadId是否相同判断的
public class ZkLock implements Lock {
//......
final AtomicInteger lockCount = new AtomicInteger(0);
//......
@Override
public boolean lock() {
//可重入,确保同一线程,可以重复加锁
synchronized (this) {
if (lockCount.get() == 0) {
thread = Thread.currentThread();
lockCount.incrementAndGet();
} else {
if (!thread.equals(Thread.currentThread())) {
return false;
}
lockCount.incrementAndGet();
return true;
}
//......
}
}
}
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和分布式锁等。
使用Spring Boot初始化Curator的前置代码
@Configuration
public class CuratorConfig {
@Bean(destroyMethod = "close", initMethod = "start")
public CuratorFramework curatorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
return CuratorFrameworkFactory.builder()
.connectString("192.168.153.133:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.namespace("zookeeper-lock")
.retryPolicy(retryPolicy)
.build();
}
}
@Test
public void testZkMutex() throws Exception {
String LOCK_KEY = "goods_001";
InterProcessMutex zkMutex = new InterProcessMutex(curatorFramework, "/"+ LOCK_KEY);
// 阻塞死等
zkMutex.acquire();
try {
doBusinessCode();
} finally {
zkMutex.release();
}
}
@Test
public void testZkReadLock() throws Exception {
String LOCK_KEY = "goods_001";
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curatorFramework, "/" + LOCK_KEY);
// 阻塞死等
InterProcessMutex readLock = readWriteLock.readLock();
try {
doBusinessCode();
} finally {
readLock.release();
}
}
@Test
public void testZkMultiLock() throws Exception {
final InterProcessLock lock1 = new InterProcessMutex(curatorFramework, "/lock_good01");
final InterProcessLock lock2 = new InterProcessMutex(curatorFramework, "/lock_good02");
InterProcessMultiLock interProcessMultiLock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
// 阻塞死等
interProcessMultiLock.acquire();
try {
doBusinessCode();
} finally {
interProcessMultiLock.release();
}
}
@Test
public void testZkSemaphore() throws Exception {
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(curatorFramework, "/test_semaphore", 10);
// 阻塞死等
Collection<Lease> acquireLease = semaphore.acquire(8);
try {
doBusinessCode();
} finally {
semaphore.returnAll(acquireLease);
}
}