Kafka-Consumer源码分析
一、上下文
《Kafka-Consumer理论知识》中队Kafka-Consumer的理论知识进行了阐述。理解了它的基本思想,我们下面进入源码环节看看。
二、初始化KafkaConsumer
private final static ConsumerDelegateCreator CREATOR = new ConsumerDelegateCreator();
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
// 初始化 consumer 时会创建一个 delegate 代表
delegate = CREATOR.create(config, keyDeserializer, valueDeserializer);
}
ConsumerDelegateCreator实现了一个准工厂模式,这里需要参考一个配置:group.protocol
它的取值有classic、consumer,默认值为classic。
当group.protocol = classic ,则返回LegacyKafkaConsumer
当group.protocol = consumer,则返回AsyncKafkaConsumer
GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
//如果不设置group.protocol,则使用默认值:classic,默认返回 LegacyKafkaConsumer
if (groupProtocol == GroupProtocol.CONSUMER)
return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
else
return new LegacyKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
三、调用poll拉取数据
默认使用的consumer是LegacyKafkaConsumer,下面我们看下它的poll()
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
// 确保消费者没有关闭
acquireAndEnsureOpen();
try {
// 度量系统开始记录
this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
//consumer未订阅任何topic或分配任何partition
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
//一个循环,我们看下满足什么调节会一直循环 timer.notExpired()
// Expire 到期 如果没有超时将一直循环
do {
// Trigger 触发
// Wakeup 醒醒
//client 是 ConsumerNetworkClient
// 目的是唤醒... 可能是 网络连接
client.maybeTriggerWakeup();
// 上面调用时 给的 true
if (includeMetadataInTimeout) {
// 尝试更新分配元数据,但不需要在加入组的计时器上阻塞
updateAssignmentMetadataIfNeeded(timer, false);
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
// 仍然等待元数据
log.warn("Still waiting for metadata");
}
}
// 核心方法,看看怎么拉取的数据
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
// 在返回提取的记录之前,我们可以发送下一轮提取,避免在用户处理提取的记录时等待其响应以启用流水线。
// ***难道是变相的流水线技术吗?一次启动背后其实是两次 拉取数据,***
// 注意:由于消耗的位置已经更新,在返回提取的记录之前,我们不能允许触发唤醒或任何其他错误。
if (sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}
if (fetch.records().isEmpty()) {
// 从`poll()`返回空记录,因为消费者的位置已经前进了至少一个主题分区
log.trace("Returning empty records from `poll()` "
+ "since the consumer's position has advanced for at least one topic partition");
}
// Map<TopicPartition, List<ConsumerRecord<K, V>>> records
// 返回的是 每个 TopicPartition 以及对应拉取的数据list
return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}
在拉取数据之前必须等待元数据的更新,因为有了最新的元数据,才知道去哪个节点拉取数据。拉取数据调用的是pollForFetches(timer) 下面我们继续往下看(方法上看到Fetche一般就是采用异步的方式去拉取数据了。)
四、pollForFetches()异步拉取数据
private final Fetcher<K, V> fetcher;
private Fetch<K, V> pollForFetches(Timer timer) {
// 拉取数据超时时间
// coordinator 协调员
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
// if data is available already, return it immediately
// 如果数据已经可用,请立即返回
// max.partition.fetch.bytes 是拉取每个分区的数据量
// max.poll.records 是consumer一次拉取的数据量,
// 也就是说 拉取次数 和 网络真实拉取次数不是相等的, 拉取的数据放缓存 ,consumer 慢慢消费,消费完再触发拉
// 上面猜想的缓存就是 FetchBuffer
final Fetch<K, V> fetch = fetcher.collectFetch();
if (!fetch.isEmpty()) {
return fetch;
}
// 发送任何新的提取(不会重新发送待处理的提取)
// 重点看下这里
sendFetches();
// 如果我们错过了一些位置,我们不想在poll()中被阻塞,因为偏移查找可能会在失败后退缩
// 注意:使用cachedSubscriptionHasAllFetchPositions意味着我们必须在此方法之前调用updateAssignmentMetadataIfNeed。
if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
// 轮询超时获取
log.trace("Polling for fetches with timeout {}", pollTimeout);
Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {
// 由于获取可能由后台线程完成,我们需要这个轮询条件来确保我们不会在poll()中不必要地阻塞
// 后台线程去拉取数据的
return !fetcher.hasAvailableFetches();
});
timer.update(pollTimer.currentTimeMs());
return fetcher.collectFetch();
}
可以看到,第一次获取是先去 Fetcher<K, V> fetcher 中获取,如果没有获取到才触发数据的拉取操作:sendFetches(),然后在从Fetcher<K, V> fetcher 中获取。
我们看下拉取数据的代码sendFetches()
1、sendFetches()异步拉取数据
private int sendFetches() {
offsetFetcher.validatePositionsOnMetadataChange();
return fetcher.sendFetches();
}
1、检测元数据是否变化
再拉取数据前必须进行元数据的再次校验,以确保拉取到最新的数据。比如leader的位置发生了变化,
public void validatePositionsOnMetadataChange() {
offsetFetcherUtils.validatePositionsOnMetadataChange();
}
void validatePositionsOnMetadataChange() {
//获取元数据的最新版本
int newMetadataUpdateVersion = metadata.updateVersion();
//比较手中的元数据版本和最新的版本是否一致,如果一致就不需要调整
if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) {
//如果不一致,需要对每个topicPartition 进行校验
subscriptionState.assignedPartitions().forEach(topicPartition -> {
ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition);
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch);
});
}
}
2、拉取数据
public synchronized int sendFetches() {
// 一个Node 一个拉取请求
final Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = prepareFetchRequests();
// Internal 内部的
// sendFetchesInternal 方法有三个参数 ,fetchRequests 、处理成功的方法、处理失败的方法
// 和 scala 的语法越来越像了
sendFetchesInternal(
fetchRequests,
(fetchTarget, data, clientResponse) -> {
synchronized (Fetcher.this) {
//看看成功获取到数据的处理
handleFetchSuccess(fetchTarget, data, clientResponse);
}
},
(fetchTarget, data, error) -> {
synchronized (Fetcher.this) {
handleFetchFailure(fetchTarget, data, error);
}
});
return fetchRequests.size();
}
我们先来看看prepareFetchRequests()中队目标节点的选择和队request的封装
封装FetchRequest
protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
// Update metrics in case there was an assignment change
metricsManager.maybeUpdateAssignment(subscriptions);
//下面会填充这个 map : fetchable
Map<Node, FetchSessionHandler.Builder> fetchable = new HashMap<>();
long currentTimeMs = time.milliseconds();
Map<String, Uuid> topicIds = metadata.topicIds();
// 返回了没有在缓存中的 Set<TopicPartition>
for (TopicPartition partition : fetchablePartitions()) {
// 该分区最后消费位置
SubscriptionState.FetchPosition position = subscriptions.position(partition);
if (position == null)
throw new IllegalStateException("Missing position for fetchable partition " + partition);
Optional<Node> leaderOpt = position.currentLeader.leader;
if (!leaderOpt.isPresent()) {
log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);
metadata.requestUpdate(false);
continue;
}
// 如果已设置,则使用首选的读取副本,否则使用分区的 leader
// 也就是可以设置 你指定的 副本节点,如果没有指定 就从 leader 拉取
// 满足 3个条件,会走副本节点拉取数据,否则返回的就是leader节点
Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
// 检查节点是否已断开连接,并且无法立即重新连接(即断开连接后是否处于重新连接退避窗口)。
if (isUnavailable(node)) {
// 检查给定节点上的身份验证错误,如果存在则抛出异常。
maybeThrowAuthFailure(node);
// 如果我们试图在重新连接退避窗口期间发送,那么请求在发送之前无论如何都会失败,所以现在跳过发送请求
// 跳过分区{}的获取,因为节点{}正在等待重新连接回退
log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
} else if (nodesWithPendingFetchRequests.contains(node.id())) {
// 跳过分区{}的获取,因为之前对{}的请求尚未处理
log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);
} else {
// if there is a leader and no in-flight requests, issue a new fetch
// 如果有leader并且没有正在进行的请求,则发出新的fetch
// 也就是说,如果两个 consumer 同时消费一个分区的数据,是排队进行拉取的。
// Absent 缺席的 不存在
// 这里是返回一个 builder
// fetchable 最开始一个一个空的map ,需要调用 k 来填充它
// 再调用 sessionHandlers 这个map ,如果是空 再调用 n 来填充,
// 最终 builder 是 FetchSessionHandler (维护用于连接到 broker 的fetch会话状态。)
FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
FetchSessionHandler fetchSessionHandler = sessionHandlers.computeIfAbsent(node.id(), n -> new FetchSessionHandler(logContext, n));
return fetchSessionHandler.newBuilder();
});
Uuid topicId = topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID);
// 封装了一个 PartitionData 想 拉取请求 ,
// 里面抱哈了 offset 、拉取的大小(max.partition.fetch.bytes )、当前leader的纪元
// max.partition.fetch.bytes 默认 : 1 * 1024 * 1024 1M
// 服务器将返回的每个分区的最大数据量。记录由消费者分批提取。
// 如果获取的第一个非空分区中的第一个记录批大于此限制,则仍将返回该批以确保消费者可以取得进展。
// 代理接受的最大记录批大小是通过<code>message.max.bytes</code>(代理配置)
// 或<code>max.message.bytes</code](主题配置)定义的
// 。请参阅FETCH_MAX_BYTES_CONFIG+“以限制消费者请求大小。”;
// 里面封装了 ApiKeys.FETCH ,下面可以看下 ApiKeys.FETCH 中的操作
// max.partition.fetch.bytes 是拉取每个分区的数据量
// max.poll.records 是consumer一次拉取的数据量,
// 也就是说 拉取次数 和 网络真实拉取次数不是相等的, 拉取的数据放缓存 ,consumer 慢慢消费,消费完再触发拉
FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId,
position.offset,
FetchRequest.INVALID_LOG_START_OFFSET,
fetchConfig.fetchSize,
position.currentLeader.epoch,
Optional.empty());
builder.add(partition, partitionData);
// 将位置{}处的分区{}的{}获取请求添加到节点{}
log.debug("Added {} fetch request for partition {} at position {} to node {}", fetchConfig.isolationLevel,
partition, position, node);
}
}
// entrySet() 返回 Set<Map.Entry<K, V>>
// stream() 返回 Stream<E> 可以像 scala 一样进行流式操作
// collect 收集, 将流中的元素 收集到一起
// e.getValue().build() 返回 FetchRequestData
// 因此这里将返回 拉取的数据
// 返回的是 topicPartition 对应的 FetchRequestData
return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));
}
这里总结下
1、获取待拉取的TopicPartition
2、获取每个TopicPartition的目标节点(默认是该分区的leader节点)
如果满足以下三个条件,则不从leader节点拉取
1、之前已设置了首选副本
2、仍然在首选副本的租赁期内
3、副本仍然在线/可用
3、检测每个TopicPartition与目标节点的连接状态
4、为每个TopicPartition封装FetchRequest
需要特殊之处的是FetchRequest中封装了ApiKeys.FETCH
发送FetchRequest
这里调用了ConsumerNetworkClient的send()来队每个目标节点发送FetchRequest
它内部和其他分布式框架一样,并不是立即发送请求,而是放入队列中进行统一的发送
public class ConsumerNetworkClient implements Closeable {
//交给底层通信,最终由NetworkClient处理
//NetworkClient 是队NIO的封装
private final KafkaClient client;
//存放所有的请求的ConcurrentHashMap<Node -> ConcurrentLinkedQueue<ClientRequest>
>
rivate final UnsentRequests unsent = new UnsentRequests();
public RequestFuture<ClientResponse> send(...){
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
ClientRequest clientRequest = client.newClientRequest(...);
//将该请求放入对应节点的队列中
unsent.put(node, clientRequest);
//这里最终会调到 this.nioSelector.wakeup();
client.wakeup();
return completionHandler.future;
}
long trySend(long now) {
long pollDelayMs = maxPollTimeoutMs;
// 发送任何可以立即发送的请求
for (Node node : unsent.nodes()) {
//获取每个节点对应的请求迭代器,依次进行发送
Iterator<ClientRequest> iterator = unsent.requestIterator(node);
if (iterator.hasNext())
pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
if (client.ready(node, now)) {
client.send(request, now);
iterator.remove();
} else {
// 当前节点未就绪时尝试下一个节点
break;
}
}
}
return pollDelayMs;
}
}
处理FetchRequest
下面我们看下KafkaApis中对应的处理方法,这个在《Kafka-follower同步leader数据》中也有涉及,也就是说follower同步leader数据和consumer拉取数据走的api是同一个,因为他们的动作及其相似。这里就不再详细分析了
处理拉回的数据
这里我们看下成功返回后的回调函数:handleFetchSuccess(fetchTarget, data, clientResponse)
protected void handleFetchSuccess(...){
//获取响应体
final FetchResponse response = (FetchResponse) resp.responseBody();
//获取这个节点上的所有TopicPartition
final Map<TopicPartition, FetchResponseData.PartitionData> responseData = response.responseData(handler.sessionTopicNames(), requestVersion);
for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) {
TopicPartition partition = entry.getKey();
FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
//......省略.......
CompletedFetch completedFetch = new CompletedFetch(...)
//把数据放 fetchBuffer 中 ,下次或者首次直接从缓存拿
fetchBuffer.add(completedFetch);
}
}
可以看到,拉取的数据都放到了FetchBuffer中
2、从缓存中获取结果
public Fetch<K, V> collectFetch() {
return fetchCollector.collectFetch(fetchBuffer);
}
public Fetch<K, V> collectFetch(final FetchBuffer fetchBuffer) {
final Fetch<K, V> fetch = Fetch.empty();
final Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
// max.poll.records 默认 500
// 单次调用poll()时返回的最大记录数注意,max.poll.records 不会影响底层的获取(fetching)行为。
// 消费者将缓存每个获取请求中的记录,并从每个轮询中递增地返回它们
int recordsRemaining = fetchConfig.maxPollRecords;
try {
while (recordsRemaining > 0) {
// ConcurrentLinkedQueue<CompletedFetch> completedFetches;
//private CompletedFetch nextInLineFetch;
// 因此 nextInLineFetch 是一个完成拉取的数据
final CompletedFetch nextInLineFetch = fetchBuffer.nextInLineFetch();
// 如果 fetchBuffer 中没有数据了 或者 nextInLineFetch 是被消费的
if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
final CompletedFetch completedFetch = fetchBuffer.peek();
// 如果等于null 是不是代表了一种情况:consumer客户端从各个节点拉取了 1300 条数据
// 消费者每次最多了拉 500 条 ,最后一次组装500时不够了,的处理情况 ,也就是这次只返回 300 条
// 下一次poll时 触发重新拉取,
// 如果一行记录特别大时,就会造成 每次都需要重新拉。
if (completedFetch == null)
break;
if (!completedFetch.isInitialized()) {
try {
fetchBuffer.setNextInLineFetch(initialize(completedFetch));
} catch (Exception e) {
// 在解析时删除一个completedExch,但以下情况除外:
// (1)它不包含completedExtch,并且
// (2)在这个异常之前没有提取到具有实际内容的completedExche。
// 第一个条件确保在TopicAuthorizationException等情况下,completedExches不会与相同的completedExch保持一致,
// 第二个条件确保不会因以下记录中的异常而导致潜在的数据丢失。
if (fetch.isEmpty() && FetchResponse.recordsOrFail(completedFetch.partitionData).sizeInBytes() == 0)
fetchBuffer.poll();
throw e;
}
} else {
fetchBuffer.setNextInLineFetch(completedFetch);
}
// 如果 是空的 ,那么就从队列中弹出它,把它从新进行拉取
fetchBuffer.poll();
} else if (subscriptions.isPaused(nextInLineFetch.partition)) {
// 当分区暂停时,我们将记录添加回completedExches队列,而不是将其耗尽,以便在分区恢复时可以在后续轮询中返回
// paused 暂停
log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
pausedCompletedFetches.add(nextInLineFetch);
fetchBuffer.setNextInLineFetch(null);
} else {
final Fetch<K, V> nextFetch = fetchRecords(nextInLineFetch, recordsRemaining);
//这里真正的对 recordsRemaining 做减法 ,放入待返回的数据中
recordsRemaining -= nextFetch.numRecords();
fetch.add(nextFetch);
}
}
} catch (KafkaException e) {
//.....
} finally {
//.....
}
return fetch;
}
FetchBuffer中存放了所有拉取回来的数据,其本身是对队列的一个封装。consumer每次从这个缓存队列中获取最多max.poll.records 条数据进行返回
我们详细看下fetchRecords()中的逻辑
private Fetch<K, V> fetchRecords(final CompletedFetch nextInLineFetch, int maxRecords) {
final TopicPartition tp = nextInLineFetch.partition;
if (!subscriptions.isAssigned(tp)) {
// 当在将提取的记录返回给消费者的投票呼叫之前进行重新平衡时,可能会发生这种情况
// 如果数据已经拉取回来了,但是发生了分区从分配,就会给与提醒,数据不返回
//由于分区{}不再被分配,因此不返回其已获取的记录
log.debug("Not returning fetched records for partition {} since it is no longer assigned", tp);
} else if (!subscriptions.isFetchable(tp)) {
// 当在将获取的记录返回给消费者的轮询调用之前暂停分区,或者重置偏移量时,可能会发生这种情况
// 由于分配的分区{}不再可获取,因此不返回已获取的记录“
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", tp);
} else {
SubscriptionState.FetchPosition position = subscriptions.position(tp);
if (position == null)
throw new IllegalStateException("Missing position for fetchable partition " + tp);
if (nextInLineFetch.nextFetchOffset() == position.offset) {
//这里用到了解码器
List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(fetchConfig,
deserializers,
maxRecords);
log.trace("Returning {} fetched records at offset {} for assigned partition {}",
partRecords.size(), position, tp);
boolean positionAdvanced = false;
//kafka 自身做的 更新offset
if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(),
position.currentLeader);
// 将分区{}的提取位置从{}更新到{},并从`poll()返回{}条记录`
log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
position, nextPosition, tp, partRecords.size());
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
}
Long partitionLag = subscriptions.partitionLag(tp, fetchConfig.isolationLevel);
if (partitionLag != null)
metricsManager.recordPartitionLag(tp, partitionLag);
Long lead = subscriptions.partitionLead(tp);
if (lead != null) {
metricsManager.recordPartitionLead(tp, lead);
}
return Fetch.forPartition(tp, partRecords, positionAdvanced);
} else {
//这些记录不是最后一个消耗位置的下一个记录,忽略它们,它们必须来自过时的请求
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
tp, nextInLineFetch.nextFetchOffset(), position);
}
}
log.trace("Draining fetched records for partition {}", tp);
nextInLineFetch.drain();
return Fetch.empty();
}
五、总结
1、构建并初始化KafkaConsumer
2、订阅topic列表或指定分区拉取
3、调用KafkaConsumer的poll()拉取数据
4、调用LegacyKafkaConsumer的poll()拉取数据
5、判断FetchBuffer中是否由数据,如果有直接返回
6、如果FetchBuffer中没有数据,则开始拉取数据
7、更新并校验元数据
8、获取每个TopicPartition的目标节点(默认是该分区的leader节点)
如果满足以下三个条件,则不从leader节点拉取
1、之前已设置了首选副本
2、仍然在首选副本的租赁期内
3、副本仍然在线/可用
9、检测每个TopicPartition与目标节点的连接状态
10、为每个TopicPartition封装FetchRequest
11、将这些FetchRequest按照目标节点存放
12、向每个目标节点的ApiKeys.FETCH请求
13、目标节点处理ApiKeys.FETCH请求,如果是最近的数据,直接从pagecache中读取数据发到网卡返回。
14、将异步返回的数据按照TopicPartition纬度放入FetchBuffer中
15、从FetchBuffer读取数据还要判断此时分配器是否将该分区分配给了其他consumer,如果被分配走也不会读取
16、调用解码器队数据进行解码
17、Kafka此时根据返回的数据更新offset
18、我们自己写的Consumer获取到数据