场馆预定平台高并发时间段预定实现V1
🎯 本文介绍了一个高效处理高并发场馆预订请求的系统设计方案。通过使用Redis缓存和位图技术,系统能够快速管理场地的可用性和预订状态。采用Lua脚本确保操作的原子性,结合责任链模式进行参数校验,并通过事务保证数据一致性。系统还实现了订单生成、延时关闭订单等功能,确保资源的公平分配和高效利用。整体设计旨在提供稳定、高性能的预订接口,满足高并发场景下的用户需求。
文章目录
- 简介
- 思路
- 数据表设计
- 场馆服务
- Controller
- Service
- 时间段预定
- 缓存回滚
- 库存回滚
- Mapper
- 订单服务
- Controller
- Service
- 复合分片
- MQ
简介
在场馆预订平台的设计中,通常会将一个大型场馆细分为多个功能区,例如篮球区、羽毛球区、乒乓球区等。每个功能区内又包含若干个独立的场地或球台,用户可以根据自己的需求选择特定的区域和设施进行预定。当预定时间窗口开启时,用户通过平台预订场地,并指定使用的时间段——比如9:00-10:00、10:00-11:00等一小时的时段。
特别地,在一些热门场馆或者资源有限的情况下(例如学校内,众多学生争相预订为数不多的运动场地),高并发的预订请求可能会同时涌入系统。为了应对这种场景,平台需要具备高效的处理能力,确保即使在高峰期也能提供流畅的用户体验。因此,实现一个稳定且高性能的时间段预订接口至关重要,它不仅能够准确管理场地的可用性,还能公平有效地分配资源给每一位有需要的用户。
思路
采用传统秒杀思路,使用 Redis缓存 存储具体的剩余库存,使用 Redis位图 来存储场号预订情况,时间段数据提前放入缓存中
- 接口限流
- 接口幂等性
- 验证提交参数
- 参数非空判断
- 预订的时间段id是否正确
- 是否已经到达预订时间(增加时间缓冲,提前1秒开放预订,避免时间同步问题)
- 是否已经过了时间段时间
- 同一用户只能抢一次同一时间段
- 执行 lua 脚本
- 库存是否足够
- 分配空闲场号(使用bitmap存储)
- 开启事务
- 数据库扣减所预定时间段的库存、空闲场号信息
- 调用订单远程服务
- 生成订单
- 订单号生成,基因算法生成分片键
- 发送延迟消息,超时未支付关闭订单
- 监听延时消息关掉订单
- Binlog监听订单状态改变,使用RokcetMQ消费消息,恢复数据库和缓存的库存、空闲场号
- 若订单远程服务调用失败,恢复缓存的库存
- 生成订单
- 关闭事务
数据表设计
DROP TABLE IF EXISTS `time_period`;
CREATE TABLE `time_period`(
`id` bigint NOT NULL COMMENT 'ID',
`create_time` datetime,
`update_time` datetime,
`is_deleted` tinyint default 0 COMMENT '逻辑删除 0:没删除 1:已删除',
`partition_id` bigint NOT NULL COMMENT '场区id',
`price` decimal(10,2) NOT NULL COMMENT '该时间段预订使用价格(元)',
`stock` int NOT NULL COMMENT '库存',
`booked_slots` bigint unsigned NOT NULL DEFAULT 0 COMMENT '已预订的场地(位图表示)',
`period_date` date NOT NULL COMMENT '预定日期',
`begin_time` time NOT NULL COMMENT '时间段开始时间HH:mm(不用填日期)',
`end_time` time NOT NULL COMMENT '时间段结束时间HH:mm(不用填日期)',
PRIMARY KEY (`id`) USING BTREE,
INDEX `idx_partition_id` (`partition_id`)
);
这里使用位图来存储一个分区场号的预定情况,是如何存储的呢?很简单的理解是,一个字节有8个比特,每个比特存储一个 0 或 1 ,那么一个字节其实可以存储 8 个场是否被预订的状态,0表示空闲,1表示预定。在java中一个bigint可以存储64个场的空闲状态,一个int可以存储32个场的空闲状态,一个tinyint可以存储8个场的空闲状态。
使用这种方式存储有哪些优点?
- 节省存储空间:每个场地只需要1个二进制位进行存储,可以显著减少存储空间占用
- 效率高:位图支持高效的位运算,可以快速查询和更新场地的预订状态
- 适合高并发:位图的位操作是原子性的,适合高并发场景下的预订和释放操作
在数据库中,常用的位运算操作如下:
- 预订场地:假设要预订索引为2的场地。
UPDATE bookings
SET BookedSlots = BookedSlots | (1 << 2)
WHERE BookingID = 1;
- 取消预订:假设要取消预订索引为2的场地。
UPDATE bookings
SET BookedSlots = BookedSlots & ~(1 << 2)
WHERE BookingID = 1;
- 检查场地是否已预订:假设要检查索引为2的场地是否已预订。
SELECT (BookedSlots & (1 << 2)) > 0 AS IsBooked
FROM bookings
WHERE BookingID = 1;
场馆服务
Controller
在这里使用了@Idempotent
注解来实现接口的幂等性,实现方式可以参考文章:https://hellodam.blog.csdn.net/article/details/137435495
/**
* 预定时间段
*/
@GetMapping("/v1/reserve")
@Idempotent(
uniqueKeyPrefix = "vrs-venue:lock_reserve:",
// 让用户同时最多只能预定一个时间段,根据用户名来加锁
// key = "T(com.vrs.common.context.UserContext).getUsername()",
// 让用户同时最多只能预定该时间段一次,但是可以同时预定其他时间段,根据用户名+时间段ID来加锁
key = "T(com.vrs.common.context.UserContext).getUsername()+'_'+#timePeriodId",
message = "正在执行场馆预定流程,请勿重复预定...",
scene = IdempotentSceneEnum.RESTAPI
)
@Operation(summary = "预定时间段")
public Result reserve(@RequestParam("timePeriodId") Long timePeriodId) {
OrderDO orderDO = timePeriodService.reserve(timePeriodId);
return Results.success(orderDO);
}
Service
时间段预定
首先使用了一个责任链模式并结合来进行参数校验,需要的校验如下:
- 用户预定的时间段是否存在
- 是否已经到达预订时间(增加时间缓冲,提前1秒开放预订,避免时间同步问题)
- 是否已经过了时间段时间
其实这里的责任链模式用的不是特别好,因为检验过程中,数据具有连续性。即在检验用户预定时间段是否为空时,如果不为空,需要把时间段的相关属性带到预定时间校验中,所以需要进行参数的传递。
执行 lua 脚本获取预定场号,这里要做几件事,使用 lua 脚本可以保证几件事情的原子性
- 校验用户是否已经预定过当前时间段
- 如果用户是第一次预定,检验库存是否大于0
- 如果库存大于0,分配场地
-- 定义脚本参数
local stock_key = KEYS[1]
local free_index_bitmap_key = KEYS[2]
-- 用来存储已购买用户的set
local set_name = KEYS[3]
-- 用户ID
local user_id = ARGV[1]
-- 过期时间 (秒)
local expire_time = tonumber(ARGV[2])
-- 检查用户是否已经购买过
if redis.call("SISMEMBER", set_name, user_id) == 1 then
-- 用户已经购买过,返回 -2 表示失败
return -2
end
-- 如果用户没有购买过,添加用户到set中
redis.call("SADD", set_name, user_id)
-- 设置过期时间
if expire_time > 0 then
redis.call("EXPIRE", set_name, expire_time)
end
-- 获取库存
local current_inventory = tonumber(redis.call('GET', stock_key) or 0)
-- 尝试消耗库存
if current_inventory < 1 then
-- 库存不够了,返回-1,代表分配空场号失败
return -1 -- 失败
end
-- 查找第一个空闲的场地(位图中第一个为 0 的位)
local free_court_bit = redis.call("BITPOS", free_index_bitmap_key, 0)
if not free_court_bit or free_court_bit == -1 then
-- 没有空闲的场号
return -1 -- 失败
end
-- 占用该场地(将对应位设置为 1)
redis.call("SETBIT", free_index_bitmap_key, free_court_bit, 1)
-- 更新库存
redis.call('DECRBY', stock_key, 1)
-- 返回分配的场地索引(注意:位图的位索引从0开始,如果你需要从1开始,这里加1)
return tonumber(free_court_bit)
场地分配成功之后,扣减数据库库存、更新场地预定情况,并创建订单。由于创建订单需要调用远程服务,需要保证业务一致性。这里使用@Transactional
注解,当创建订单失败时,恢复缓存,并不提交数据库的修改。但是这里的实现仍然存在一个问题:如果说由于网络原因,实际上订单已经创建成功了,但是因为超时访问失败,这里库存却回滚了,咋办?这里提出两种解决方案
方案一:在扣减库存成功之后,发送一个消息到消息队列,通知订单服务创建订单,如果消息发送失败,回滚库存。
【优点】
- 库存扣减和订单创建是异步的,给订单服务发送消息之后,即可返回,预订接口吞吐量更高。
【缺点】
- 使用这种方式,库存扣减和订单创建是异步的,无法在接口调用结束时,直接返回订单信息给前端。在订单创建成功之后,需要使用双向通讯技术(如Websocket)通知前端订单创建完成,并给前端发生订单数据。
方案二:使用延时消息兜底,间隔几秒后,去查询订单是否创建成功,如果创建成功,到缓存中检查一下用户是否真的预订了该时间段,如果没有预定该时间段,说明库存发生了回滚,系统将订单进行删除
【优点】
- 通过延时消息和后续的检查机制,能够确保订单和库存的最终一致性。即使因为网络超时导致库存回滚,系统也能通过后续的检查发现不一致并修复。
【缺点】
- 数据一致性有延迟,如果订单创建成功但库存回滚,用户可能会短暂看到订单创建成功,但后续订单被删除,这可能会让用户疑惑。
- 库存扣减和订单创建是同步的,预订接口吞吐量较低。
在这里先提出这个问题,后续在时间段预定V2中使用方案一进行解决
这里还存在一个问题,假如说lua脚本执行完成,缓存中的库存已经扣减,结果突然服务器宕机了,没有执行后续的数据库库存扣减和创建订单流程,那相当于有一个场被占用了,但实际上无人使用,这个问题也会在时间段预定V2中解决
/**
* 传统秒杀架构,使用缓存存储具体的剩余库存,使用 位图 来存储空闲场号
* 下单时首先尝试扣减库存和分配空闲场号,如果可以扣减成功,再执行下单等逻辑
* 存在问题:
* 1、如果缓存扣减成功之后,应用宕机了,没有执行数据库库存扣减和生成订单逻辑,那就会出现缓存、数据库不一致的情况。应用重启之后,需要重新同步缓存和数据库,需要人力管理
* 2、使用同步扣减数据库库存和下单,接口吞吐量不够高
*
* @param timePeriodId
* @return
*/
@Override
// 子方法声明了Transactional,父方法也需要声明,不要会失效
@Transactional(rollbackFor = Throwable.class)
public OrderDO reserve(Long timePeriodId) {
参数校验
// 使用责任链模式校验数据是否正确
//todo 后面对于不对外开放的场馆,还需要校验用户是否属于该机构用户
TimePeriodReserveReqDTO timePeriodReserveReqDTO = new TimePeriodReserveReqDTO(timePeriodId);
chainContext.handler(ChainConstant.RESERVE_CHAIN_NAME, timePeriodReserveReqDTO);
TimePeriodDO timePeriodDO = timePeriodReserveReqDTO.getTimePeriodDO();
Long venueId = timePeriodReserveReqDTO.getVenueId();
VenueDO venueDO = timePeriodReserveReqDTO.getVenueDO();
使用lua脚本获取一个空场地对应的索引,并扣除相应的库存,同时在里面进行用户的查重
// 使用 Hutool 的单例管理容器 管理lua脚本的加载,保证其只被加载一次
String luaScriptPath = "lua/free_court_index_allocate_by_bitmap.lua";
DefaultRedisScript<Long> luaScript = Singleton.get(luaScriptPath, () -> {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(luaScriptPath)));
redisScript.setResultType(Long.class);
return redisScript;
});
Long freeCourtIndex = stringRedisTemplate.execute(
luaScript,
Lists.newArrayList(
String.format(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_KEY, timePeriodReserveReqDTO.getTimePeriodId()),
String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_KEY, timePeriodReserveReqDTO.getTimePeriodId()),
String.format(RedisCacheConstant.VENUE_IS_USER_BOUGHT_TIME_PERIOD_KEY, timePeriodReserveReqDTO.getTimePeriodId())
),
UserContext.getUserId().toString(),
String.valueOf(venueDO.getAdvanceBookingDay() * 86400)
);
if (freeCourtIndex == -2) {
// --if-- 用户已经购买过该时间段
throw new ClientException(BaseErrorCode.TIME_PERIOD_HAVE_BOUGHT_ERROR);
} else if (freeCourtIndex == -1) {
// --if-- 没有空闲的场号
throw new ServiceException(BaseErrorCode.TIME_PERIOD_SELL_OUT_ERROR);
}
修改数据库中时间段的库存和已经选定的场号,并生成订单
// todo 为了保证事务原子性,将修改数据库库存操作和创建订单放在了一次,而且是同步执行,如果想要接口吞吐量更高,这里肯定是需要优化成异步的
return this.executePreserve(
timePeriodDO,
freeCourtIndex,
venueId);
}
/**
* 执行下单和数据库库存扣减操作
*
* @param timePeriodDO
* @param courtIndex
* @param venueId
* @return
*/
@Override
// 抛出任何异常,回退库存
@Transactional(rollbackFor = Throwable.class)
public OrderDO executePreserve(TimePeriodDO timePeriodDO,
Long courtIndex, Long venueId) {
// 扣减当前时间段的库存,修改空闲场信息
baseMapper.updateStockAndBookedSlots(timePeriodDO.getId(), timePeriodDO.getPartitionId(), courtIndex);
// 调用远程服务创建订单
OrderGenerateReqDTO orderGenerateReqDTO = OrderGenerateReqDTO.builder()
.timePeriodId(timePeriodDO.getId())
.partitionId(timePeriodDO.getPartitionId())
.periodDate(timePeriodDO.getPeriodDate())
.beginTime(timePeriodDO.getBeginTime())
.endTime(timePeriodDO.getEndTime())
.courtIndex(courtIndex)
.userId(UserContext.getUserId())
.userName(UserContext.getUsername())
.venueId(venueId)
.payAmount(timePeriodDO.getPrice())
.build();
Result<OrderDO> result;
try {
result = orderFeignService.generateOrder(orderGenerateReqDTO);
if (result == null || !result.isSuccess()) {
// --if-- 订单生成失败,抛出异常,上面的库存扣减也会回退
throw new ServiceException(BaseErrorCode.ORDER_GENERATE_ERROR);
}
} catch (Exception e) {
// --if-- 订单生成服务调用失败
// 恢复缓存中的信息
this.restoreStockAndBookedSlotsCache(timePeriodDO.getId(), UserContext.getUserId(), courtIndex);
// todo 如果说由于网络原因,实际上订单已经创建成功了,但是因为超时访问失败,这里库存却回滚了,咋办,如何将订单置为废弃状态
// 打印错误堆栈信息
e.printStackTrace();
// 把错误返回到前端
throw new ServiceException(e.getMessage());
}
return result.getData();
}
缓存回滚
/**
* 库存、空闲场号、已购买用户缓存回退
*/
@Override
public void restoreStockAndBookedSlotsCache(Long timePeriodId, Long userId, Long courtIndex) {
使用lua脚本获取一个空场地对应的索引,并扣除相应的库存
// 使用 Hutool 的单例管理容器 管理lua脚本的加载,保证其只被加载一次
String luaScriptPath = "lua/free_court_index_release_by_bitmap.lua";
DefaultRedisScript<Long> luaScript = Singleton.get(luaScriptPath, () -> {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(luaScriptPath)));
redisScript.setResultType(Long.class);
return redisScript;
});
Long status = stringRedisTemplate.execute(
luaScript,
Lists.newArrayList(
String.format(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_KEY, timePeriodId),
String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_KEY, timePeriodId),
String.format(RedisCacheConstant.VENUE_IS_USER_BOUGHT_TIME_PERIOD_KEY, timePeriodId)
),
userId.toString(),
courtIndex.toString()
);
if (status == -3) {
// --if-- 该场号本身就是空闲的,无需释放库存(说明库存已经被释放过了,这不要抛异常出去,否则库存释放方法会反复失败)
} else if (status == -2) {
// --if-- 用户没有购买该时间段
throw new ServiceException(BaseErrorCode.TIME_PERIOD_HAVE_NOT_BOUGHT_ERROR);
} else if (status == -1) {
// --if-- 场号不合法
throw new ServiceException(BaseErrorCode.TIME_PERIOD_FREE_COURT_INDEX_ERROR);
}
}
在进行缓存回滚的时候,也需要使用 lua 脚本,保证用户去重表删除、库存回滚、预订场号回滚操作的原子性
-- 定义脚本参数
local stock_key = KEYS[1]
local free_index_bitmap_key = KEYS[2]
-- 用来存储已购买用户的set
local set_name = KEYS[3]
-- 用户ID
local user_id = ARGV[1]
-- 场地索引
local court_index = tonumber(ARGV[2])
-- 检查用户是否已经购买过
if redis.call("SISMEMBER", set_name, user_id) == 0 then
-- 用户没有购买过,返回 -2 表示失败
return -2
end
-- 检查场地索引是否有效
if not court_index or court_index < 0 then
-- 无效的场地索引,返回 -1 表示失败
return -1
end
-- 检查场号是否本来就是处于空闲状态
local is_free = redis.call("GETBIT", free_index_bitmap_key, court_index)
if is_free == 0 then
-- 场号本身就处于空闲状态,所以无需释放库存,返回 -3 表示错误
return -3
end
-- 释放场号(将对应位设置为 0)
redis.call("SETBIT", free_index_bitmap_key, court_index, 0)
-- 更新库存(增加库存)
redis.call('INCRBY', stock_key, 1)
-- 移除用户
redis.call("SREM", set_name, user_id)
-- 返回成功
return 0 -- 成功
库存回滚
/**
* 库存、空闲场号数据库回退
*/
@Override
public void restoreStockAndBookedSlotsDatabase(TimePeriodStockRestoreReqDTO timePeriodStockRestoreReqDTO) {
// 恢复数据库中的库存
baseMapper.restoreStockAndBookedSlots(timePeriodStockRestoreReqDTO.getTimePeriodId(), timePeriodStockRestoreReqDTO.getPartitionId(), timePeriodStockRestoreReqDTO.getCourtIndex());
}
Mapper
在更新位图的时候,需要使用位运算
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.vrs.mapper.TimePeriodMapper">
<update id="updateStockAndBookedSlots">
<![CDATA[
UPDATE time_period
SET booked_slots = booked_slots | (1 << #{partitionIndex}), stock = stock - 1
WHERE id = #{timePeriodId} AND stock > 0 AND partition_id = #{partitionId}
]]>
</update>
<update id="restoreStockAndBookedSlots">
<![CDATA[
UPDATE time_period
SET booked_slots = booked_slots & ~(1 << #{partitionIndex}), stock = stock + 1
WHERE id = #{timePeriodId} AND partition_id = #{partitionId}
]]>
</update>
</mapper>
订单服务
Controller
/**
* 生成订单
*/
@PostMapping("/v1/generateOrder")
@Operation(summary = "生成订单")
public Result<OrderDO> generateOrder(@RequestBody OrderGenerateReqDTO orderGenerateReqDTO) {
OrderDO orderDO = orderService.generateOrder(orderGenerateReqDTO);
return Results.success(orderDO);
}
Service
为了支持大数据量,本文对订单进行了分表。但是订单需要支持两种查询方式。
- 直接根据订单号查询相应的订单
- 用户需要查询自己的订单列表。
传统的分表方式只有一个分片键,要么用订单号分片,要么用用户id进行分片,两种方式都无法满足上述查询需求,会触发读扩散问题,效率严重下降,本文使用复合分片算法来解决这个问题,即在订单后面冗余用户ID的后六位SnowflakeIdUtil.
nextId
() + String.
valueOf
(orderGenerateReqDTO.getUserId() % 1000000)
- 如果是用户查询自己的订单列表,直接使用用户ID的后六位进行分片定位
- 如果根据订单号查询相应的订单,那么使用订单号的后六位进行分片定位
在创建订单之后,发送一个延时消息,如果十分钟用户还没有成功付款,则取消订单,回滚缓存和数据库的库存,避免有人长期占用资源但是不购买。这里为啥分为closeOrder
和secondCloseOrder
,主要是避免订单支付成功回调期间,订单被超时关闭了,详情请看 https://hellodam.blog.csdn.net/article/details/144942881
基于 Canal 监听 MySQL Binlog 日志恢复库存,可以参考 https://hellodam.blog.csdn.net/article/details/144483823
@Override
public OrderDO generateOrder(OrderGenerateReqDTO orderGenerateReqDTO) {
OrderDO orderDO = OrderDO.builder()
// 订单号使用雪花算法生成分布式ID,然后再拼接用户ID的后面六位
.orderSn(SnowflakeIdUtil.nextId() + String.valueOf(orderGenerateReqDTO.getUserId() % 1000000))
.orderTime(new Date())
.venueId(orderGenerateReqDTO.getVenueId())
.partitionId(orderGenerateReqDTO.getPartitionId())
.courtIndex(orderGenerateReqDTO.getCourtIndex())
.timePeriodId(orderGenerateReqDTO.getTimePeriodId())
.periodDate(orderGenerateReqDTO.getPeriodDate())
.beginTime(orderGenerateReqDTO.getBeginTime())
.endTime(orderGenerateReqDTO.getEndTime())
.userId(orderGenerateReqDTO.getUserId())
.userName(orderGenerateReqDTO.getUserName())
.payAmount(orderGenerateReqDTO.getPayAmount())
.orderStatus(OrderStatusConstant.UN_PAID)
.build();
int insert = baseMapper.insert(orderDO);
if (insert > 0) {
// 发送延时消息来关闭未支付的订单
orderDelayCloseProducer.sendMessage(OrderDelayCloseMqDTO.builder()
.orderSn(orderDO.getOrderSn())
.build());
}
return orderDO;
}
@Override
@Transactional(rollbackFor = Throwable.class)
public void closeOrder(String orderSn) {
String orderPayLock = stringRedisTemplate.opsForValue().get(String.format(RedisCacheConstant.ORDER_PAY_LOCK_KEY, orderSn));
if ("0".equals(orderPayLock)) {
OrderDO orderDO = baseMapper.selectByOrderSn(orderSn);
// --if-- 订单已经被锁定,说明订单正处于支付状态,先不要关闭订单,等等再看看是否支付成功了
if (orderDO.getOrderStatus().equals(OrderStatusConstant.UN_PAID)) {
// --if-- 当前订单还没有支付成功,发一个延时消息,如果等等订单还没有被支付,就关闭订单
orderSecondDelayCloseProducer.sendMessage(OrderDelayCloseMqDTO.builder()
.orderSn(orderDO.getOrderSn())
.build());
// 将订单支付状态设置为1,拒绝后面的支付调用
stringRedisTemplate.opsForValue().set(String.format(RedisCacheConstant.ORDER_PAY_LOCK_KEY, orderSn), "1", 5, TimeUnit.MINUTES);
}
} else {
// --if-- 订单不在支付中,直接关闭订单
secondCloseOrder(orderSn);
}
}
@Override
public void secondCloseOrder(String orderSn) {
OrderDO orderDO = baseMapper.selectByOrderSn(orderSn);
if (orderDO.getOrderStatus().equals(OrderStatusConstant.UN_PAID)) {
// --if-- 到时间了,订单还没有支付,取消该订单
orderDO.setOrderStatus(OrderStatusConstant.CANCEL);
// 分片键不能更新
orderDO.setVenueId(null);
baseMapper.updateByOrderSn(orderDO);
if (!isUseBinlog) {
// --if-- 如果不启用binlog的话,需要自己手动调用方法来释放库存
// 极端情况,如果说远程已经还原了库存,但是因为网络问题,返回了错误,导致订单没有关闭,于是出现了不一致的现象。库存都还原完了,你订单还可以支付
Result<OrderDO> result;
try {
result = timePeriodFeignService.release(TimePeriodStockRestoreReqDTO.builder()
.timePeriodId(orderDO.getTimePeriodId())
.partitionId(orderDO.getPartitionId())
.courtIndex(orderDO.getCourtIndex())
.userId(orderDO.getUserId())
.build());
} catch (Exception e) {
// --if-- 库存恢复远程接口调用失败
throw new ServiceException(BaseErrorCode.REMOTE_ERROR);
}
if (result == null || !result.isSuccess()) {
// 因为使用了Transactional,如果这里出现了异常,订单的关闭修改会回退
throw new ServiceException("调用远程服务释放时间段数据库库存失败", BaseErrorCode.SERVICE_ERROR);
}
} else {
// --if-- 如果启用binlog的话,会自动监听数据库的订单关闭,然后恢复缓存中的库存
}
}
}
复合分片
下面定义了数据分片规则,用于将 time_period_order
表的数据水平分割到多个物理表中。具体来说,它指定了 time_period_order
表的真实数据节点为 ds_0
数据源下的 time_period_order_0
到 time_period_order_15
共16个分片表。为了确定数据应该插入哪个分片表,这里采用了复合分表策略,即根据 user_id
和 order_sn
两个分片键来决定。对于分片算法,配置中引用了名为 order_table_gene_mod
的自定义算法,该算法由 com.vrs.algorithm.OrderTableGeneAlgorithm
类实现,通过计算用户ID或订单号的哈希值,并结合分片数量(本例中为16),来确定数据最终存储的具体分片表。
rules:
- !SHARDING
tables:
time_period_order:
# 真实数据节点,比如数据库源以及数据库在数据库中真实存在的
actualDataNodes: ds_0.time_period_order_${0..15}
# 分表策略
tableStrategy:
# 复合分表策略(多个分片键)
complex:
# 用户 ID 和订单号
shardingColumns: user_id,order_sn
# 搜索 order_table_complex_mod 下方会有分表算法
shardingAlgorithmName: order_table_gene_mod
# 分片算法
shardingAlgorithms:
# 订单分表算法
order_table_gene_mod:
# 通过加载全限定名类实现分片算法,相当于分片逻辑都在 algorithmClassName 对应的类中
type: CLASS_BASED
props:
algorithmClassName: com.vrs.algorithm.OrderTableGeneAlgorithm
# 分表数量
sharding-count: 16
# 复合(多分片键)分表策略
strategy: complex
下面的类实现了 Apache ShardingSphere 框架中的 ComplexKeysShardingAlgorithm
接口,用于根据订单号和用户ID这两个分片键来决定数据应该存储在哪个分片表中。该算法首先尝试使用用户ID进行哈希分片,如果用户ID不存在或为空,则退而求其次使用订单号作为分片键。它会根据配置文件中指定的分片数量,计算出分片值(用户ID或订单号)对应的哈希值,并据此确定具体的数据分片表名称,从而实现数据的水平分割。
package com.vrs.algorithm;
import cn.hutool.core.collection.CollUtil;
import com.google.common.base.Preconditions;
import lombok.Getter;
import org.apache.shardingsphere.sharding.api.sharding.complex.ComplexKeysShardingAlgorithm;
import org.apache.shardingsphere.sharding.api.sharding.complex.ComplexKeysShardingValue;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Properties;
/**
* 订单分表基因算法 - 该类用于实现复杂的分片逻辑,特别是当存在多个分片键时。(这里是订单号和用户ID)
* 它会根据提供的分片值来决定数据应该存储在哪一个分片表中。
*
* @Author dam
* @create 2024/12/6 9:55
*/
@Getter
public class OrderTableGeneAlgorithm implements ComplexKeysShardingAlgorithm {
private Properties props;
/**
* 分片的数量,即总共有多少个分片表
*/
private int shardingCount;
/**
* 配置文件中的分片数量键名
*/
private static final String SHARDING_COUNT_KEY = "sharding-count";
/**
* 根据提供的分片键和分片值来决定将数据分配到哪个分片表中。
*
* @param collection 可能的分片表集合
* @param complexKeysShardingValue 包含分片键及其对应的值
* @return 返回包含具体分片表名称的集合
*/
@Override
public Collection<String> doSharding(Collection collection, ComplexKeysShardingValue complexKeysShardingValue) {
// 获取分片键与分片值的映射
Map<String, Collection<Comparable<?>>> columnNameAndShardingValuesMap = complexKeysShardingValue.getColumnNameAndShardingValuesMap();
// 初始化结果集,使用 LinkedHashSet 以保持插入顺序
Collection<String> result = new LinkedHashSet<>(collection.size());
if (CollUtil.isNotEmpty(columnNameAndShardingValuesMap)) {
// --if-- 如果有分片键和值,则开始处理
String userId = "user_id";
// 获取 'user_id' 对应的分片值集合
Collection<Comparable<?>> customerUserIdCollection = columnNameAndShardingValuesMap.get(userId);
if (CollUtil.isNotEmpty(customerUserIdCollection)) {
// --if-- 'user_id' 存在且不为空,则基于 'user_id' 进行分片
// 获取第一个分片值
Comparable<?> comparable = customerUserIdCollection.stream().findFirst().get();
// 取用户ID的后面六位来进行哈希分片
String dbSuffix = String.valueOf(hashShardingValue((Long) comparable % 1000000) % shardingCount);
result.add(complexKeysShardingValue.getLogicTableName() + "_" + dbSuffix);
} else {
// 'user_id' 不存在或为空,尝试使用 'order_sn' 作为分片键
String orderSn = "order_sn";
Collection<Comparable<?>> orderSnCollection = columnNameAndShardingValuesMap.get(orderSn);
Comparable<?> comparable = orderSnCollection.stream().findFirst().get();
if (comparable instanceof String) {
// --if-- 如果订单号是字符串类型
String actualOrderSn = comparable.toString();
result.add(complexKeysShardingValue.getLogicTableName() + "_" + hashShardingValue(actualOrderSn.substring(Math.max(actualOrderSn.length() - 6, 0))) % shardingCount);
} else {
// --if-- 如果订单号是长整型(我们这个系统肯定不是这个)
String dbSuffix = String.valueOf(hashShardingValue((Long) comparable % 1000000) % shardingCount);
result.add(complexKeysShardingValue.getLogicTableName() + "_" + dbSuffix);
}
}
}
// 返回最终确定的分片表名称集合
return result;
}
/**
* 初始化方法,在创建分片算法实例时被调用,用来设置分片参数。
*
* @param props 包含分片配置信息的属性对象
*/
@Override
public void init(Properties props) {
this.props = props;
shardingCount = getShardingCount(props);
}
/**
* 从配置属性中读取分片数量,如果未找到则抛出异常。
*
* @param props 包含分片配置信息的属性对象
* @return 分片数量
*/
private int getShardingCount(final Properties props) {
// 检查是否提供了分片数量,如果没有则抛出异常
Preconditions.checkArgument(props.containsKey(SHARDING_COUNT_KEY), "分片数量不可以为空");
// 解析并返回分片数量
return Integer.parseInt(props.getProperty(SHARDING_COUNT_KEY));
}
/**
* 根据给定的分片值计算哈希值,用于确定具体的分片。
*
* @param shardingValue 分片值
* @return 哈希后的分片值
*/
private long hashShardingValue(final Comparable<?> shardingValue) {
// 使用分片值的 hashCode 生成一个绝对值的哈希码
return Math.abs((long) shardingValue.hashCode());
}
}
MQ
【消息生产者】
package com.vrs.rocketMq.producer;
import cn.hutool.core.util.StrUtil;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.OrderDelayCloseMqDTO;
import com.vrs.templateMethod.AbstractCommonSendProduceTemplate;
import com.vrs.templateMethod.BaseSendExtendDTO;
import com.vrs.templateMethod.MessageWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* 计算数据准备 生产者
*
* @Author dam
* @create 2024/9/20 16:00
*/
@Slf4j
@Component
public class OrderDelayCloseProducer extends AbstractCommonSendProduceTemplate<OrderDelayCloseMqDTO> {
@Override
protected BaseSendExtendDTO buildBaseSendExtendParam(OrderDelayCloseMqDTO messageSendEvent) {
return BaseSendExtendDTO.builder()
.eventName("延时关闭订单")
.keys(String.valueOf(messageSendEvent.getOrderSn()))
.topic(RocketMqConstant.ORDER_TOPIC)
.tag(RocketMqConstant.ORDER_DELAY_CLOSE_TAG)
.sentTimeout(2000L)
// .delayTime(10 * 1000L)
// 延时十分钟,关闭未支付订单
.delayTime(3 * 60 * 1000L)
.build();
}
@Override
protected Message<?> buildMessage(OrderDelayCloseMqDTO messageSendEvent, BaseSendExtendDTO requestParam) {
String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys();
return MessageBuilder
.withPayload(new MessageWrapper(keys, messageSendEvent))
.setHeader(MessageConst.PROPERTY_KEYS, keys)
.setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag())
.build();
}
}
【消息消费者】
package com.vrs.rocketMq.listener;
import com.vrs.annotation.Idempotent;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.OrderDelayCloseMqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.OrderService;
import com.vrs.templateMethod.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @Author dam
* @create 2024/9/20 21:30
*/
@Slf4j(topic = RocketMqConstant.ORDER_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.ORDER_TOPIC,
consumerGroup = RocketMqConstant.ORDER_CONSUMER_GROUP + "-" + RocketMqConstant.ORDER_DELAY_CLOSE_TAG,
messageModel = MessageModel.CLUSTERING,
// 监听tag
selectorType = SelectorType.TAG,
selectorExpression = RocketMqConstant.ORDER_DELAY_CLOSE_TAG
)
@RequiredArgsConstructor
public class OrderDelayCloseListener implements RocketMQListener<MessageWrapper<OrderDelayCloseMqDTO>> {
private final OrderService orderService;
/**
* 消费消息的方法
* 方法报错就会拒收消息
*
* @param messageWrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数
*/
@Idempotent(
uniqueKeyPrefix = "order_delay_close:",
key = "#messageWrapper.getMessage().getOrderSn()",
scene = IdempotentSceneEnum.MQ,
keyTimeout = 3600L
)
@SneakyThrows
@Override
public void onMessage(MessageWrapper<OrderDelayCloseMqDTO> messageWrapper) {
// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)
log.info("[消费者] 关闭订单:{}", messageWrapper.getMessage().getOrderSn());
String orderSn = messageWrapper.getMessage().getOrderSn();
orderService.closeOrder(orderSn);
}
}