当前位置: 首页 > article >正文

Windows10、CentOS Stream9 环境下安装kafka_2.12-3.6.2记录

目录

  • Windows下操作
    • 1. 安装kafka [kafka_2.12-3.6.2](https://downloads.apache.org/kafka/3.6.2/kafka_2.12-3.6.2.tgz)
    • 2. 启动Zookeeper
      • 2.1 进入Kafka的config目录,修改zookeeper.properties配置文件
    • 3. 启动kafka
      • 3.1 进入Kafka的config目录,修改server.properties配置文件
    • 4.消费主题
      • 4.1 创建主题
      • 4.2 查询主题
        • 4.2.1 查询指定主题信息
      • 4.3 修改主题
      • 4.4 删除主题
    • 5. 生产数据
      • 5.1 工具操作
    • 6. 消费数据
    • 7. 源码关联(可忽略)
    • 8. Kafka集群部署
      • 8.1 安装Zookeeper
      • 8.2 安装kafka
      • 8.3 启动脚本
  • Linux下操作
    • 1. 安装zookeeper
    • 2. 安装kafka

Windows下操作


1. 安装kafka kafka_2.12-3.6.2

官网: https://kafka.apache.org/downloads

安装Java8(未来Kafka 4.X版本会完全弃用Java8)

文件目录结构如下:

binlinux系统下可执行脚本文件
bin/windowswindows系统下可执行脚本文件
config配置文件
libs依赖类库
licenses许可信息
site-docs文档
logs服务日志

2. 启动Zookeeper

kafka_2.12-3.6.2该版本内部依赖ZooKeeper进行多节点协调调度,已内置ZooKeeper。


2.1 进入Kafka的config目录,修改zookeeper.properties配置文件

dataDir=F:/Apache/kafka_2.13-3.6.2/data/zk

ZooKeeper数据存放目录

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

启动命令:

zookeeper-server-start.bat ../../config/zookeeper.properties

在解压目录下创建快速启动脚本:zk.cmd

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

可快速启动zookeeper


3. 启动kafka

如果要指定jdk版本,在kafka-run-class.bat里面添加一行set JAVA_HOME=F:/Java/jdk/jdk-17.0.11是可行的


3.1 进入Kafka的config目录,修改server.properties配置文件

log.dirs=F:/Apache/kafka_2.13-3.6.2/data/kafka

kafka数据存放目录

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

启动命令:

kafka-server-start.bat ../../config/server.properties

在解压目录下创建快速启动脚本:kfk.cmd

call bin/windows/kafka-server-start.bat config/server.properties

4.消费主题

消息发布/订阅(Publish/Subscribe)、将不同的消息进行分类,分成不同的主题(Topic)


4.1 创建主题

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

Kafka通过kafka-topics.bat指令文件进行消息主题操作(主题查询、创建、删除等功能)

  • 调用指令创建主题时,需要传递多个参数,而且参数的前缀为两个横线。

  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。

  • –create : 表示对主题的创建操作,是个操作参数,后面无需增加参数值。

  • –topic : 主题的名称,后面接的参数值一般是见名知意的字符串名称,类似于java中的字符串类型标识符名称,也可以使用数字,只不过最后还是当成数字字符串使用。

创建命令:

kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test

4.2 查询主题

Kafka通过kafka-topics.bat文件进行消息主题操作。(主题的查询,创建,删除等功能)

  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

  • –list : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值

kafka-topics.bat --bootstrap-server localhost:9092 --list

4.2.1 查询指定主题信息
  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

  • –describe : 查看主题的详细信息

  • –topic : 查询的主题名称

kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test

4.3 修改主题

Kafka通过kafka-topics.bat文件进行消息主题操作。(主题的查询,创建,删除等功能)

  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

  • –alter : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值

  • –topic : 修改的主题名称

  • –partitions : 修改的配置参数:分区数量

kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2

4.4 删除主题

Kafka通过kafka-topics.bat文件进行消息主题操作。(主题的查询,创建,删除等功能)

  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

  • –delete: 表示对主题的删除操作,是个操作参数,后面无需增加参数值。默认情况下,删除操作是逻辑删除,也就是说数据存储的文件依然存在,但是通过指令查询不出来。如果想要直接删除,需要在server.properties文件中设置参数delete.topic.enable=true

  • –topic : 删除的主题名称

kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete

windows系统中由于权限或进程锁定的问题,删除topic会导致kafka服务节点异常关闭。(不然直接直接删除data下的数据)


5. 生产数据

Kafka通过kafka-console-producer.bat文件进行消息生产者操作

  • 调用指令时,需要传递多个参数,而且参数的前缀为两个横线
  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

输入生产数据:

kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

输入消息,回车发送到kafka服务器


5.1 工具操作

Kafka Web UI :

  • kafdrop https://github.com/obsidiandynamics/kafdrop

指定的话:java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar kafdrop-4.0.2.jar --kafka.brokerConnect=localhost:9092

访问http://localhost:9000/

  • kafka Tool(有点难用)
  • Java API
  <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.1</version>
        </dependency>
    </dependencies>

创建类:


package cn.coisini;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;

public class KafkaProducerTest {
    public static void main(String[] args) {
        // 配置属性集合
        Map<String, Object> configMap = new HashMap<>();
        // 配置属性:Kafka服务器集群地址
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 创建Kafka生产者对象,建立Kafka连接
        // 构造对象时,需要传递配置参数
        KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
        // 准备数据,定义泛型
        // 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key1", "value1");
        // 生产(发送)数据
        producer.send(record);
        // 关闭生产者连接
        producer.close();
    }
}

6. 消费数据

消息通过Kafka生产者客户端发送到Kafka服务器中,暂存在Kafka中,我们可通过Kafka消费者客户端对服务器指定主题的消息进行消费。

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

Kafka通过kafka-console-consumer.bat文件进行消息消费者操作。

  • 调用指令时,需要传递多个参数,而且参数的前缀为两个横线。
  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。
  • –topic : 主题的名称,后面接的参数值就是之前已经创建好的主题名称。其实这个参数并不是必须传递的参数,因为如果不传递这个参数的话,那么消费者会消费所有主题的消息。如果传递这个参数,那么消费者只能消费到指定主题的消息数据。
  • –from-beginning : 从第一条数据开始消费,无参数值,是一个标记参数。默认情况下,消费者客户端连接上服务器后,是不会消费到连接之前所生产的数据的。也就意味着如果生产者客户端在消费者客户端连接前已经生产了数据,那么这部分数据消费者是无法正常消费到的。所以在实际环境中,应该是先启动消费者客户端,再启动生产者客户端,保证消费数据的完整性。增加参数后,Kafka就会从第一条数据开始消费,保证消息数据的完整性。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

JavaAPi调用:

public class KafkaConsumerTest {
    public static void main(String[] args) {
        // 配置属性集合
        Map<String, Object> configMap = new HashMap<String, Object>();
        // 配置属性:Kafka集群地址
        configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化
        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚)
        configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        // 配置属性: 消费者组
        configMap.put("group.id", "coisini");
        // 配置属性: 自动提交偏移量
        configMap.put("enable.auto.commit", "true");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configMap);
        // 消费者订阅指定主题的数据
        consumer.subscribe(Collections.singletonList("test"));
        while ( true ) {
            // 每隔100毫秒,抓取一次数据
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));
            // 打印抓取的数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("K = " + record.key() + ", V = " + record.value());
            }
        }
    }
}

7. 源码关联(可忽略)

  • 解压源码:kafka-3.6.2-src.tgz
  • 安装JDK17和Scala2.13

JDK17:https://download.oracle.com/java/17/latest/jdk-17_windows-x64_bin.exe

进入Scala官方网站https://www.scala-lang.org/下载Scala压缩包scala-2.13.12.zip。

在IDEA中安装Scala插件

在项目Platform Settings中的Global Libraries选择scala-2.13.12

  • 安装Gradle

进入Gradle官方网站https://gradle.org/releases/下载Gradle安装包.

新增系统环境GRADLE_HOME,指定gradle安装路径,并将%GRADLE_HOME%\bin添加到path中

Gradle安装及环境变量配置完成之后,打开Windows的cmd命令窗口,输入gradle –version

在解压缩目录中打开命令行,依次执行gradle idea命令

在命令行中执行gradle build --exclude-task test命令

使用IDE工具IDEA打开该项目目录


8. Kafka集群部署

创建文件夹kafka-cluster

目录不能太深,最好放根目录,避免命令行执行不了。


8.1 安装Zookeeper

使用内置ZooKeeper

  • 解压kafka_2.13-3.6.2.tgz文件,修改目录目录名为zookeeper

  • 修改config/zookeeper.properties文件

    dataDir=F:/kafka-cluster/zookeeper/data
    clientPort=2181
    
    • 数据目录,会自动创建data目录

    • ZooKeeper默认端口为2181


8.2 安装kafka

节点broker

  • 解压kafka_2.13-3.6.2.tgz文件,修改目录目录名为kafka-node-1

  • 修改config/server.properties配置文件

    broker.id=1
    listeners=PLAINTEXT://:9091
    log.dirs=F:/kafka-cluster/kafka-node-1/data
    zookeeper.connect=localhost:2181/kafka
    
    • kafka节点数字标识,集群内具有唯一性

    • 放开注释,监听器 9091为本地端口,如果冲突,需重新指定

    • 数据目录,会自动创建data目录

    • ZooKeeper软件连接地址,2181为默认的ZK端口号 /kafka 为ZK的管理节点

分别拷贝修改目录名kafka-node-2kafka-node-3

配置文件中 broker.id=1 改为 broker.id=2broker.id=3

配置文件中 端口 9091 改为 90929093

配置文件中 数据目录kafka-broker-1 改为 kafka-node-2kafka-node-3


8.3 启动脚本

  • kafka-zookeeper 目录下 创建 zk.cmd

    call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
    
  • kafka-broker-1、2、3目录下 创建 kfk.cmd

    call bin/windows/kafka-server-start.bat config/server.properties
    
  • kafka-cluster 集群目录下 创建启动 cluster.cmd 批处理文件

    cd kafka-zookeeper
    start zk.cmd
    ping 127.0.0.1 -n 10 >nul
    cd ../kafka-node-1
    start kfk.cmd
    cd ../kafka-node-2
    start kfk.cmd
    cd ../kafka-node-3
    start kfk.cmd
    
    • 创建清理和重置kafka数据的 cluster-clear.cmd 批处理文件
cd kafka-zookeeper
rd /s /q data
cd ../kafka-node-1
rd /s /q data
cd ../kafka-node-2
rd /s /q data
cd ../kafka-node-3
rd /s /q data


Linux下操作


1. 安装zookeeper

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.2/apache-zookeeper-3.7.2-bin.tar.gz

tar -zxvf apache-zookeeper-3.7.2-bin.tar.gz

mv apache-zookeeper-3.7.2-bin zookeeper-3.7.2

mkdir zookeeper-3.7.2/zkData

echo “1” > zookeeper-3.7.2/zkData/myid

cd zookeeper-3.7.2/conf/

mv zoo_sample.cfg zoo.cfg

vi zoo.cfg

修改数据目录:

dataDir=/opt/coisini/kafka/zookeeper-3.7.2/zkData

cd …

服务命令:

bin/zkServer.sh start

bin/zkServer.sh stop

bin/zkServer.sh status


2. 安装kafka

官网: https://kafka.apache.org/downloads

如果要指定jdk版本,在kafka-run-class.sh里面添加一行set JAVA_HOME=/opt/coisini/java/jdk-17.0.12是可行的

wget https://downloads.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz

tar -zxvf kafka_2.13-3.6.2.tgz

mkdir kafka_2.13-3.6.2/kfkData

cd kafka_2.13-3.6.2/config/

进入Kafka的config目录,修改server.properties配置文件

log.dirs=/opt/coisini/kafka/kafka_2.13-3.6.2/kfkData

broker的全局唯一编号,只能是数字。

broker.id=0

broker对外暴露的IP和端口 (主机名:9092)

advertised.listeners=PLAINTEXT://coisini:9092

相关命令:

bin/kafka-server-start.sh -daemon config/server.properties

bin/kafka-server-stop.sh

开放端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent

firewall-cmd --reload


(未完结,后续有时间再写)


http://www.kler.cn/a/313152.html

相关文章:

  • 详解kafka消息发送重试机制的案例
  • Python教程笔记(1)
  • 2024年11月10日系统架构设计师考试题目回顾
  • 深度学习-图像评分实验(TensorFlow框架运用、读取处理图片、模型建构)
  • ESP32-S3模组上跑通esp32-camera(12)
  • OSG开发笔记(三十一):OSG中LOD层次细节模型介绍和使用
  • Oracle数据库pl/sql显式抛出异常
  • Python 项目实践:文件批量处理
  • 软硬件项目运维方案(Doc原件完整版套用)
  • CSP-CCF★★★201909-2小明种苹果(续)★★★
  • 【Linux】:信号的保存和信号处理
  • 【C++掌中宝】深入解析C++命名空间:有效管理代码的利器
  • 文心快码、通义灵码、腾讯云AI代码助手、豆包MarsCode 四大国产AI编程助手对比
  • 前端中常见的三种存储方式Cookie、localStorage 和 sessionStorage。
  • CompletableFuture-详解使用及源码解析
  • 洗衣机制造5G智能工厂物联数字孪生平台,推进制造业数字化转型
  • 《nmap 命令全解析:网络探测与安全扫描的利器》
  • 2024最新最全:超详细Nmap使用技巧(非常详细)零基础入门到精通,收藏这一篇就够了
  • 【delphi】正则判断windows完整合法文件名,包括路径
  • 计算机毕业设计之:教学平台微信小程序(
  • 详解:冒泡排序
  • BGP 路由反射器
  • 西圣、蜂鸟bebird可视挖耳勺好不好用?行业王者测评争霸!
  • python教程(三):python高级数据结构大全(附代码)
  • MySQL高阶1892-页面推荐2
  • 数据结构—(java)反射,枚举,lambda表达式