深入浅出:Redis 分布式锁原理与实现(一)

深入浅出:Redis 分布式锁原理与实现(一)

Posted by 金志宏 on August 12, 2019

Redis 分布式锁原理与实现

Java 应用在多线程环境下,我们可以通过 Java 内存模型实现同步,比如 Lock,synchronized 等, 但是在分布式环境下,特别是现在微服务盛行的时代,服务为了高可用会做集群。在这样的情况下每个服务都有自己独立进程,当高并发的情况下,会存在同步问题,本文主要记录自己学习Redis分布式锁的过程。从浅到深一步步通过代码去分析。

1. 什么场景下用分布式锁

​ 比如,有一个账户服务对用户账户进行金额操作并且做了集群。此时有A,B两个请求同时对账户金额进行操作,操作步骤如下:

  1. 从数据库读取账户金额
  2. 对账户金额进行操作
  3. 保存账户金额到数据库

假如不做互斥,A得到账户金额,还没等A先修改完保存,B也从数据库中得到账户金额,那么等A处理完金额保存后,B随后的保存就将覆盖A的保存。

2. 场景分析和解决方案

​ 我们通过代码演示不同的场景下,我们会出现什么样的问题,并且怎么样去解决。

2.1 不做任何同步

可以通过代码来解释这种情况,由于本地学习,用 Redis 模拟数据库存储账户金额。用多线程来模拟分布式环境,同时对账户进行操作。

UserAccount.java

//用户账户Model
public class UserAccount {

    //用户ID
    private String userId;
    //账户内金额
    private int amount;

    public UserAccount(String userId, int amount) {
        this.userId = userId;
        this.amount = amount;
    }

    public UserAccount() {
    }

    //添加账户金额
    public void addAmount(int amount) {
        this.amount = this.amount + amount;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }
}

AccountOperationThread.java 线程,模拟分布式环境

/**
 * 账户操作
 * 多线程方式模拟分布式环境
 */
public class AccountOperationThread implements Runnable {

    private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);

    //操作金额
    private int amount;

    private String userId;


    private RedisTemplate redisTemplate;

    public AccountOperationThread(String userId, int amount) {
        this.amount = amount;
        this.userId = userId;
        redisTemplate = (RedisTemplate) SpringBeanUtil.getBeanByName("redisTemplate");
    }

    @Override
    public void run() {
        noLock();
    }


    /**
     * 不做任何同步(锁)
     */
    private void noLock() {
      	try {
          	Random random = new Random();
            // 为了更好测试,模拟线程先后进入,每个线程随机休眠 1-100毫秒再进行业务操作
            TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //模拟数据库中获取用户账号
        UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
        //设置金额
        userAccount.setAmount(userAccount.getAmount() + amount);
        logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
        //模拟存回数据库
        redisTemplate.opsForValue().set(userId, userAccount);
    }
}

为了方便测试,我用的是SpringBootRedisTemplate配置了自定义序列化方式,TestController来进行测试。

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        // 设置value的序列化规则和 key的序列化规则
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();

        return redisTemplate;
    }
}
@RestController
public class TestController {

    private final static Logger logger = LoggerFactory.getLogger(TestController.class);

    private static ExecutorService executorService = Executors.newCachedThreadPool();

    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping("/test")
    public String test() throws InterruptedException {
        //设置用户user_001到 Redis,初始化账户金额为0.
        redisTemplate.opsForValue().setIfAbsent("user_001",new UserAccount("user_001",0));
        //开启10个线程进行同步测试,每个线程为账户增加1元
        for (int i = 0; i < 10; i++) {
            executorService.execute(new AccountOperationThread("user_001",1));
        }
        //休眠1秒等待线程跑完
        TimeUnit.MILLISECONDS.sleep(1000);
        //获得Redis中的user_001账户
        UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001");
        logger.info("user id : " + userAccount.getUserId()  + " amount : " + userAccount.getAmount());
        return "success";
    }

}
# 运行结果如下 看到thread-22,thread-24,thread-25 这3个线程出现了同步,导致修改金额都未成功。
# 正常最终输出应该是10,结果只有7
: pool-1-thread-22 : user id : user_001 amount : 1
: pool-1-thread-24 : user id : user_001 amount : 1
: pool-1-thread-25 : user id : user_001 amount : 1
: pool-1-thread-26 : user id : user_001 amount : 2
: pool-1-thread-21 : user id : user_001 amount : 3
: pool-1-thread-29 : user id : user_001 amount : 4
: pool-1-thread-27 : user id : user_001 amount : 5
: pool-1-thread-28 : user id : user_001 amount : 5
: pool-1-thread-23 : user id : user_001 amount : 6
: pool-1-thread-30 : user id : user_001 amount : 7
: user id : user_001 amount : 7

2.2 Redis 同步锁

如何用Redis解决呢

正常的思路下,我们在user_001这个用户账户的资源上加个锁,同时只能有一个线程访问,这个其实和线程锁概念一样,但是线程锁是在同一个JVM中生效,在分布式环境下,我们可以用Redis缓存锁解决这个问题。

解决思路:当第一个线程进入程序前,先判断锁是否存在,如果存在则等待资源释放,不存在则抢占资源并且加锁。代码如下:

		/**
     * 1.抢占资源时判断是否被锁。
     * 2.如未锁则抢占成功且加锁,否则等待锁释放。
     * 3.业务完成后释放锁,让给其它线程。
     */
    private void redisLock(){
        Random random = new Random();
        try {
            //为了更好测试,模拟线程进入间隔,每个线程随机休眠 1-1000毫秒再进行业务操作
            TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (true) {
            Object lock = redisTemplate.opsForValue().get(userId + ":syn");
            if(lock == null ){
                // 获得锁 -> 加锁 -> 跳出循环
                logger.info(Thread.currentThread().getName() + ":获得锁");
                redisTemplate.opsForValue().set(userId + ":syn","lock");
                break;
            }
            try {
                // 等待500毫秒重试获得锁
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try{
            //模拟数据库中获取用户账号
            UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
            //设置金额
            userAccount.setAmount(userAccount.getAmount() + amount);
            logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
            //模拟存回数据库
            redisTemplate.opsForValue().set(userId, userAccount);
        }finally {
            //释放锁
            redisTemplate.delete(userId + ":syn");
            logger.info(Thread.currentThread().getName() + ":释放锁");
        }
    }
# 输出 
: pool-1-thread-1:获得锁                              
: pool-1-thread-1 : user id : user_001 amount : 1  
: pool-1-thread-1:释放锁                              
: pool-1-thread-4:获得锁                              
: pool-1-thread-4 : user id : user_001 amount : 2  
: pool-1-thread-4:释放锁                              
: pool-1-thread-9:获得锁                              
: pool-1-thread-9 : user id : user_001 amount : 3  
: pool-1-thread-9:释放锁                              
: pool-1-thread-8:获得锁                              
: pool-1-thread-8 : user id : user_001 amount : 4  
: pool-1-thread-8:释放锁                              
: pool-1-thread-5:获得锁                              
: pool-1-thread-5 : user id : user_001 amount : 5  
: pool-1-thread-5:释放锁                              
: pool-1-thread-6:获得锁                              
: pool-1-thread-6 : user id : user_001 amount : 6  
: pool-1-thread-6:释放锁                              
: pool-1-thread-10:获得锁                             
: pool-1-thread-10 : user id : user_001 amount : 7 
: pool-1-thread-10:释放锁                             
: pool-1-thread-2:获得锁                              
: pool-1-thread-2 : user id : user_001 amount : 8  
: pool-1-thread-2:释放锁                              
: pool-1-thread-3:获得锁                              
: pool-1-thread-3 : user id : user_001 amount : 9  
: pool-1-thread-3:释放锁                              
: pool-1-thread-7:获得锁                              
: pool-1-thread-7 : user id : user_001 amount : 10 
: pool-1-thread-7:释放锁                              
: user id : user_001 amount : 10                   

看似好了,但是其实并没解决问题,假设把下面这行代码的休眠时间变小,同步问题就马上到来,由于线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A 获得锁,还未加锁时,线程B也获得了锁。他们还是存在同步问题。其实并不需要修改休眠时间,多执行几次,也会看见类似同步问题。

 						//为了更好测试,模拟线程进入间隔,每个线程随机休眠 1-50毫秒再进行业务操作
            TimeUnit.MILLISECONDS.sleep(random.nextInt(50) + 1);
# 输出会出现如下情况
: pool-1-thread-16:获得锁
: pool-1-thread-16 : user id : user_001 amount : 38
: pool-1-thread-16:释放锁
: pool-1-thread-18:获得锁
: pool-1-thread-19:获得锁
: pool-1-thread-19 : user id : user_001 amount : 39
: pool-1-thread-18 : user id : user_001 amount : 39
: pool-1-thread-18:释放锁
: pool-1-thread-19:释放锁

2.3 Redis 原子性同步锁

那么只要解决获得锁和加锁的原子性,是否能解决问题,Spring-boot-data-redis 提供了原子性操作的 API

//该方法其实是使用了 redis 的指令 SET key value`NX 
// 1.key 不存在,设置成功返回 value setIfAbsent返回true
// 2.key 存在,则设置失败返回 nul,setIfAbsent返回false
// 3.原子性操作
Boolean setIfAbsent(K var1, V var2);`
	/**
     * 1.原子操作加锁
     * 2.竞争线程循环重试获得锁
     * 3.业务完成释放锁
     */    
private void atomicityRedisLock() {
        //Spring data redis 支持的原子性操作
        while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {
            try {
                // 等待100毫秒重试获得锁
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        logger.info(Thread.currentThread().getName() + ":获得锁");
        try {
            //模拟数据库中获取用户账号
            UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
            //设置金额
            userAccount.setAmount(userAccount.getAmount() + amount);
            logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
            //模拟存回数据库
            redisTemplate.opsForValue().set(userId, userAccount);
        } finally {
            //释放锁
            redisTemplate.delete(userId + ":syn");
            logger.info(Thread.currentThread().getName() + ":释放锁");
        }
    }
# 输出 
: pool-1-thread-1:获得锁                              
: pool-1-thread-1 : user id : user_001 amount : 1  
: pool-1-thread-1:释放锁                              
: pool-1-thread-4:获得锁                              
: pool-1-thread-4 : user id : user_001 amount : 2  
: pool-1-thread-4:释放锁                              
: pool-1-thread-9:获得锁                              
: pool-1-thread-9 : user id : user_001 amount : 3  
: pool-1-thread-9:释放锁                              
: pool-1-thread-8:获得锁                              
: pool-1-thread-8 : user id : user_001 amount : 4  
: pool-1-thread-8:释放锁                              
: pool-1-thread-5:获得锁                              
: pool-1-thread-5 : user id : user_001 amount : 5  
: pool-1-thread-5:释放锁                              
: pool-1-thread-6:获得锁                              
: pool-1-thread-6 : user id : user_001 amount : 6  
: pool-1-thread-6:释放锁                              
: pool-1-thread-10:获得锁                             
: pool-1-thread-10 : user id : user_001 amount : 7 
: pool-1-thread-10:释放锁                             
: pool-1-thread-2:获得锁                              
: pool-1-thread-2 : user id : user_001 amount : 8  
: pool-1-thread-2:释放锁                              
: pool-1-thread-3:获得锁                              
: pool-1-thread-3 : user id : user_001 amount : 9  
: pool-1-thread-3:释放锁                              
: pool-1-thread-7:获得锁                              
: pool-1-thread-7 : user id : user_001 amount : 10 
: pool-1-thread-7:释放锁                              
: user id : user_001 amount : 10 

2.4 设置超时时间解决死锁

但是以上代码也会有问题,假设A线程抢占到了资源,但是很不幸在业务处理到一半时,宕机了。finally 代码块无法执行,就无法释放锁,线程B会死锁,永远无法获得这个锁。

/**
     * 1. 线程1 拿到锁
     * 2. 业务执行到一半宕机
     * 3. 无法正常释放锁
     * 4. 其它线程竞争资源导致死锁
     */
    private void deadLock() {
        //Spring data redis 支持的原子性操作
        while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {
            try {
                // 等待1000毫秒重试获得锁
                logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        logger.info(Thread.currentThread().getName() + ":获得锁");
        try {
            // 应用在这里宕机,进程退出,无法执行 finally;
            Thread.currentThread().interrupt();
            // 业务逻辑...
        } finally {
            //释放锁
            if (!Thread.currentThread().isInterrupted()) {
                redisTemplate.delete(userId + ":syn");
                logger.info(Thread.currentThread().getName() + ":释放锁");
            }
        }
    }

为了更清晰查看测试结果,我们只启动2个线程

public String test() throws InterruptedException {
        //设置用户user_001到 Redis,初始化账户金额为0.
        redisTemplate.opsForValue().setIfAbsent("user_001",new UserAccount("user_001",0));
        //开启10个线程进行同步测试,每个线程为账户增加1元
        for (int i = 0; i < 2; i++) {
            executorService.execute(new AccountOperationThread("user_001",1));
        }
        //休眠1秒等待线程跑完
        TimeUnit.MILLISECONDS.sleep(1000);
        //获得Redis中的user_001账户
        UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001");
        logger.info("user id : " + userAccount.getUserId()  + " amount : " + userAccount.getAmount());
        return "success";
    }
# 输出
pool-1-thread-1:尝试循环获取锁
pool-1-thread-2:获得锁
pool-1-thread-1:尝试循环获取锁
user id : user_001 amount : 0
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁

pool-1-thread-2 获得锁,未释放锁,导致pool-1-thread-1死锁。

解决方案,通过Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);重载方法设定锁的过期时间:

    /**
     * 1. 原子操作,获得锁并设置5秒过期时间
     * 2. 业务执行到一半宕机
     * 3. 无法正常释放锁
     * 4. 其它线程等待过期时间获得锁
     */
    private void atomicityAndExRedisLock() {
         try {
            //Spring data redis 支持的原子性操作,并设置5秒过期时间
            while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
                // 等待100毫秒重试获得锁
                logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
                TimeUnit.MILLISECONDS.sleep(1000);
            }
            logger.info(Thread.currentThread().getName() + ":获得锁");
            // 应用在这里宕机,进程退出,无法执行 finally;
            Thread.currentThread().interrupt();
            // 业务逻辑...
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //释放锁
            if (!Thread.currentThread().isInterrupted()) {
                redisTemplate.delete(userId + ":syn");
                logger.info(Thread.currentThread().getName() + ":释放锁");
            }
        }
    }
# 输出 5秒后,线程 pool-1-thread-1 正常获取锁
pool-1-thread-2:获得锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:尝试循环获取锁
pool-1-thread-1:获得锁

2.5 守护线程

这时候程序其实还是有问题的,假如,锁的生效时间是5秒,在绝大多数情况下,服务5秒内足够执行完毕正常释放锁,但是假如服务由于某些原因(比如网络问题)执行过程阻塞了,时间远超过5秒了,这时候锁失效了,其它服务抢占了锁。这种情况下会有2个问题:

1、A服务阻塞锁超时释放,B服务抢占到锁,执行业务。这样情况下,A服务和B服务的操作还是会存在同步问题,并不互斥。

2、A服务阻塞锁超时释放,B服务抢占到锁,A服务恢复执行完毕后,释放锁的过程会把B服务的锁误删。

很多的主流文章都提到了,解决问题2的方法可以为每个锁设置有个唯一Value比如 UUID:ThreadId,在释放锁的时候判断 Value 值。如果相同则删除不同则不删,防止误删。但其实根源在问题1这种情况上,服务还没执行完,怎么可以释放锁呢?同步问题或者服务互斥问题还是无法解决的。要么服务宕机整个执行失败,超时后释放锁才安全。否则服务还在运行,释放锁是有风险的。其实很容易理解,解决了问题1,问题2自然就不会存在。我们既然要求对于资源的访问必须互斥,那么就决不允许同一时间有2个服务共享资源。

最好方式是通过守护线程给锁续命,比如5秒失效,我们在4秒时判断服务是否还在运行,如果还在运行,则给锁超时时间延长5秒。保证要么服务运行结束 finally 块里释放锁,要么就宕机,时间过了释放锁。当然设置唯一value也是有好处的,我们就一起通过代码看看怎么解决。

DaemonThread 守护线程

/**
 * 守护线程
 */
public class DaemonThread implements Runnable {
    private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class);

  	// 是否需要守护 主线程关闭则结束守护线程
    private boolean daemon = true;
		// 守护锁
    private String lockKey;

    private RedisTemplate redisTemplate;


    public DaemonThread(String lockKey) {
        this.lockKey = lockKey;
        redisTemplate = (RedisTemplate) SpringBeanUtil.getBeanByName("redisTemplate");
    }


    @Override
    public void run() {
        try {
            while (daemon) {
                long time = redisTemplate.getExpire(lockKey,TimeUnit.MILLISECONDS);
                // 剩余有效期小于1秒则续命
                if(time < 1000) {
                    logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒" );
                    redisTemplate.expire(lockKey , 5000, TimeUnit.MILLISECONDS);
                }
                TimeUnit.MILLISECONDS.sleep(300);
            }
            logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
		// 主线程主动调用结束
    public void stop() {
        daemon = false;
    }
}

		//守护线程
    private DaemonThread daemonThread;

		/**
     * 1. 原子操作,获得锁并设置5秒过期时间
     * 2. 获取锁并且开启守护线程
     * 3. 业务执行时间超过5秒
     * 4. 守护线程判断时间,快超时时为锁续命5秒
     * 5. 主线程执行完毕,释放锁,释放守护线程
     */
    private void deamonRedisLock() {
        //Spring data redis 支持的原子性操作,并设置5秒过期时间
        String uuid = UUID.randomUUID().toString();
        String value = Thread.currentThread().getId() + ":" + uuid;
        try {
            while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) {
                // 等待100毫秒重试获得锁
                logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
                TimeUnit.MILLISECONDS.sleep(1000);
            }
            logger.info(Thread.currentThread().getName() + ":获得锁");
          	//开启守护线程
            daemonThread = new DaemonThread(userId + ":syn");
            Thread thread = new Thread(daemonThread);
            thread.start();
            // 业务逻辑执行10秒...
            TimeUnit.MILLISECONDS.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲
            String result = (String)redisTemplate.opsForValue().get(userId + ":syn");
            if(value.equals(uuid)) {
                redisTemplate.delete(userId + ":syn");
                logger.info(Thread.currentThread().getName() + ":释放锁");
            }
            //关闭守护线程
            daemonThread.stop();
        }
    }
: pool-1-thread-6:尝试循环获取锁       
: pool-1-thread-5:获得锁           
: pool-1-thread-6:尝试循环获取锁       
: pool-1-thread-6:尝试循环获取锁       
: pool-1-thread-6:尝试循环获取锁       
: pool-1-thread-6:尝试循环获取锁       
: 守护进程: Thread-17 延长锁时间 5000 毫秒 
: pool-1-thread-6:尝试循环获取锁       
: pool-1-thread-6:尝试循环获取锁       
: pool-1-thread-6:尝试循环获取锁       
: pool-1-thread-6:尝试循环获取锁       
: 守护进程: Thread-17 延长锁时间 5000 毫秒 
: pool-1-thread-6:尝试循环获取锁       
: pool-1-thread-5:释放锁           
:  守护进程: Thread-17关闭            
: pool-1-thread-6:获得锁           
: 守护进程: Thread-18 延长锁时间 5000 毫秒 
: 守护进程: Thread-18 延长锁时间 5000 毫秒 
: pool-1-thread-6:释放锁           
:  守护进程: Thread-18关闭            

线程5获得锁后,守护线程17开始执行,并且在锁快失效时延长5秒。等到线程5完全执行完毕,释放锁,关闭17守护线程后,线程6才获得锁,并且开启自己的守护线程。

3. 问题

以上Redis分布式锁只是通过简单代码做原理的演示,在释放锁以及判断锁并且续命的操作,最好是原子性的,Redis 2.6后支持Lua脚本,Lua脚本可以实现复杂的Redis原子性操作。这个今后会写另一篇学习下Lua脚本实现Redis分布式锁。代码在 https://github.com/jinzhihong/spring-boot-starter-redis-lock.git 有兴趣的同学可以跑一下看看。