当前位置: 首页 > article >正文

Kafa分区策略实现

引言

Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中,合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。

轮询策略(默认)

  • 轮询策略是 Kafka 默认的分区策略(当消息没有指定键时)。生产者会按照顺序依次将消息发送到各个分区中,确保每个分区都能均匀地接收到消息,从而实现负载均衡。简单高效,能使各个分区的消息量相对均衡,充分利用每个分区的存储和处理能力。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class RoundRobinProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    随机策略

  • 随机策略会随机地将消息分配到一个分区中。这种策略在某些情况下可以实现一定程度的负载均衡,但由于是随机分配,可能会导致分区之间的消息分布不够均匀。可以通过自定义分区器来实现随机策略。
  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    
    public class RandomPartitioner implements Partitioner {
        private final Random random = new Random();
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            return random.nextInt(partitions.size());
        }
    
        @Override
        public void close() {}
    
        @Override
        public void configure(Map<String, ?> configs) {}
    }
    
    // 使用随机分区器的生产者示例
    public class RandomProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partitioner.class", "RandomPartitioner");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    按键哈希策略

  • 当消息指定了键时,Kafka 会根据键的哈希值将消息分配到特定的分区中。相同键的消息会被分配到同一个分区,这有助于保证具有相同业务逻辑的消息顺序性。可以保证消息的局部有序性,例如在处理用户相关的消息时,将同一个用户的消息发送到同一个分区,方便后续的处理和分析。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class KeyBasedProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    自定义分区策略(实现接口)

  • 当上述默认策略无法满足业务需求时,可以自定义分区策略。通过实现org.apache.kafka.clients.producer.Partitioner接口,重写partition方法来实现自定义的分区逻辑。例如,根据消息的某些特定字段(如时间、地理位置等)来进行分区,以满足特定的业务需求。

  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    
    public class CustomPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            // 自定义分区逻辑,这里简单示例根据消息值的长度分区
            String message = (String) value;
            return message.length() % partitions.size();
        }
    
        @Override
        public void close() {}
    
        @Override
        public void configure(Map<String, ?> configs) {}
    }
    
    // 使用自定义分区器的生产者示例
    public class CustomProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partitioner.class", "CustomPartitioner");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }


http://www.kler.cn/a/525198.html

相关文章:

  • MyBatis 框架:简化 Java 数据持久化的利器
  • [论文总结] 深度学习在农业领域应用论文笔记14
  • java——继承
  • 具身智能研究报告
  • Git进阶之旅:Git 配置信息 Config
  • 认识小程序的基本组成结构
  • fpga系列 HDL:XILINX Vivado Vitis 高层次综合(HLS) 实现 EBAZ板LED控制(下)
  • 前端力扣刷题 | 2:hot100之 双指针
  • Web3 如何赋能元宇宙,实现虚实融合的无缝对接
  • 论“0是不存在的”
  • H3CNE-27-链路聚合(L3)
  • 使用shell命令安装virtualbox的虚拟机并导出到vagrant的Box
  • 正则表达式入门
  • DeepSeek的崛起与全球科技市场的震荡
  • C++并发编程指南03
  • 【JavaWeb】利用IntelliJ IDEA 2024.1.4 +Tomcat10 搭建Java Web项目开发环境(图文超详细)
  • 商品信息管理自动化测试
  • 落地基于特征的图像拼接
  • 研发的立足之本到底是啥?
  • 跨平台物联网漏洞挖掘算法评估框架设计与实现文献综述之Gemini
  • 我的求职之路合集
  • zsh安装插件
  • Vue演练场基础知识(七)插槽
  • sentence_transformers安装
  • BGP分解实验·15——路由阻尼(抑制/衰减)实验
  • 关于Java的HttpURLConnection重定向问题 响应码303