SpringCloud--分布式锁实现
一、简介
分布式锁其实就是控制分布式系统中不同进程共同访问共享资源的一种锁的实现。在分布式系统中各个微服务都是独立部署在不同的服务器上,如果多个服务同时操作同一个共享资源的话,就不能像单体服务那样通过synchronized或者Lock等同步机制保证一个代码块在同一时间只能由一个线程访问来实现共享资源的安全性。因为分布式系统中的不同服务已经不在是多线程之间的并发访问了,而是属于多进程之间的并发访问,所以就需要一种更加高级的锁机制,来处理这种跨JVM进程之间的线程安全问题。
二、主要特征
- 互斥性:在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁:即使持有锁的客户端发生故障,也能保证锁最终会被释放。
- 具有容错性:只要大部分的 Redis 节点正常运行,客户端就可以加锁和解锁。
- 不乱解锁:加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
三、实现方案
在 Java 中,实现分布式锁的方案有多种,常见的3中方案如下:
- 基于数据库实现:可以通过数据库的乐观锁或悲观锁实现分布式锁,但是由于数据库的 IO 操作比较慢,不适合高并发场景,而且可能存在死锁和超时等问题。
- 基于 Redis 实现:Redis 是一个高性能的内存数据库,支持分布式部署,可以通过Redis的原子操作实现分布式锁,支持key的过期时间设置,不容易发生死锁,而且具有高性能和高可用性。
- 基于 Lock4j 实现:Lock4j 是一个分布式锁组件,它提供了多种不同的支持以满足不同性能和环境的需求,基于Spring AOP的声明式和编程式分布式锁,支持RedisTemplate、Redisson、Zookeeper。
- 基于 ZooKeeper 实现:ZooKeeper 是一个高可用性的分布式协调服务,可以通过它来实现分布式锁。但是使用 ZooKeeper 需要部署额外的服务,增加了系统复杂度。
3.1 基于 Redis 实现
-
实现方式有以下几种:
(1)setnx + expire:这种方式加锁操作和设置超时时间是分开的。如果在执行完setnx加锁后,正要执行expire设置过期时间时,进程挂掉了,那这个锁就永远不会过期了。
(2)set的扩展命令:通过set(String key, String value, String nxxx, String expx, int time) 加锁的同时设置过期时间,再通过del(key)删除key。这种方式可能导致锁被别的线程误删,假设A获取锁后,由于业务还没执行完就过期释放了,然后立即就被B获取该锁执行业务逻辑,此时A执行完成后就会去释放这个锁,但此时这个锁已经被B占用了,也就是说A此时把B的锁给释放掉了。
(3)set的扩展命令+唯一值校验:通过set(String key, String value, String nxxx, String expx, int time) 加锁的同时设置过期时间,再通过Lua 脚本去根据唯一值删除key。这种方式可以解决误删除别人的锁问题,但是还是存在锁过期释放了,业务还没执行完的问题。 -
添加redis依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
- 创建一个redis配置类,用来设置redis连接信息,并创建JedisPool和RedisTemplate的实例。
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.timeout}")
private int timeout;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.pool.maxTotal}")
private int maxTotal;
@Value("${spring.redis.pool.maxWait}")
private int maxWait;
@Value("${spring.redis.pool.maxIdle}")
private int maxIdle;
@Value("${spring.redis.pool.minIdle}")
private int minIdle;
@Value("${spring.redis.blockWhenExhausted}")
private Boolean blockWhenExhausted;
@Value("${spring.redis.JmxEnabled}")
private Boolean JmxEnabled;
/**
* 创建JedisPool实例
*
* @return JedisPool
*/
@RefreshScope
@Bean
public JedisPool jedisPoolFactory() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(maxTotal);
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMinIdle(minIdle);
jedisPoolConfig.setMaxWaitMillis(maxWait);
// 连接耗尽时是否阻塞, false报异常,true阻塞直到超时, 默认true
jedisPoolConfig.setBlockWhenExhausted(blockWhenExhausted);
// 是否启用pool的jmx管理功能, 默认true
jedisPoolConfig.setJmxEnabled(JmxEnabled);
return new JedisPool(jedisPoolConfig, host, port, timeout, password);
}
/**
* redisTemplate 序列化使用的jdkSerializeable, 存储二进制字节码, 所以自定义序列化类
*
* @return redisTemplate
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerialize 替换默认序列化
@SuppressWarnings({ "rawtypes", "unchecked" })
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 设置value的序列化规则和 key的序列化规则
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
- 编写redis的工具类,使用(set的扩展命令+唯一值校验)的方式创建获取和释放分布式锁相应的方法。
@Slf4j
@Component
public class RedisUtil {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private StringRedisTemplate stringRedisTemplate;
private static final String LOCK_SUCCESS = "OK";
/**
* NX: 仅在键不存在时设置键
* XX: 只有在键已存在时才设置
*/
private static final String SET_IF_NOT_EXIST = "NX";
/**
* 过期时间单位
* EX: seconds
* PX: milliseconds
*/
private static final String SET_WITH_EXPIRE_TIME = "EX";
private static final Long RELEASE_SUCCESS = 1L;
/**
* 尝试获取分布式锁
*
* @param lockKey 分布式锁的key,想要获取锁时,判断这个key是否存在于redis中,存在则说明获取分布式锁失败,否则成功获取锁
* @param requestId 每个请求的全局唯一id,用于释放锁时只能释放自己持有的锁
* @param expireTime 超期时间,单位:秒
* @return boolean 是否获取成功
*/
public boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
if (jedis == null) {
return false;
}
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
log.info("========================================获取分布式锁成功, lockKey is:{}, requestId is:{}", lockKey, requestId);
return true;
}
log.info("========================================获取分布式锁失败, lockKey is:{}, requestId is:{}", lockKey, requestId);
return false;
}
/**
* 释放分布式锁
*
* @param jedis Redis客户端
* @param lockKey 分布式锁的key
* @param requestId 每个请求的全局唯一id
* @return 是否释放成功
*/
public boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
if (jedis == null) {
log.info("========================================分布式锁释放失败,Jedis为空, lockKey is:{}, requestId is:{}", lockKey, requestId);
return false;
}
// 通过Lua 脚本保证只释放requestId对应的lockKey
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
log.info("========================================分布式锁释放成功, lockKey is:{}, requestId is:{}", lockKey, requestId);
return true;
}
log.info("========================================分布式锁释放失败, lockKey is:{}, requestId is:{}", lockKey, requestId);
return false;
}
}
- 案例实现
/**
* 通过分布式锁来生成全局订单唯一的id
*
* @param type 订单类型
* @return String 订单唯一的id
*/
@Override
public String generate(String type) {
String orderId = null;
log.info("========================================要生成的订单类型为:{}", type);
if (StringUtils.isBlank(type)) {
return null;
}
// 开始获取分布式锁
Jedis jedis = jedisPool.getResource();
String lockKey = redisUtils.getRedisKey(RedisTemplateConstant.DISTRIBUTED_LOCK_KEY_TYPE, type);
String requestId = CommonUtil.getUUID();
try {
if (redisUtils.tryGetDistributedLock(jedis, lockKey, requestId, expireTime)) {
// 生成订单id
orderId = getOrderId(type);
}
} catch (Exception e) {
log.info("========================================组装id出错================================");
} finally {
// 释放分布式锁
redisUtils.releaseDistributedLock(jedis, lockKey, requestId);
if (jedis != null) {
jedis.close();
}
}
return orderId;
}
3.2 基于 Redisson + RedLock
- Redisson:
对于可能存在锁过期释放,业务没执行完的问题。Redisson的处理逻辑是:只要客户端一加锁成功,就会启动一个watch dog看门狗,它是一个后台线程,会每隔10秒检查一下,如果当前线程还持有锁,那么就会不断的延长锁key的生存时间。因此,Redisson就是使用 watch dog 解决了锁过期释放,业务没执行完问题。 - RedLock:
由于Redis一般都是集群部署,如果客户端A在Redis的master节点上拿到了锁,但是加锁的key还没同步到slave节点。恰好这时,master节点发生故障,一个slave节点就会升级为master节点。客户端B就可以顺理成章获取同个key的锁了,但客户端A早在故障的master节点已经拿到这个锁了,从而导致分布式锁在这种情况下失效了。因此,可通过 redLock 红锁,一种相对安全的分布式锁实现方式,采用主节点过半机制,当客户端申请分布式锁的时候,需要向所有的redis实例发出申请,只有超过半数的redis实例报告获取锁成功,才能算真正获取到锁。也因此redis集群部署的节点数一般都为奇数。 - RedLock 已经在最新本的 Redisson 中被弃用了:
其实红锁其实也并不能解决根本问题,只是降低问题发生的概率。因为完全相互独立的redis,每一台至少也要保证高可用,还是会有主从节点。既然有主从节点,在持续的高并发下,master还是可能会宕机,从节点可能还没来得及同步锁的数据。在实际场景中,红锁是很少使用的。这是因为使用了红锁后会影响高并发环境下的性能,使得程序的体验更差。同时,使用红锁后,当加锁成功的RLock个数不超过总数的一半时,会返回加锁失败,即使在业务层面任务加锁成功了,但是红锁也会返回加锁失败的结果。另外实现红锁需要提供多套Redis的主从部署架构,并且这多套Redis主从架构中的Master节点必须都是独立的,相互之间没有任何数据交互。 - 引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.20.0</version>
</dependency>
- 编写分布式锁工具类
@Slf4j
@Component
public class RedissonUtil {
@Resource
private Redisson redisson;
/**
* 加锁
*
* @param key 分布式锁的 key
* @param timeout 超时时间
* @param unit 时间单位
* @return
*/
public boolean tryLock(String key, long timeout, TimeUnit unit) {
RLock lock = redisson.getLock(key);
try {
return lock.tryLock(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
/**
* 释放分布式锁
*
* @param key 分布式锁的 key
*/
public void unlock(String key) {
RLock lock = redisson.getLock(key);
lock.unlock();
}
}
3.3 基于 Lock4j 实现
- 引入依赖
<!-- 若使用redisTemplate作为分布式锁底层,则需要引入 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>lock4j-redis-template-spring-boot-starter</artifactId>
<version>2.2.4</version>
</dependency>
<!-- 若使用redisson作为分布式锁底层,则需要引入 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>lock4j-redisson-spring-boot-starter</artifactId>
<version>2.2.4</version>
</dependency>
- 简单使用实例
@Slf4j
@RestController
@RequestMapping("/redisLock")
public class RedisLockController {
@Autowired
private LockTemplate lockTemplate;
/**
* 使用 lock4j 注解加锁
*/
@Lock4j(keys = {"#key"}, acquireTimeout = 1000, expire = 10000)
@GetMapping("/testAnnotate")
public R<String> testAnnotate(String key) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return R.ok(key);
}
/**
* 使用LockTemplate模板加锁
*/
@GetMapping("/testLock4jLockTemplate")
public R<String> testLock4jLockTemplate(String key) {
final LockInfo lockInfo = lockTemplate.lock(key, 30000L, 5000L, RedissonLockExecutor.class);
if (null == lockInfo) {
throw new RuntimeException("业务繁忙,请稍后再试!");
}
// 获取锁成功,处理业务
try {
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
//
}
System.out.println("当前线程:" + Thread.currentThread().getName());
} finally {
//释放锁
lockTemplate.releaseLock(lockInfo);
}
return R.ok(key);
}
}
3.4 基于 ZooKeeper 实现
- 引入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>latest</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>latest</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>latest</version>
</dependency>
- 配置 ZooKeeper 连接
spring:
zookeeper:
connect-string: localhost:8081
namespace: test
- 编写分布式锁工具类
@Component
public class ZkLockUtil {
@Autowired
private CuratorFramework curatorFramework;
/**
* 获取分布式锁
*
* @param lockPath 锁路径
* @param waitTime 等待时间
* @param leaseTime 锁持有时间
* @param timeUnit 时间单位
* @return 锁对象
* @throws Exception 获取锁异常
*/
public InterProcessMutex acquire(String lockPath, long waitTime, long leaseTime, TimeUnit timeUnit) throws Exception {
InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
if (!lock.acquire(waitTime, timeUnit)) {
throw new RuntimeException("获取分布式锁失败");
}
if (leaseTime > 0) {
lock.acquire(leaseTime, timeUnit);
}
return lock;
}
/**
* 释放分布式锁
*
* @param lock 锁对象
* @throws Exception 释放锁异常
*/
public void release(InterProcessMutex lock) throws Exception {
if (lock != null) {
lock.release();
}
}
}