经验分享:用一张表解决并发冲突!数据库事务锁的核心实现逻辑
背景
对于一些内部使用的管理系统来说,可能没有引入Redis
,又想基于现有的基础设施处理并发问题,而数据库是每个应用都避不开的基础设施之一,因此分享个我曾经维护过的一个系统中,使用数据库表来实现事务锁的方式。
之前在文章Java业务功能并发问题处理中实现了使用
MySQL
行锁、Redis
分布式锁来处理业务并发问题,这次来填坑了,如果想了解其他并发问题处理方式和区别,可以看看文章Java业务功能并发问题处理哈。
业务流程说明
方案分析
适用场景
- 应用服务有多个实例,但是数据库是单实例;
- 没有用上
Redis
的应用服务,想通过现有的基础设施解决并发数据问题
待改进措施
- 设置超时机制:当出现锁无法及时释放时需要手动删除表数据,可以设置逻辑删除字段或者定时器删除过期数据;
- 重试获取锁机制:设置一定的循环次数,当获取不到锁时休眠200毫秒再次获取,直到循环次数用尽后再返回失败;
- 锁重入支持:通过增加
加锁次数
字段让当同一个线程可以重复获取锁
程序实现过程
框架及工具说明
- 技术框架:
SpringBoot
、MyBatis
、Maven
- 数据库:
MySQL
- 测试工具:
Apifox
- 表设计及代码说明:
- 唯一索引:需要有一个用于判断唯一的字段,在数据库表中通过指定唯一索引来实现;
- 加锁的线程号:避免A线程加的锁,被B线程删除;
- 锁的可见性要单独事务:添加事务锁的逻辑应在我们执行业务逻辑的事务之前,且不能跟业务逻辑的事务在一块,否则在事务提交前其他线程根本看不到这个锁,也就达不到我们锁的目的了;
- 为了我们的锁更方便使用,也可以将加锁逻辑抽到注解中实现,注解的实现流程:
- 在pom文件中引入
spring-boot-starter-aop
- 编写自定义注解
ConcurrencyLock
- 实现切面类(
Aspect
)逻辑
- 在pom文件中引入
代码展示
为了能让大家更关注加解锁逻辑,本文只保留主要代码,参考链接处会放置
码云(gitee)
的源码地址(或者点击此处跳转);
另外,本文就不展示注解方式的使用了,以免占用篇幅。
代码结构图
实体类
/**
* 并发锁实体类
*/
public class ConcurrencyLockBean {
/**
* 数据库主键
*/
private Long id;
/**
* 操作节点
*/
private String businessNode;
/**
* 订单唯一编号
*/
private String businessUniqueNo;
/**
* 线程ID
*/
private Long threadId;
/**
* 创建日期
*/
private Date creationDate;
}
/**
* 订单实体类
*/
@Setter
@Getter
@ToString
public class OrderInfoBean {
/**
* 自增长主键
*/
private int id;
/**
* 订单号
*/
private String orderNo;
/**
* 物料数量
*/
private Integer itemQty;
}
ConcurrencyLockServiceImpl.java
@Slf4j
@Service
public class ConcurrencyLockServiceImpl implements ConcurrencyLockService {
ConcurrencyLockMapper mapper;
/**
* service类注入
*/
@Autowired
ConcurrencyLockServiceImpl(ConcurrencyLockMapper mapper) {
this.mapper = mapper;
}
@Override
public Boolean tryLock(String businessNode, String businessUniqueNo) {
long threadId = Thread.currentThread().getId();
ConcurrencyLockBean concurrencyLock = mapper.selectConcurrencyLock(businessNode, businessUniqueNo);
if (concurrencyLock != null) {
log.info("{}数据正在操作中,请稍后", threadId);
return false;
}
ConcurrencyLockBean lock = new ConcurrencyLockBean();
lock.setBusinessNode(businessNode);
lock.setBusinessUniqueNo(businessUniqueNo);
lock.setThreadId(threadId);
try {
int insertCount = mapper.insertConcurrencyLock(lock);
if (insertCount == 0) {
log.info("{}获取锁失败,请稍后重试", threadId);
return false;
}
} catch (Exception e) {
log.info("{}获取锁异常,请稍后重试", threadId);
return false;
}
log.info("{}完成锁表插入", threadId);
return true;
}
@Override
public void unLock(String businessNode, String businessUniqueNo) {
ConcurrencyLockBean lock = new ConcurrencyLockBean();
long threadId = Thread.currentThread().getId();
lock.setThreadId(threadId);
lock.setBusinessNode(businessNode);
lock.setBusinessUniqueNo(businessUniqueNo);
mapper.deleteConcurrencyLock(lock);
log.info("{}执行解锁完毕", threadId);
}
}
ConcurrencyLockMapper.java
import org.apache.ibatis.annotations.Param;
public interface ConcurrencyLockMapper {
/**
* 根据业务节点和唯一业务号查询锁
*/
ConcurrencyLockBean selectConcurrencyLock(@Param("businessNode") String businessNode, @Param("businessUniqueNo") String businessUniqueNo);
/**
* 插入锁
*/
int insertConcurrencyLock(ConcurrencyLockBean lock);
/**
* 删除锁
*/
int deleteConcurrencyLock(ConcurrencyLockBean lock);
}
ConcurrencyLockMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.runningrookie.mapper.ConcurrencyLockMapper">
<select id="selectConcurrencyLock" resultType="com.runningrookie.domain.ConcurrencyLockBean">
SELECT
THREAD_ID,
BUSINESS_NODE,
BUSINESS_UNIQUE_NO,
CREATION_DATE
FROM concurrency_lock
WHERE BUSINESS_UNIQUE_NO = #{businessUniqueNo}
AND BUSINESS_NODE = #{businessNode}
</select>
<insert id="insertConcurrencyLock" useGeneratedKeys="true" keyProperty="id">
INSERT INTO concurrency_lock (
THREAD_ID,
BUSINESS_NODE,
BUSINESS_UNIQUE_NO,
CREATION_DATE)
VALUES
(#{threadId}, #{businessNode}, #{businessUniqueNo}, NOW());
</insert>
<delete id="deleteConcurrencyLock">
DELETE FROM concurrency_lock
WHERE THREAD_ID = #{threadId}
and BUSINESS_NODE = #{businessNode}
and BUSINESS_UNIQUE_NO = #{businessUniqueNo}
</delete>
</mapper>
ConcurrencyLock.java注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ConcurrencyLock {
String businessNode();
String businessUniqueNoKey();
}
ConcurrencyLockAspect.java注解类
@Aspect
@Component
@Slf4j
public class ConcurrencyLockAspect {
ConcurrencyLockService concurrencyLockService;
@Autowired
ConcurrencyLockAspect(ConcurrencyLockService concurrencyLockService) {
this.concurrencyLockService = concurrencyLockService;
}
// 环绕切面
@Around("@annotation(concurrencyLock)")
public Object around(ProceedingJoinPoint joinPoint, ConcurrencyLock concurrencyLock) throws Throwable {
long threadId = Thread.currentThread().getId();
Object[] args = joinPoint.getArgs();
if (args.length == 0) {
return joinPoint.proceed();
}
// 通过反射获取值
String invokeMethodName = "get" + concurrencyLock.businessUniqueNoKey().substring(0, 1).toUpperCase() + concurrencyLock.businessUniqueNoKey().substring(1);
// 获取Order类的Class对象
Class<?> clazz = args[0].getClass();
// 获取getOrderNo方法的Method对象
Method method = clazz.getMethod(invokeMethodName);
// 调用getOrderNo方法并获取返回值
String businessUniqueNo = method.invoke(args[0]).toString();
Boolean isSuccessLock = concurrencyLockService.tryLock(concurrencyLock.businessNode(), businessUniqueNo);
if (!isSuccessLock) {
log.info("{}加锁失败,请稍后重试", threadId);
// 生成与切点方法相同的返回对象
return AjaxResult.error("加锁失败,请稍后重试");
}
try {
log.info("{}开始执行业务逻辑", threadId);
joinPoint.proceed();
} finally {
concurrencyLockService.unLock(concurrencyLock.businessNode(), businessUniqueNo);
}
return joinPoint.proceed();
}
}
OrderInfoController.java
@RestController
@RequestMapping("/orderInfo")
public class OrderInfoController {
OrderInfoService orderInfoService;
@Autowired
private OrderInfoController(OrderInfoService orderInfoService) {
this.orderInfoService = orderInfoService;
}
@PostMapping
public AjaxResult saveOrderInfo(@RequestBody OrderInfoBean bean) {
return orderInfoService.saveOrderInfo(bean);
}
}
OrderServiceImpl.java
/**
* 订单逻辑代码
*/
@Slf4j
@Service
public class OrderInfoServiceImpl implements OrderInfoService {
ConcurrencyLockService concurrencyLockService;
/**
* service类注入
*/
@Autowired
OrderInfoServiceImpl(ConcurrencyLockService concurrencyLockService) {
this.concurrencyLockService = concurrencyLockService;
}
@Override
public AjaxResult saveOrderInfo(OrderInfoBean bean) {
long threadId = Thread.currentThread().getId();
final String businessNode = "插入";
Boolean isSuccessLock = concurrencyLockService.tryLock(businessNode, bean.getOrderNo());
if (!isSuccessLock) {
return AjaxResult.error("加锁失败,请稍后重试");
}
try {
log.info("{}开始执行业务逻辑", threadId);
// TODO:模拟业务逻辑耗时
Thread.sleep(1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
concurrencyLockService.unLock(businessNode, bean.getOrderNo());
}
return AjaxResult.success();
}
@Override
@ConcurrencyLock(businessNode = "插入", businessUniqueNoKey = "orderNo")
@Transactional
public AjaxResult saveOrderInfoByAnnotation(OrderInfoBean bean) {
// TODO:模拟业务逻辑耗时
Thread.sleep(1500);
return AjaxResult.success();
}
}
pom.xml相关依赖
在dependencies
中添加下列依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.4</version>
</dependency>
<!-- Mysql驱动包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
事务处理表的表结构
CREATE TABLE `concurrency_lock` (
`ID` int NOT NULL AUTO_INCREMENT COMMENT '主键',
`THREAD_ID` int DEFAULT NULL COMMENT '线程号',
`BUSINESS_NODE` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '操作节点',
`BUSINESS_UNIQUE_NO` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '单据号',
`CREATION_DATE` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`ID`),
UNIQUE KEY `uni_business_no` (`BUSINESS_UNIQUE_NO`,`BUSINESS_NODE`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
测试输出结果
使用Apifox
并发发送5次请求,可以看到实际成功获取到锁并执行的只有一个线程
17:08:00.449 [http-nio-8080-exec-1] c.r.service.impl.ConcurrencyLockServiceImpl - 40完成锁表插入
17:08:00.462 [http-nio-8080-exec-1] c.runningrookie.service.impl.OrderInfoServiceImpl - 40开始执行业务逻辑
17:08:00.573 [http-nio-8080-exec-5] c.r.service.impl.ConcurrencyLockServiceImpl - 44获取锁异常,请稍后重试
17:08:00.573 [http-nio-8080-exec-4] c.r.service.impl.ConcurrencyLockServiceImpl - 43获取锁异常,请稍后重试
17:08:00.573 [http-nio-8080-exec-3] c.r.service.impl.ConcurrencyLockServiceImpl - 42获取锁异常,请稍后重试
17:08:00.573 [http-nio-8080-exec-2] c.r.service.impl.ConcurrencyLockServiceImpl - 41获取锁异常,请稍后重试
17:08:00.574 [http-nio-8080-exec-1] c.r.service.impl.ConcurrencyLockServiceImpl - 40执行解锁完毕
参考链接
gitee代码仓库地址:数据库并发锁