当前位置: 首页 > article >正文

Kafka-Consumer源码分析

一、上下文

《Kafka-Consumer理论知识》中队Kafka-Consumer的理论知识进行了阐述。理解了它的基本思想,我们下面进入源码环节看看。

二、初始化KafkaConsumer

    private final static ConsumerDelegateCreator CREATOR = new ConsumerDelegateCreator();

    KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        // 初始化 consumer 时会创建一个 delegate  代表
        delegate = CREATOR.create(config, keyDeserializer, valueDeserializer);
    }

ConsumerDelegateCreator实现了一个准工厂模式,这里需要参考一个配置:group.protocol

它的取值有classic、consumer,默认值为classic。

当group.protocol = classic ,则返回LegacyKafkaConsumer

当group.protocol = consumer,则返回AsyncKafkaConsumer

GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
//如果不设置group.protocol,则使用默认值:classic,默认返回 LegacyKafkaConsumer
if (groupProtocol == GroupProtocol.CONSUMER)
    return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
else
    return new LegacyKafkaConsumer<>(config, keyDeserializer, valueDeserializer);

三、调用poll拉取数据

默认使用的consumer是LegacyKafkaConsumer,下面我们看下它的poll()

    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
        // 确保消费者没有关闭
        acquireAndEnsureOpen();
        try {
            // 度量系统开始记录
            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                //consumer未订阅任何topic或分配任何partition
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }
            //一个循环,我们看下满足什么调节会一直循环  timer.notExpired()
            // Expire  到期  如果没有超时将一直循环
            do {
                // Trigger  触发
                // Wakeup  醒醒
                //client 是 ConsumerNetworkClient
                // 目的是唤醒... 可能是 网络连接
                client.maybeTriggerWakeup();

                // 上面调用时 给的 true
                if (includeMetadataInTimeout) {
                    // 尝试更新分配元数据,但不需要在加入组的计时器上阻塞
                    updateAssignmentMetadataIfNeeded(timer, false);
                } else {
                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                        // 仍然等待元数据
                        log.warn("Still waiting for metadata");
                    }
                }

                // 核心方法,看看怎么拉取的数据
                final Fetch<K, V> fetch = pollForFetches(timer);
                if (!fetch.isEmpty()) {
                    // 在返回提取的记录之前,我们可以发送下一轮提取,避免在用户处理提取的记录时等待其响应以启用流水线。
                    // ***难道是变相的流水线技术吗?一次启动背后其实是两次 拉取数据,***
                    // 注意:由于消耗的位置已经更新,在返回提取的记录之前,我们不能允许触发唤醒或任何其他错误。
                    if (sendFetches() > 0 || client.hasPendingRequests()) {
                        client.transmitSends();
                    }

                    if (fetch.records().isEmpty()) {
                        // 从`poll()`返回空记录,因为消费者的位置已经前进了至少一个主题分区
                        log.trace("Returning empty records from `poll()` "
                                + "since the consumer's position has advanced for at least one topic partition");
                    }

                    // Map<TopicPartition, List<ConsumerRecord<K, V>>> records
                    // 返回的是 每个 TopicPartition 以及对应拉取的数据list
                    return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
                }
            } while (timer.notExpired());

            return ConsumerRecords.empty();
        } finally {
            release();
            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
        }
    }

在拉取数据之前必须等待元数据的更新,因为有了最新的元数据,才知道去哪个节点拉取数据。拉取数据调用的是pollForFetches(timer) 下面我们继续往下看(方法上看到Fetche一般就是采用异步的方式去拉取数据了。)

四、pollForFetches()异步拉取数据

   private final Fetcher<K, V> fetcher;

   private Fetch<K, V> pollForFetches(Timer timer) {
        // 拉取数据超时时间
        // coordinator   协调员
        long pollTimeout = coordinator == null ? timer.remainingMs() :
                Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

        // if data is available already, return it immediately
        // 如果数据已经可用,请立即返回
        // max.partition.fetch.bytes   是拉取每个分区的数据量
        // max.poll.records            是consumer一次拉取的数据量,
        // 也就是说 拉取次数 和 网络真实拉取次数不是相等的, 拉取的数据放缓存 ,consumer 慢慢消费,消费完再触发拉
        // 上面猜想的缓存就是 FetchBuffer
        final Fetch<K, V> fetch = fetcher.collectFetch();
        if (!fetch.isEmpty()) {
            return fetch;
        }

        // 发送任何新的提取(不会重新发送待处理的提取)
        // 重点看下这里
        sendFetches();

        // 如果我们错过了一些位置,我们不想在poll()中被阻塞,因为偏移查找可能会在失败后退缩

        // 注意:使用cachedSubscriptionHasAllFetchPositions意味着我们必须在此方法之前调用updateAssignmentMetadataIfNeed。
        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
            pollTimeout = retryBackoffMs;
        }

        // 轮询超时获取
        log.trace("Polling for fetches with timeout {}", pollTimeout);

        Timer pollTimer = time.timer(pollTimeout);
        client.poll(pollTimer, () -> {
            // 由于获取可能由后台线程完成,我们需要这个轮询条件来确保我们不会在poll()中不必要地阻塞
            // 后台线程去拉取数据的
            return !fetcher.hasAvailableFetches();
        });
        timer.update(pollTimer.currentTimeMs());

        return fetcher.collectFetch();
    }

可以看到,第一次获取是先去 Fetcher<K, V> fetcher 中获取,如果没有获取到才触发数据的拉取操作:sendFetches(),然后在从Fetcher<K, V> fetcher 中获取。

我们看下拉取数据的代码sendFetches()

1、sendFetches()异步拉取数据

    private int sendFetches() {
        offsetFetcher.validatePositionsOnMetadataChange();
        return fetcher.sendFetches();
    }

1、检测元数据是否变化

再拉取数据前必须进行元数据的再次校验,以确保拉取到最新的数据。比如leader的位置发生了变化,

    public void validatePositionsOnMetadataChange() {
        offsetFetcherUtils.validatePositionsOnMetadataChange();
    }
    void validatePositionsOnMetadataChange() {
        //获取元数据的最新版本
        int newMetadataUpdateVersion = metadata.updateVersion();
        //比较手中的元数据版本和最新的版本是否一致,如果一致就不需要调整
        if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) {
            //如果不一致,需要对每个topicPartition 进行校验
            subscriptionState.assignedPartitions().forEach(topicPartition -> {
                ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition);
                subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch);
            });
        }
    }

2、拉取数据

    public synchronized int sendFetches() {
        // 一个Node 一个拉取请求
        final Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = prepareFetchRequests();
        // Internal 内部的
        // sendFetchesInternal 方法有三个参数 ,fetchRequests 、处理成功的方法、处理失败的方法
        // 和 scala 的语法越来越像了
        sendFetchesInternal(
                fetchRequests,
                (fetchTarget, data, clientResponse) -> {
                    synchronized (Fetcher.this) {
                        //看看成功获取到数据的处理
                        handleFetchSuccess(fetchTarget, data, clientResponse);
                    }
                },
                (fetchTarget, data, error) -> {
                    synchronized (Fetcher.this) {
                        handleFetchFailure(fetchTarget, data, error);
                    }
                });
        return fetchRequests.size();
    }

我们先来看看prepareFetchRequests()中队目标节点的选择和队request的封装

封装FetchRequest
    protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
        // Update metrics in case there was an assignment change
        metricsManager.maybeUpdateAssignment(subscriptions);
        //下面会填充这个 map : fetchable
        Map<Node, FetchSessionHandler.Builder> fetchable = new HashMap<>();
        long currentTimeMs = time.milliseconds();
        Map<String, Uuid> topicIds = metadata.topicIds();

        // 返回了没有在缓存中的 Set<TopicPartition>
        for (TopicPartition partition : fetchablePartitions()) {
            // 该分区最后消费位置
            SubscriptionState.FetchPosition position = subscriptions.position(partition);

            if (position == null)
                throw new IllegalStateException("Missing position for fetchable partition " + partition);

            Optional<Node> leaderOpt = position.currentLeader.leader;

            if (!leaderOpt.isPresent()) {
                log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);
                metadata.requestUpdate(false);
                continue;
            }

            // 如果已设置,则使用首选的读取副本,否则使用分区的 leader
            // 也就是可以设置 你指定的 副本节点,如果没有指定 就从 leader 拉取
            // 满足 3个条件,会走副本节点拉取数据,否则返回的就是leader节点
            Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);

            // 检查节点是否已断开连接,并且无法立即重新连接(即断开连接后是否处于重新连接退避窗口)。
            if (isUnavailable(node)) {
                // 检查给定节点上的身份验证错误,如果存在则抛出异常。
                maybeThrowAuthFailure(node);

                // 如果我们试图在重新连接退避窗口期间发送,那么请求在发送之前无论如何都会失败,所以现在跳过发送请求
                // 跳过分区{}的获取,因为节点{}正在等待重新连接回退
                log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
            } else if (nodesWithPendingFetchRequests.contains(node.id())) {
                // 跳过分区{}的获取,因为之前对{}的请求尚未处理
                log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);
            } else {
                // if there is a leader and no in-flight requests, issue a new fetch
                // 如果有leader并且没有正在进行的请求,则发出新的fetch
                // 也就是说,如果两个 consumer 同时消费一个分区的数据,是排队进行拉取的。
                // Absent 缺席的 不存在
                // 这里是返回一个  builder
                // fetchable  最开始一个一个空的map ,需要调用 k 来填充它
                // 再调用 sessionHandlers 这个map  ,如果是空 再调用 n 来填充,
                // 最终 builder 是  FetchSessionHandler (维护用于连接到 broker 的fetch会话状态。)
                FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
                    FetchSessionHandler fetchSessionHandler = sessionHandlers.computeIfAbsent(node.id(), n -> new FetchSessionHandler(logContext, n));
                    return fetchSessionHandler.newBuilder();
                });
                Uuid topicId = topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID);
                // 封装了一个 PartitionData 想 拉取请求 ,
                // 里面抱哈了  offset 、拉取的大小(max.partition.fetch.bytes )、当前leader的纪元
                // max.partition.fetch.bytes  默认 : 1 * 1024 * 1024  1M
                // 服务器将返回的每个分区的最大数据量。记录由消费者分批提取。
                // 如果获取的第一个非空分区中的第一个记录批大于此限制,则仍将返回该批以确保消费者可以取得进展。
                // 代理接受的最大记录批大小是通过<code>message.max.bytes</code>(代理配置)
                // 或<code>max.message.bytes</code](主题配置)定义的
                // 。请参阅FETCH_MAX_BYTES_CONFIG+“以限制消费者请求大小。”;
                // 里面封装了 ApiKeys.FETCH ,下面可以看下 ApiKeys.FETCH 中的操作

                // max.partition.fetch.bytes   是拉取每个分区的数据量
                // max.poll.records            是consumer一次拉取的数据量,
                // 也就是说 拉取次数 和 网络真实拉取次数不是相等的, 拉取的数据放缓存 ,consumer 慢慢消费,消费完再触发拉
                FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId,
                        position.offset,
                        FetchRequest.INVALID_LOG_START_OFFSET,
                        fetchConfig.fetchSize,
                        position.currentLeader.epoch,
                        Optional.empty());
                builder.add(partition, partitionData);

                // 将位置{}处的分区{}的{}获取请求添加到节点{}
                log.debug("Added {} fetch request for partition {} at position {} to node {}", fetchConfig.isolationLevel,
                        partition, position, node);
            }
        }

        //  entrySet() 返回 Set<Map.Entry<K, V>>
        //  stream()   返回 Stream<E> 可以像 scala 一样进行流式操作
        //  collect 收集, 将流中的元素 收集到一起
        //  e.getValue().build() 返回  FetchRequestData
        // 因此这里将返回 拉取的数据
        // 返回的是 topicPartition 对应的  FetchRequestData
        return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));
    }

这里总结下

1、获取待拉取的TopicPartition

2、获取每个TopicPartition的目标节点(默认是该分区的leader节点

        如果满足以下三个条件,则不从leader节点拉取

        1、之前已设置了首选副本

        2、仍然在首选副本的租赁期内

        3、副本仍然在线/可用

3、检测每个TopicPartition与目标节点的连接状态

4、为每个TopicPartition封装FetchRequest

需要特殊之处的是FetchRequest中封装了ApiKeys.FETCH

发送FetchRequest

这里调用了ConsumerNetworkClient的send()来队每个目标节点发送FetchRequest

它内部和其他分布式框架一样,并不是立即发送请求,而是放入队列中进行统一的发送

public class ConsumerNetworkClient implements Closeable {

    //交给底层通信,最终由NetworkClient处理
    //NetworkClient 是队NIO的封装
    private final KafkaClient client;
    //存放所有的请求的ConcurrentHashMap<Node -> ConcurrentLinkedQueue<ClientRequest> 
>
    rivate final UnsentRequests unsent = new UnsentRequests();


    public RequestFuture<ClientResponse> send(...){

      RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
      ClientRequest clientRequest = client.newClientRequest(...);
      //将该请求放入对应节点的队列中
      unsent.put(node, clientRequest);
      //这里最终会调到 this.nioSelector.wakeup();
      client.wakeup();
      return completionHandler.future;
    
    }

    long trySend(long now) {
        long pollDelayMs = maxPollTimeoutMs;

        // 发送任何可以立即发送的请求
        for (Node node : unsent.nodes()) {
            //获取每个节点对应的请求迭代器,依次进行发送
            Iterator<ClientRequest> iterator = unsent.requestIterator(node);
            if (iterator.hasNext())
                pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));

            while (iterator.hasNext()) {
                ClientRequest request = iterator.next();
                if (client.ready(node, now)) {
                    client.send(request, now);
                    iterator.remove();
                } else {
                    // 当前节点未就绪时尝试下一个节点
                    break;
                }
            }
        }
        return pollDelayMs;
    }
}
处理FetchRequest

下面我们看下KafkaApis中对应的处理方法,这个在《Kafka-follower同步leader数据》中也有涉及,也就是说follower同步leader数据和consumer拉取数据走的api是同一个,因为他们的动作及其相似。这里就不再详细分析了

处理拉回的数据

这里我们看下成功返回后的回调函数:handleFetchSuccess(fetchTarget, data, clientResponse)

    protected void handleFetchSuccess(...){
      //获取响应体
      final FetchResponse response = (FetchResponse) resp.responseBody();
      //获取这个节点上的所有TopicPartition
      final Map<TopicPartition, FetchResponseData.PartitionData> responseData = response.responseData(handler.sessionTopicNames(), requestVersion);
      for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) {
        TopicPartition partition = entry.getKey();
        FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
        //......省略.......
        CompletedFetch completedFetch = new CompletedFetch(...)
        //把数据放 fetchBuffer 中 ,下次或者首次直接从缓存拿
        fetchBuffer.add(completedFetch);
      }
    }

可以看到,拉取的数据都放到了FetchBuffer中

2、从缓存中获取结果

    public Fetch<K, V> collectFetch() {
        return fetchCollector.collectFetch(fetchBuffer);
    }

    public Fetch<K, V> collectFetch(final FetchBuffer fetchBuffer) {
        final Fetch<K, V> fetch = Fetch.empty();
        final Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
        // max.poll.records  默认 500
        // 单次调用poll()时返回的最大记录数注意,max.poll.records 不会影响底层的获取(fetching)行为。
        // 消费者将缓存每个获取请求中的记录,并从每个轮询中递增地返回它们
        int recordsRemaining = fetchConfig.maxPollRecords;

        try {
            while (recordsRemaining > 0) {
                // ConcurrentLinkedQueue<CompletedFetch> completedFetches;
                //private CompletedFetch nextInLineFetch;
                // 因此 nextInLineFetch 是一个完成拉取的数据
                final CompletedFetch nextInLineFetch = fetchBuffer.nextInLineFetch();

                // 如果 fetchBuffer 中没有数据了 或者 nextInLineFetch 是被消费的
                if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
                    final CompletedFetch completedFetch = fetchBuffer.peek();

                    // 如果等于null 是不是代表了一种情况:consumer客户端从各个节点拉取了 1300 条数据
                    //     消费者每次最多了拉 500 条 ,最后一次组装500时不够了,的处理情况 ,也就是这次只返回 300 条
                    // 下一次poll时 触发重新拉取,
                    // 如果一行记录特别大时,就会造成 每次都需要重新拉。
                    if (completedFetch == null)
                        break;

                    if (!completedFetch.isInitialized()) {
                        try {
                            fetchBuffer.setNextInLineFetch(initialize(completedFetch));
                        } catch (Exception e) {
                            // 在解析时删除一个completedExch,但以下情况除外:
                            //    (1)它不包含completedExtch,并且
                            //    (2)在这个异常之前没有提取到具有实际内容的completedExche。
                            //  第一个条件确保在TopicAuthorizationException等情况下,completedExches不会与相同的completedExch保持一致,
                            //  第二个条件确保不会因以下记录中的异常而导致潜在的数据丢失。
                            if (fetch.isEmpty() && FetchResponse.recordsOrFail(completedFetch.partitionData).sizeInBytes() == 0)
                                fetchBuffer.poll();

                            throw e;
                        }
                    } else {
                        fetchBuffer.setNextInLineFetch(completedFetch);
                    }
                    // 如果 是空的 ,那么就从队列中弹出它,把它从新进行拉取
                    fetchBuffer.poll();
                } else if (subscriptions.isPaused(nextInLineFetch.partition)) {
                    // 当分区暂停时,我们将记录添加回completedExches队列,而不是将其耗尽,以便在分区恢复时可以在后续轮询中返回
                    // paused 暂停
                    log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
                    pausedCompletedFetches.add(nextInLineFetch);
                    fetchBuffer.setNextInLineFetch(null);
                } else {
                    final Fetch<K, V> nextFetch = fetchRecords(nextInLineFetch, recordsRemaining);
                    //这里真正的对 recordsRemaining 做减法 ,放入待返回的数据中
                    recordsRemaining -= nextFetch.numRecords();
                    fetch.add(nextFetch);
                }
            }
        } catch (KafkaException e) {
            //.....
        } finally {
            //.....
        }

        return fetch;
    }

FetchBuffer中存放了所有拉取回来的数据,其本身是对队列的一个封装。consumer每次从这个缓存队列中获取最多max.poll.records 条数据进行返回

我们详细看下fetchRecords()中的逻辑

   private Fetch<K, V> fetchRecords(final CompletedFetch nextInLineFetch, int maxRecords) {
        final TopicPartition tp = nextInLineFetch.partition;

        if (!subscriptions.isAssigned(tp)) {
            // 当在将提取的记录返回给消费者的投票呼叫之前进行重新平衡时,可能会发生这种情况
            // 如果数据已经拉取回来了,但是发生了分区从分配,就会给与提醒,数据不返回
            //由于分区{}不再被分配,因此不返回其已获取的记录
            log.debug("Not returning fetched records for partition {} since it is no longer assigned", tp);
        } else if (!subscriptions.isFetchable(tp)) {
            // 当在将获取的记录返回给消费者的轮询调用之前暂停分区,或者重置偏移量时,可能会发生这种情况
            // 由于分配的分区{}不再可获取,因此不返回已获取的记录“
            log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", tp);
        } else {
            SubscriptionState.FetchPosition position = subscriptions.position(tp);

            if (position == null)
                throw new IllegalStateException("Missing position for fetchable partition " + tp);

            if (nextInLineFetch.nextFetchOffset() == position.offset) {
                //这里用到了解码器
                List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(fetchConfig,
                        deserializers,
                        maxRecords);

                log.trace("Returning {} fetched records at offset {} for assigned partition {}",
                        partRecords.size(), position, tp);

                boolean positionAdvanced = false;

                //kafka 自身做的 更新offset
                if (nextInLineFetch.nextFetchOffset() > position.offset) {
                    SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
                            nextInLineFetch.nextFetchOffset(),
                            nextInLineFetch.lastEpoch(),
                            position.currentLeader);
                    // 将分区{}的提取位置从{}更新到{},并从`poll()返回{}条记录`
                    log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
                            position, nextPosition, tp, partRecords.size());
                    subscriptions.position(tp, nextPosition);
                    positionAdvanced = true;
                }

                Long partitionLag = subscriptions.partitionLag(tp, fetchConfig.isolationLevel);
                if (partitionLag != null)
                    metricsManager.recordPartitionLag(tp, partitionLag);

                Long lead = subscriptions.partitionLead(tp);
                if (lead != null) {
                    metricsManager.recordPartitionLead(tp, lead);
                }

                return Fetch.forPartition(tp, partRecords, positionAdvanced);
            } else {
                //这些记录不是最后一个消耗位置的下一个记录,忽略它们,它们必须来自过时的请求
                log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
                        tp, nextInLineFetch.nextFetchOffset(), position);
            }
        }

        log.trace("Draining fetched records for partition {}", tp);
        nextInLineFetch.drain();

        return Fetch.empty();
    }

五、总结

1、构建并初始化KafkaConsumer

2、订阅topic列表或指定分区拉取

3、调用KafkaConsumer的poll()拉取数据

4、调用LegacyKafkaConsumer的poll()拉取数据

5、判断FetchBuffer中是否由数据,如果有直接返回

6、如果FetchBuffer中没有数据,则开始拉取数据

7、更新并校验元数据

8、获取每个TopicPartition的目标节点(默认是该分区的leader节点)

        如果满足以下三个条件,则不从leader节点拉取

        1、之前已设置了首选副本

        2、仍然在首选副本的租赁期内

        3、副本仍然在线/可用

9、检测每个TopicPartition与目标节点的连接状态

10、为每个TopicPartition封装FetchRequest

11、将这些FetchRequest按照目标节点存放

12、向每个目标节点的ApiKeys.FETCH请求

13、目标节点处理ApiKeys.FETCH请求,如果是最近的数据,直接从pagecache中读取数据发到网卡返回。

14、将异步返回的数据按照TopicPartition纬度放入FetchBuffer中

15、从FetchBuffer读取数据还要判断此时分配器是否将该分区分配给了其他consumer,如果被分配走也不会读取

16、调用解码器队数据进行解码

17、Kafka此时根据返回的数据更新offset

18、我们自己写的Consumer获取到数据


http://www.kler.cn/a/420648.html

相关文章:

  • Java - JSR223规范解读_在JVM上实现多语言支持
  • 【案例教程】python生物信息多组学大数据深度挖掘与论文整理技巧实践技术应用
  • 工作:三菱PLC防止程序存储器爆满方法
  • A109 PHP+MYSQL+LW+网上论坛网站 军事BBS系统的设计与实现 源码+文档 全套 教程
  • BWO-CNN-BiGRU-Attention白鲸优化算法优化卷积神经网络结合双向门控循环单元时间序列预测,含优化前后对比
  • 【人工智能】Transformers之Pipeline(二十七):蒙版生成(mask-generation)
  • USB 声卡全解析:提升音频体验的得力助手
  • 网络安全之常用安全设备功能及作用_设备管理器安全设备是什么
  • Runway 技术浅析(六):文本到视频(Text-to-Video)
  • GPT时代的BI革命:智能报表系统如何颠覆传统决策
  • qt音频实战
  • Vue 实现无线滚动效果
  • Linux下anaconda安装环境
  • Docker和Docker Compose部署方式的区别以及各自适用的场景(ChatGPT-4o回答)
  • WPF+MVVM案例实战与特效(三十一)- 封装一个加载动画的自定义控件
  • 将一个数组逆序输出。-多语言
  • 【SQL】实战--组合两个表
  • 一、文本预处理
  • mysql order by后进行limit分页查询出现重复数据
  • shell脚本30个案例(五)
  • Spring AI 框架介绍
  • WuCup网络安全技能大赛WP
  • Java 单例模式:深度解析与应用
  • mysql线上问题集合
  • Stable Diffusion 3 论文
  • 淘宝商品数据获取:Python爬虫技术的应用与实践