当前位置: 首页 > article >正文

Kafka-broker处理producer请求-leader篇

一、上下文

《Kafka-生产者源码分析》博客中我们了解了Kafka是如何生产数据的,《Kafka-broker粗粒度启动流程》博客中我们了解了KafkaApis中有各种api和对应处理逻辑,其中PRODUCE请求对应了处理produce请求的逻辑,下面我们跟着源码来看下处理细节

class KafkaApis(
  //......
  request.header.apiKey match {
        case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
        ......
  }
}

二、handleProduceRequest

  def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {

    //获取请求体
    val produceRequest = request.body[ProduceRequest]
    //为每个TopicPartition 的不同情况声明不同的responseMap
    //未经授权的
    val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
    //不存在的
    val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
    //无效的
    val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
    //已授权的
    val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
    //缓存结果以避免冗余的授权调用
    val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
      produceRequest.data().topicData().asScala)(_.name())
    //依次循环 topic > partition 来处理
    //这说明这一个请求中需要处理多个 topic 的多个 partition 的数据
        produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition =>
      val topicPartition = new TopicPartition(topic.name, partition.index)
      val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
      if (!authorizedTopics.contains(topicPartition.topic))
        unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
      else if (!metadataCache.contains(topicPartition))
        nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
      else
        try {
          //只有授权且metadataCache中有这个topicPartition 才能走到这
          //校验数据,并把该数据放入authorizedRequestInfo 
          ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)
          authorizedRequestInfo += (topicPartition -> memoryRecords)
        } catch {
            //......
        }
    }

    //回调函数,这里先不展开,后续处理完数据需要返回给producer时再展开
    @nowarn("cat=deprecation")
    def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {...}
    
    def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
      processingStats.forKeyValue { (tp, info) =>
        updateRecordConversionStats(request, tp, info)
      }
    }

    //如果没有有效数据,就立即返回空的情况
    if (authorizedRequestInfo.isEmpty)
      sendResponseCallback(Map.empty)
    else {
      val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID
      val transactionSupportedOperation = if (request.header.apiVersion > 10) genericError else defaultError
      //调用副本管理器将消息追加到副本
      //副本管理器 是一个屏蔽了集群的层 里面既包括本地leader写也包括远程Follower写
      replicaManager.handleProduceAppend(
        timeout = produceRequest.timeout.toLong,
        requiredAcks = produceRequest.acks,
        internalTopicsAllowed = internalTopicsAllowed,
        transactionalId = produceRequest.transactionalId,
        entriesPerPartition = authorizedRequestInfo,
        responseCallback = sendResponseCallback,
        recordValidationStatsCallback = processingStatsCallback,
        requestLocal = requestLocal,
        transactionSupportedOperation = transactionSupportedOperation)

      //如果请求被放入炼狱,它将有一个被保留的引用,因此不能被垃圾回收;因此,我们在这里清除它的数据,以便让GC回收内存,因为它已经附加到日志中
      //当follower的数据追平leader的数据,且leader没有新数据增长时,follower的fetch请求会放入炼狱,来减少带宽的消耗
      produceRequest.clearPartitionRecords()
    }

  }

三、ReplicaManager

最终需要将数据给到ReplicaManager来进行实际的追加

class ReplicaManager(...){
    
  def handleProduceAppend(...){
    val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
    val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
    entriesPerPartition.forKeyValue { (topicPartition, records) =>
      // 生成请求(仅需要验证的请求)应该在“批处理”中每个分区只有一个批处理,但为了安全起见,请检查所有批处理。
      val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)
      transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
      if (transactionalBatches.nonEmpty) 
        topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
    }
    if (transactionalProducerInfo.size > 1) {
      //从这里看出,事务记录应该只包含一个生产者id
      //抛出异常并提示:事务记录包含多个生产者ID
      throw new InvalidPidMappingException("Transactional records contained more than one producer ID")
    }

    //又封装了一层回调
    def postVerificationCallback(...){}

    //如果事务记录包含0个生产者id,不用处理
    if (transactionalProducerInfo.size < 1) {
      postVerificationCallback(
        requestLocal,
        (Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, VerificationGuard])
      )
      return
    }
    
    //事务记录中只能有一个生产者id
    maybeStartTransactionVerificationForPartitions(
      topicPartitionBatchInfo,
      transactionalId,
      transactionalProducerInfo.head._1,
      transactionalProducerInfo.head._2,
      //当事务验证完成时,将要处理的回调封装在任意请求处理程序线程上。传入的本地请求仅在立即执行回调时使用。
      KafkaRequestHandler.wrapAsyncCallback(
        postVerificationCallback,
        requestLocal
      ),
      transactionSupportedOperation
    )
  }

}

1、postVerificationCallback

事务校验完成,会处理回调,执行真正的数据追加

    def postVerificationCallback(...): Unit = {
      val (preAppendErrors, verificationGuards) = results
      //将事务协调器错误转换为已知的生产者响应错误
      val errorResults = preAppendErrors.map {
        //......
      }
      val entriesWithoutErrorsPerPartition = entriesPerPartition.filter { case (key, _) => !errorResults.contains(key) }

      val preAppendPartitionResponses = buildProducePartitionStatus(errorResults).map { case (k, status) => k -> status.responseStatus }

      def newResponseCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = {
        responseCallback(preAppendPartitionResponses ++ responses)
      }

      //执行数据追加操作
      appendRecords(
        timeout = timeout,
        requiredAcks = requiredAcks,
        internalTopicsAllowed = internalTopicsAllowed,
        origin = AppendOrigin.CLIENT,
        entriesPerPartition = entriesWithoutErrorsPerPartition,
        responseCallback = newResponseCallback,
        recordValidationStatsCallback = recordValidationStatsCallback,
        requestLocal = newRequestLocal,
        actionQueue = actionQueue,
        verificationGuards = verificationGuards
      )
    }

2、appendRecords

将消息附加到分区的leader副本,并等待它们复制到其他副本;当超时或满足所需的ack时,将触发回调函数;如果回调函数本身已经在某个对象上同步,则传递此对象以避免死锁

注意,所有挂起的延迟检查操作都存储在队列中。ReplicaManager.appendRecords() 的所有调用者都应该对所有受影响的分区调用ActionQueue.tryCompleteActions,而不会持有任何冲突的锁(将多线程锁的问题转成线性队列操作来提升性能)

  def appendRecords(t...): Unit = {
    //验证 acks 必须是 0 1 -1 三种类型
    if (!isValidRequiredAcks(requiredAcks)) {
      sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)
      return
    }

    val sTime = time.milliseconds
    //向本地log写入数据
    val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
      origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap)
    //生产到本地日志中用了多长时间
    debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

    //下面的我们后续博客再接着分析,本地写完log,
    //按理论知识:如果ack=0或者1,就可以直接返回了,如果是-1就需要等待备份数据写入成功
    val produceStatus = buildProducePartitionStatus(localProduceResults)

    addCompletePurgatoryAction(actionQueue, localProduceResults)
    recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
      k -> v.info.recordValidationStats
    })

    maybeAddDelayedProduce(
      requiredAcks,
      delayedProduceLock,
      timeout,
      entriesPerPartition,
      localProduceResults,
      produceStatus,
      responseCallback
    )
  }

3、appendToLocalLog

接下来我们看看kafka是如何将消息附加到本地副本日志的

  private def appendToLocalLog(...){

    //按照每个topic > 分区 来写
    entriesPerPartition.map { case (topicPartition, records) =>
      brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
      brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()

      //如果不允许,则拒绝附加到内部主题
      if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
        (topicPartition, LogAppendResult(
          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
          Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")),
          hasCustomErrorMessage = false))
      } else {
        try {
          val partition = getPartitionOrException(topicPartition)
          //接下来交给Partition向leader追加数据
          val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal,
          //......省略.....
        } catch {
         //......
        }
      }
    }

  }

四、Partition

它是表示topic partition的数据结构。leader负责维护AR、ISR、CUR、RAR

并发注意事项:

1、分区是线程安全的。分区上的操作可以从不同的请求处理程序线程并发调用

2、ISR更新使用读写锁同步。读锁用于检查是否需要更新,以避免在不执行更新的情况下获取副本的常见情况下获取写锁。在执行更新之前,在写锁下第二次检查ISR更新条件

3、在保持ISR写锁的同时,处理各种其他操作,如leader更改。这可能会在生成和副本获取请求中引入延迟,但这些操作通常不常见

4、使用ISR读锁同步HW更新

5、锁用于防止在ReplicaAlterDirThread执行时更新follower副本。可以使用ReplaceCurrentWithFutureReplica()用未来的副本替换follower副本。

1、appendRecordsToLeader

ReplicaManager调用了Partition的该方法来继续执行数据的追加操作

  def appendRecordsToLeader(...): LogAppendInfo = {

        val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
      leaderLogIfLocal match {
        case Some(leaderLog) =>
          //最小的ISR
          val minIsr = effectiveMinIsr(leaderLog)
          val inSyncSize = partitionState.isr.size

          //如果没有足够的insync副本来保证安全,请避免写信给leader
          //如果目前的ids < 最小的isr 要求,且还要求了 acks = -1 就直接返回异常
          if (inSyncSize < minIsr && requiredAcks == -1) {
            //这个 topic 的 partition 当前的isr 集合 为 partitionState.isr  不足以满足  min.isr 要求
            throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +
              s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
          }
          //将追加数据的任务委托给leaderLog既:UnifiedLog
          val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
            interBrokerProtocolVersion, requestLocal, verificationGuard)
          //我们可能需要增加高水位,因为ISR可能会降至1
          (info, maybeIncrementLeaderHW(leaderLog))
        case None =>
          //抛出异常:不能在该broker为xx分区的ledaer写入数据
      }
    }
    info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)
  }

五、UnifiedLog

本地和分层日志段的统一视图。

1、appendAsLeader

  //将此消息集附加到本地日志的活动段 segment ,分配偏移量和分区前导纪元
  def appendAsLeader(...): LogAppendInfo = {
    //这里需要验证log的来源,从而决定是否有对offset验证的必要
    //当下log是来自客户端,要写入的对象是RAFT_LEADER,筏头的意思,kafka中有一个HW的概念
    //我理解的意思是:此时要写入的数据是在HW之上的,因此称之为木筏leader
    //总共有四种来源:既REPLICATION(副本)、COORDINATOR(组协调员和事务)、CLIENT(客户端)、RAFT_LEADER(leader)
    val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER
    //将此消息集附加到本地日志的活动段,必要时滚动到新段。
    //此方法通常负责为消息分配偏移量,但是如果传递了assignOffsets=false标志,我们将只检查现有的偏移量是否有效。
    append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)
  }

2、append 

  private def append(...): LogAppendInfo = {
    //我们希望确保在将任何日志数据写入磁盘之前,将分区元数据文件写入日志目录。
    //这将确保在发生故障时,可以使用正确的topic ID恢复任何日志数据。
    //可能要写元数据了,这里用了flush,因为元数据比较重要,数据到pagecache后需要直接写入磁盘
    maybeFlushMetadataFile()

    //对要追加的数据进行解析和校验,校验的有以下几点
    //1、每条消息都与其CRC匹配,循环冗余校验
    //2、每个消息大小是否有效
    //3、传入记录批的序列号与现有状态一致
    //4、offset是否单调递增
    //也需要计算下面纬度的数量:
    //1、第一条消息的 offset
    //2、最后一条消息的offset
    //3、消息条数
    //4、有效字节数
    //5、偏移量是否单调递增
    //6、是否使用了任何压缩编解码器(如果使用了多个,则给出最后一个)
    val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch)

    // 如果我们没有有效的消息,或者这是最后一个附加条目的重复,则返回
    if (appendInfo.validBytes <= 0) appendInfo
    else {
      // 在将任何无效字节或部分消息附加到磁盘日志之前,请先对其进行修剪
      var validRecords = trimInvalidBytes(records, appendInfo)

      // 它们是有效的,请将其插入日志中
      lock synchronized {
        maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
          //主要是检查 log 的日志文件 的 mmap 是否关闭
          localLog.checkIfMemoryMappedBufferClosed()
          if (validateAndAssignOffsets) {
            //为消息集分配偏移量
            val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
            appendInfo.setFirstOffset(offset.value)
            val validateAndOffsetAssignResult = try {
              val targetCompression = BrokerCompressionType.targetCompression(config.compression, appendInfo.sourceCompression())
              val validator = new LogValidator(validRecords,
                topicPartition,
                time,
                appendInfo.sourceCompression,
                targetCompression,
                config.compact,
                config.recordVersion.value,
                config.messageTimestampType,
                config.messageTimestampBeforeMaxMs,
                config.messageTimestampAfterMaxMs,
                leaderEpoch,
                origin,
                interBrokerProtocolVersion
              )
              validator.validateMessagesAndAssignOffsets(offset,
                validatorMetricsRecorder,
                requestLocal.getOrElse(throw new IllegalArgumentException(
                  "requestLocal should be defined if assignOffsets is true")
                ).bufferSupplier
              )
            } catch {
              //......
            }

            validRecords = validateAndOffsetAssignResult.validatedRecords
            appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
            appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp)
            appendInfo.setLastOffset(offset.value - 1)
            appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
            if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
              appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs)

            // 如果消息大小有可能发生变化(由于重新压缩或消息格式转换),则对其进行电子验证
            if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {
              validRecords.batches.forEach { batch =>
                if (batch.sizeInBytes > config.maxMessageSize) {
                  // 我们记录原始消息集大小,而不是修剪后的大小,以与预压缩字节RejectedRate记录保持一致
                  brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                  brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                  throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
                    s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
                }
              }
            }
          } else {
            //我们 使用 自己给的offsets
            if (appendInfo.firstOrLastOffsetOfFirstBatch < localLog.logEndOffset) {
              // 如果日志为空,我们仍然可以恢复,例如:从未批处理对齐的leader上的日志开始偏移量中获取,这可能是由于AdminClient#deleteRecords()造成的
              val hasFirstOffset = appendInfo.firstOffset != UnifiedLog.UnknownOffset
              val firstOffset = if (hasFirstOffset) appendInfo.firstOffset else records.batches.iterator().next().baseOffset()

              val firstOrLast = if (hasFirstOffset) "First offset" else "Last offset of the first batch"
              throw new UnexpectedAppendOffsetException(...)
            }
          }

          // 用领导者标记在消息上的纪元更新纪元缓存
          validRecords.batches.forEach { batch =>
            if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
              maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
            } else {
              // 在部分升级场景中,我们可能会对消息格式进行临时回归。为了确保领导人选举的安全性,我们清除了纪元缓存,以便在下一次领导人选举后恢复到 HW 截断。
              leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
                warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
                cache.clearAndFlush()
              }
            }
          }

          // 检查消息集大小可能超过config.segmentSize  这一批次消息的大小不能超过 整个段的大小
          if (validRecords.sizeInBytes > config.segmentSize) {
            throw new RecordBatchTooLargeException(......)
          }

          //如果该段已满,可能会滚动日志
          val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)

          val logOffsetMetadata = new LogOffsetMetadata(
            appendInfo.firstOrLastOffsetOfFirstBatch,
            segment.baseOffset,
            segment.size)

          // 现在我们有了有效的记录、分配的偏移量和更新的时间戳,我们需要验证生产者的幂等/事务状态,并收集一些元数据

          val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
            logOffsetMetadata, validRecords, origin, verificationGuard)

          maybeDuplicate match {
            case Some(duplicate) =>
              appendInfo.setFirstOffset(duplicate.firstOffset)
              appendInfo.setLastOffset(duplicate.lastOffset)
              appendInfo.setLogAppendTime(duplicate.timestamp)
              appendInfo.setLogStartOffset(logStartOffset)
            case None =>
              //追加记录,并在追加后立即递增本地日志结束偏移量,因为对下面事务索引的写入可能会失败,
              // 我们希望确保未来追加的偏移量仍然单调增长。恢复日志目录后,将清理由此产生的事务索引不一致。
              // 请注意,如果事务索引的追加失败,ProducerStateManager的结束偏移量将不会更新,最后一个稳定偏移量也不会前进。
              //这里就开始将记录插入 log 和索引文件了 并更新最后的 offset
              localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.shallowOffsetOfMaxTimestamp, validRecords)
              updateHighWatermarkWithLogEndOffset()

              //更新生产者状态
              updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo))

              // 用真实的最后一个稳定偏移量更新事务索引。
              // 使用READ_COMMITTED的消费者可见的最后一个偏移量将受到此值和高水印的限制。
              completedTxns.foreach { completedTxn =>
                val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
                segment.updateTxnIndex(completedTxn, lastStableOffset)
                producerStateManager.completeTxn(completedTxn)
              }

              // 始终更新最后一个生产者id映射偏移量,以便快照反映当前偏移量,即使没有写入任何幂等数据
              producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)

              // 更新第一个不稳定偏移量(用于计算LSO)
              maybeIncrementFirstUnstableOffset()

              trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
                s"first offset: ${appendInfo.firstOffset}, " +
                s"next offset: ${localLog.logEndOffset}, " +
                s"and messages: $validRecords")

              //localLog.unflushedMessages   :  还没有 flush 的消息数
              //config.flushInterval 为  flush.messages配置  默认 Long.MAX_VALUE
              //说明:
              //此设置允许指定一个间隔,在该间隔内,我们将强制对写入日志的数据进行fsync。
              // 例如,如果将其设置为1,我们将在每条消息后进行fsync;如果是5,我们将在每5条消息后进行fsync。
              // 一般来说,我们建议您不要设置此选项,而是使用副本来提高持久性,并允许操作系统的后台刷新功能,因为它更高效。
              // 此设置可以按topic覆盖(请参阅<a href=\“#topicconfigs\”>按主题配置部分</a  很灵活,可以为topic设置不同的 flush策略
              if (localLog.unflushedMessages >= config.flushInterval) flush(false)
          }
          appendInfo
        }
      }
    }
  }

六、LocalLog

用于在本地存储消息的仅追加日志。日志是一系列LogSegments,每个LogSegments都有一个基本偏移。根据可配置的策略创建新的日志段,该策略控制给定段的字节大小或时间间隔。

因此数据寻址的逻辑是:基地址+偏移地址(基地址找到具体的LogSegment,再根据偏移地址找到文件中的具体数据)

  private[log] def append(...): Unit = {
    //向本地活动段中追加数据
    segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)
    //更新LEO:既log末尾的offset
    updateLogEndOffset(lastOffset + 1)
  }

七、 LogSegment

日志的一部分。每个段有两个组成部分:日志和索引。日志是一个包含实际消息的FileRecords。该索引是一个从逻辑偏移映射到物理文件位置的OffsetIndex。每个段都有一个基本偏移量,该偏移量<=该段中任何消息的最小偏移量,>任何先前段中的任何偏移量。

基偏移量为[base_offset]的段将存储在两个文件中,一个[base_ooffset].index和一个[base_offset].log文件。

    public void append(long largestOffset,
                       long largestTimestampMs,
                       long shallowOffsetOfMaxTimestamp,
                       MemoryRecords records) throws IOException {
        if (records.sizeInBytes() > 0) {
            //在位置{}的末尾偏移量{}处插入{}个字节,在偏移量{}处插入最大的时间戳{}
            LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}",
                records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp);
            int physicalPosition = log.sizeInBytes();
            if (physicalPosition == 0)
                rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);

            ensureOffsetInRange(largestOffset);

            //数据真正写的地方
            long appendedBytes = log.append(records);
            LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset);
            // 更新内存中的最大时间戳和相应的偏移量。
            if (largestTimestampMs > maxTimestampSoFar()) {
                maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
            }
            // 在索引中附加一个条目(如果需要)
            //这证明并不是每次写数据都会向索引中写入标记,因此索引指向了一段数据
            //需要累计写入的数据 > 索引中条目的大致字节数(index.interval.bytes 默认4096字节)
            if (bytesSinceLastIndexEntry > indexIntervalBytes) {
                offsetIndex().append(largestOffset, physicalPosition);
                timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());
                bytesSinceLastIndexEntry = 0;
            }
            bytesSinceLastIndexEntry += records.sizeInBytes();
        }
    }

1、log追加

FileRecords

由文件支持的 Records 实现。可以将可选的开始和结束位置应用于此实例,以允许对一系列日志记录进行切片。

    public int append(MemoryRecords records) throws IOException {
        int written = records.writeFullyTo(channel);
        size.getAndAdd(written);
        return written;
    }

MemoryRecords

 由ByteBuffer支持的 Records实现。这仅用于读取或就地修改记录批的现有缓冲区。

    //将所有记录写入给定通道(包括部分记录)。
    public int writeFullyTo(GatheringByteChannel channel) throws IOException {
        buffer.mark();
        int written = 0;
        //并没有调 flush ,这里只是写入了 pagecache 中 
        //需要考内核的机制将其flush到磁盘 (linux系统中可以配置参数 当到达总内存的多少后或者脏页机制触发去写)
        while (written < sizeInBytes())
            written += channel.write(buffer);
        buffer.reset();
        return written;
    }

2、offsetIndex追加

OffsetIndex

   public void append(long offset, int position) {
        lock.lock();
        try {
            if (isFull())
                throw new IllegalArgumentException(...);

            if (entries() == 0 || offset > lastOffset) {
                log.trace("Adding index entry {} => {} to {}", offset, position, file().getAbsolutePath());
                //利用mmap的特性将数据写入内核的pagecache
                mmap().putInt(relativeOffset(offset));
                mmap().putInt(position);
                incrementEntries();
                lastOffset = offset;
                if (entries() * ENTRY_SIZE != mmap().position())
                    throw new IllegalStateException(...);
            } else
                throw new InvalidOffsetException(...);
        } finally {
            lock.unlock();
        }
    }

3、timeIndex追加

TimeIndex

    public void maybeAppend(long timestamp, long offset) {
        maybeAppend(timestamp, offset, false);
    }

    public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {
        lock.lock();
        try {
            if (!skipFullCheck && isFull())
                throw new IllegalArgumentException(...);

            // 当偏移量等于最后一个条目的偏移量时,我们不会抛出异常。这意味着我们正试图插入与最后一个条目相同的时间索引条目。
            // 如果要插入的时间戳索引条目与最后一个条目相同,我们只需忽略插入,因为这可能会在以下两种情况下发生:
            // 1.日志段关闭了
            // 2.当滚动活动日志段时,会调用LogSegment.onBecomeActiveSegment()。
            if (entries() != 0 && offset < lastEntry.offset)
                throw new InvalidOffsetException(...);
            if (entries() != 0 && timestamp < lastEntry.timestamp)
                throw new IllegalStateException(...);

            // 只有当时间戳大于最后插入的时间戳时,我们才会附加到时间索引。
            // 如果所有消息都是消息格式v0,则时间戳将始终为NoTimestamp。在这种情况下,时间索引将为空。
            if (timestamp > lastEntry.timestamp) {
                log.trace("Adding index entry {} => {} to {}.", timestamp, offset, file().getAbsolutePath());
                //同样是调用mmap将数据写入pagecache
                MappedByteBuffer mmap = mmap();
                mmap.putLong(timestamp);
                mmap.putInt(relativeOffset(offset));
                incrementEntries();
                this.lastEntry = new TimestampOffset(timestamp, offset);
                if (entries() * ENTRY_SIZE != mmap.position())
                    throw new IllegalStateException(...);
            }
        } finally {
            lock.unlock();
        }
    }

八、总结

1、producer调用send发送数据

2、kafka调用对应的api进行处理

3、获取请求体中的数据

4、校验是否有有效数据,如果没有立即返回

5、调用副本管理器(ReplicaManager)将数据进行追加

6、校验事务完成后处理回调,执行真正的数据追加

7、对acks进行校验(必须是0、1、-1)

8、循环处理这次请求中的每个topic、partition,调用Partition进行数据追加

9、获取最小的ISR以及可以正常写的副本数量,如果存活的副本节点数量<最小ISR数量,且请求中的acks=-1,里面抛出异常

10、将数据委托给UnifiedLog进行追加

11、根据数据的来源对offset设置校验等级

12、写入元数据(因为元数据重要,因此要立马flush到磁盘)

13、再次对数据进行解析和校验(CRC、消息大小是否有效、序列号是否一致、offset是否单调递增)

14、校验日志的mmap是否关闭

15、为数据分配offset

16、校验数据大小是否超过了段大小

17、委托给LocalLog向本地活动段追加数据

18、委托给LogSegment进行数据追加

19、调用FileRecords、MemoryRecords将数据写入pagecache

20、判断累计数据量是否>index.interval.bytes 默认4096字节,如果大于开始写入索引

21、调用OffsetIndex将offset索引利用mmap写入pagecache

22、调用TimeIndex将time offset索引利用mmap写入pagecache(time的索引执行了offset索引,offset索引指向了真正位置)

23、更新该broker中的HW和LEO


http://www.kler.cn/a/389897.html

相关文章:

  • Net Core微服务入门全纪录(三)——Consul-服务注册与发现(下)
  • 【Unity3D】利用Hinge Joint 2D组件制作绳索效果
  • 谷歌宣布没 JavaScript 将无法启动搜索,居然引起了轩然大波
  • 51c大模型~合集106
  • linux手动安装mysql5.7
  • 【Uniapp-Vue3】uni-api交互反馈showToast的使用方法
  • Solon MVC 的 @Mapping 用法说明
  • 人工智能技术将逐步渗透到我们生活的每个角落
  • 《Rust语言圣经》Rust教程笔记17:2.Rust基础入门(2.6模式匹配)2.6.2解构Rust Option<T>
  • 三级等保安全解决方案,实施方案,整改方案(Word,PPT等相关资料学习)
  • docker compose - 设置名字
  • 根文件系统ROOTFS
  • 前端跨域~简述
  • 技术整合与生态构建:Lyft与Mobileye引领自动驾驶新纪元
  • Git核心概念
  • 解决”重复文件名重命名“问题【根据Word系统方式】
  • Qt文件系统-二进制文件读写
  • 【Django】Clickjacking点击劫持攻击实现和防御措施
  • 数组类算法【leetcode】
  • 「IDE」VS2022插件 Visual Assist X 番茄助手介绍说明
  • Python小游戏24——小恐龙躲避游戏
  • 使用 Elasticsearch 构建食谱搜索(一)
  • RSTP的配置
  • DNS Resolver解析服务器出口IP查询
  • 2024 年 Apifox 和 Postman 对比介绍详细版
  • vue3 动态路由+动态组件+缓存应用