当前位置: 首页 > article >正文

限流,流量整形算法

写在前面

源码 。
本文看下流量整形相关算法。

目前流量整形算法主要有三种,计数器,漏桶,令牌桶。分别看下咯!

1:计数器

1.1:描述

单位时间内只允许指定数量的请求,如果是时间区间内超过指定数量,则直接拒绝,如果时间区间结束,则重置计数器,开始下一个时间区间。
在这里插入图片描述

1.2:程序

package com.dahuyou.algrithm.triffic.shaper.counter;

import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

// 计速器 限速
public class CounterLimiter {
    // 起始时间
    private static long startTime = System.currentTimeMillis();
    // 时间区间的时间间隔 ms
    private static long interval = 1000;
    // 每interval时间内限制数量
    private static long maxCount = 2;
    //累加器
    private static AtomicLong accumulator = new AtomicLong();

    // 计数判断, 是否超出限制
    private static long tryAcquire(long taskId, int turn) {
        long nowTime = System.currentTimeMillis();
        //在时间区间之内
        if (nowTime < startTime + interval) {
            long count = accumulator.incrementAndGet();

            if (count <= maxCount) {
                System.out.println("taskId: " + taskId + " 正常执行!");
                return count;
            } else {
                // 返回-1说明时间区间内被限制了
//                return -count;
                System.out.println("时区内达到次数咯!");
                return -1;
            }
        } else {
            //在时间区间之外
            synchronized (CounterLimiter.class) {
                System.out.println("新时间区到了,taskId:" + taskId + ", turn {}.." + turn);
                // 再一次判断,防止重复初始化
                if (nowTime > startTime + interval) {
                    accumulator.set(0);
                    startTime = nowTime;
                }
            }
            return 0;
        }
    }
    final int threads = 1;

    //线程池,用于多线程模拟测试
//    private ExecutorService pool = Executors.newFixedThreadPool(10);
    private ExecutorService pool = Executors.newFixedThreadPool(threads);

    @Test
    public void testLimit() {
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
//        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        long taskId = Thread.currentThread().getId();
                        long index = tryAcquire(taskId, j);
//                        if (index <= 0) {
                        if (index == -1) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();

            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果

        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time);
    }

}

输出:
在这里插入图片描述

1.3:优缺点

  • 优点
简单
  • 缺点
无法处理流量分配不均匀的情况,可能导致大量的请求被拒绝

1.4:适用场景

流量比较平稳业务场景。比如我司的机器人外呼业务,因为是程序在跑,所以流量很稳定,一旦业务配置导致流量增高,则可以使用该算法进行限流。

但对于突发流量场景,可能会因为很短时间内的突发流量就导致计数器达到最大值,从而时间区间内的剩余时间所有请求全部丢弃,这也存在着被攻击的风险。
在这里插入图片描述

2:漏桶

2.1:描述

水(对应请求)从进水口进入到漏桶里,漏桶以一定的速度出水(请求放行),当水流入速度过大,桶内的总水量大于桶容量会直接溢出,请求被拒绝,如图所示:
在这里插入图片描述

2.2:程序

package com.dahuyou.algrithm.triffic.shaper.counter;

import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

// 漏桶 限流
public class LeakBucketLimiter {

    // 计算的起始时间
    private static long lastOutTime = System.currentTimeMillis();
    // 流出速率 每100毫秒漏2次
    private static int leakRate = 1;
//    private static int leakRate = 2000;

    // 桶的容量
    private static int capacity = 5;

    //剩余的水量
    private static AtomicInteger water = new AtomicInteger(0);

    //返回值说明:
    // false 没有被限制到
    // true 被限流
    public static synchronized boolean isLimit(long taskId, int turn) {
        // 如果是空桶,就当前时间作为漏出的时间
        if (water.get() == 0) {
            lastOutTime = System.currentTimeMillis();
            water.addAndGet(1);
            return false;
        }
        // 执行漏水
//        int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;
        int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 100)) * leakRate;
        // 计算剩余水量,当前的量减去漏出去的量就是剩余的量
        int waterLeft = water.get() - waterLeaked;
        // 要注意:剩余的量最小是0
        water.set(Math.max(0, waterLeft));
        // 重新更新leakTimeStamp
        lastOutTime = System.currentTimeMillis();
        // 尝试加水,并且水还未满 ,放行
        if ((water.get()) < capacity) {
            System.out.println("水未满,成功加水");
            water.addAndGet(1);
            return false;
        } else {
            System.out.println("水已满,水溢出");
            // 水满,拒绝加水, 限流
            return true;
        }

    }

    final int threads = 1;

    //线程池,用于多线程模拟测试(负责加水)
    private ExecutorService pool = Executors.newFixedThreadPool(threads);
    private ExecutorService outWaterPool = Executors.newFixedThreadPool(threads);

    @Test
    public void testLimit() {

//        new Thread(() -> {
//            for (int i = 0; i < 1000; i++) {
//                if (water.get() > 0) {
//                    System.out.println("出水了");
//                    water.decrementAndGet();
//                } else {
//                    System.out.println("无水可出了");
//                }
//                try {
//                    TimeUnit.MILLISECONDS.sleep(100);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//            }
//        }).start();
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
//        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 线程同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        long taskId = Thread.currentThread().getId();
                        boolean intercepted = isLimit(taskId, j);
                        if (intercepted) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();

            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果

        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time);
    }
}

运行:

水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
限制的次数为:0,通过的次数为:20
限制的比例为:0.0
运行的时长为:4.136

Process finished with exit code 0

此时因为水流出的速度快于流入的速度,所以,一直可以成功加水,可以修改leakRate=0,再运行:

水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
限制的次数为:15,通过的次数为:5
限制的比例为:0.75
运行的时长为:4.176

Process finished with exit code 0

就可以看到水满溢出的情况了。

2.3:优缺点

  • 优点
可应对突发流量,避免服务被冲垮,从而起到保护服务的作用
  • 缺点
因为出口速率固定,所以当服务能力提升时,无法自动匹配后端服务的能力提升

2.4:适用场景

3:令牌桶

3.1:描述

有一个固定容量的令牌桶,按照一定的速率(可以调节)向令牌桶中放入令牌,请求想要被执行,必须能够从令牌桶中获取到令牌,否则将会被抛弃,参考下图:
在这里插入图片描述

3.2:程序

package com.dahuyou.algrithm.triffic.shaper.counter;

//import lombok.extern.slf4j.Slf4j;

import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

// 令牌桶 限速
//@Slf4j
public class TokenBucketLimiter {
    // 上一次令牌发放时间
    public long lastTime = System.currentTimeMillis();
    // 桶的容量
    public int capacity = 2;
    // 令牌生成速度 /s,如果是调大令牌的生成速度,则服务能力也会得到提高(在服务扛得住的前提下)
    public int rate = 2;
    // 当前令牌数量
    public AtomicInteger tokens = new AtomicInteger(0);

    //返回值说明:
    // false 没有被限制到
    // true 被限流
    public synchronized boolean isLimited(long taskId, int applyCount) {
        long now = System.currentTimeMillis();
        //时间间隔,单位为 ms
        long gap = now - lastTime;

        //计算时间段内的令牌数
        int reverse_permits = (int) (gap * rate / 1000);
        int all_permits = tokens.get() + reverse_permits;
        // 当前令牌数(固有的令牌加上时间段内新产生的令牌就是当前真实的令牌数啦),
        // 因为令牌桶也有固定的数量所以要取下最小值
        tokens.set(Math.min(capacity, all_permits));
//        log.info("tokens {} capacity {} gap {} ", tokens, capacity, gap);
//        System.out.println("tokens " + tokens + " capacity " + capacity + " gap  " + gap);

        /**
         * 如果申请的数量大于可用令牌数,则拒绝,否则发放令牌,执行请求
         */
        if (tokens.get() < applyCount) {
            System.out.println("没有辣么多令牌啦!!!");
            // 若拿不到令牌,则拒绝
            // log.info("被限流了.." + taskId + ", applyCount: " + applyCount);
            return true;
        } else {
            System.out.println("令牌拿去撒!!!");
            // 还有令牌,领取令牌
            tokens.getAndAdd( - applyCount);
            lastTime = now;

            // log.info("剩余令牌.." + tokens);
            return false;
        }
    }

    //线程池,用于多线程模拟测试
    private ExecutorService pool = Executors.newFixedThreadPool(10);

    @Test
    public void testLimit() {

        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;


        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        long taskId = Thread.currentThread().getId();
                        boolean intercepted = isLimited(taskId, 1);
                        if (intercepted) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }

                        Thread.sleep(200);
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();

            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果

        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time);
    }
}

输出:
在这里插入图片描述
展示的是既有申请到令牌也有没有申请到令牌的场景,修改代码public int rate = 2000;给令牌发放一个非常大的速度,此时就会一直可以拿得到令牌:
在这里插入图片描述
修改程序public int rate = 0;直接不发放令牌,就可以看到令牌全部申请失败的场景:
在这里插入图片描述

3.3:优缺点

  • 优点
1:因为令牌桶容量有限制,所以可以应对突发流量
2:服务QPS增加或者降低时只需要对应调整令牌的发放速度即可适配
  • 缺点

3.4:适用场景

写在后面

参考文章列表

限流:计数器、漏桶、令牌桶 三大算法的原理与实战(史上最全) 。


http://www.kler.cn/a/303824.html

相关文章:

  • ue5玩家角色添加武器。切换武器位置,手上武器放到背上。演示一下人体插槽和武器的连接。仅仅演示,实际项目不是这么用的
  • Qt C++读写NFC标签NDEF网址URI
  • RK3568 Android 13 内置搜狗输入法小计
  • HBuilderX打包ios保姆式教程
  • 清理Mac硬盘超大占用:.Spotlight-V100
  • 初学者关于对机器学习的理解
  • 安全基础设施如何形成统一生态标准?OASA 硬件安全合作计划启动 | 2024 龙蜥大会
  • 【贪心算法】(二)贪心算法区间问题及进阶习题
  • 重学SpringBoot3-集成RocketMQ(二)
  • Python(TensorFlow和PyTorch)及C++注意力网络导图
  • Docker 安装 Nacos 教程
  • L3级智能网联汽车准入试点详细解析及所需材料
  • oracle 如何查死锁
  • Web大学生网页作业成品——动漫喜羊羊网页设计与实现(HTML+CSS)(4个页面)
  • 趣味SQL | 从围棋收官到秦楚大战的数据库SQL语言实现
  • Flutter自定义Icon的简易使用(两种)
  • 项目——负载均衡OJ
  • 【数据库】瀚高数据库Windows/Linux、下载安装、高级用法、数据迁移(保姆篇)
  • MongoDB设置系统服务启动教程
  • c/c++ *和后置++的优先级
  • YOLOv8+Deepsort+PyQt+GUI 语义分割+目标检测+姿态识别 三者合一(集成于一套系统)综合视觉分析系统
  • 数据库设计中的需求分析
  • TCPIP网络编程(尹圣雨)UDP 轮流收发消息(windows)
  • CSS 新特性查漏补缺,快来看看你用过几个?
  • C#使用TCP-S7协议读写西门子PLC(一)
  • SAM 2:分割图像和视频中的任何内容