当前位置: 首页 > article >正文

在Nodejs中使用kafka(一)安装使用

安装 

方法一、使用docker-compose安装

1、创建docker-compose.yml文件。

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.9
    ports:
      - "2181:2181"
    volumes:
      - "./data/zookeeper:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:3.4
    ports:
      - "9092:9092"
    volumes:
      - "./data/kafka:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092  # 关键配置
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

2、在文件所在位置运行下面命令

docker compose up -d

方法二、 

直接下载kafka,zookeeper。(注:CentOS环境 )  

java安装:yum install java-17-openjdk-devel 

kafka下载地址:Apache Kafka

zookeeper下载地址:Apache ZooKeeper

 下载好后选择路径直接解压即可。

Kafka 的核心功能

1.消息队列:Kafka 最初被设计为一个高吞吐量、分布式的消息队列系统,用于处理大规模的消息流。

2.持久化:Kafka 不仅仅是一个消息队列,它还提供消息的持久化功能,数据存储在磁盘中,这使得 Kafka 适合用于长期的数据存储和分析。

3.高吞吐量和低延迟:Kafka 设计上非常注重高吞吐量和低延迟,支持每秒钟处理百万级别的消息。

4.分布式系统:Kafka 是一个分布式系统,能够跨多台机器进行水平扩展,支持高可用性和容错。

5.流处理:除了消息传递,Kafka 还提供了流处理 API(例如 Kafka Streams),使得用户能够在消息流中进行实时计算和分析。

 Kafka 的核心组件

1.Producer(生产者)

  • Producer 是负责将消息发送到 Kafka 的客户端应用程序。
  • 生产者将消息发布到 Kafka 的特定 Topic(主题)中。
  • 生产者可以选择将消息发布到一个或多个 Kafka 分区中。

2.Consumer(消费者) 

  • Consumer 是从 Kafka 中读取消息的客户端应用程序。
  • 消费者订阅一个或多个 Topic,并从中读取消息。
  • Kafka 支持消费者组机制,消费者组中的多个消费者可以共同消费一个主题的消息,达到负载均衡和容错的效果。

3.Broker(代理)

  • Kafka Broker 是 Kafka 集群中的一个节点,负责存储消息并处理生产者和消费者的请求。
  • Kafka 集群通常由多个 Broker 组成,每个 Broker 负责存储特定的分区数据。
  • Kafka 的分布式架构保证了可扩展性和高可用性,所有的消息都会在多个 Broker 之间进行分布和复制。

4.Topic(主题)

  • Topic 是 Kafka 中的消息分类,用来组织消息。
  • 每个 Topic 可以有多个 Partition(分区),消息被写入到分区中,分区内的消息是有序的。
  • Kafka 支持将一个 Topic 的消息分布到多个分区中,允许高并发的生产者和消费者。

5.Partition(分区)

  • Partition 是 Topic 的实际存储单位,一个 Topic 可以有多个 Partition,每个 Partition 作为一个日志文件进行存储。
  • 分区保证了消息的顺序性,即同一个分区内的消息是有序的。
  • 分区的数量可以通过配置来控制,多个分区能够提供水平扩展和并行处理能力。

7. Zookeeper(协调者)(对于传统 Kafka 集群来说)

  • Zookeeper 是 Kafka 中的一个分布式协调组件,用于管理集群的元数据(如 Broker 的信息、Topic 和 Partition 的分配等)。
  • Zookeeper 保证 Kafka 集群中的 Broker 之间的协调和一致性。
  • 从 Kafka 2.8.0 开始,Kafka 引入了 KRaft 模式,它在未来版本中逐步摆脱了对 Zookeeper 的依赖,转而使用内置的 Raft 协议进行集群管理。

  

kafka基本命令

在解压的kafka根文件夹下运行

1、启动zookeeper

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

2、启动kafka

nohup bin/kafka-server-start.sh config/server.properties &

3、 查看kafka,zookeeper启动进程

jps

4、 创建topic

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

5、 删除topic

bin/kafka-topics.sh --delete --topic test --bootstrap-server localhost:9092

6、查看topic详情

bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092

 7、查看创建的所有topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

8、运行生产者

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

9、消费者(默认接收实时消息)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

10、从最开头开始消费

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

11、指定分区和位置开始消费

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 2

12、创建消费者组,可以指定多个组,多个消费者接收消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group testGroup

13、查看消费者组详情

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup

示例 

kafkajs文档地址:Getting Started · KafkaJS 

producer.ts

import { Kafka } from 'kafkajs';


async function run() {
  const kafka = new Kafka({
    clientId: 'test1',
    brokers: ['localhost:9092'],
  });

  const producer = kafka.producer();

  await producer.connect();

  for (let i = 1; i <= 10; ++i) {
    await producer.send({
      topic: 'topic1',
      messages: [
        { value: `hello kafka${i}` },
      ],
    });
  }

  await producer.disconnect();
}

run();

consumer.ts

import { Kafka } from 'kafkajs'


const kafka = new Kafka({
  clientId: 'test1',
  brokers: ['localhost:9092'],
  connectionTimeout: 1000, // 1 秒连接超时
});

const consumer = kafka.consumer({
  groupId: 'group1',
});

await consumer.connect();

await consumer.subscribe({ 
  topic: 'topic1',
  fromBeginning: true, // 从头开始消费
});

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log(message.value?.toString());
  },
});

// consumer.seek({ topic: 'topic1', partition: 0, offset: '2' });

 


http://www.kler.cn/a/549720.html

相关文章:

  • 【设计模式】-工厂模式(简单工厂、工厂方法、抽象工厂)
  • 股指期货是什么?股指期货日内拐点有什么特征?
  • Springer |第七届2025年区块链、人工智能和可信系统国际会议
  • frp与云服务器内网穿透
  • 实现MySQL的横向扩展
  • 一、OpenSM 架构部署及原理详解
  • Python实现语音识别详细教程【2025】最新教程
  • 41.日常算法
  • CRISPR spacers数据库;CRT和PILER-CR用于MAGs的spacers搜索
  • MySQL创建存储过程和存储函数
  • 人工智能在文化遗产保护中的创新:科技与文化的完美融合
  • 【ENSP】华为设备console 认证配置
  • Ubuntu 系统 LVM 逻辑卷扩容教程
  • uni-app 学习(一)
  • 【Elasticsearch】Mapping概述
  • C++ 设计模式-装饰器模式
  • 八股取士--dockerk8s
  • springboot jackson配置 以及Java8 时间新特性 理解
  • 蓝桥杯试题:计数问题
  • ubuntu基于docker部署呼叫中心质检【支持情绪,话术对比】