Kafka AdminClient API 来获取特定 Kafka 消费组的消费延迟
文章目录
- 代码流程详解
- 1. Kafka 配置与创建 `AdminClient`
- 2. 获取 Topic 的所有分区
- 3. 获取消费者组的偏移量
- 4. 获取每个分区的 `log-end-offset`
- 5. 获取消费者组成员信息
- 6. 计算 Lag 并输出信息
- 7. 关闭 `AdminClient`
- 8. 完整代码
- 代码功能总结:
这段代码的目标是通过 Kafka
AdminClient
API 获取特定消费者组在一个特定 Topic 中各个分区的消费延迟(Lag)信息,并输出消费者实例的信息(包括实例 ID 和主机)。该程序会计算每个分区的消费 Lag 并输出消费者的偏移量、日志结束偏移量(log-end-offset)以及每个消费者实例的相关信息。
代码流程详解
1. Kafka 配置与创建 AdminClient
String bootstrapServers = ""; // Kafka 集群的地址(需要根据实际情况调整)
String consumerGroupId = ""; // 消费者组 ID(需要根据实际情况替换)
String topicName = ""; // Topic 名称(根据实际情况替换)
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
AdminClient adminClient = AdminClient.create(adminProps);
bootstrapServers
:指定 Kafka 集群的地址,通常是一个或多个 Kafka broker 的地址。consumerGroupId
:要查询的消费者组 ID。topicName
:要查询的 Topic 名称。AdminClient
:用于与 Kafka 集群交互的客户端,用于执行诸如描述 Topic、获取消费者组的偏移量等操作。
2. 获取 Topic 的所有分区
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topicName));
Map<String, TopicDescription> topicDescriptions = describeTopicsResult.all().get();
TopicDescription topicDescription = topicDescriptions.get(topicName);
List<TopicPartition> topicPartitions = new ArrayList<>();
for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
topicPartitions.add(new TopicPartition(topicName, partitionInfo.partition()));
}
describeTopics
:用于获取 Topic 的元数据(如分区数量等)。TopicPartition
:每个 Topic 会有多个分区,TopicPartition
对象代表了某个 Topic 中的特定分区。
3. 获取消费者组的偏移量
ListConsumerGroupOffsetsResult offsetsResult = adminClient.listConsumerGroupOffsets(consumerGroupId);
Map<TopicPartition, OffsetAndMetadata> consumerOffsets = offsetsResult.partitionsToOffsetAndMetadata().get();
listConsumerGroupOffsets
:返回消费者组在每个分区上的当前消费偏移量。返回的是每个TopicPartition
对应的OffsetAndMetadata
(包括当前偏移量和元数据)。
4. 获取每个分区的 log-end-offset
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap =
adminClient.listOffsets(topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()))).all().get();
listOffsets
:返回指定分区的log-end-offset
,即分区的最后消息的偏移量。OffsetSpec.latest()
表示获取当前最新的偏移量(log-end-offset
)。
5. 获取消费者组成员信息
DescribeConsumerGroupsResult consumerGroupResult = adminClient.describeConsumerGroups(Collections.singletonList(consumerGroupId));
Map<String, ConsumerGroupDescription> consumerGroupDescriptionMap = consumerGroupResult.all().get();
ConsumerGroupDescription consumerGroupDescription = consumerGroupDescriptionMap.get(consumerGroupId);
describeConsumerGroups
:获取消费者组的描述信息,包括该组内的消费者实例信息(例如,消费者的分区分配情况、消费者的主机名等)。
6. 计算 Lag 并输出信息
for (TopicPartition partition : topicPartitions) {
OffsetAndMetadata consumerOffset = consumerOffsets.get(partition);
if (consumerOffset != null) {
long consumerOffsetValue = consumerOffset.offset(); // 当前消费者的偏移量
ListOffsetsResult.ListOffsetsResultInfo logEndOffsetInfo = topicPartitionListOffsetsResultInfoMap.get(partition);
long logEndOffset = logEndOffsetInfo.offset(); // Kafka 中该分区的 log-end-offset
// 计算 Lag = log-end-offset - consumerOffset
long lag = logEndOffset - consumerOffsetValue;
String consumerInstance = "";
// 获取每个消费者实例的信息
for (MemberDescription member : consumerGroupDescription.members()) {
for (TopicPartition topicPartition : member.assignment().topicPartitions()) {
if (topicPartition.topic().equals(partition.topic())) {
Field field = MemberDescription.class.getDeclaredField("memberId"); // 获取成员 ID 字段
field.setAccessible(true); // 设置该字段为可访问
String memberIdValue = (String) field.get(member); // 通过反射获取该字段的值
consumerInstance = memberIdValue + ":" + member.host(); // 组合消费者 ID 和主机信息
break;
}
}
}
// 输出每个分区的 Lag 以及消费者实例信息
System.out.println("Topic: " + partition.topic() + ", Partition: " + partition.partition() +
", Consumer Offset: " + consumerOffsetValue + ", Log End Offset: " + logEndOffset + ", Lag: " + lag + ", consumerInstance : " + consumerInstance);
} else {
System.out.println("No consumer offset found for partition: " + partition);
}
}
-
Lag 计算:Lag 是指 Kafka 中某个分区的
log-end-offset
和消费者的当前偏移量(consumerOffset
)之间的差距。即:Lag = log-end-offset - consumerOffset
,表示当前消费者尚未消费的消息数量。
-
反射访问消费者实例信息:
- 通过反射访问
MemberDescription
类中的私有字段memberId
(该字段表示消费者的唯一 ID)。 - 使用
setAccessible(true)
方法绕过访问控制,使得可以访问私有字段。 - 获取到
memberId
后,组合消费者的 ID 和主机地址,作为消费者实例的标识。
- 通过反射访问
7. 关闭 AdminClient
adminClient.close();
- 关闭
AdminClient
实例,释放相关资源。
8. 完整代码
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class KafkaLagChecker {
public static void main(String[] args) throws ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException {
// Kafka 配置
String bootstrapServers = ""; // 请根据实际情况调整
String consumerGroupId = ""; // 请替换为你的 consumer group
String topicName = ""; // 请替换为你的 Topic 名称
// 创建 AdminClient
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
AdminClient adminClient = AdminClient.create(adminProps);
// 获取 topic 中所有分区
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topicName));
Map<String, TopicDescription> topicDescriptions = describeTopicsResult.all().get();
TopicDescription topicDescription = topicDescriptions.get(topicName);
List<TopicPartition> topicPartitions = new ArrayList<>();
for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
topicPartitions.add(new TopicPartition(topicName, partitionInfo.partition()));
}
// 获取 consumer group 的偏移量
ListConsumerGroupOffsetsResult offsetsResult = adminClient.listConsumerGroupOffsets(consumerGroupId);
Map<TopicPartition, OffsetAndMetadata> consumerOffsets = offsetsResult.partitionsToOffsetAndMetadata().get();
// 获取 topic 分区的 log-end-offset
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap =
adminClient.listOffsets(topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()))).all().get();
// 获取 consumer group 成员信息
DescribeConsumerGroupsResult consumerGroupResult = adminClient.describeConsumerGroups(Collections.singletonList(consumerGroupId));
Map<String, ConsumerGroupDescription> consumerGroupDescriptionMap = consumerGroupResult.all().get();
ConsumerGroupDescription consumerGroupDescription = consumerGroupDescriptionMap.get(consumerGroupId);
// 输出消费者的偏移量与 log-end-offset 比较
for (TopicPartition partition : topicPartitions) {
OffsetAndMetadata consumerOffset = consumerOffsets.get(partition);
if (consumerOffset != null) {
long consumerOffsetValue = consumerOffset.offset(); // 消费者的当前偏移量
// 获取 Kafka 中该分区的 log-end-offset
ListOffsetsResult.ListOffsetsResultInfo logEndOffsetInfo = topicPartitionListOffsetsResultInfoMap.get(partition);
long logEndOffset = logEndOffsetInfo.offset(); // Kafka 中该分区的 log-end-offset
// 计算 Lag
long lag = logEndOffset - consumerOffsetValue;
String consumerInstance = "";
// 输出每个消费实例的信息
for (MemberDescription member : consumerGroupDescription.members()) {
for (TopicPartition topicPartition : member.assignment().topicPartitions()) {
if (topicPartition.topic().equals(partition.topic())) {
Field field = MemberDescription.class.getDeclaredField("memberId");
// 设置可以访问私有字段
field.setAccessible(true);
// 通过反射获取 final 字段的值
String memberIdValue = (String) field.get(member);
consumerInstance = memberIdValue + ":" + member.host();
break;
}
}
}
// 输出每个分区的 Lag,并输出每个消费者实例信息
System.out.println("Topic: " + partition.topic() + ", Partition: " + partition.partition() +
", Consumer Offset: " + consumerOffsetValue + ", Log End Offset: " + logEndOffset + ", Lag: " + lag + ", consumerInstance : " + consumerInstance);
} else {
System.out.println("No consumer offset found for partition: " + partition);
}
}
// 关闭 AdminClient
adminClient.close();
}
}
代码功能总结:
- 查询 Kafka 分区的
log-end-offset
和消费者的consumerOffset
。 - 计算每个分区的消费延迟(Lag)。
- 使用反射访问消费者实例的
memberId
字段和主机名。 - 输出每个 Topic 分区的消费偏移量、日志结束偏移量、Lag 和消费者实例信息。