使用Java代码操作Kafka(五):Kafka消费 offset API,包含指定 Offset 消费以及指定时间消费
文章目录
- 1、指定 Offset 消费
- 2、指定时间消费
1、指定 Offset 消费
auto.offset.reset = earliest | latest | none 默认是 latest
(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
这个参数的力度太大了,不是从头,就是从尾
kafka提供了seek方法
,可以让我们从分区的固定位置开始消费
seek(TopicPartition topicPartition,offset offset)
示例代码:
package com.bigdata.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;
public class CustomConsumerSeek {
public static void main(String[] args) {
Properties properties = new Properties();
// 连接kafka
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
// 字段反序列化 key 和 value
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 关闭自动提交offset
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 执行计划
// 此时的消费计划是空的,因为没有时间生成
Set<TopicPartition> assignment = kafkaConsumer.assignment();
while(assignment.size() == 0){
// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
kafkaConsumer.poll(Duration.ofSeconds(1));
// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
assignment = kafkaConsumer.assignment();
}
// 获取所有分区的offset =5 以后的数据
/*for (TopicPartition tp:assignment) {
kafkaConsumer.seek(tp,5);
}*/
// 获取分区0的offset =5 以后的数据
//kafkaConsumer.seek(new TopicPartition("bigdata",0),5);
for (TopicPartition tp:assignment) {
if(tp.partition() == 0){
kafkaConsumer.seek(tp,5);
}
}
while(true){
//1 秒中向kafka拉取一批数据
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> record :records) {
// 打印一条数据
System.out.println(record);
// 可以打印记录中的很多内容,比如 key value offset topic 等信息
System.out.println(record.value());
}
}
}
}
2、指定时间消费
示例代码:
package com.bigdata.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
/**
* 从某个特定的时间开始进行消费
*/
public class Customer05 {
public static void main(String[] args) {
// 其实就是map
Properties properties = new Properties();
// 连接kafka
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
// 字段反序列化 key 和 value
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// 配置消费者组(组名任意起名) 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testf");
// 指定分区的分配方案 为轮询策略
//properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
// 指定分区的分配策略为:Sticky(粘性)
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);
// 创建一个kafka消费者的对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?
List<String> topics = new ArrayList<>();
topics.add("five");// list总可以设置多个主题的名称
kafkaConsumer.subscribe(topics);
// 因为消费者是不停的消费,所以是while true
// 指定了获取分区数据的起始位置。
// 这样写会报错的,因为前期消费需要指定计划,指定计划需要时间
// 此时的消费计划是空的,因为没有时间生成
Set<TopicPartition> assignment = kafkaConsumer.assignment();
while(assignment.size() == 0){
// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
kafkaConsumer.poll(Duration.ofSeconds(1));
// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
assignment = kafkaConsumer.assignment();
}
Map<TopicPartition, Long> hashMap = new HashMap<>();
for (TopicPartition partition:assignment) {
hashMap.put(partition,System.currentTimeMillis()- 60*60*1000);
}
Map<TopicPartition, OffsetAndTimestamp> map = kafkaConsumer.offsetsForTimes(hashMap);
for (TopicPartition partition:assignment) {
OffsetAndTimestamp offsetAndTimestamp = map.get(partition);
kafkaConsumer.seek(partition,offsetAndTimestamp.offset());
}
while(true){
// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
// 循环打印每一条数据
for (ConsumerRecord record:records) {
// 打印数据中的值
System.out.println(record.value());
System.out.println(record.offset());
// 打印一条数据
System.out.println(record);
}
}
}
}