当前位置: 首页 > article >正文

接口调用限频(代理模式+滑动窗口)

目录

代码示例

接口

代理

接口实现

限流工厂

限流处理器接口

直接交换处理器

限流处理器

限流配置

滑动窗口限流


 

通过代理模式+滑动窗口,限流请求第三方平台,避免出现第三方平台抛出限流异常,影响正常业务流程,从出口出发进行限流请求。

代码示例

接口

/**
 * 第三方请求
 */
public interface ThirdApi {

    /**
     * 发送消息
     *
     * @param userId 用户id
     * @param message 消息
     * @return 发送是否成功
     */
    boolean sendMessage(String userId, String message);
}

代理

/**
 * 第三方请求代理
 */
@Component
public class ProxyThirdApi implements ThirdApi {

    @Resource
    private ThirdApiServiceClient thirdApiServiceClient;
    @Resource
    private LimitProcessorFactory limitProcessorFactory;
    @Resource
    private YmlConstant ymlConstant;

    private ThirdApi thirdApi;

    @PostConstruct
    public void initThirdApi() {
        thirdApi = new ThirdApiImpl(thirdApiServiceClient, ymlConstant);
    }

    @Override
    @SneakyThrows
    public boolean sendMessage(String userId, String message) {
        // 限流
        String bizLimit = "MSG_SEND_LIMIT";
        Object result = limitProcessorFactory.getProcessor(bizLimit).process(
                () -> thirdApi.sendMessage(userId, message)
        );
        if (result instanceof Boolean) {
            return (Boolean) result;
        } else {
            return false;
        }
    }
}

接口实现

/**
 * 第三方请求实现
 *
 */
@Slf4j
@AllArgsConstructor
public class ThirdApiImpl implements ThirdApi {

    private final ThirdApiServiceClient thirdApiServiceClient;
    private final YmlConstant ymlConstant;

    @Override
    public boolean sendMessage(String userId, String message) {
        MessageReq messageReq = new MessageReq();
        messageReq.setContent(message);
        messageReq.setReceiveId(userId);

        log.info("[ThirdApiImpl][sendMessage] {}", JSON.toJSONString(messageReq));
        HttpResponse<SendMessagesResp> sendResp = thirdApiServiceClient.sendMessage(messageReq);
        if (sendResp.isOk()) {
            return true;
        } else {
            log.error("[ThirdApiImpl][sendMessage] 消息发送失败,返回信息:{}", JSON.toJSONString(sendResp));
            return false;
        }
    }
}

限流工厂

/**
 * 限流工厂
 *
 */
@Component
public class LimitProcessorFactory {

    @Resource
    private LimitProperties properties;

    @Getter
    private Map<String, LimitProperties.LimitData> propertiesMap;

    private final Map<String, LimiterProcessor> processorMap = new ConcurrentHashMap<>(10);


    @PostConstruct
    public void initPropertiesMap() {
        List<LimitProperties.LimitData> props = properties.getProps();
        if (CollectionUtils.isEmpty(props)) {
            propertiesMap = Collections.emptyMap();
        } else {
            propertiesMap = props.stream().collect(
                    Collectors.toMap(LimitProperties.LimitData::getName, Function.identity())
            );
        }
    }

    /**
     * 获取限流处理器
     *
     * @param name 业务名称
     * @return 限流处理器
     */
    public LimiterProcessor getProcessor(String name) {
        LimitProperties.LimitData props = propertiesMap.get(name);
        if (Objects.isNull(props)) {
            throw new BusinessException(String.format("无法找到[%s]的处理器配置", name));
        }

        if (props.getEnabled()) {
            return processorMap.computeIfAbsent(props.getName(), name -> {
                TimeUnit timeUnit = props.getTimeUnit();

                // 使用窗口滑动算法进行限流
                RateLimiter limiter = new SlidingWindowRateLimiter(props.getInterval(), props.getLimit(), timeUnit);
                return new LimiterProcessor(name, timeUnit.toMillis(props.getWaitTime()), limiter);
            });
        } else {
            return new SynchronousProcessor();
        }
    }
}

限流处理器接口

/**
 * 限流处理器接口
 */
public interface LimiterProcessor {

    /**
     * 限流
     *
     * @param callback 回调
     * @return 执行结果
     * @throws Throwable Throwable
     */
    Object process(LimiterCallback callback) throws Throwable;
}

直接交换处理器

/**
 * 直接交换处理器
 *
 * @author zhimajiang
 */
@Slf4j
public class SynchronousProcessor implements LimiterProcessor {

    @Override
    public Object process(LimiterCallback callback) throws Throwable {
        return callback.process();
    }
}

限流处理器

/**
 * 限流处理器
 *
 */
@Slf4j
@AllArgsConstructor
public class Processor implements LimiterProcessor {

    private final String name;
    private final long waitTime;
    private final RateLimiter rateLimiter;

    @Override
    public Object process(LimiterCallback callback) throws Throwable {
        while (true) {
            if (rateLimiter.tryAcquire()) {
                // 未被限流,则尝试唤醒其他被限流的任务
                Object proceed = callback.process();
                synchronized (this) {
                    this.notifyAll();
                }
                return proceed;
            } else {
                // 已被限流则进入阻塞
                log.info("LimiterProcessor][process] {}-限流", name);
                synchronized (this) {
                    try {
                        this.wait(waitTime);
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        }
    }
}

限流配置

/**
 * 限流配置
 *
 */
@Data
@Configuration
@ConfigurationProperties("limit")
public class LimitProperties {

    /**
     * 限流配置
     */
    private List<LimitProperties.LimitData> props;

    @Data
    public static class LimitData {

        /**
         * 名称
         */
        private String name;

        /**
         * 是否启用
         */
        private Boolean enabled = false;

        /**
         * 时间间隔
         */
        private int interval;

        /**
         * 限制阈值
         */
        private int limit;

        /**
         * 阻塞等待时间
         */
        private int waitTime = 1000;

        /**
         * 时间单位
         */
        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
    }
}

滑动窗口限流

/**
 * 滑动窗口限流
 *
 */
public class SlidingWindowRateLimiter implements RateLimiter {

    /**
     * 子窗口数量
     */
    private final int slotNum;

    /**
     * 子窗口大小
     */
    private final long slotSize;

    /**
     * 限流阈值
     */
    private final int limit;

    /**
     * 上一次的窗口结束时间
     */
    private long lastTime;

    /**
     * 子窗口流量计数
     */
    private final AtomicInteger[] counters;

    /**
     * 滑动窗口限流
     *
     * @param windowSize 时间窗口大小
     * @param slotNum    子窗口数量
     * @param limit      限流阈值
     * @param timeUnit   时间单位
     */
    public SlidingWindowRateLimiter(int windowSize, int slotNum, int limit, TimeUnit timeUnit) {
        long windowSizeMills = timeUnit.toMillis(windowSize);
        this.slotNum = slotNum;
        this.slotSize = windowSizeMills / slotNum;
        this.limit = limit;
        this.lastTime = System.currentTimeMillis();
        this.counters = new AtomicInteger[slotNum];
        resetCounters();
    }

    /**
     * 滑动窗口限流
     *
     * @param windowSize 时间窗口大小
     * @param limit      限流阈值
     * @param timeUnit   时间单位
     */
    public SlidingWindowRateLimiter(int windowSize, int limit, TimeUnit timeUnit) {
        this(windowSize, 5, limit, timeUnit);
    }

    /**
     * 滑动窗口限流
     *
     * @param windowSize 时间窗口大小(毫秒)
     * @param limit      限流阈值
     */
    public SlidingWindowRateLimiter(int windowSize, int limit) {
        this(windowSize, 5, limit, TimeUnit.MILLISECONDS);
    }

    /**
     * 重置子窗口流量计数
     */
    private void resetCounters() {
        for (int i = 0; i < this.slotNum; i++) {
            this.counters[i] = new AtomicInteger(0);
        }
    }

    /**
     * 限流请求
     *
     * @return true-允许执行 false-触发限流
     */
    @Override
    public synchronized boolean tryAcquire() {
        long currentTime = System.currentTimeMillis();
        // 小窗口移动格数
        int slideNum = (int) Math.floor((double) (currentTime - this.lastTime) / this.slotSize);
        slideWindow(slideNum);

        // 窗口时间内的请求总数
        int sum = Arrays.stream(this.counters).mapToInt(AtomicInteger::get).sum();
        this.lastTime = this.lastTime + slideNum * slotSize;

        if (sum >= limit) {
            return false;
        } else {
            this.counters[this.slotNum - 1].incrementAndGet();
            return true;
        }
    }

    /**
     * 将计数器内全部元素向左移动num个位置
     *
     * @param num 移动位置个数
     */
    private void slideWindow(int num) {
        if (num == 0) {
            return;
        }
        if (num >= this.slotNum) {
            // 如果移动步数大于子窗口个数,则计数全部清零
            resetCounters();
            return;
        }

        // 对于a[0]~a[num-1]来说,移动元素则代表删除元素,所以直接从a[num]开始移动
        for (int index = num; index < this.slotNum; index++) {
            // 移动元素
            int newIndex = index - num;
            this.counters[newIndex] = this.counters[index];
            this.counters[index].getAndSet(0);
        }
    }
}

 

 


http://www.kler.cn/a/453134.html

相关文章:

  • 如何通过 360 驱动大师检查自己电脑上的显卡信息
  • 使用Excel制作通达信自定义“序列数据“
  • 层序遍历练习
  • 使用“NodeMCU”、“红外模块”实现空调控制
  • vLLM (2) - 架构总览
  • 【接口自动化连载】使用yaml配置文件自动生成接口case
  • Electron【详解】菜单 Menu
  • tokenizer、tokenizer.encode、tokenizer.encode_plus比较
  • 打造两轮差速机器人fishbot:从零开始构建移动机器人
  • 前端开发 -- 自动回复机器人【附完整源码】
  • 如何检查交叉编译器gcc工具链里是否有某个库(以zlib库和libpng库为例)
  • 修炼之道 ---其四
  • 3.系统学习-熵与决策树
  • 福特汽车物流仓储系统WMS:开源了,可直接下载
  • CentOS下安装RabbitMQ
  • HNUST-数据分析技术课堂实验
  • 软件渗透测试如何做?渗透测试作用有哪些?
  • flask后端开发(4):模板访问对象属性和过滤器的使用
  • 短视频运营行业该如何选择服务器?
  • 使用FFmpeg进行拉流和推流操作
  • 运行Zr.Admin项目(后端)
  • 使用React Strict DOM改善React生态系统
  • 使用openvino加速部署paddleocr文本方向分类模型(C++版)
  • 质数分解,用sqrt缩小范围
  • Ps:在 Photoshop 中编辑视频
  • 微信小程序中遇到过的问题