kafka学习-02
kafka分区的分配以及再平衡:
4个:
1、Range 以及再平衡
1)Range 分区策略原理
2)Range 分区分配策略案例
(1)修改主题 first 为 7 个分区。
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 7
注意:分区数可以增加,但是不能减少。
一个主题,假如副本数想修改,是否可以直接修改?答案是不可以。
如果想修改,如何修改?制定计划,执行计划。
(2)这样可以由三个消费者
CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”, 同时启动 3 个消费者。
(3)启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。
备注:只需要将以前的CustomProducerCallback,修改发送次数为500次即可。
2、RoundRobin(轮询) 以及再平衡
1)RoundRobin 分区策略原理
2)RoundRobin 分区分配策略案例
(1)依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代 码中修改分区分配策略为 RoundRobin。
3、Sticky 以及再平衡
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
4、CooperativeSticky 的解释【新的kafka中刚添加的策略】
在消费过程中,会根据消费的偏移量情况进行重新再平衡,也就是粘性分区,运行过程中还会根据消费的实际情况重新分配消费者,直到平衡为止。
好处是:负载均衡,不好的地方是:多次平衡浪费性能。
动态平衡,在消费过程中,实施再平衡,而不是定下来,等某个消费者退出再平衡。
offset 位移[偏移量](重要)
记录消费到了哪里的这个值,就是偏移量。
记录:哪个主题,哪个分区,哪个位置。
1) 消费 offset 案例
(0)思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。
(1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,
默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
如果不修改是无法查看offset的值的,因为这些都是加密数据。
创建一个新的主题:bigdata
kafka-topics.sh --bootstrap-server bigdata01:9092 --create --topic bigdata --partitions 2 --replication-factor 2
在kafka3下执行此命令
查看消费者消费主题__consumer_offsets。 -- from-beginning 表示查看历史所有的偏移量
2) 自动提交案例:
写java代码即可
和之前的基本相同,只是加入了几个参数
//设置自动提交偏移量 (默认就是true)
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 可以设置为false 之后就需要手动提交
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
package com.bigdata.day04;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class Demo02_自动提交offset {
public static void main(String[] args) {
Properties properties = new Properties();
// 连接kafka
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
// 字段反序列化 key 和 value
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
//设置自动提交偏移量 (默认就是true)
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 可以设置为false 之后就需要手动提交
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
ArrayList<String> list = new ArrayList<>();
list.add("bigdata");
kafkaConsumer.subscribe(list);
while (true){
// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
// 循环打印每一条数据
for (ConsumerRecord record : records) {
// 打印一条数据
System.out.println(record);
// 打印数据中的值
System.out.println(record.value());
}
}
}
}
3) 手动提交案例
package com.bigdata.day04;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class Demo02_手动提交offset {
public static void main(String[] args) {
Properties properties = new Properties();
// 连接kafka
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
// 字段反序列化 key 和 value
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
//设置自动提交偏移量 (默认就是true)
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 可以设置为false 之后就需要手动提交
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
ArrayList<String> list = new ArrayList<>();
list.add("bigdata");
kafkaConsumer.subscribe(list);
// 可以指定条件提交
int i = 1;
while (true){
// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
// 循环打印每一条数据
for (ConsumerRecord record : records) {
// 打印一条数据
System.out.println(record);
// 打印数据中的值
System.out.println(record.value());
i++;
}
// 当生产者发送了10条消息后,再提交偏移量(offset)
if (i==10){
kafkaConsumer.commitAsync();
}
}
}
}
4) 指定分区和偏移量消费 (重要)
package com.bigdata.day04;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;
public class Demo02_指定提交offset {
public static void main(String[] args) {
Properties properties = new Properties();
// 连接kafka
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
// 字段反序列化 key 和 value
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
ArrayList<String> list = new ArrayList<>();
list.add("bigdata");
kafkaConsumer.subscribe(list);
//执行计划
Set<TopicPartition> assignment = kafkaConsumer.assignment();
while (assignment.size() == 0){
// 拉去数据的代码,此处可以帮助快速构建分区方案
kafkaConsumer.poll(Duration.ofSeconds(1));
// 一直获取它的分区方案,什么时候由方案了就跳出循环
assignment = kafkaConsumer.assignment();
}
// 获取所有分区的 偏移量(offset)等于 5 以后的数据
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition,5);
}
// 获取指定分区 5 以后的数据
// kafkaConsumer.seek(new TopicPartition("bigdata",0),5);
//
// // 还可以这样写
// for (TopicPartition topicPartition : assignment) {
// if (topicPartition.partition() == 0){
// kafkaConsumer.seek(topicPartition,5);
// }
// }
while (true){
// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
// 循环打印每一条数据
for (ConsumerRecord record : records) {
// 打印一条数据
System.out.println(record);
// 打印数据中的值
System.out.println(record.value());
}
}
}
}
5) 指定时间消费
package com.bigdata.day04;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class Demo02_指定时间提交offset {
public static void main(String[] args) {
Properties properties = new Properties();
// 连接kafka
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
// 字段反序列化 key 和 value
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test4");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
ArrayList<String> list = new ArrayList<>();
list.add("bigdata");
kafkaConsumer.subscribe(list);
//执行计划
Set<TopicPartition> assignment = kafkaConsumer.assignment();
while (assignment.size() == 0){
// 拉去数据的代码,此处可以帮助快速构建分区方案
kafkaConsumer.poll(Duration.ofSeconds(1));
// 一直获取它的分区方案,什么时候由方案了就跳出循环
assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> map = new HashMap<>();
for (TopicPartition topicPartition : assignment) {
map.put(topicPartition,System.currentTimeMillis()-60*60*1000*10);
}
Map<TopicPartition, OffsetAndTimestamp> timestampMap = kafkaConsumer.offsetsForTimes(map);
Set<Map.Entry<TopicPartition, OffsetAndTimestamp>> entries = timestampMap.entrySet();
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : entries) {
kafkaConsumer.seek(entry.getKey(),entry.getValue().offset());
}
while (true){
// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
// 循环打印每一条数据
for (ConsumerRecord record : records) {
// 打印一条数据
System.out.println(record);
// 打印数据中的值
System.out.println(record.value());
}
}
}
}
漏消费和重复消费
重复消费:
当我们在使用kafka消费者的时候,已经消费了数据,但是没有提交偏移量,这时就可能造成数据重复
假如我们往消费者中发送了10条数据,并且设置了手动提交,但是并没有提交,这时我们是可以看到消费者数据的,但是此时消费者中的数据没有偏移量等(如果有的话偏移量应该是10),再次消费的时候,消费者就可能接着从偏移量为0的地方开始消费,造成重复消费
漏消费:
先提交 offset 后消费,有可能会造成数据的漏消费。
在flume中使用kafka:
生产者将数据发送到kafka中,再使用flume将数据从kafka中抽取到hdfs上
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = bigdata
a1.sources.r1.kafka.consumer.group.id = donghu
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text