当前位置: 首页 > article >正文

如何用redis+lua来实现高并发限流,超时数据进行等待

限制每个商家的每种类型的接口请求做限流。例如:同一商家每秒仅允许20个签约请求。当每秒有20个以上的请求时,它将提示“对接口进行签名的客户端请求数超过了限制”。

然后,作为下游系统,我们需要控制并发以防止无效请求。最常用的并发电流限制方案是使用redis / jedis。为了确保原子性,在这里,我使用Redis + LUA脚本进行控制。然后,对于服务提供商,当请求数量超过设置的限流阈值时,将直接返回错误代码/错误提示,并终止请求的处理。对于调用者,我们要做的是:当并发请求超过限制的阈值时,请延迟请求,而不是直接丢弃它。

如下RedisLimiter类,服务提供方使用limit方法实现限流,服务调用方使用limitWait方法实现限流等待(如需)

package jstudy.redislimit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * Redis+Lua实现高并发限流
 */
@Slf4j
@Component
public class RedisLimiter {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 达到限流时,则等待,直到新的间隔。
     *
     * @param key
     * @param limitCount
     * @param limitSecond
     */
    public void limitWait(String key, int limitCount, int limitSecond) {
        boolean ok;//放行标志
        do {
            ok = limit(key, limitCount, limitSecond);
            log.info("放行标志={}", ok);
            if (!ok) {
                Long ttl = redisTemplate.getExpire(key, TimeUnit.MILLISECONDS);
                if (null != ttl && ttl > 0) {
                    try {
                        Thread.sleep(ttl);
                        log.info("sleeped:{}", ttl);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } while (!ok);
    }

    /**
     * 限流方法    true-放行;false-限流
     *
     * @param key
     * @param limitCount
     * @param limitSecond
     * @return
     */
    public boolean limit(String key, int limitCount, int limitSecond) {
        List<String> keys = Collections.singletonList(key);
        String luaScript = buildLuaScript();
        RedisScript<Number> redisScript = new DefaultRedisScript<>(luaScript, Number.class);
        Number count = redisTemplate.execute(redisScript, keys, limitCount, limitSecond);
        log.info("Access try count is {} for key = {}", count, key);
        if (count != null && count.intValue() <= limitCount) {
            return true;//放行
        } else {
            return false;//限流
//            throw new RuntimeException("You have been dragged into the blacklist");
        }
    }

    /**
     * 编写 redis Lua 限流脚本
     */
    public String buildLuaScript() {
        StringBuilder lua = new StringBuilder();
        lua.append("local c");
        lua.append("\nc = redis.call('get',KEYS[1])");
        // 调用不超过最大值,则直接返回
        lua.append("\nif c and tonumber(c) > tonumber(ARGV[1]) then");
        lua.append("\nreturn c;");
        lua.append("\nend");
        // 执行计算器自加
        lua.append("\nc = redis.call('incr',KEYS[1])");
        lua.append("\nif tonumber(c) == 1 then");
        // 从第一次调用开始限流,设置对应键值的过期
        lua.append("\nredis.call('expire',KEYS[1],ARGV[2])");
        lua.append("\nend");
        lua.append("\nreturn c;");
        return lua.toString();
    }

}

springboot自动注入的RedisTemplate是RedisTemplate<Object,Object>泛型, 上面class使用RedisTemplate<String, Object>,bean定义如下:

package jstudy.redislimit;

import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@EnableCaching // 开启缓存支持
public class RedisConfig extends CachingConfigurerSupport {

    /**
     * RedisTemplate配置
     *
     * @param lettuceConnectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        // 设置序列化
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, Visibility.ANY);
        om.enableDefaultTyping(DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);// key序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化
        redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}

并发测试通过,如下是testcase:

package jstudy.redislimit;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedisLimiterTest {
    @Autowired
    private RedisLimiter redisLimiter;

    @Test
    public void testLimitWait() throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        log.info("--------{}", redisTemplate.opsForValue().get("abc"));
        for (int j = 1; j <= 5; j++) {
            int i=j;
            pool.execute(() -> {
                Thread.currentThread().setName( Thread.currentThread().getName().replace("-","_"));
                redisLimiter.limitWait("abc", 3, 1);
                log.info(i + ":" + true + " ttl:" + redisTemplate.getExpire("abc", TimeUnit.MILLISECONDS));
                    try {
                        // 线程等待,模拟执行业务逻辑
                        Thread.sleep(new Random().nextInt(100));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            });
        }
        pool.shutdown();
        pool.awaitTermination(2,TimeUnit.SECONDS);
    }
}


http://www.kler.cn/a/405155.html

相关文章:

  • java 并发编程 (1)java中如何实现并发编程
  • 【赵渝强老师】MySQL的慢查询日志
  • 游戏陪玩系统开发功能需求分析
  • 【代码随想录day36】【C++复健】1049. 最后一块石头的重量 II ; 494. 目标和 ;474.一和零
  • 解决Docker环境变量的配置的通用方法
  • 【已解决】“EndNote could not connect to the online sync service”问题的解决
  • 基于Java Springboot北京医疗企业固定资产管理系统
  • HTML5和CSS3新增特性
  • cocos creator 3.8 Node学习 3
  • 【spring】spring单例模式与锁对象作用域的分析
  • 【IOS】Undefined symbol: _OBJC_CLASS_$_PAGFile
  • Java通过calcite实时读取kafka中的数据
  • 学习threejs,通过SkinnedMesh来创建骨骼和蒙皮动画
  • WSL2 ubuntu配置redis
  • Simulink学习笔记【PID UG联动仿真】
  • 算法.图论-习题全集(Updating)
  • 【Android、IOS、Flutter、鸿蒙、ReactNative 】自定义View
  • 力扣 LeetCode 513. 找树左下角的值(Day8:二叉树)
  • [服务器] 腾讯云服务器免费体验,成功部署网站
  • PBDL (基于物理的深度学习)-Chapter 1
  • 深度学习day2-Tensor 2
  • 【Git】git从暂存区中移除文件
  • 山泽HDMI切换器:提升家庭娱乐与办公体验的利器
  • 支持向量机SVM——基于分类问题的监督学习算法
  • HBase 原理
  • 蓝桥杯每日真题 - 第19天