当前位置: 首页 > article >正文

浅谈Kafka(三)

浅谈Kafka(三)

文章目录

    • 浅谈Kafka(三)
      • Kafka目录介绍
      • 基础操作
      • JMX接口
      • 消费者是否能够消费指定分区的消息
      • 生产者是否发送消息到leader
      • 创建主题时如何把分区放到不同broker中
      • Kafka新建的分区在哪个目录创建
      • Kafka java示例

Kafka目录介绍

  1. bin:执行脚本
  2. config:配置文件
  3. libs:运行所需要的jar包
  4. logs:日志文件
  5. site-docs:网站的帮助文档

基础操作

  1. 创建topic、生产消息到Kafka、从Kafka消费消息。
# 创建主题test
bin/kafka-topics.sh --create-topic test --bootstrap-server localhost:9092
# 查看目前Kafka中的主题
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 从Kafka中读取消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  1. 图形化操作工具有KafkaTools、Kafka-Eagle等。
  2. 内置性能测试工具
  • kafka-producer-perf-test.sh
  • kafka-consumer-perf-test.sh
# 基于1个分区1个副本的基准测试
bin/kafka-topics.sh --bootstrap-server lcoalhost:9092 --create-topic benchmark --partitions 1 --replication-factor 1
# --throughput指定吞吐量,-l不指定,--record-size指定record数据大小
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 50000000 -–throughput -l -–record-size 1000 –-producer-props  bootstrap.servers=localhost:9092 acks=1
  1. Kafka集群搭建
  • Kafka版本号:kafka_2.12-2.4.1,kafka采用scale开发,2.12为scale的版本号。
1. 安装包的上传解压
2. 修改kafka的config目录下的server.properties配置文件
    broker.id=0 指定broker的id
    log.dirs=/data 指定kafka数据的位置
3. 把安装好的kafka复制到另外两台机器
4. 配置环境变量
  	vi /etc/profile;
    export KAFKA_HOME=/kafka_2.12-2.4.1;
    export PATH = $PATH:${KAFKA_HOME}; 
    source /etc/profile;

JMX接口

  1. JMX接口是一个为应用程序植入管理功能的框架。
  2. 开启Kafka JMX,暴露JMX端口。
export JMX_PORT=9988

消费者是否能够消费指定分区的消息

​ 消费者消费消息时,向broker发出fetch请求去消费特定分区的消息,消费者指定消息在日志中的偏移量,就可以消费从这个位置开始的消息,消费者拥有偏移量的控制权,可以向后回滚去重新消费之前的消息。

生产者是否发送消息到leader

​ 生产者直接将数据发送到broker的leader,不需要在多个节点进行分发,为了帮助生产者做到这点,所有的Kafka节点都可以及时的告知哪些节点是活动的,目标主题分区的leader在哪里。

创建主题时如何把分区放到不同broker中

  1. 副本因子不能大于broker的数量。
  2. 编号为0的第一个分区的第一个副本的放置位置是随机从broker列表中选择。
  3. 其他分区的第一个副本的放置位置相对于第0个分区依次往后移,剩余的副本相对于第一个副本位置由随机产生的nextReplicaShift决定。

Kafka新建的分区在哪个目录创建

  1. Kafka集群驱动前需要设置log.dirs参数即数据的存放目录,这个参数可以配置多个以逗号分隔的目录,这些目录通常分布在不同的磁盘上用于提升读写性能。我们也可以设置log.dir参数,只需要设置其中一个就行了。
  2. 如果log.dirs参数只配置了一个目录,那么分配到各个broker上的分区肯定只能在这目录下创建文件用于存放数据。如果配置了多个目录,Kafka就会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为topic名+分区ID。

Kafka java示例

  1. 打开控制台消费者,等待主题test消息。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  1. 创建生产者并执行发送消息。
package org.lxx.stream.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class SimpleProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.182.171:9092");
        props.put("acks", "-1");
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", 
                  "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                  "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i=0;i<10;i++) {
            producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }

}
  1. 消费者消费消息
package org.lxx.stream.kafka.producer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class SimpleConsumer {

    public static void main(String[] args) {
        System.out.println("Consumer consume...");
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.182.171:9092");
        props.put("group.id", "device-consumer-topic");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", 
                  "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", 
                  "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        List<String> topics = new ArrayList<>();
        topics.add("test");
        consumer.subscribe(topics);
        while(true) {
            ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
    }

}

ConsumerRecord(topic = test, partition = 0, offset = 27, CreateTime = 1724477538955, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = 0)
ConsumerRecord(topic = test, partition = 0, offset = 28, CreateTime = 1724477539012, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = 1)
ConsumerRecord(topic = test, partition = 0, offset = 29, CreateTime = 1724477539012, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = 2)
ConsumerRecord(topic = test, partition = 0, offset = 30, CreateTime = 1724477539013, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 3, value = 3)
ConsumerRecord(topic = test, partition = 0, offset = 31, CreateTime = 1724477539013, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = 4)
ConsumerRecord(topic = test, partition = 0, offset = 32, CreateTime = 1724477539018, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 5, value = 5)
ConsumerRecord(topic = test, partition = 0, offset = 33, CreateTime = 1724477539018, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 6, value = 6)
ConsumerRecord(topic = test, partition = 0, offset = 34, CreateTime = 1724477539018, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 7, value = 7)
ConsumerRecord(topic = test, partition = 0, offset = 35, CreateTime = 1724477539019, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 8, value = 8)
ConsumerRecord(topic = test, partition = 0, offset = 36, CreateTime = 1724477539019, serialized key size = 1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = 9, value = 9)


http://www.kler.cn/news/282895.html

相关文章:

  • 深度学习基础--深度学习网络
  • 服务器内存飙升分析小记
  • PostgreSQL遍历所有的表并设置id为自增主键(基于自建函数)
  • FineReport帆软报表:使用JAVA批量更新报表里的数据集连接名
  • 【python量化分析专题】最新整理的已经实测可用的各类免费股票数据接口之实时交易数据
  • 『大模型笔记』林纳斯·托瓦兹(Linux之父):谈论热议与人工智能的未来!
  • Linux 网络技术栈,看这篇就够了!!
  • 【ACM独立出版 | 厦大主办】第五届计算机科学与管理科技国际学术会议(ICCSMT 2024,10月18-20)
  • 基于web 在线影院系统网站设计与实现
  • 交通流量监测检测系统源码分享 # [一条龙教学YOLOV8标注好的数据集一键训练_70+全套改进创新点发刊_Web前端展示]
  • Qt 调用执行 Python 函数
  • zookeeper服务器动态上下线监听案例
  • 【MySQL数据库管理问答题】第4章 配置 MySQL
  • SpringBoot应用打成ZIP部署包
  • 18.神经网络 - 非线性激活
  • 【机器学习】梯度下降算法
  • 源码编译并安装Squid的方法
  • BEVDet4D:多帧时序信息融合方法详解
  • 富格林:正规方式顺利盈利出金
  • 性能测试的基本概念
  • Pycharm安装报错:Cannot detect a launch configuration 解决办法
  • 吴恩达机器学习笔记 四十五 基于内容的过滤的tensorFlow实现
  • 怎么解决 hash 碰撞,用 C++ 实现 hashMap?
  • Nosql数据库redis集群配置详解
  • Nginx轮询负载均衡配置指南:实现高效请求分发
  • docker常用命令使用dockerfile构建镜像,推送到私有镜像仓库
  • 【AI绘画】Midjourney前置指令/describe、/shorten详解
  • 适配算能BM1684开发板,bmodel推理模型转换
  • 矩阵分块乘法的证明
  • C语言典型例题55