当前位置: 首页 > article >正文

kafka学习-02

kafka分区的分配以及再平衡:

4个:

1、Range 以及再平衡

        1)Range 分区策略原理

2)Range 分区分配策略案例

(1)修改主题 first 为 7 个分区。

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 7

注意:分区数可以增加,但是不能减少。

一个主题,假如副本数想修改,是否可以直接修改?答案是不可以。

如果想修改,如何修改?制定计划,执行计划。

(2)这样可以由三个消费者

CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”, 同时启动 3 个消费者。

(3)启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。

备注:只需要将以前的CustomProducerCallback,修改发送次数为500次即可。

2、RoundRobin(轮询) 以及再平衡

1)RoundRobin 分区策略原理

2RoundRobin 分区分配策略案例

(1)依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代 码中修改分区分配策略为 RoundRobin。

3、Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

4、CooperativeSticky 的解释【新的kafka中刚添加的策略】

在消费过程中,会根据消费的偏移量情况进行重新再平衡,也就是粘性分区,运行过程中还会根据消费的实际情况重新分配消费者,直到平衡为止。

好处是:负载均衡,不好的地方是:多次平衡浪费性能。

动态平衡,在消费过程中,实施再平衡,而不是定下来,等某个消费者退出再平衡。

offset 位移[偏移量](重要)

记录消费到了哪里的这个值,就是偏移量。

记录:哪个主题,哪个分区,哪个位置。

1) 消费 offset 案例

(0)思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。

(1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,

默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

如果不修改是无法查看offset的值的,因为这些都是加密数据。

创建一个新的主题:bigdata

kafka-topics.sh --bootstrap-server bigdata01:9092 --create --topic bigdata --partitions 2 --replication-factor 2

在kafka3下执行此命令

查看消费者消费主题__consumer_offsets。 -- from-beginning 表示查看历史所有的偏移量

2) 自动提交案例:

写java代码即可

和之前的基本相同,只是加入了几个参数

        //设置自动提交偏移量 (默认就是true)
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);  // 可以设置为false 之后就需要手动提交
        // 提交 offset 的时间周期 1000ms,默认 5s
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
package com.bigdata.day04;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class Demo02_自动提交offset {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");

        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");

        //设置自动提交偏移量 (默认就是true)
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);  // 可以设置为false 之后就需要手动提交
        // 提交 offset 的时间周期 1000ms,默认 5s
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);


        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        ArrayList<String> list = new ArrayList<>();
        list.add("bigdata");

        kafkaConsumer.subscribe(list);

        while (true){

            // 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
            ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));

            // 循环打印每一条数据
            for (ConsumerRecord record : records) {
                // 打印一条数据
                System.out.println(record);
                // 打印数据中的值
                System.out.println(record.value());
            }

        }

    }

}

3) 手动提交案例

package com.bigdata.day04;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class Demo02_手动提交offset {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");

        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");

        //设置自动提交偏移量 (默认就是true)
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);  // 可以设置为false 之后就需要手动提交


        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        ArrayList<String> list = new ArrayList<>();
        list.add("bigdata");

        kafkaConsumer.subscribe(list);

        // 可以指定条件提交
        int i = 1;

        while (true){

            // 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
            ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));

            // 循环打印每一条数据
            for (ConsumerRecord record : records) {
                // 打印一条数据
                System.out.println(record);
                // 打印数据中的值
                System.out.println(record.value());
                i++;
            }
            // 当生产者发送了10条消息后,再提交偏移量(offset)
            if (i==10){
                kafkaConsumer.commitAsync();
            }

        }

    }

}

4) 指定分区和偏移量消费 (重要)

package com.bigdata.day04;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;

public class Demo02_指定提交offset {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");

        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);


        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        ArrayList<String> list = new ArrayList<>();
        list.add("bigdata");

        kafkaConsumer.subscribe(list);


        //执行计划
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        while (assignment.size() == 0){

            // 拉去数据的代码,此处可以帮助快速构建分区方案
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 一直获取它的分区方案,什么时候由方案了就跳出循环
            assignment = kafkaConsumer.assignment();
        }

        // 获取所有分区的 偏移量(offset)等于 5 以后的数据
        for (TopicPartition topicPartition : assignment) {
            kafkaConsumer.seek(topicPartition,5);
        }

        // 获取指定分区 5 以后的数据
//        kafkaConsumer.seek(new TopicPartition("bigdata",0),5);
//
//        // 还可以这样写
//        for (TopicPartition topicPartition : assignment) {
//            if (topicPartition.partition() == 0){
//                kafkaConsumer.seek(topicPartition,5);
//            }
//        }

        while (true){

            // 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
            ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));

            // 循环打印每一条数据
            for (ConsumerRecord record : records) {
                // 打印一条数据
                System.out.println(record);
                // 打印数据中的值
                System.out.println(record.value());
            }

        }

    }

}

5) 指定时间消费

package com.bigdata.day04;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class Demo02_指定时间提交offset {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");

        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test4");

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);


        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        ArrayList<String> list = new ArrayList<>();
        list.add("bigdata");

        kafkaConsumer.subscribe(list);


        //执行计划
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        while (assignment.size() == 0){

            // 拉去数据的代码,此处可以帮助快速构建分区方案
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 一直获取它的分区方案,什么时候由方案了就跳出循环
            assignment = kafkaConsumer.assignment();
        }

        HashMap<TopicPartition, Long> map = new HashMap<>();

        for (TopicPartition topicPartition : assignment) {
            map.put(topicPartition,System.currentTimeMillis()-60*60*1000*10);
        }
        Map<TopicPartition, OffsetAndTimestamp> timestampMap = kafkaConsumer.offsetsForTimes(map);

        Set<Map.Entry<TopicPartition, OffsetAndTimestamp>> entries = timestampMap.entrySet();
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : entries) {

            kafkaConsumer.seek(entry.getKey(),entry.getValue().offset());
        }


        while (true){

            // 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
            ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));

            // 循环打印每一条数据
            for (ConsumerRecord record : records) {
                // 打印一条数据
                System.out.println(record);
                // 打印数据中的值
                System.out.println(record.value());
            }

        }

    }

}

漏消费和重复消费

重复消费:

当我们在使用kafka消费者的时候,已经消费了数据,但是没有提交偏移量,这时就可能造成数据重复

假如我们往消费者中发送了10条数据,并且设置了手动提交,但是并没有提交,这时我们是可以看到消费者数据的,但是此时消费者中的数据没有偏移量等(如果有的话偏移量应该是10),再次消费的时候,消费者就可能接着从偏移量为0的地方开始消费,造成重复消费

漏消费:

先提交 offset 后消费,有可能会造成数据的漏消费。

在flume中使用kafka:

生产者将数据发送到kafka中,再使用flume将数据从kafka中抽取到hdfs上

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = bigdata
a1.sources.r1.kafka.consumer.group.id = donghu

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp=true

a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text


http://www.kler.cn/a/410439.html

相关文章:

  • Apollo9.0源码部署(Nvidia显卡)
  • 设计模式:责任链实现数据流风格的数据处理
  • 零基础学指针(上)
  • Makefile 之 wordlist
  • docker容器化部署springboot项目
  • 力扣.5.最长回文子串力扣.14最长公共前缀力扣219.存在重复元素II力扣.67二进制求和
  • Elmentui实现订单拆单功能
  • Golang调用MongoDB的表自动增长的 ID 永久保存在 MongoDB 中,并且每次获取的 ID 是基于上次的结果
  • 5.5 W5500 TCP服务端与客户端
  • 排序算法(四)--快速排序
  • 移动语义和拷贝语义区别、智能指针
  • 深度学习3
  • 论文笔记 网络安全图谱以及溯源算法
  • JavaScript的基础数据类型
  • 241124_基于MindSpore学习Prompt Tuning
  • 【数据分析】基于GEE实现大津算法提取洞庭湖流域水体
  • 手机无法连接服务器1302什么意思?
  • 前端预览pdf文件流
  • 【cocos creator】下拉框
  • Windows系统电脑安装TightVNC服务端结合内网穿透实现异地远程桌面
  • kafka学习-01
  • Unity 2020、2021、2022、2023、6000下载安装
  • 【测试工具JMeter篇】JMeter性能测试入门级教程(一)出炉,测试君请各位收藏了!!!
  • 2024 APMCM亚太数学建模C题 - 宠物行业及相关产业的发展分析和策略 完整参考论文(1)
  • [算法] 前缀函数与KMP算法
  • 数据集-目标检测系列- 荷花 莲花 检测数据集 lotus>> DataBall