如何用redis+lua来实现高并发限流,超时数据进行等待
限制每个商家的每种类型的接口请求做限流。例如:同一商家每秒仅允许20个签约请求。当每秒有20个以上的请求时,它将提示“对接口进行签名的客户端请求数超过了限制”。
然后,作为下游系统,我们需要控制并发以防止无效请求。最常用的并发电流限制方案是使用redis / jedis。为了确保原子性,在这里,我使用Redis + LUA脚本进行控制。然后,对于服务提供商,当请求数量超过设置的限流阈值时,将直接返回错误代码/错误提示,并终止请求的处理。对于调用者,我们要做的是:当并发请求超过限制的阈值时,请延迟请求,而不是直接丢弃它。
如下RedisLimiter类,服务提供方使用limit方法实现限流,服务调用方使用limitWait方法实现限流等待(如需)
package jstudy.redislimit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Redis+Lua实现高并发限流
*/
@Slf4j
@Component
public class RedisLimiter {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 达到限流时,则等待,直到新的间隔。
*
* @param key
* @param limitCount
* @param limitSecond
*/
public void limitWait(String key, int limitCount, int limitSecond) {
boolean ok;//放行标志
do {
ok = limit(key, limitCount, limitSecond);
log.info("放行标志={}", ok);
if (!ok) {
Long ttl = redisTemplate.getExpire(key, TimeUnit.MILLISECONDS);
if (null != ttl && ttl > 0) {
try {
Thread.sleep(ttl);
log.info("sleeped:{}", ttl);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} while (!ok);
}
/**
* 限流方法 true-放行;false-限流
*
* @param key
* @param limitCount
* @param limitSecond
* @return
*/
public boolean limit(String key, int limitCount, int limitSecond) {
List<String> keys = Collections.singletonList(key);
String luaScript = buildLuaScript();
RedisScript<Number> redisScript = new DefaultRedisScript<>(luaScript, Number.class);
Number count = redisTemplate.execute(redisScript, keys, limitCount, limitSecond);
log.info("Access try count is {} for key = {}", count, key);
if (count != null && count.intValue() <= limitCount) {
return true;//放行
} else {
return false;//限流
// throw new RuntimeException("You have been dragged into the blacklist");
}
}
/**
* 编写 redis Lua 限流脚本
*/
public String buildLuaScript() {
StringBuilder lua = new StringBuilder();
lua.append("local c");
lua.append("\nc = redis.call('get',KEYS[1])");
// 调用不超过最大值,则直接返回
lua.append("\nif c and tonumber(c) > tonumber(ARGV[1]) then");
lua.append("\nreturn c;");
lua.append("\nend");
// 执行计算器自加
lua.append("\nc = redis.call('incr',KEYS[1])");
lua.append("\nif tonumber(c) == 1 then");
// 从第一次调用开始限流,设置对应键值的过期
lua.append("\nredis.call('expire',KEYS[1],ARGV[2])");
lua.append("\nend");
lua.append("\nreturn c;");
return lua.toString();
}
}
springboot自动注入的RedisTemplate是RedisTemplate<Object,Object>泛型, 上面class使用RedisTemplate<String, Object>,bean定义如下:
package jstudy.redislimit;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@EnableCaching // 开启缓存支持
public class RedisConfig extends CachingConfigurerSupport {
/**
* RedisTemplate配置
*
* @param lettuceConnectionFactory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
// 设置序列化
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, Visibility.ANY);
om.enableDefaultTyping(DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置redisTemplate
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
RedisSerializer<?> stringSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringSerializer);// key序列化
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化
redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
并发测试通过,如下是testcase:
package jstudy.redislimit;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedisLimiterTest {
@Autowired
private RedisLimiter redisLimiter;
@Test
public void testLimitWait() throws InterruptedException {
ExecutorService pool = Executors.newCachedThreadPool();
log.info("--------{}", redisTemplate.opsForValue().get("abc"));
for (int j = 1; j <= 5; j++) {
int i=j;
pool.execute(() -> {
Thread.currentThread().setName( Thread.currentThread().getName().replace("-","_"));
redisLimiter.limitWait("abc", 3, 1);
log.info(i + ":" + true + " ttl:" + redisTemplate.getExpire("abc", TimeUnit.MILLISECONDS));
try {
// 线程等待,模拟执行业务逻辑
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
pool.shutdown();
pool.awaitTermination(2,TimeUnit.SECONDS);
}
}