当前位置: 首页 > article >正文

Kylin Server V10 下基于Kraft模式搭建Kafka集群

一、Kraft 模式与 ZooKeeper 模式简介

        在Kafka 2.8 之前,Kafka 重度依赖 ZooKeeper 集群做元数据管理、Controller 的选举等(统称为共识服务);当ZooKeeper 集群性能发生抖动时,Kafka 的性能也会受到很大的影响。如下图所示:

        在 Kafka 2.8 之后,引入了基于 Raft 协议的 Kraft 模式,支持取消对 ZooKeeper 的依赖。在2022 年 10月 3日发布 Kafka 3.3.1 版本之后,将名为 KRaft 的新元数据管理方案标记为生产环境可用。

        在此模式下,一部分 Kafka Broker 被指定为 Controller, 另一部分则为 Broker。这些 Controller 的作用就是以前由 ZooKeeper 提供的共识服务,并且所有的元数据都将存储在 Kafka 主题中并在内部进行管理。

Kraft 模式相比 ZooKeeper 模式的主要优势如下:

  • 运维简化:只需要部署 Kafka, 不再依赖 ZooKeeper。
  • 横向扩展能力提升:Kafka 集群能支持的 Partition 数量是衡量其横向扩展能力的重要指标。此前这个值受 ZooKeeper 与 Controller 之间传递元数据的限制,只能到十万量级,而 Kraft 模式不需要这种传递,因此可以提升到百万量级。
  • 元数据传播提效:元数据通过 Kafka 的 Topic 管理,并利用 Topic 的生产消费传播,集成性更好的同时也提升了一些底层实现的性能。
二、基于 Kraft 模式的 Kafka 集群部署
1.主机规划

主机名

IP 地址

角色

node id

10.8.3.35

Broker,Controller

0

10.8.3.36

Broker,Controller

1

10.8.3.37

Broker,Controller

2

2.部署

(1)编辑 10.8.3.35 上的server.properties 配置文件

[root@localhost kraft]# vi server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License.  You may obtain a copy of the License at

#

#    http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

#

# This configuration file is intended for use in KRaft mode, where

# Apache ZooKeeper is not present.

#

########################### Server Basics ###########################

# The role of this server. Setting this puts us in KRaft mode

process.roles=broker,controller

# The node id associated with this instance's roles

node.id=1

# The connect string for the controller quorum

controller.quorum.voters=1@10.8.3.35:9093,2@10.8.3.36:9093,3@10.8.3.37:9093

########################Socket Server Settings ########################

# The address the socket server listens on.

# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.

# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),

# with PLAINTEXT listener name, and port 9092.

#   FORMAT:

#     listeners = listener_name://host_name:port

#   EXAMPLE:

#     listeners = PLAINTEXT://your.host.name:9092

listeners=PLAINTEXT://10.8.3.35:9092,CONTROLLER://10.8.3.35:9093

# Name of listener used for communication between brokers.

inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker will advertise to clients.

# If not set, it uses the value for "listeners".

advertised.listeners=PLAINTEXT://10.8.3.35:9092

# A comma-separated list of the names of the listeners used by the controller.

# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol

# This is required if running in KRaft mode.

controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details

listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network

num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O

num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600

############################# Log Basics ###########################

# A comma separated list of directories under which to store log files

log.dirs=/usr/local/kafka/logs/kraft-combined-logs

# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.

# This value is recommended to be increased for installations with data dirs located in RAID array.

num.recovery.threads.per.data.dir=1

######################## Internal Topic Settings  ######################

# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"

# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

########################### Log Flush Policy #########################

# Messages are immediately written to the filesystem but by default we only fsync() to sync

# the OS cache lazily. The following configurations control the flush of data to disk.

# There are a few important trade-offs here:

#    1. Durability: Unflushed data may be lost if you are not using replication.

#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.

#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.

# The settings below allow one to configure the flush policy to flush data after a period of time or

# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk

#log.flush.interval


http://www.kler.cn/a/421049.html

相关文章:

  • dns实验3:主从同步-完全区域传输
  • c++预编译头文件
  • ultralytics-YOLOv11的目标检测解析
  • 蓝桥杯准备训练(lesson1,c++方向)
  • SpringMVC:参数传递之日期类型参数传递
  • 要使用 OpenResty 创建一个接口,返回客户端的 IP 地址,并以 JSON 格式输出
  • Leetcode 每日一题 383.赎金信
  • D86【python 接口自动化学习】- pytest基础用法
  • 《Spring Boot 整合 Avro 与 Kafka》
  • C++ 简介
  • 恼人的MAVEN,继续报 xx is present in the local repository, but
  • 第十七届山东省职业院校技能大赛 高职组“信息安全管理与评估”比赛通知
  • 7、硬盘品牌分类介绍:西数 - 计算机硬件品牌系列文章
  • java执行规则引擎
  • LeetCode763. 划分字母区间(2024冬季每日一题 23)
  • 基于STM32的气体泄漏检测器
  • 在21世纪的我用C语言探寻世界本质——字符函数和字符串函数(2)
  • lambda strem流表达式处理工具
  • 第10章 大模型的有害性(下)
  • 初始化webpack应用示例
  • 基于python的某音乐网站热门歌曲的采集与分析,包括聚类和Lda主题分析
  • QT5.14 QML串口助手
  • Docker快速部署RabbitMq
  • 【Vue3】Vue3与React的路由管理对比:详细解析与实战案例!
  • WPF+LibVLC开发播放器-LibVLC在C#中的使用
  • 高速定向广播声光预警系统赋能高速安全管控