Java面试要点94 - Java分布式锁的实现与应用
文章目录
- 一、引言
- 二、分布式锁的核心要求
- 三、基于Redis的分布式锁实现
- 3.1 Redis分布式锁的基本实现
- 3.2 Redis分布式锁的可重入实现
- 四、基于Zookeeper的分布式锁实现
- 4.1 Zookeeper分布式锁的基本原理
- 4.2 Zookeeper分布式锁的优化实现
- 五、基于数据库的分布式锁实现
- 总结
一、引言
在分布式系统中,多个应用实例对共享资源的并发访问需要一个跨JVM的锁机制来保证互斥。分布式锁通过分布式协调器来实现多个应用实例之间的同步,确保同一时间只有一个应用实例能够访问共享资源。
二、分布式锁的核心要求
分布式锁需要满足互斥性、防死锁、高可用和高性能等基本要求。互斥性确保任意时刻只有一个客户端持有锁,防死锁机制通过超时释放来避免死锁。
下面通过示例代码来说明分布式锁的基本应用:
public class DistributedSystemDemo {
private static int inventory = 100; // 商品库存
private final DistributedLock distributedLock;
private final String lockKey = "inventory_lock";
public DistributedSystemDemo(DistributedLock distributedLock) {
this.distributedLock = distributedLock;
}
public boolean deductInventory() {
try {
// 尝试获取分布式锁
if (distributedLock.tryLock(lockKey, 1000)) {
try {
if (inventory > 0) {
inventory--;
System.out.println("扣减库存成功,当前库存: " + inventory);
return true;
}
return false;
} finally {
// 释放锁
distributedLock.unlock(lockKey);
}
}
return false;
} catch (Exception e) {
// 处理异常
return false;
}
}
}
三、基于Redis的分布式锁实现
3.1 Redis分布式锁的基本实现
Redis分布式锁利用SET命令的原子性特性实现。通过SET NX命令尝试获取锁,并设置过期时间防止死锁。释放锁时需要确保只有锁的持有者才能释放锁,通常使用Lua脚本实现原子性释放。
public class RedisDistributedLock {
private static final String LOCK_PREFIX = "distributed_lock:";
private static final String LOCK_SUCCESS = "OK";
private StringRedisTemplate redisTemplate;
public RedisDistributedLock(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public boolean tryLock(String lockKey, String requestId, long expireTime) {
String key = LOCK_PREFIX + lockKey;
try {
String result = redisTemplate.execute((RedisCallback<String>) connection -> {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
return commands.set(key, requestId, "NX", "PX", expireTime);
});
return LOCK_SUCCESS.equals(result);
} catch (Exception e) {
return false;
}
}
public boolean releaseLock(String lockKey, String requestId) {
String key = LOCK_PREFIX + lockKey;
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "return redis.call('del', KEYS[1]) "
+ "else return 0 end";
try {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
Long result = redisTemplate.execute(redisScript,
Collections.singletonList(key), requestId);
return Long.valueOf(1).equals(result);
} catch (Exception e) {
return false;
}
}
}
3.2 Redis分布式锁的可重入实现
可重入锁允许同一个线程多次获取同一把锁,实现需要记录锁的持有线程和重入次数。使用ThreadLocal维护线程的锁计数,当计数归零时才真正释放Redis中的锁。
public class ReentrantRedisLock {
private static final String LOCK_PREFIX = "reentrant_lock:";
private StringRedisTemplate redisTemplate;
private ThreadLocal<Map<String, LockInfo>> lockInfo = new ThreadLocal<>();
public static class LockInfo {
String requestId;
int lockCount;
public LockInfo(String requestId) {
this.requestId = requestId;
this.lockCount = 1;
}
}
public ReentrantRedisLock(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public boolean tryLock(String lockKey, long expireTime) {
String key = LOCK_PREFIX + lockKey;
Map<String, LockInfo> locks = lockInfo.get();
if (locks == null) {
locks = new HashMap<>();
lockInfo.set(locks);
}
LockInfo info = locks.get(key);
if (info != null && info.lockCount > 0) {
info.lockCount++;
return true;
}
String requestId = UUID.randomUUID().toString();
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, requestId, expireTime, TimeUnit.MILLISECONDS);
if (Boolean.TRUE.equals(success)) {
locks.put(key, new LockInfo(requestId));
return true;
}
return false;
}
public boolean releaseLock(String lockKey) {
String key = LOCK_PREFIX + lockKey;
Map<String, LockInfo> locks = lockInfo.get();
if (locks == null) {
return false;
}
LockInfo info = locks.get(key);
if (info == null || info.lockCount <= 0) {
return false;
}
info.lockCount--;
if (info.lockCount == 0) {
locks.remove(key);
if (locks.isEmpty()) {
lockInfo.remove();
}
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "return redis.call('del', KEYS[1]) "
+ "else return 0 end";
try {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
Long result = redisTemplate.execute(redisScript,
Collections.singletonList(key), info.requestId);
return Long.valueOf(1).equals(result);
} catch (Exception e) {
return false;
}
}
return true;
}
}
四、基于Zookeeper的分布式锁实现
4.1 Zookeeper分布式锁的基本原理
Zookeeper的分布式锁利用临时顺序节点特性,创建节点时会得到一个递增序号,序号最小的节点获得锁。其他节点通过监听前一个节点的删除事件来获取锁。
public class ZookeeperDistributedLock {
private static final String LOCK_ROOT = "/distributed_locks";
private final CuratorFramework client;
private final String lockPath;
private String currentLockPath;
public ZookeeperDistributedLock(CuratorFramework client, String lockName) {
this.client = client;
this.lockPath = LOCK_ROOT + "/" + lockName;
try {
client.createContainers(LOCK_ROOT);
} catch (Exception e) {
throw new RuntimeException("Failed to create lock root", e);
}
}
public boolean tryLock() throws Exception {
currentLockPath = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(lockPath + "/lock-");
List<String> children = getSortedChildren();
String currentNode = currentLockPath.substring(
currentLockPath.lastIndexOf('/') + 1);
if (currentNode.equals(children.get(0))) {
return true;
}
String previousNode = children.get(children.indexOf(currentNode) - 1);
final CountDownLatch latch = new CountDownLatch(1);
client.getData().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDeleted) {
latch.countDown();
}
}
}).forPath(lockPath + "/" + previousNode);
return latch.await(30, TimeUnit.SECONDS);
}
private List<String> getSortedChildren() throws Exception {
List<String> children = client.getChildren().forPath(lockPath);
Collections.sort(children);
return children;
}
public void unlock() {
try {
if (currentLockPath != null) {
client.delete().guaranteed().forPath(currentLockPath);
}
} catch (Exception e) {
throw new RuntimeException("Failed to release lock", e);
}
}
}
4.2 Zookeeper分布式锁的优化实现
为了提高可靠性和性能,需要添加重试机制、超时机制和异常处理。重试使用指数退避策略,避免频繁无效请求,同时妥善处理网络闪断等异常情况。
public class OptimizedZookeeperLock {
private static final String LOCK_ROOT = "/distributed_locks";
private final CuratorFramework client;
private final String lockPath;
private String currentLockPath;
private final int maxRetries;
private final long timeoutMs;
public OptimizedZookeeperLock(CuratorFramework client, String lockName,
int maxRetries, long timeoutMs) {
this.client = client;
this.lockPath = LOCK_ROOT + "/" + lockName;
this.maxRetries = maxRetries;
this.timeoutMs = timeoutMs;
try {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(LOCK_ROOT);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize lock", e);
}
}
public boolean tryLock() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, maxRetries);
try {
currentLockPath = client.create()
.withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(lockPath + "/lock-");
return waitForLock(timeoutMs);
} catch (Exception e) {
if (currentLockPath != null) {
try {
client.delete().guaranteed().forPath(currentLockPath);
} catch (Exception ex) {
// 处理删除失败的异常
}
}
throw e;
}
}
private boolean waitForLock(long millisToWait) throws Exception {
long startTime = System.currentTimeMillis();
while ((System.currentTimeMillis() - startTime) < millisToWait) {
List<String> children = getSortedChildren();
String currentNode = currentLockPath.substring(
currentLockPath.lastIndexOf('/') + 1);
if (currentNode.equals(children.get(0))) {
return true;
}
String previousNode = children.get(children.indexOf(currentNode) - 1);
final CountDownLatch latch = new CountDownLatch(1);
Stat stat = client.checkExists()
.usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDeleted) {
latch.countDown();
}
}
}).forPath(lockPath + "/" + previousNode);
if (stat == null) {
continue;
}
if (latch.await(millisToWait - (System.currentTimeMillis() - startTime),
TimeUnit.MILLISECONDS)) {
return true;
}
}
return false;
}
private List<String> getSortedChildren() throws Exception {
List<String> children = client.getChildren().forPath(lockPath);
Collections.sort(children);
return children;
}
public void unlock() {
try {
if (currentLockPath != null) {
client.delete().guaranteed().forPath(currentLockPath);
}
} catch (Exception e) {
throw new RuntimeException("Failed to release lock", e);
}
}
}
五、基于数据库的分布式锁实现
数据库分布式锁利用数据库的行锁机制实现,通过创建锁表并使用事务特性来保证互斥性。这种方式实现简单,但性能相对较低,适合并发量小的场景。
public class DatabaseDistributedLock {
private final DataSource dataSource;
public DatabaseDistributedLock(DataSource dataSource) {
this.dataSource = dataSource;
}
public boolean tryLock(String lockKey, String owner, long expireTime) {
Connection conn = null;
PreparedStatement stmt = conn.prepareStatement(sql);
stmt.setString(1, lockKey);
stmt.setString(2, owner);
stmt.setTimestamp(3, new Timestamp(System.currentTimeMillis() + expireTime));
int affected = stmt.executeUpdate();
conn.commit();
return affected > 0;
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException ex) {
// 处理回滚异常
}
}
return false;
} finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
// 处理异常
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
// 处理异常
}
}
}
}
}
public class OptimisticLockImplementation {
private final DataSource dataSource;
public OptimisticLockImplementation(DataSource dataSource) {
this.dataSource = dataSource;
}
public boolean tryOptimisticLock(String resourceId, String owner, int version) {
Connection conn = null;
PreparedStatement stmt = null;
try {
conn = dataSource.getConnection();
String sql = "UPDATE resource_table SET version = ?, owner = ? "
+ "WHERE resource_id = ? AND version = ?";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, version + 1);
stmt.setString(2, owner);
stmt.setString(3, resourceId);
stmt.setInt(4, version);
int affected = stmt.executeUpdate();
return affected > 0;
} catch (SQLException e) {
return false;
} finally {
// 关闭资源
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
// 处理异常
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
// 处理异常
}
}
}
}
}
总结
分布式锁是构建可靠分布式系统的重要组件。通过Redis、Zookeeper和数据库三种实现方案,可以满足不同场景的需求。在实际应用中,需要结合业务特点选择合适的实现方式,同时做好异常处理,确保分布式锁的可靠性。