当前位置: 首页 > article >正文

高性能采集服务上线回顾

最近基于lua脚本做了一套数据采集服务,日采集数据量 :1.1亿条/天,CPU和内存都可以做到极低的水平。当然上线以来也经历了一波全链路的崩溃到修复过程。其中涉及到nginx优化、lua脚本写入kafka性能优化、kafka性能优化;

1、数据压缩策略

     进行对应的数据压缩是大数据量传输的前提条件,业界用得比较多的压缩算法有gzip、lz4、snappy、zstd。已经实现了snappy、gzip,后续计划扩展zstd和lz4;假如考虑速度更快可以考虑snappy、lz4;考虑压缩比更高可以考虑zstd;(ps:微信桌面端通信采用snappy进行数据压缩)

2、nginx网关层

  worker_processes  auto;        #  根据CPU自动选择worker进程数据

   client_body_buffer_size 64k; #  增大可以防止生成大量小文件,全在内存进行处理,提升效率

3、lua采集脚本

      lua-resty-kafka配置优化(生产者优化)

 kafka_producer = producer:new(broker_list, {
            producer_type = "async",  -- 异步生产者类型
            required_acks = 1,
            request_timeout = 20000,
            max_retry = 15,
            retry_backoff = 1000,
            max_buffering = 100000,     -- 生产者缓冲消息队列最大值,默认5w条(queue.buffering.max.messages)
            --batch_size = 400,
            batch_size = 2097152,       -- 生产者缓冲区大小,每次发送的数据大于这个值,有可能发生堆积(send.buffer.bytes)应小于kafka的socket.request.max.bytes / 2 - 10k
            batch_num = 600,            -- 生产者每次批量发送的数量(batch.num.messages)
            linger_ms = 800,            -- 
            compression_codec = "gzip"  -- 当前插件是不支持压缩,可惜了
        })

     (1)生产者内存队列缓存区配置

     (2)每个批次发送大小

     (2)网络缓冲区,批次大小不能超过网络缓冲区,网络缓冲区最大依赖于操作系统的tcp和kafka broker配置。

      (3)发送前的数据压缩

      (4)多分区topic提升写入并行度

      (5)异步写入

      (6)ack=1,单副本成功即可

异步批量写盘优化

local function async_flush_logs()
    if #log_buffer == 0 then return end

    ngx.thread.spawn(function()
        local retries = 3  -- 最大重试次数
        local delay = 0.1  -- 每次重试的延迟时间(秒)

        while retries > 0 do
            -- 确保日志文件已初始化
            init_log_file()

            -- 检查日志文件是否已成功打开
            if current_log_file then
                local ok, err = pcall(function()
                    for _, log_entry in ipairs(log_buffer) do
                        current_log_file:write(log_entry .. "\n")
                    end
                    current_log_file:flush()  -- 确保数据立即写入磁盘
                end)

                if not ok then
                    ngx.log(ngx.ERR, "Failed to write log: ", err)
                else
                    -- 写入成功,清空缓冲区
                    log_buffer = {}
                    break  -- 退出重试循环
                end
            else
                ngx.log(ngx.WARN, "Log file is not initialized, retrying...")
                retries = retries - 1
                ngx.sleep(delay)  -- 等待一段时间后重试
            end
        end

        if retries == 0 then
            ngx.log(ngx.ERR, "Failed to write log after multiple retries")
        end
    end)
end

4、kafka端

     Kafka Broker 的 socket.request.max.bytes=204857600(200M),配合写入端进行性能优化

     堆内存配置大小 

                中型正式环境  4~8G

                大型生产环境 8~16G

5、安全性

    采用非对称加密+对称加密的混合方式进行数据加密传输;完全的非对称加密在数据量稍大的情况下会消耗大量的cpu和压缩解压时间;

   对于上报的数据压缩提需要校验上传文件的最大值,以及解压后数据大小,防止非法请求,循环解压浪费大量cpu和存储资源


http://www.kler.cn/a/570031.html

相关文章:

  • React 各模块相关全面面试及答案解析
  • 毓恬冠佳即将登陆资本市场,深耕汽车天窗领域,引领行业创新发展
  • [MySQL初阶]MySQL(1)MySQL的理解、库的操作、表的操作
  • MySQL深分页如何优化?
  • IDEA 2025最新版2024.3.3软件安装、插件安装、语言设置
  • c语言中return 数字代表的含义
  • 探秘基带算法:从原理到5G时代的通信变革【一】引言
  • 第三百七十二节 JavaFX教程 - JavaFX HTMLEditor
  • 【HDLbits--FSM续(二)】
  • 搭建iOS逆向开发环境 (下) - 越狱设备与高级工具配置
  • 蓝桥杯4T平台(串口控制LD状态)
  • 【Java项目】基于vue的地方美食分享系统
  • Spring配置文件
  • iPhone 镜像 连接错误
  • 2024 ChatGPT大模型技术场景与商业应用视频精讲合集(45课).zip
  • 51单片机课综合项目
  • 京准电钟:NTP校时服务器于安防监控系统应用方案
  • K8S学习之基础五:k8s中node节点亲和性
  • 创建者——建造者模式
  • 直流减速电机控制实验:实验介绍