当前位置: 首页 > article >正文

Partition Strategies kafka分区策略

原文链接 Kafka Partition Strategy

分区策略简介

分区增加了并行化并允许Kafka扩展。存在许多将消息分发到主题分区的策略。在我们深入研究每种策略的背景之前,下面的表格给出了每种策略的简要概述。

Kafka消息由生产者发送,由消费者接收。这两种策略是不同的,所以我们有两个表,一个总结每种策略。

生产者分区策略

策略描述
Default partitioner

key=null,使用 round-robin 方式。

key!=null, 使用 hash 算法

Round-robin partitionerround-robin 方式
Uniform sticky partitioner消息被发送到一个粘性分区(直到 batch.size 符合 或 linger.ms time 到时间 )以减少延迟。
Custom partitioner这种方法实现了Partitioner接口,用一些定义key-to-partition路由策略的自定义逻辑覆盖 partition 方法。

Default partitioner

顾名思义,这是生成消息的默认策略。当键为空时,记录被随机发送到主题的一个可用分区。如果一个键存在,Kafka将对该键进行散列,并使用结果将消息映射到特定的分区。这可确保具有相同键的消息最终位于相同分区中。但是,只有在主题中的分区数量保持不变的情况下,这种映射才是一致的:如果添加了新分区,那么具有相同键的新消息可能会被写入与具有相同键的旧消息不同的分区。

【下图左边是按时间排序(chronological)的message, 从右到左,右边K1:Va 第一个发送】

【下图右边是根据默认分区器路由到3个partition的结果, 添加顺序是从右到左。 相同key的message被分配到同一个partition(但同一个分区会出现不同的key), 将按发生顺序进行消费。】

Round robin partitioner

当生产者希望在所有分区之间平均分配写操作时,使用这种方法。这种分布与键的哈希值(或键为空)无关,因此具有相同键的消息最终可能位于不同的分区中。

【下图左边是按时间排序(chronological)的message, 从右到左,右边K1:Va 第一个发送】

当工作负载因单个键而发生偏差时,即为同一键生成许多消息时,此策略非常有用。假设消息的顺序无关紧要,并且使用默认分区器。在这种情况下,负载不平衡将导致消息在分区中排队,并增加分配给这些分区的消费者子集的负载。轮询策略将导致消息在各个分区之间均匀分布。

Uniform sticky partitioner

目前,当没有指定分区和键时,生产者的默认分区程序以轮询方式对记录进行分区。这意味着一系列连续记录中的每条记录将被发送到不同的分区,直到覆盖所有分区,然后生产者重新开始。虽然这会在分区之间均匀地分布记录,但也会产生更多较小的批,从而导致更多的请求和排队以及更高的延迟。

为了解决这一问题,引入了均匀粘分器。它有两条规则:

1,如果用记录指定了分区,则按原样使用该分区。
2,如果没有指定分区,则会选择一个sticky分区,直到配置文件里的属性 “batch.size” 被填满或 “ linger.ms ” (发送消息前等待的时间)到了。

“固定 (Sticking)”到一个分区可以实现更大的 batch,并减少系统中的延迟。发送 batch 后,sticky分区发生变化。随着时间的推移,记录均匀地分布在所有分区中。

记录键不作为分区策略的一部分使用,因此不能保证具有相同键的记录被发送到相同的分区。

【下图左边是按时间排序(chronological)的message, 从右到左,右边K1:Va 第一个发送】

【下图右边是使用 Uniform Sticky 分区策略的结果, batch满了发送,3个消息一个 batch的发送。】

Custom partitioner

有时,用例与任何标准分区器都不太适合。例如,假设我们想要将事务日志数据写入Kafka,其中一个用户(称为“CEO”)占所有事务的40%以上。

如果使用默认散列分区,“CEO”用户的记录将被分配到与其他用户相同的分区。这将导致一个分区比其他分区大得多,从而导致代理耗尽空间并减慢处理速度。理想的解决方案是为用户“CEO”提供一个专用分区,然后使用散列分区将其余用户映射到剩余的分区。下面是这个用例的简单实现:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

public class PowerUserPartitioner implements Partitioner {

 public void configure(Map<String, ?> configs) {}

 public int partition(String topic, Object key, byte[] keyBytes,
 Object value, byte[] valueBytes,Cluster cluster) {

 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
 int numPartitions = partitions.size();
 if ((keyBytes == null) || (!(key instanceOf String))) 
      throw new InvalidRecordException("Record must have a valid string key");

 if (((String) key).equals("CEO"))
      return numPartitions - 1; // Messages with key “CEO” will always go to the last partition

 // Other records will get hashed to the rest of the partitions
 return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);
 }

 public void close() {}

}

消费者分配策略

策略        描述
Range assignor (default)

(Partitions 总数) / (Consumers总数)个 partitions被分配到每个consumer。

其目的是实现共处分区,即将两个不同主题的相同分区号分配给相同的消费者。(P0 of Topic X and P0 of Topic Y 到同一个consumer)

Round-robin assignor分区被单独挑选并分配给消费者(按照任何合理的顺序,比如从前到后)。当所有的消费者都用完,但仍有一些分区未分配时,将从第一个消费者开始重新分配。这样做的目的是最大化使用的消费者数量。
Sticky assignor这种方法的工作原理类似于轮循分配,但在重新分配分区时保留尽可能多的现有分配。目的是减少或完全避免再平衡过程中的分区移动。
Custom assignor继承 AbstractPartitionAssignor 然后重写  assign method 

消费者和消费者组(Consumers and consumer groups)

消费者是从Kafka读取数据的应用程序。通常,我们有多个生产者向主题写入消息,因此从主题读取和处理数据的单个消费者可能无法跟上传入消息的速度,并且越来越落后。为了从主题扩展消费,Kafka有一个叫做消费者组的概念。顾名思义,消费者群体就是一群消费者。当消费者组中的多个消费者订阅同一主题时,每个消费者接收来自主题中不同分区集的消息,从而在它们之间分发数据。

唯一需要注意的是,一个分区只能分配给单个消费者。向组中添加更多的消费者有助于扩大消费规模。但是,如果消费者的数量多于分区,则一些消费者将保持空闲状态,因为没有任何分区可供它们使用。这个空闲的消费者充当故障转移消费者,允许它在现有消费者发生故障时迅速弥补空缺。

消费者分区分配

就像我们可以控制如何将数据写入主题中的分区一样,Kafka也允许我们决定消费者如何从这些分区中读取数据。我们可以配置用于在消费者实例之间分配分区的策略。属性 “partition.assignment.strategy” 可用于在设置消费者时配置分配策略。

【上图展示了一个topic有三个 partition 如何分配给四个消费者组】

【Consumer Group 1 拥有一个消费者 C11, 负责所有的 Partitions】

【Consumer Group 2 拥有两个消费者 C21和C22, C21负责所有的 Partition 0, P1,P2由C22负责】

【Consumer Group 3 拥有三个消费者, 各自负责一个Partition】

【Consumer Group 4 拥有四个消费者, 各自负责一个Partition, 空闲状态的C44时刻准备接替挂掉的消费者】

以下是消费者分区分配的一些策略:

范围分配器(Range assignor):

这是默认策略,并在每个主题( per-topic)的基础上工作。对于每个主题,可用分区按数字顺序考虑,消费者按字典顺序()考虑。然后将分区数除以消费者数,以确定分配给每个消费者的分区数。如果它没有平均划分,那么前几个消费者将有一个额外的分区。

轮循分配器(Round robin assignor):

它接受所有分区,并以轮循方式将它们分配给消费者。这里的优势在于,它旨在最大化使用的消费者数量。但是,由于这个原因,在重新平衡的情况下,它不能减少分区移动。

Sticky分配器(Sticky assignor):

它的工作原理与轮询分配器非常相似,但有一个额外的优点,因为它在发生分区重新分配时保留了尽可能多的现有分配。当主题分区在重新平衡期间从一个消费者移动到另一个消费者时,这有助于节省一些开销处理(将在后面的小节中讨论)。

自定义分配器(Custom assignor):

也可以为分配器编写自定义逻辑。下面是一个基于分配给消费者的优先级处理故障转移的示例

public class CustomPartitionAssignor extends AbstractPartitionAssignor implements Configurable {

  @Override
  public void configure(final Map<String, ?> configMap) { 
    // define the configs here
  }

  @Override
  public String name() {
    return "CustomPartitionerAssigner";
  }

 @Override
  public Subscription subscription(final Set<String> topicSet) { 
    return new Subscription(
        new ArrayList<>(topicSet), 
        ByteBuffer.wrap(ByteBuffer.allocate(4).putInt(config.priority()))
     );
  }

  @Override
  Map<String, List<TopicPartition>> assign(
                         Map<String, Integer> partitionsTopicMap,
                         Map<String, Subscription> subscriptionsMap)  {

    Stream<ConsumerPriority> consumerOrdered = subscriptionsMap.entrySet()
        .stream()
        .map(x -> {
            int consumerPriority = x.getValue().data().getInt();
            String consumerId = x.getKey();
            return new ConsumerPriority(consumerId, consumerPriority);
        })
        .sorted(Comparator.reverseOrder());

    ConsumerPriority priority = consumerOrdered.findFirst().get();

    final List<TopicPartition> assignments = partitionsTopicMap
        .entrySet()
        .stream()
        .flatMap(entry -> {
            final String topicName = entry.getKey();
            final int partitionsCount = entry.getValue();
            return IntStream.range(0, partitionsCount).mapToObj( i -> new TopicPartition(topicName, i));
        }).collect(Collectors.toList());

    final Map<String, List<TopicPartition>> assignmentMap = new HashMap<>();
    subscriptionsMap.keySet().forEach(memberId -> assignmentMap.put(memberId, Collections.emptyList()));
    assignmentMap.put(priority.memberId, assignments);
    return assignmentMap;
  }
}

这个自定义分配器可以在初始化消费者时使用。

Properties properties = new Properties();
...
...
properties.put(
    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,   
    CustomPartitionAssignor.class.getName()
);
...
...
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

消费者的再平衡(Consumer rebalancing)


消费者可以随时添加到消费者组或从消费者组中删除。当添加新的消费者时,它开始从先前分配给不同消费者的分区中消费消息。类似地,当一个消费者被删除时,它的分区被分配给另一个剩余的活动消费者。这种重新分配和将分区所有权从一个消费者转移到另一个消费者的过程称为再平衡。重新平衡是至关重要的,因为它们提供了高可用性和可伸缩性,允许用户安全地添加和删除消费者。

有两种类型的再平衡。使用哪一个取决于消费者使用的分区分配策略:

急于再平衡:(Eager rebalancing: )

所有消费者停止消费,放弃其分区的所有权,重新加入组,然后获得分配给它们的新分区。当整个组中没有消费者时,这会导致一个小的停机窗口(消费者不可用性)。

协作再平衡:(Cooperative rebalancing: )

也称为增量再平衡( incremental rebalancing),此策略在多个阶段执行再平衡。它涉及到将一小部分分区从一个消费者重新分配给另一个消费者,允许消费者继续处理来自未重新分配的分区的消息,并避免完全不可用。

在正常情况下,重新平衡可能是不可取的。在重新平衡期间,消费者在一段时间内停止处理消息,这会导致处理来自主题的事件的延迟。对于需要实时事件处理并且不能承受超过几秒钟的延迟的用例来说,这可能是有问题的。

Kafka提供了一种有趣的方式来避免这种重新平衡。

静态组成员(Static group membership)

默认的消费者再平衡行为是组中的消费者身份是暂时的。当消费者离开组时,它的分区被撤销;当它重新加入时,它会获得一个新的成员ID,并为它分配一组新的分区。

通过使用唯一的“group.instance”配置消费者,可以使其成为静态组成员。id”属性。当它加入一个组时,将根据所使用的分区分配策略为它分配一组分区。但是,它在重新启动或关闭时不会自动离开组——它仍然是一个成员。在重新连接时,它被识别为其唯一的静态标识,并被重新分配到它所使用的相同分区,而不会触发再平衡。

这对于有状态应用程序非常有用,其中状态由分配给消费者的分区填充。由于这个过程可能很耗时,因此每次消费者重启时都重新创建这个初始状态或缓存是不理想的。

结论

我们已经看到,从生产者和消费者的角度来看,找到正确的分区策略是多么重要。如果没有正确配置,从长远来看,这可能会成为一个问题。决策很大程度上取决于用例的细微差别、数据量等,没有一种方法能解决所有问题。

默认选项可能适用于大多数情况,但有时它们不是正确的选择。例如,如果在生产者端不需要排序,那么循环或统一粘性策略的性能会好得多。类似地,在消费者端,为用例选择默认范围分配器,正如我们在事务日志示例(倾斜负载)中讨论的那样,可能会导致不必要的再平衡和缓慢的处理。

允许自定义分区器和分配器,但它们并不简单,需要深入的技术理解才能正确实现。找到正确的策略是获得Kafka提供的速度和可扩展性优势的关键一步。


http://www.kler.cn/a/447094.html

相关文章:

  • 基于蓝牙通信的手机遥控智能灯(论文+源码)
  • [LeetCode-Python版] 定长滑动窗口1(1456 / 643 / 1343 / 2090 / 2379)
  • flask before_request 请求拦截器返回无值则放行,有值则拦截
  • Mybatis加密解密查询操作(sql前),where要传入加密后的字段时遇到的问题
  • 蓝桥杯练习生第四天
  • 力扣-图论-18【算法学习day.68】
  • <项目代码>YOLO Visdrone航拍目标识别<目标检测>
  • GESP CCF python二级编程等级考试认证真题 2024年12月
  • 基于微信小程序的绘画学习平台
  • 攻防世界easyphp
  • Leetcode中最常用的Java API——util包
  • LeetCode hot100-90
  • 纯css 实现呼吸灯效果
  • TCP/IP协议:网际层相关知识梳理
  • metasploit之ms17_010_psexec模块
  • MapBox实现深蓝色科技风格底图方案
  • android studio更改应用图片,和应用名字。
  • D101【python 接口自动化学习】- pytest进阶之fixture用法
  • Unity3D仿星露谷物语开发6之角色添加动画
  • 麒麟操作系统服务架构保姆级教程(二)ssh远程连接
  • Linux之多线程互斥
  • 前端实现获取后端返回的文件流并下载
  • 【原生js案例】移动端如何实现页面的入场和出场动画
  • 了解cuda的统一内存
  • 复习打卡大数据篇——Apache Hadoop
  • leetcode之hot100---240搜索二维矩阵II(C++)