当前位置: 首页 > article >正文

Kafka集群的常用命令与策略

一、查看topic

bin/kafka-topics.sh --list --zookeeper node10:2181,node11:2181,node12:2181

二、查看topic状态

bin/kafka-topics.sh --describe --zookeeper node10:2181,node11:2181,node12:2181 --topic TestTopic

三、KAFKA常用配置

1、主题配置

(1)# 新创建的主题包含1个分区num.partitions=1

写入和读取数据的速度是1G/s,一个消费者处理速度50M/s,需要20个分区分别由20个消费者处理速度(吞吐量)1G/s

(2)消息配置

# 消息可以保留168小时=7天

log.retention.hours=168

# 消息字节数超过1G就删除

og.retention.bytes=1073741824

# 5分钟检查一次消息是否过期

log.retention.check.interval.ms=300000

# 单个消息的最大100M

message.max.bytes=104857600

2、broker配置

(1)broker信息配置

broker配置  broker消息配置

broker.id=0

port=9092

zookeeper.connect=node10:2181,node11:2181,node12:2181

# 消息保存的磁盘目录

log.dirs=/tmp/kafka-logs

(2)broker消息形式配置

# 不自动创建topic:生产者写入消息,消费者读取消息,发送元数据请求

auto.create.topics.enable=false

3、集群需要多少个Broker

每个broker可以存储2T数据,如果需要保存10T,则需要5T

4、主题的分区和副本放置策略

(1)broker数

所有broker依次分配主分区,下一个broker分配副本,注意:第一个分区随机放,每个分区副本数不能超过broker个数

(2)broker分配

n个broker,i分区分配到(i % n)broker, 其j副本分配到((i+j) % n)broker

例如:5个broker,0分区到0号broker,3副本到3号broker

5、KAFKA偏移量

auto.offset.reset

(1)设置为earliest

当一个分区被一个消费者组已经提交了offset时,同一消费者组从提交的offset开始消费;无提交的offset时,从头开始消费一个新的消费者组进行消费,从头开始

(2)设置为latest

当一个分区被一个消费者组已经提交了offset时,同一消费者组从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

(3)设置为none

topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

6、KAFKA分区消费者关系

一个消费者组的消费者消费所有分区,消费者数=分区数(均分),消费者数<分区数(某消费者消费多个分区),消费者数>分区数(某个消费者不消费分区),不同消费者组互不干扰

7、发送消息push

(1)同步方式发送消息

ProducerRecord<String, String> msg = new ProducerRecord<String, String>("TestTopic4", null, "hello world tomas100");

producer.send(msg).get(); // 同步发送消息,死等broker返回结果

producer.close();

(2)异步发送消息

ProducerRecord<String, String> msg = new ProducerRecord<String, String>("TestTopic4", null, "hello world tomas100");

producer.send(msg); // 异步发送消息

producer.send(msg, new Callback() {

@Override

public void onCompletion(RecordMetadata metadata, Exception exception) {

if(exception != null) {

exception.printStackTrace();

} else {

System.out.println("主题:" + metadata.topic() + " 分区:" + metadata.partition() + " 偏移量:" + metadata.offset());

}

}

}); // 异步发送消息,回调函数

producer.close();

8、接受消息pull

Consumer<String, String> consumer = new KafkaConsumer<String, String>(config);

consumer.subscribe(Collections.singletonList("TestTopic4"));

try {

while (true) {

ConsumerRecords<String, String> msgs = consumer.poll(5000);// 5000毫秒轮询一次

for (ConsumerRecord<String, String> msg : msgs) {

System.out.println(" topic:" + msg.topic() + " partition:" + msg.partition() + " offset:" + msg.offset() + "key:" + msg.key() + " value:" + msg.value());

}

try {

consumer.commitAsync(); // 提交偏移量

} catch (Exception ex) {

ex.printStackTrace();

}

}

} finally {

consumer.close();

}


http://www.kler.cn/a/465648.html

相关文章:

  • 生成模型的现状2025年的新兴趋势
  • Navicat 17 for Mac 数据库管理软件
  • 「Mac畅玩鸿蒙与硬件49」UI互动应用篇26 - 数字填色游戏
  • 【AI编辑器】Cursor与DeepSeek模型的集成:提升开发效率的新选择
  • 【LLM-Agent】Building effective agents和典型workflows
  • Django中自定义模板字符串
  • 从室内到室外:移动机器人的环境适应之旅
  • 企业级网络运维管理系统:构建高效与稳定的基石
  • 电化学气体传感器在物联网中的精彩表现
  • 文本表征的Scaling Laws:Scaling Laws For Dense Retrieval
  • 02.01、移除重复节点
  • 【Ubuntu】安装华为的MindSpore
  • 2、pycharm常用快捷命令和配置【持续更新中】
  • Jetpack Compose 学习笔记(一)—— 快速上手
  • 智能边缘计算×软硬件一体化:开启全场景效能革命新征程(企业开发者作品)
  • kafka小实站
  • SASS 简化代码开发的基本方法
  • AcWing练习题:平均数2
  • 肿瘤免疫循环与肿瘤免疫治疗的关系
  • 《Vue3实战教程》39:Vue3无障碍访问
  • 初学stm32 --- FSMC驱动LCD屏
  • XML里预定义的字符实体引用
  • graylog+sidecar通过docker-compose部署并采集SSH登录日志
  • C++中的常见关键字
  • 如何在Golang中实现协程池
  • 靶机系列|VULNHUB|DC-3