当前位置: 首页 > article >正文

【Kafka 实战】Kafka 如何保证消息的顺序性?

👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO 专家博主

⛪️ 个人社区:个人社区
💞 个人主页:个人主页
🙉 专栏地址: ✅ Java 中级
🙉八股文专题:剑指大厂,手撕 Java 八股文

在这里插入图片描述

文章目录

      • 1. Kafka 如何保证消息的顺序性?
        • 1.1 分区(Partition)
        • 1.2 生产者
        • 1.3 消费者
        • 1.4 配置参数

1. Kafka 如何保证消息的顺序性?

Apache Kafka 是一个高吞吐量的分布式消息系统,广泛用于构建实时数据流处理平台。Kafka 在设计上考虑了消息的顺序性,通过多种机制确保消息在特定条件下按顺序处理。以下是 Kafka 保证消息顺序性的主要机制:

1.1 分区(Partition)

Kafka 将主题(Topic)划分为多个分区(Partition),每个分区是一个有序的、不可变的消息序列。分区是 Kafka 中消息顺序性的基本单位。

  • 单个分区:在一个分区内部,消息是严格有序的。生产者发送的消息会按照发送顺序追加到分区的末尾,消费者从分区中读取消息时也是按顺序读取的。
  • 多个分区:如果一个主题有多个分区,那么消息的全局顺序性无法保证。但是,可以确保每个分区内部的消息是有序的。
1.2 生产者

Kafka 生产者(Producer)通过以下方式确保消息的顺序性:

  • 分区键(Partition Key):生产者可以为每条消息指定一个分区键。Kafka 会根据分区键将消息路由到特定的分区。如果多条消息具有相同的分区键,它们会被路由到同一个分区,从而保证这些消息在该分区内的顺序性。
  • 幂等生产者:Kafka 2.0 引入了幂等生产者(Idempotent Producer),确保每条消息在分区中最多只出现一次,避免重复消息的问题。
  • 事务性生产者:Kafka 2.0 还引入了事务性生产者(Transactional Producer),允许生产者在事务中发送多条消息,确保这些消息要么全部成功写入,要么全部失败。
1.3 消费者

Kafka 消费者(Consumer)通过以下方式确保消息的顺序性:

  • 单个分区:如果一个消费者组中的消费者只消费一个分区的消息,那么消息的顺序性是可以保证的。
  • 多个分区:如果一个消费者组中的消费者消费多个分区的消息,那么全局的顺序性无法保证。但是,每个分区内部的消息仍然是有序的。
1.4 配置参数

Kafka 提供了一些配置参数,可以帮助确保消息的顺序性:

  • max.in.flight.requests.per.connection:控制生产者在收到确认之前可以发送的最大请求数。设置为 1 可以确保消息的顺序性,但会降低吞吐量。
  • enable.idempotence:启用幂等生产者,确保每条消息在分区中最多只出现一次。
  • transactional.id:启用事务性生产者,确保多条消息的原子性。

以下是一个简单的 Java 示例,展示了如何使用分区键和幂等生产者来确保消息的顺序性。

生产者配置

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class OrderlyProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("max.in.flight.requests.per.connection", 1); // 确保消息顺序性
        props.put("enable.idempotence", true); // 启用幂等生产者

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            String key = "key-" + (i % 3); // 使用分区键
            String value = "message-" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", key, value);
            producer.send(record);
        }

        producer.close();
    }
}

消费者配置

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class OrderlyConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

Kafka 通过分区、生产者配置和消费者配置等多种机制确保消息的顺序性。通过合理使用分区键、幂等生产者和事务性生产者,可以确保在特定条件下消息的顺序性。

精彩专栏推荐订阅:在下方专栏👇🏻
✅ 2023年华为OD机试真题(A卷&B卷)+ 面试指导
✅ 精选100套 Java 项目案例
✅ 面试需要避开的坑(活动)
✅ 你找不到的核心代码
✅ 带你手撕 Spring
✅ Java 初阶

在这里插入图片描述


http://www.kler.cn/a/405322.html

相关文章:

  • 深入理解 Spring Boot 的 CommandLineRunner 原理及使用
  • 【ArcGIS微课1000例】0132:从多个GIS视角认识与攀登珠穆朗玛峰
  • C++桥接模式在使用时需要注意什么
  • vue中路由缓存
  • 链表续-8种链表(数据结构)
  • FIFO和LRU算法实现操作系统中主存管理
  • C/C++语言基础--C++检测内存泄露方法、RALL思想模型
  • RTPS通信使用的socket和端口
  • 从零开始:如何使用第三方视频美颜SDK开发实时直播美颜平台
  • 在 Swift 中实现字符串分割问题:以字典中的单词构造句子
  • 摸一下elasticsearch8的AI能力:语义搜索/vector向量搜索案例
  • GPU服务器厂家:为什么要选择 GPU 服务器?
  • 包装器与绑定器
  • 06、Spring AOP
  • Bug Fix 20241122:缺少lib文件错误
  • 低速接口项目之串口Uart开发(四)——UART串口实现FPGA内部AXILITE寄存器的读写控制
  • 历遍单片机下的IIC设备[ESP--0]
  • 浅谈新能源光储充一体化电站设计方案
  • PyTorch图像预处理:计算均值和方差以实现标准化
  • 网安基础知识|IDS入侵检测系统|IPS入侵防御系统|堡垒机|VPN|EDR|CC防御|云安全-VDC/VPC|安全服务
  • RocketMQ文件刷盘机制深度解析与Java模拟实现
  • Leecode刷题C语言之统计不是特殊数字的数字数量
  • xbh的比赛
  • Qt 的事件投递机制:从基础到实战
  • 动态调试对安全研究有什么帮助?
  • 设计模式之 模板方法模式