Redis 分布式锁原理与实现
Java 应用在多线程环境下,我们可以通过 Java 内存模型实现同步,比如 Lock,synchronized 等, 但是在分布式环境下,特别是现在微服务盛行的时代,服务为了高可用会做集群。在这样的情况下每个服务都有自己独立进程,当高并发的情况下,会存在同步问题,本文主要记录自己学习Redis分布式锁的过程。从浅到深一步步通过代码去分析。
1. 什么场景下用分布式锁
比如,有一个账户服务对用户账户进行金额操作并且做了集群。此时有A,B两个请求同时对账户金额进行操作,操作步骤如下:
- 从数据库读取账户金额
- 对账户金额进行操作
- 保存账户金额到数据库
假如不做互斥,A得到账户金额,还没等A先修改完保存,B也从数据库中得到账户金额,那么等A处理完金额保存后,B随后的保存就将覆盖A的保存。
2. 场景分析和解决方案
我们通过代码演示不同的场景下,我们会出现什么样的问题,并且怎么样去解决。
2.1 不做任何同步
可以通过代码来解释这种情况,由于本地学习,用 Redis 模拟数据库存储账户金额。用多线程来模拟分布式环境,同时对账户进行操作。
UserAccount.java
//用户账户Model
public class UserAccount {
//用户ID
private String userId;
//账户内金额
private int amount;
public UserAccount(String userId, int amount) {
this.userId = userId;
this.amount = amount;
}
public UserAccount() {
}
//添加账户金额
public void addAmount(int amount) {
this.amount = this.amount + amount;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
}
AccountOperationThread.java
线程,模拟分布式环境
/**
* 账户操作
* 多线程方式模拟分布式环境
*/
public class AccountOperationThread implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);
//操作金额
private int amount;
private String userId;
private RedisTemplate redisTemplate;
public AccountOperationThread(String userId, int amount) {
this.amount = amount;
this.userId = userId;
redisTemplate = (RedisTemplate) SpringBeanUtil.getBeanByName("redisTemplate");
}
@Override
public void run() {
noLock();
}
/**
* 不做任何同步(锁)
*/
private void noLock() {
try {
Random random = new Random();
// 为了更好测试,模拟线程先后进入,每个线程随机休眠 1-100毫秒再进行业务操作
TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//模拟数据库中获取用户账号
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
//设置金额
userAccount.setAmount(userAccount.getAmount() + amount);
logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
//模拟存回数据库
redisTemplate.opsForValue().set(userId, userAccount);
}
}
为了方便测试,我用的是SpringBoot
,RedisTemplate
配置了自定义序列化方式,TestController
来进行测试。
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 设置value的序列化规则和 key的序列化规则
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
@RestController
public class TestController {
private final static Logger logger = LoggerFactory.getLogger(TestController.class);
private static ExecutorService executorService = Executors.newCachedThreadPool();
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("/test")
public String test() throws InterruptedException {
//设置用户user_001到 Redis,初始化账户金额为0.
redisTemplate.opsForValue().setIfAbsent("user_001",new UserAccount("user_001",0));
//开启10个线程进行同步测试,每个线程为账户增加1元
for (int i = 0; i < 10; i++) {
executorService.execute(new AccountOperationThread("user_001",1));
}
//休眠1秒等待线程跑完
TimeUnit.MILLISECONDS.sleep(1000);
//获得Redis中的user_001账户
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001");
logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount());
return "success";
}
}
# 运行结果如下 看到thread-22,thread-24,thread-25 这3个线程出现了同步,导致修改金额都未成功。
# 正常最终输出应该是10,结果只有7
: pool-1-thread-22 : user id : user_001 amount : 1
: pool-1-thread-24 : user id : user_001 amount : 1
: pool-1-thread-25 : user id : user_001 amount : 1
: pool-1-thread-26 : user id : user_001 amount : 2
: pool-1-thread-21 : user id : user_001 amount : 3
: pool-1-thread-29 : user id : user_001 amount : 4
: pool-1-thread-27 : user id : user_001 amount : 5
: pool-1-thread-28 : user id : user_001 amount : 5
: pool-1-thread-23 : user id : user_001 amount : 6
: pool-1-thread-30 : user id : user_001 amount : 7
: user id : user_001 amount : 7
2.2 Redis 同步锁
如何用Redis解决呢
正常的思路下,我们在user_001
这个用户账户的资源上加个锁,同时只能有一个线程访问,这个其实和线程锁概念一样,但是线程锁是在同一个JVM中生效,在分布式环境下,我们可以用Redis缓存锁解决这个问题。
解决思路:当第一个线程进入程序前,先判断锁是否存在,如果存在则等待资源释放,不存在则抢占资源并且加锁。代码如下:
/**
* 1.抢占资源时判断是否被锁。
* 2.如未锁则抢占成功且加锁,否则等待锁释放。
* 3.业务完成后释放锁,让给其它线程。
*/
private void redisLock(){
Random random = new Random();
try {
//为了更好测试,模拟线程进入间隔,每个线程随机休眠 1-1000毫秒再进行业务操作
TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
Object lock = redisTemplate.opsForValue().get(userId + ":syn");
if(lock == null ){
// 获得锁 -> 加锁 -> 跳出循环
logger.info(Thread.currentThread().getName() + ":获得锁");
redisTemplate.opsForValue().set(userId + ":syn","lock");
break;
}
try {
// 等待500毫秒重试获得锁
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try{
//模拟数据库中获取用户账号
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
//设置金额
userAccount.setAmount(userAccount.getAmount() + amount);
logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
//模拟存回数据库
redisTemplate.opsForValue().set(userId, userAccount);
}finally {
//释放锁
redisTemplate.delete(userId + ":syn");
logger.info(Thread.currentThread().getName() + ":释放锁");
}
}
# 输出
: pool-1-thread-1:获得锁
: pool-1-thread-1 : user id : user_001 amount : 1
: pool-1-thread-1:释放锁
: pool-1-thread-4:获得锁
: pool-1-thread-4 : user id : user_001 amount : 2
: pool-1-thread-4:释放锁
: pool-1-thread-9:获得锁
: pool-1-thread-9 : user id : user_001 amount : 3
: pool-1-thread-9:释放锁
: pool-1-thread-8:获得锁
: pool-1-thread-8 : user id : user_001 amount : 4
: pool-1-thread-8:释放锁
: pool-1-thread-5:获得锁
: pool-1-thread-5 : user id : user_001 amount : 5
: pool-1-thread-5:释放锁
: pool-1-thread-6:获得锁
: pool-1-thread-6 : user id : user_001 amount : 6
: pool-1-thread-6:释放锁
: pool-1-thread-10:获得锁
: pool-1-thread-10 : user id : user_001 amount : 7
: pool-1-thread-10:释放锁
: pool-1-thread-2:获得锁
: pool-1-thread-2 : user id : user_001 amount : 8
: pool-1-thread-2:释放锁
: pool-1-thread-3:获得锁
: pool-1-thread-3 : user id : user_001 amount : 9
: pool-1-thread-3:释放锁
: pool-1-thread-7:获得锁
: pool-1-thread-7 : user id : user_001 amount : 10
: pool-1-thread-7:释放锁
: user id : user_001 amount : 10
看似好了,但是其实并没解决问题,假设把下面这行代码的休眠时间变小,同步问题就马上到来,由于线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A 获得锁,还未加锁时,线程B也获得了锁。他们还是存在同步问题。其实并不需要修改休眠时间,多执行几次,也会看见类似同步问题。
//为了更好测试,模拟线程进入间隔,每个线程随机休眠 1-50毫秒再进行业务操作
TimeUnit.MILLISECONDS.sleep(random.nextInt(50) + 1);
# 输出会出现如下情况
: pool-1-thread-16:获得锁
: pool-1-thread-16 : user id : user_001 amount : 38
: pool-1-thread-16:释放锁
: pool-1-thread-18:获得锁
: pool-1-thread-19:获得锁
: pool-1-thread-19 : user id : user_001 amount : 39
: pool-1-thread-18 : user id : user_001 amount : 39
: pool-1-thread-18:释放锁
: pool-1-thread-19:释放锁
2.3 Redis 原子性同步锁
那么只要解决获得锁和加锁的原子性,是否能解决问题,Spring-boot-data-redis 提供了原子性操作的 API
//该方法其实是使用了 redis 的指令 SET key value`NX
// 1.key 不存在,设置成功返回 value setIfAbsent返回true
// 2.key 存在,则设置失败返回 nul,setIfAbsent返回false
// 3.原子性操作
Boolean setIfAbsent(K var1, V var2);`
/**
* 1.原子操作加锁
* 2.竞争线程循环重试获得锁
* 3.业务完成释放锁
*/
private void atomicityRedisLock() {
//Spring data redis 支持的原子性操作
while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {
try {
// 等待100毫秒重试获得锁
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info(Thread.currentThread().getName() + ":获得锁");
try {
//模拟数据库中获取用户账号
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
//设置金额
userAccount.setAmount(userAccount.getAmount() + amount);
logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
//模拟存回数据库
redisTemplate.opsForValue().set(userId, userAccount);
} finally {
//释放锁
redisTemplate.delete(userId + ":syn");
logger.info(Thread.currentThread().getName() + ":释放锁");
}
}
# 输出
: pool-1-thread-1:获得锁
: pool-1-thread-1 : user id : user_001 amount : 1
: pool-1-thread-1:释放锁
: pool-1-thread-4:获得锁
: pool-1-thread-4 : user id : user_001 amount : 2
: pool-1-thread-4:释放锁
: pool-1-thread-9:获得锁
: pool-1-thread-9 : user id : user_001 amount : 3
: pool-1-thread-9:释放锁
: pool-1-thread-8:获得锁
: pool-1-thread-8 : user id : user_001 amount : 4
: pool-1-thread-8:释放锁
: pool-1-thread-5:获得锁
: pool-1-thread-5 : user id : user_001 amount : 5
: pool-1-thread-5:释放锁
: pool-1-thread-6:获得锁
: pool-1-thread-6 : user id : user_001 amount : 6
: pool-1-thread-6:释放锁
: pool-1-thread-10:获得锁
: pool-1-thread-10 : user id : user_001 amount : 7
: pool-1-thread-10:释放锁
: pool-1-thread-2:获得锁
: pool-1-thread-2 : user id : user_001 amount : 8
: pool-1-thread-2:释放锁
: pool-1-thread-3:获得锁
: pool-1-thread-3 : user id : user_001 amount : 9
: pool-1-thread-3:释放锁
: pool-1-thread-7:获得锁
: pool-1-thread-7 : user id : user_001 amount : 10
: pool-1-thread-7:释放锁
: user id : user_001 amount : 10
2.4 设置超时时间解决死锁
但是以上代码也会有问题,假设A线程抢占到了资源,但是很不幸在业务处理到一半时,宕机了。finally 代码块无法执行,就无法释放锁,线程B会死锁,永远无法获得这个锁。
/**
* 1. 线程1 拿到锁
* 2. 业务执行到一半宕机
* 3. 无法正常释放锁
* 4. 其它线程竞争资源导致死锁
*/
private void deadLock() {
//Spring data redis 支持的原子性操作
while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {
try {
// 等待1000毫秒重试获得锁
logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info(Thread.currentThread().getName() + ":获得锁");
try {
// 应用在这里宕机,进程退出,无法执行 finally;
Thread.currentThread().interrupt();
// 业务逻辑...
} finally {
//释放锁
if (!Thread.currentThread().isInterrupted()) {
redisTemplate.delete(userId + ":syn");
logger.info(Thread.currentThread().getName() + ":释放锁");
}
}
}
为了更清晰查看测试结果,我们只启动2个线程
public String test() throws InterruptedException {
//设置用户user_001到 Redis,初始化账户金额为0.
redisTemplate.opsForValue().setIfAbsent("user_001",new UserAccount("user_001",0));
//开启10个线程进行同步测试,每个线程为账户增加1元
for (int i = 0; i < 2; i++) {
executorService.execute(new AccountOperationThread("user_001",1));
}
//休眠1秒等待线程跑完
TimeUnit.MILLISECONDS.sleep(1000);
//获得Redis中的user_001账户
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001");
logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount());
return "success";
}
# 输出
pool-1-thread-1:尝试循环获取锁
pool-1-thread-2:获得锁
pool-1-thread-1:尝试循环获取锁
user id : user_001 amount : 0
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-2
获得锁,未释放锁,导致pool-1-thread-1
死锁。
解决方案,通过Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
重载方法设定锁的过期时间:
/**
* 1. 原子操作,获得锁并设置5秒过期时间
* 2. 业务执行到一半宕机
* 3. 无法正常释放锁
* 4. 其它线程等待过期时间获得锁
*/
private void atomicityAndExRedisLock() {
try {
//Spring data redis 支持的原子性操作,并设置5秒过期时间
while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
// 等待100毫秒重试获得锁
logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
TimeUnit.MILLISECONDS.sleep(1000);
}
logger.info(Thread.currentThread().getName() + ":获得锁");
// 应用在这里宕机,进程退出,无法执行 finally;
Thread.currentThread().interrupt();
// 业务逻辑...
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放锁
if (!Thread.currentThread().isInterrupted()) {
redisTemplate.delete(userId + ":syn");
logger.info(Thread.currentThread().getName() + ":释放锁");
}
}
}
# 输出 5秒后,线程 pool-1-thread-1 正常获取锁
pool-1-thread-2:获得锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:获得锁
2.5 守护线程
这时候程序其实还是有问题的,假如,锁的生效时间是5秒,在绝大多数情况下,服务5秒内足够执行完毕正常释放锁,但是假如服务由于某些原因(比如网络问题)执行过程阻塞了,时间远超过5秒了,这时候锁失效了,其它服务抢占了锁。这种情况下会有2个问题:
1、A服务阻塞锁超时释放,B服务抢占到锁,执行业务。这样情况下,A服务和B服务的操作还是会存在同步问题,并不互斥。
2、A服务阻塞锁超时释放,B服务抢占到锁,A服务恢复执行完毕后,释放锁的过程会把B服务的锁误删。
很多的主流文章都提到了,解决问题2的方法可以为每个锁设置有个唯一Value比如 UUID:ThreadId
,在释放锁的时候判断 Value 值。如果相同则删除不同则不删,防止误删。但其实根源在问题1这种情况上,服务还没执行完,怎么可以释放锁呢?同步问题或者服务互斥问题还是无法解决的。要么服务宕机整个执行失败,超时后释放锁才安全。否则服务还在运行,释放锁是有风险的。其实很容易理解,解决了问题1,问题2自然就不会存在。我们既然要求对于资源的访问必须互斥,那么就决不允许同一时间有2个服务共享资源。
最好方式是通过守护线程给锁续命,比如5秒失效,我们在4秒时判断服务是否还在运行,如果还在运行,则给锁超时时间延长5秒。保证要么服务运行结束 finally
块里释放锁,要么就宕机,时间过了释放锁。当然设置唯一value
也是有好处的,我们就一起通过代码看看怎么解决。
DaemonThread
守护线程
/**
* 守护线程
*/
public class DaemonThread implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class);
// 是否需要守护 主线程关闭则结束守护线程
private boolean daemon = true;
// 守护锁
private String lockKey;
private RedisTemplate redisTemplate;
public DaemonThread(String lockKey) {
this.lockKey = lockKey;
redisTemplate = (RedisTemplate) SpringBeanUtil.getBeanByName("redisTemplate");
}
@Override
public void run() {
try {
while (daemon) {
long time = redisTemplate.getExpire(lockKey,TimeUnit.MILLISECONDS);
// 剩余有效期小于1秒则续命
if(time < 1000) {
logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒" );
redisTemplate.expire(lockKey , 5000, TimeUnit.MILLISECONDS);
}
TimeUnit.MILLISECONDS.sleep(300);
}
logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 主线程主动调用结束
public void stop() {
daemon = false;
}
}
//守护线程
private DaemonThread daemonThread;
/**
* 1. 原子操作,获得锁并设置5秒过期时间
* 2. 获取锁并且开启守护线程
* 3. 业务执行时间超过5秒
* 4. 守护线程判断时间,快超时时为锁续命5秒
* 5. 主线程执行完毕,释放锁,释放守护线程
*/
private void deamonRedisLock() {
//Spring data redis 支持的原子性操作,并设置5秒过期时间
String uuid = UUID.randomUUID().toString();
String value = Thread.currentThread().getId() + ":" + uuid;
try {
while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) {
// 等待100毫秒重试获得锁
logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
TimeUnit.MILLISECONDS.sleep(1000);
}
logger.info(Thread.currentThread().getName() + ":获得锁");
//开启守护线程
daemonThread = new DaemonThread(userId + ":syn");
Thread thread = new Thread(daemonThread);
thread.start();
// 业务逻辑执行10秒...
TimeUnit.MILLISECONDS.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲
String result = (String)redisTemplate.opsForValue().get(userId + ":syn");
if(value.equals(uuid)) {
redisTemplate.delete(userId + ":syn");
logger.info(Thread.currentThread().getName() + ":释放锁");
}
//关闭守护线程
daemonThread.stop();
}
}
: pool-1-thread-6:尝试循环获取锁
: pool-1-thread-5:获得锁
: pool-1-thread-6:尝试循环获取锁
: pool-1-thread-6:尝试循环获取锁
: pool-1-thread-6:尝试循环获取锁
: pool-1-thread-6:尝试循环获取锁
: 守护进程: Thread-17 延长锁时间 5000 毫秒
: pool-1-thread-6:尝试循环获取锁
: pool-1-thread-6:尝试循环获取锁
: pool-1-thread-6:尝试循环获取锁
: pool-1-thread-6:尝试循环获取锁
: 守护进程: Thread-17 延长锁时间 5000 毫秒
: pool-1-thread-6:尝试循环获取锁
: pool-1-thread-5:释放锁
: 守护进程: Thread-17关闭
: pool-1-thread-6:获得锁
: 守护进程: Thread-18 延长锁时间 5000 毫秒
: 守护进程: Thread-18 延长锁时间 5000 毫秒
: pool-1-thread-6:释放锁
: 守护进程: Thread-18关闭
线程5获得锁后,守护线程17开始执行,并且在锁快失效时延长5秒。等到线程5完全执行完毕,释放锁,关闭17守护线程后,线程6才获得锁,并且开启自己的守护线程。
3. 问题
以上Redis分布式锁只是通过简单代码做原理的演示,在释放锁以及判断锁并且续命的操作,最好是原子性的,Redis 2.6后支持Lua脚本,Lua脚本可以实现复杂的Redis原子性操作。这个今后会写另一篇学习下Lua脚本实现Redis分布式锁。代码在 https://github.com/jinzhihong/spring-boot-starter-redis-lock.git 有兴趣的同学可以跑一下看看。