当前位置: 首页 > article >正文

【Linux】Kafka部署

1、部署

# 解压
[appuser@localhost app]$ tar -zxvf kafka_2.13-3.0.0.tgz

# 修改配置文件(具体配置见下文)
[appuser@localhost app]$ cd kafka_2.13-3.0.0
[appuser@localhost kafka_2.13-3.0.0]$ vim config/server.properties 

# 在所有集群节点启动Kafka
[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-server-start.sh -daemon config/server.properties

# 创建topic
[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-topics.sh --bootstrap-server 10.20.10.1:9092 --create --topic test --partitions 3 --replication-factor 3

# 查询Topic
[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-topics.sh --bootstrap-server [2106:410:a10::911]:9092 --list

# 查询消费组
#[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-consumer-groups.sh --bootstrap-server [2106:410:a10::911]:9092 --list

2、server.properties

kafka_2.13-3.0.0/config/server.properties

############################# Server Basics #############################
#必须为每个代理设置一个唯一的整数
broker.id=1	

############################# Socket Server Settings #############################
#套接字服务器侦听的地址。如果未配置,它将从 java.net.InetAddress.getCanonicalHostName()获取。
#listeners=PLAINTEXT://10.200.34.5:9092
listeners=PLAINTEXT://[2106:410:a10::911]:9092  

#代理生产和消费的主机端口。如果未设置,则在配置时使用“listeners”的值。否则,它将使用从java.net.InetAddress.getCanonicalHostName()返回的值。
# advertised.listeners=PLAINTEXT://10.20.10.1:9092	
advertised.listeners=PLAINTEXT://[2106:410:a10::911]:9092   

#服务器用于从网络接收请求并向网络发送响应的线程数(默认是3),接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3

#消息从内存中写入磁盘是时候使用的线程数量(默认是8)
num.io.threads=8                                               

#套接字服务器使用的发送缓冲区(SO_SNDBUF)       
socket.send.buffer.bytes=102400

#套接字服务器使用的接收缓冲区(SO_RCVBUF)
socket.receive.buffer.bytes=102400

#套接字服务器将接受的请求的最大大小(防止OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################
#日志
log.dirs=/app/kafka_2.13-3.0.0/logs

#每个topic的默认日志分区数。更多的分区允许更大的并行性以供使用,但这也会导致代理之间有更多的文件,建议broker少的话,默认就几个broker 就设置成几个分区
num.partitions=1 

#在启动时用于日志恢复和在关闭时刷新的每个数据目录的线程数。对于数据目录位于RAID阵列中的安装,建议增加此值。
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
#“__consumer_offsets”和“__transaction_state” topic内部的组元数据的复制因子。对于除开发测试之外的任何其他内容,建议使用大于1的值以确保可用性
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

#是否允许自动创建 Topic。
# auto.create.topics.enable=false
#是否允许 Unclean Leader 选举。
# unclean.leader.election.enable=false
#是否允许定期进行 Leader 选举。
auto.leader.rebalance.enable=false

############################# Log Flush Policy (日志刷新政策) #############################  
#消息立即写入文件系统,但默认情况下我们只有fsync()才能同步
#懒惰的操作系统缓存。以下配置控制将数据刷新到磁盘。
#这里有一些重要的权衡:
#1。持久性:如果您不使用复制,则可能会丢失未刷新的数据。
#2。延迟:当刷新确实发生时,非常大的刷新间隔可能会导致延迟峰值,因为会有大量数据需要刷新。
#3。吞吐量:冲洗通常是最昂贵的操作,并且小的冲洗间隔可能导致过多的搜索。
#以下设置允许配置刷新策略以在一段时间后刷新数据或每N条消息(或两者)。这可以在全局范围内完成,并在每个主题的基础上进行覆盖。

#强制刷新数据到磁盘之前要接受的消息数
#log.flush.interval.messages=10000

#强制刷新之前消息可以在日志中停留的最长时间
#log.flush.interval.ms=1000

############################# Log Retention Policy(日志保留政策) #############################
#以下配置控制日志段的处理。政策可以设置为在一段时间后或在累积给定大小后删除段。只要满足这些条件*either*,就会删除一个段。删除总是发生从日志的末尾开始。

# segment文件保留的最长时间,默认保留7天(168小时),超时将被删除,也就是说7天之前的数据将被清理掉
log.retention.hours=168

# 日志的基于大小的保留策略。消息数量大于改值将删除最旧的数据。函数独立于log.retenation.h。
log.retention.bytes=1073741824

#Kafka 调用cleanupLogs的时间间隔 ,该函数对所有 Logs 执行清理操作,(目前不确定 Logs 对应的是 Topic 还是 Partition,目测应当是 Partition)
log.cleanup.interval.mins=2

# 日志段文件的最大大小。达到此大小时,将创建新的日志段。
log.segment.bytes=104857600

#检查日志段以查看是否可以删除日志段的时间间隔
log.retention.check.interval.ms=300000


############################# Zookeeper #############################
#zookeeper集群的地址,可以是多个,多个之间用逗号分割
zookeeper.connect=10.20.11.1:2181,10.20.11.2:2181,10.20.11.3:2181 

#设置连接Zeekeeper超时时间
zookeeper.connection.timeout.ms=6000 


############################# Group Coordinator Settings(组协调员设置) #############################   
#以下配置指定GroupCoordinator将延迟初始消费者重新平衡的时间(以毫秒为单位)。当新成员加入组时,重新平衡将进一步延迟group.initial.rebalance.delay.ms的值,最多为max.poll.interval.ms。默认值为3秒。我们将此覆盖为0,因为它为开发和测试提供了更好的开箱即用体验。但是,在生产环境中,默认值3秒更合适,因为这有助于避免在应用程序启动期间不必要且可能很昂贵的重新平衡。
group.initial.rebalance.delay.ms = 0s

http://www.kler.cn/a/372116.html

相关文章:

  • STM32-笔记11-手写带操作系统的延时函数
  • Batch_Size对神经网络训练效率的影响:一个PyTorch实例分析
  • webserver log日志系统的实现
  • Pandas系列|第二期:Pandas中的数据结构
  • 【论文复现】农作物病害分类(Web端实现)
  • springboot中使用gdal将表中的空间数据转shapefile文件
  • SpringBoot实现的扶贫成效监测平台
  • 保研考研机试攻略:python笔记(2)
  • 【Windows】Redis 部署
  • Unity构建WebGL知识点
  • redis windows 7.0 下载
  • 【BF+4D雷达成像】四维成像汽车MIMO雷达的波束赋形【附MATLAB代码】
  • Python基础10
  • 别玩了!软考初级网络管理员无非就这23页纸!背完稳!
  • 论文学习 | 《锂离子电池健康状态估计及剩余寿命预测研究》
  • riscv uboot 启动流程分析 - SPL启动流程
  • 深入理解Docker,从入门到精通-Part1(基础使用)
  • 如何SSH到Openshift Node上设置相关网口的静态IP
  • LeetCode16:最接近的三数之和
  • 【网页布局技术】项目五 使用CSS设置导航栏
  • HarmonyOS 设备管理
  • 深入浅出梯度下降算法(学习笔记)
  • xlnt加载excel报错:‘localSheetId‘ expected
  • springboot日志配置
  • 力扣算法笔记——生成随机数组
  • 从单体架构到云原生架构演化图示