当前位置: 首页 > article >正文

【kafka-04】kafka线上问题以及高效原理

Kafka系列整体栏目


内容链接地址
【一】afka安装和基本核心概念https://zhenghuisheng.blog.csdn.net/article/details/142213307
【二】kafka集群搭建https://zhenghuisheng.blog.csdn.net/article/details/142253288
【三】springboot整合kafka以及核心参数详解https://zhenghuisheng.blog.csdn.net/article/details/142346016
【四】kafka线上问题以及高效原理https://zhenghuisheng.blog.csdn.net/article/details/142371986

kafka线上问题以及高效原理

  • 一,kafka线上问题以及高效原理
    • 1,如何保证kafka消息不丢失问题
      • 1.1,生产者方面
      • 1.2,broker方面
      • 1.3,消费者方面
    • 2,如何保证不重复消费
    • 3,如何解决消息积压问题
    • 4,kafka如何设计一个延时队列
    • 5,kafka事务使用
    • 6,kafka高性能原因
      • 6.1,零拷贝
      • 6.2,顺序写
      • 6.3,批量操作

一,kafka线上问题以及高效原理

在前面几篇中,讲解了kakfa的安装部署,以及springboot如何整合kafka的使用,然而在实际的开发中,往往会遇到很多生产中的问题中需要解决,往往需要提前配置好参数,以及根据不同场景解决不同的业务

在这里插入图片描述

1,如何保证kafka消息不丢失问题

虽然说kafka是更加趋向于大数据方面,用于数据分发和流失计算等,允许丢失部分数据,但是在某些场合中,也是需要保证消息是不能丢失的,如金融行业,订单数据等。为了保证数据不丢失,不管是任何一种消息队列,都需要考虑三个方面,生产者方面、broker方面和消费者三个方面进行考虑。

1.1,生产者方面

在生产者方面,当消息往broker中投递时,需要通过设置ack机制来保证消息确认机制,通过设置不同的参数来决定消息是否会丢失。为了保证消息不丢失,可以在配置文件中奖ack的值设置成-1

  • ack=0时, 性能最高,消息直接异步给完broker就行,不需要broker任何答复,缺点就是容易丢消息
  • ack=1时, 性能其次,需要leader结点将数据成功写入到本地日志,但是不需要等待集群中的follower写入,如果出现leader挂掉,但是follower未及时同步,那么在follower变成leader之后,就会丢失这部分消息
  • ack=-1时,性能最低,但是安全性最高, 生产者端需要等待broker集群中的leader和副本都成功写入日志

1.2,broker方面

在架构层面,可以部署一套多个结点的broker集群模式,最好是每台机器对应一个broker,然后每个broker配置一个副本,从而保证broker的高可用性,即使某个节点甚至是leader主节点挂了,也能通过选举出其他结点进行故障转移,从而保证消息不丢失

1.3,消费者方面

在kafka中,消费者默认会使用自动提交的方式提交消费的偏移量到broker中,比如5s提交一次,那么如果消费者消费完了某段偏移量的消息之后,在提交到topic中时结点突然挂了,导致topic那边没有成功的记录该消费者的偏移量,导致服务重启之后,新的消费者重新消费这5s的消息,也可能造成消息丢失。

因此最好设置成自动提交偏移量到broker中,需要关闭默认的自动提交,然后设置这个 ack-mode的value值为manual , 指定手动提交确认模式,使用 Acknowledgment 对象来手动确认消费。

enable-auto-commit: false
ack-mode: manual

2,如何保证不重复消费

消息重复消费,主要分为两个方面:一个是生产者重复投递,一个是消费者重复消费同一条

生产者因为网络抖动的原因,导致broker没有及时的给生产者反馈,导致生产者触发了重试策略,又往broker中投递了同一条数据,这就导致了重复投递,因此可以设置以下参数保证生产者的幂等性

enable.idempotence=true

消费者这边,首先还是这个自动提交的问题,需要改成手动提交偏移量到topic,需要将这个自动提交

enable-auto-commit: false

另一个就是可以通过幂等性来保证消息不重复消费,比如给每条消息设置一个唯一id,将消息的id加入的缓存中,或者加入到数据库设置唯一索引等等方式

3,如何解决消息积压问题

在解决积压问题之前,首先需要明白kafka内部消费的几个原理,首先是关于消费者组的问题,一个partition分区中数据只能被消费者组中的一个消费者消费,也就是说最好的情况就是如有10个分区,那么最好就有10个消费者,如果消费者组中增加消费者也没用,因为所有已有的分区都已有消费者进行消费。因此假设说如果消费者小于partition分区的个数,那么就可能出现一个消费者得消费两个分区或者以上,因此这种情况就可以增加消费者的个数来解决积压问题。

第二个问题就是业务场景是否支持rebanlance机制,举个例子,假设已有10个分区,10个消费者,但是突然消费者被移除了一个,那么kakfa就会触发这种机制,就是说此时有一个分区没有消费者消费了,那么kafka就会将这个分区通过这种rebanlance机制分配给其他消费者消费,那么此时就是存在一个消费者得消费两个分区的消息,那么就会降低整个系统的吞吐量

如果不支持这种rebanlance机制,就是不影响其他的消费者消费以及不影响整体系统的吞吐量。比如这个order的topic主题,已有4个分区和4个消费者,此时都在正常消费,c1对应partition1,依次对应,假如此时c3这个消费者突然消费不过来,然后消息一直是挤压在partition中,此时加消费者也没用,但是又不能影响其他消费者的支持消费,那么需要如何设计和解决这种积压问题

在这里插入图片描述

其解决方案如下,就是新加一个order2的主题,然后c3这个消费者不做具体的信息消费,而是只作一个转发,将数据转发到新的order2的这个主题中,将原先一个partition和消费者拆分成多个partition和多个消费者。从而在不进行rebanlance的情况下,既不影响整体的吞吐量,也解决了这个消息积压的问题。

在这里插入图片描述

除此之外,消费者这边也可以采用多线程,优化硬件,如cpu、内存和磁盘等。

4,kafka如何设计一个延时队列

在kafka中,内部并没有设计这个延时队列,因此如果想通过kakfa实现一个延时队列,那么需要内部自定义设计。比如存在一个支付场景,假设某个订单30分钟没有支付,那么需要主动的关闭这个订单,需要如何设计和实现。

针对上面这个场景,首先需要知道kafka的存储原理,在kakfa中,内部采用的是顺序写的原理,也就是说partition前面写入的内容的时间一定是比partition后面写入的内容的时间是更早的,明白这个原理之后,那么多就可以采用定时任务的形式来进行消费

如创建一个order_30的topic主题,专门存储用户下单后未立马支付的订单。假设在第一个分区中,已经有1000条这种信息,那么就可以开启一个定时任务,每隔30s检测一次,从头开始轮询,如第一条数据到期,那么就对这条消息进行消费,并且修改partition的偏移量,依次下去。

@Service
public class RetryTask {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Scheduled(fixedDelay = 5000)
    public void retryMessages() {
        xxx
    }
}

5,kafka事务使用

在kakfa中,内部也存在事务概念,但是和mysql不同,kafka的事务只支持批量消息整体,也就是说批量消息要么同时成功,要么同时失败。 Kafka事务主要用于保证消息的一致性和避免消息丢失或重复消费。

核心概念

  • 事务:一组生产操作的集合,要么全部成功,要么全部失败。
  • 事务ID:每个事务都有一个唯一的事务ID,Kafka用来标识和跟踪事务。
  • 事务日志:Kafka维护事务日志以跟踪事务的状态和相关的偏移量。
  • 原子性:事务中的消息要么全部提交,要么全部回滚,保证数据的一致性。

在配置文件中,需要配置以下参数

spring:
  kafka:
    producer:
      acks: all
      enable-idempotence: true
      transaction-id-prefix: my-transactional-id

事务的实现方式如下,也是需要在方法上面加上 Transactional 注解

@Transactional
public void sendTransactionalMessages(String topic, String message) {
    kafkaTemplate.executeInTransaction(kafkaTemplate -> {
        kafkaTemplate.send(topic, message);
        // 可以发送多条消息
        return true; // 返回true表示事务成功提交
    });
}

Kafka的事务机制适用于需要保证消息处理一致性的场景,主要适用于金融交易系统和订单处理系统

6,kafka高性能原因

6.1,零拷贝

首先内部采用了零拷贝:https://blog.csdn.net/zhenghuishengq/article/details/140721001

kafka采用的是sendfile的零拷贝方式, 只需要两次上下文切换,两次DMA拷贝,0次或者1次的CPU拷贝 。解决了最初的4次上下文,4次DMA,2次CPU拷贝,详细参考上面那篇文章。

img

6.2,顺序写

内部采用了顺序写:https://zhenghuisheng.blog.csdn.net/article/details/129080088

通过顺序写和顺序读,减少 寻找磁道和盘面时间 ,从而提高整体的性能,详情参考上面这篇文章

6.3,批量操作

kafka内部采用的都是批量的操作,如生产者批量将数据写入缓冲区中,然后批量的缓冲区数据投递到broker中,消费者批量的上报偏移量等。都是通过大量的批量操作来实现数据的传输


http://www.kler.cn/news/316725.html

相关文章:

  • HarmonyOS鸿蒙开发实战(5.0)网格元素拖动交换案例实践
  • Go语言并发编程之sync包详解
  • 前后端分离,使用MOCK进行数据模拟开发,让前端攻城师独立于后端进行开发
  • 【Verilog学习日常】—牛客网刷题—Verilog快速入门—VL21
  • Kotlin高阶函数func
  • 计算机毕业设计 美妆神域网站的设计与实现 Java实战项目 附源码+文档+视频讲解
  • 一对一视频通话软件Call-Me
  • 某采招网爬虫数据采集逆向
  • 医学数据分析实训 项目四 回归分析--预测帕金森病病情的严重程度
  • I.MX6U裸机-C语言版LED灯实验
  • ld-linux-x86-64.so.2
  • git 操作远程别名
  • tcpdump使用方法
  • 24. Revit API: 几何对象(五)- (Sur)Face
  • [Linux]Vi和Vim编辑器
  • 修改Git配置信息:用户名
  • linux第三课(linux中安装nginx与redis及SpringBoot集成redis)
  • 颍川陈氏——平民崛起的典范
  • 【AcWing】基础算法
  • Django 数据库配置以及字段设置详解
  • 移情别恋c++ ദ്ദി˶ー̀֊ー́ ) ——14.AVL树
  • C++(学习)2024.9.20
  • 【Kubernetes】常见面试题汇总(二十五)
  • 基于安全风险预测的自动驾驶自适应巡航控制优化
  • 智能BI项目第一期
  • 【网络】高级IO——poll版本TCP服务器
  • 11年408考研真题解析-计算机网络
  • js使用:
  • 银行项目测试,基础业务知识,一文全掌握!
  • 【C语言】__attribute__((constructor)) 和 __attribute__((destructor))详细解析