springcloud——并发请求处理方案
目录
1.业务逻辑处理
2.数据库层面保底
3.利用mysql update行锁
4.基于redis控制请求数量
5.利用mq进行流量削峰
6.使用限流算法
7.使用分布式锁
生活中的抢购、抢订等活动遍地都是,比如双11抢购商品、抢占活动名额、抢火车票等。而这些活动往往伴随的是大量的并发请求访问服务器,如果处理不当,会使得服务器崩溃导致活动无法正常展开。
具体来说,我们可能会面对这样的问题:我们上架了一件商品,库存为100,进行秒杀活动。当活动开始时,用户进行抢购,并发发送了1000个请求过来,远远大于库存数量。此时,我们面对并发请求,需要保证:
- 不出现超卖现象,即库存不为负数
- 限制进入数据库请求的数量,保证数据库服务器不蹦。
- 避免用户恶意请求,时间内频发点击请求需拒绝
- 避免业务处理过程过长,即保证运行时间
针对大量的并发请求,我们一般可以基于以下方案进行处理。
1.业务逻辑处理
如果活动开始时我们无法应当过多的请求,那我们首先可以考虑业务逻辑上先对请求进行筛减。比如限制双十一的预购活动,就是让用户先进行报名,拥有名额的用户才可以在活动开始时发送请求,从而做到业务层面上对请求进行减少。
不同的业务需要根据实际进行设计。
2.数据库层面保底
面对并发请求,困难的往往是数据库中的写操作,读操作是没有并发性问题,所以我们可以在数据库使用乐观锁对数据库数据进行保底。
乐观锁:即认为任务并不是所有请求都会同时修改数据,只是在修改的时候判断是否被其他线程修改了,如果是,此处操作撤销。
原理:mysql进行update操作时,会自动判断version是否为当前版本号(即当读取的version=1,update的时候会把version=1也作为where里面的判断条件),执行结束后把该行数据的version+1。
由于mysql中update操作是加锁进行的,所以两个请求并发过来时,两个请求读到的version都为1:前一个请求先执行update操作,把 version改为2;后一个请求执行update前判断 version为2,与读取到的1不一致,撤销操作。
实现方法:我们可以通过mybatis中的@version注解标记的字段作为版本号(乐观锁字段),然后在数据表中建立version字段即可
3.利用mysql update行锁
与乐观锁同理,由于mysql中update操作是加锁进行的,我们可以在update语句时添加判断条件帮助保证数据库层面不会出现超卖。即两个请求并发过来时,两个请求读到的库存都为1:前一个请求先执行update操作,把库存-1改为0;后一个请求执行update前判断库存为0,不符合update条件,不执行操作。
实现方式:update table set count = count -1 and count>0
4.基于redis控制请求数量
我们可以利用redis的原理性递减来应对海量的请求,控制进入数据库操作的人数。
原理:redis单线程性能高,操作内存快;多线程存在上下文切换,且多线程为保证线程安全需上锁,导致效率变慢。redis的原子性递减操作可以保证请求一个接一个进行递减,从而保证了进入的请求数量。
实现方式:首先,我们可以通过定时任务等方式,在活动开始前将库存(即允许的请求数量)缓存进redis中,然后使用redis的increment操作进行减库存,然后进行判断,当递减后库存小于0,则抛出异常。
demo:
定时任务:
//定时任务,缓存库存
@Scheduled(cron = "0 0 6 * * ?")
//秒 分 时 日 月 周 年(可选);*表所有可能的值,-指定范围值,/表示步长
public void updateSeckillStatue(){
log.info("修改球场状态及秒杀时间");
List<SeckillGoods> list = seckillCourtMapper.selectList(null);
for (SeckillGoods goods:list ){
//将库存缓存进redis
redisUtil.set("seckill-goods:" + goods.getId(), goods.getCount());
}
}
在秒杀接口进行判断:
//判断库存,redis原子递减
long count = redisUtil.decr("seckill-goods:" + goods_id, 1);
if (count < 0){
//该球场已被订购,抛出异常
throw new GlobalException(SeckillCodeMsg.SECKILL_COURT_NULL);
}
注意: 缓存进redis中的库存只是用来控制进入service层人数的,不是真正的库存。
5.利用mq进行流量削峰
我们可以使用mq进行异步处理,设置最大消费者的数量。从而减少进入数据库请求人数,达到流量削峰的目的:
原理:rabbitmq队列是先进先出的顺序,先来后到。即1000个请求并发,根据最大消费者数量对请求进行限制:如果最大消费者的数量为100,库存为100,则前100个请求抢单成功之后就注定了后900个请求是抢单失败的。
实现方式: 在配置文件中配置最大消费者数量,然后controller中接口作为生产者,service业务类作为消费者,对消息进行监听。
demo:(此处Jackson序列化类省略)
配置文件:
spring:
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
publisher-confirm-type: correlated #发布确认模式:correlated即交换机收到消息后触发回调方法
publisher-returns: true #回退消息,当找不到routing key对应的队列时,是否回退信息
listener:
simple:
concurrency: 10 #消费者最少数量
max-concurrency: 100 #消费者最大数量
prefetch: 1 #消费者每次处理一条消息
auto-startup: true
default-requeue-rejected: false #消息被拒绝是否重新进入队列
生产者:
OrderMessageDto orderMessageDto = new OrderMessageDto(seckill_id,userId,ip);
//rabbitmq异步处理订单
rabbitTemplate.convertAndSend(EXCHANGE_ORDER,"order_route", orderMessageDto,
new CorrelationData(String.valueOf(seckill_id)));
消费者:
@RabbitListener(queues = QUEUE_ORDER)
@Transactional(rollbackFor = GlobalException.class)
public void receiveOrderMsg(@Headers Channel channel, Message message) {
log.info("订单队列接收到消息:"+new String(message.getBody()));
OrderMessageDto orderMessageDto = JSON.parseObject(new String(message.getBody()), OrderMessageDto.class);
//捕捉异常,方便业务失败时进行回滚
try {
//业务代码逻辑
//确认消息消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.info("订单处理异常");
e.printStackTrace();
}
}
此处省略路由异常callback相关代码,需要请参考:Springboot集成rabbitmq——消息持久化-CSDN博客
一般情况下,我们会在最后创建订单+减数据库库存时使用mq进行异步处理。
6.使用限流算法
可以通过计数器法限流,或是springcloud gateway使用令牌桶算法限制请求数。
计数器算法实现方式:基于redis计数器算法对端口进行限流处理。使用redis缓存访问次数,并定义过期时间,过期时间内达到对应值,则限制访问。主要通过自定义注解+aop的形式对接口进行限流,限制每秒请求数。
demo:
注解:
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface RequestLimit {
//允许访问次数
int permitRequest();
//过期时间
int second();
}
aop切面:
@Slf4j
@Aspect
@Component
public class RequestLimitAop {
@Resource
private AuthJwtProperties authJwtProperties;
@Resource
private RedisUtil redisUtil;
@Resource
private JwtTokenUtil tokenUtil;
@Around("@annotation(com.seven.seckill.annotation.RequestLimit)")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable{
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
//拿limit的注解
RequestLimit limit = method.getAnnotation(RequestLimit.class);
if (limit != null) {
//允许的请求数
int requestCount = limit.permitRequest();
//key过期时间
int second = limit.second();
//获取用户id
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String token = attributes.getRequest().getHeader(authJwtProperties.getHeader());
//处理前缀
if (StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX))
{
token = token.replaceFirst(TokenConstants.PREFIX, StringUtils.EMPTY);
}
//从token中获取用户id
String userId = tokenUtil.getUserIdFromToken(token);
Integer flag = (Integer) redisUtil.get("cloud-court-seckill:"+method.getName()+":"+userId);
if (flag==null){
//设置计数器,second 秒过期
redisUtil.set("cloud-court-seckill:"+method.getName()+":"+userId, 1, second);
}else if (flag < requestCount){
//请求数+1
redisUtil.incr("cloud-court-seckill:"+method.getName()+":"+userId,1);
}else {
//限流
log.warn("请求过于频繁");
throw new GlobalException(ExceptionCodeMsg.SYSTEM_BUSY);
}
}
return joinPoint.proceed();
}
}
将注解放于接口上:
@RequestLimit(permitRequest = 5,second = 2) //限制用户每2秒点击次数为5
public Result<?> test(){
return new Result<>(ResultEnum.SUCCESS);
}
注意:次方法一般用于限制单个用户的频繁点击(所以代码中使用用户id做redis中点击次数的key),需要用户先登录才可以访问接口。
基于springcloud gateway的令牌桶限流算法可以参考:springcloud——gateway功能拓展_tang_seven的博客-CSDN博客
7.使用分布式锁
最后,如果需要对抢购活动进行进一步保障,可以使用分布式锁。
具体可以参考:Springboot集成Redis——实现分布式锁-CSDN博客
可以确定的是,上锁必定会影响代码的效率(由于需要一个一个排队进行处理),所以上锁的代码段不易过长,需要自行进行设计。