当前位置: 首页 > article >正文

Kafka 快速实战及基本原理详解解析-01

一、Kafka 介绍

1. MQ 的作用

消息队列(Message Queue,简称 MQ)是一种用于跨进程通信的技术,核心功能是通过异步消息的方式实现系统之间的解耦。它在现代分布式系统中有着广泛的应用,主要作用体现在以下三个方面:

异步处理

在传统的同步调用中,生产者和消费者需要同时在线,并且生产者在完成任务后才能继续执行其他工作。这种模式限制了系统的性能。而引入消息队列后,生产者可以将任务提交到队列中,消费者按需消费任务,从而提升系统的吞吐量。

  • 示例:快递员送快递到客户家,效率低下。而菜鸟驿站的出现让快递员只需将包裹放置在驿站,客户可以根据自己的时间安排取件。这种方式大大提高了效率。
解耦

解耦是消息队列最重要的功能之一。服务之间通过消息队列传递数据,而不是直接调用对方的服务接口,这样可以有效降低系统的耦合度。

  • 示例:《Thinking in JAVA》原书是英文版,但通过翻译社将内容翻译成多种语言,满足不同读者的需求。翻译社起到了桥梁作用,不同语言之间的沟通不再直接依赖于作者和读者。
削峰填谷

在高并发场景下,系统往往会遇到流量高峰,导致系统负载过重。通过消息队列,可以将流量暂存并按固定速率处理,从而避免系统崩溃。

  • 示例:长江每年都会涨水,但通过三峡大坝的调节,下游的出水速度保持稳定,避免了洪水泛滥。

2. 为什么要用 Kafka

Kafka 是一种高吞吐量、低延迟、分布式的消息队列系统,适合在大规模数据处理场景中使用。以下是 Kafka 的典型使用场景和优势:

日志聚合场景

在大规模分布式系统中,各个服务都会产生大量的日志信息。传统的日志收集方式往往存在以下问题:

  • 数据量大:需要快速收集和处理来自各个渠道的海量日志。
  • 容错性要求高:集群中允许少量节点出现故障而不影响整体服务。
  • 功能专注:Kafka 专注于高吞吐量、低延迟的消息传递,不追求复杂的消息处理功能。
核心优势
  • 高吞吐量:Kafka 能够处理数百万 TPS(每秒事务处理量)。
  • 低延迟:通常在毫秒级别的延迟时间内完成消息传递。
  • 可扩展性:通过增加节点和分区数量,可以线性扩展处理能力。
  • 容错性:通过副本机制保证消息的高可用性。
  • 持久化:Kafka 使用磁盘存储消息,保证消息的持久性。

二、Kafka 快速上手

1. 实验环境准备

要快速上手 Kafka,首先需要搭建实验环境。以下是推荐的实验环境配置:

  • 虚拟机数量:3 台
  • 操作系统:CentOS 7
  • Java 版本:Java 8
环境配置步骤
  1. 下载 Kafka 和 Zookeeper。
  2. 将 Kafka 解压到 /app/kafka 目录,将 Zookeeper 解压到 /app/zookeeper 目录。
  3. 配置环境变量,确保系统能够识别 Kafka 和 Zookeeper 的命令。
  4. 关闭防火墙,以避免端口阻塞:
    systemctl stop firewalld.service
    

2. 单机服务体验

为了更直观地理解 Kafka 的工作原理,我们可以先体验单机版 Kafka 服务。

步骤 1:启动 Zookeeper

Kafka 依赖 Zookeeper 进行元数据管理和选举机制。在实际部署中,通常使用独立的 Zookeeper 集群。

启动 Zookeeper 服务:

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

检查 Zookeeper 是否正常启动:

jps

确认输出中有 QuorumPeerMain 进程。

步骤 2:启动 Kafka

启动 Kafka 服务前,需要确保 Zookeeper 服务正常运行。

启动 Kafka 服务:

nohup bin/kafka-server-start.sh config/server.properties &

确认 Kafka 是否正常启动:

jps

检查输出中是否包含 Kafka 进程。

步骤 3:创建和使用 Topic

Kafka 的基础工作机制是通过 Topic 进行消息的传递。

  1. 创建 Topic

    bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
    
  2. 发送消息 启动生产者端并发送消息:

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    > 这是一条测试消息
    
  3. 消费消息 启动消费者端并接收消息:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

3. 理解 Kafka 的消息传递机制

Kafka 的消息传递机制可以通过以下核心组件来理解:

  • 生产者(Producer):将消息发送到指定的 Topic。
  • 消费者(Consumer):从指定的 Topic 消费消息。
  • Topic:逻辑概念,表示一类业务消息的集合。
  • Partition:物理概念,实际存储消息的分区。
  • Broker:Kafka 服务器实例,存储和管理 Partition。

Kafka 的设计目标是通过这些组件实现高效、可靠的消息传递,满足企业级数据管道的需求。


四、Kafka 集群服务

1. 为什么要使用集群

单机部署的 Kafka 在性能上虽然已经非常出色,但在实际生产环境中通常需要使用 Kafka 集群来进一步提升数据存储能力和系统的高可用性。集群可以解决以下问题:

1.1 解决海量数据存储问题

单个 Broker 服务器的存储能力有限,当数据量增长到一定程度时,单机难以承载。通过集群部署,可以将数据分散存储在多个 Broker 中,从而提升整体存储能力。

1.2 提高系统容错能力

单机环境中,如果 Broker 崩溃,所有数据都会丢失。而集群环境下,每个 Partition 都有多个副本,即使部分 Broker 节点宕机,系统依然可以正常运行,保证数据的高可用性。


五、理解服务端的 Topic、Partition 和 Broker

Kafka 的核心架构由 Topic、Partition 和 Broker 组成,这三者之间的关系至关重要:

  • Topic:一个逻辑的消息分类,每个 Topic 包含多条消息。
  • Partition:每个 Topic 可以分成多个 Partition,每个 Partition 是一个消息队列。
  • Broker:Kafka 的服务器实例,负责存储 Partition 数据,并处理客户端请求。

5.1 创建分布式 Topic 示例

bin/kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 2 --partitions 4 --topic distributedTopic

5.2 查看 Topic 信息

bin/kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic distributedTopic

六、章节总结:Kafka 集群的整体结构

通过前面的学习,我们可以总结 Kafka 集群的整体结构:

  1. Topic 是逻辑概念,Producer 和 Consumer 通过 Topic 进行消息传递。
  2. Partition 是实际存储单元,保证数据分散存储和负载均衡。
  3. Broker 是 Kafka 的服务器实例,存储 Partition 数据并处理客户端请求。
  4. Zookeeper 管理 Kafka 集群的元数据和选举过程。
  5. Controller 是 Kafka 集群的核心管理节点,负责管理 Topic 和 Partition 的分配。

七、Spring Boot 实现 Kafka 消息有序性

为了保证 Kafka 的消息有序性,可以使用 Spring Boot 和 Kafka 的整合来实现。在 Java 的 Spring Boot 项目中,我们通过指定消息的 Key 和自定义分区器来确保消息发送到相同的 Partition,从而实现有序性。

7.1 依赖配置

在 Maven 项目中,引入 Kafka 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-kafka</artifactId>
</dependency>

7.2 配置 KafkaProducer

创建 Kafka 的生产者配置类:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

7.3 发送有序消息

创建一个消息发送服务,确保消息使用相同的 Key 发送到同一个 Partition:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private static final String TOPIC = "test_topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String key, String message) {
        kafkaTemplate.send(TOPIC, key, message);
    }
}

7.4 自定义分区器(可选)

如果有更复杂的分区逻辑,可以自定义分区器:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义分区逻辑
        return Math.abs(key.hashCode()) % cluster.partitionCountForTopic(topic);
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

7.5 设置一个 Topic 对应一个 Partition 的方法

如果业务需求是保证某个 Topic 的消息全局有序,可以在创建 Topic 时将 Partition 数量设置为 1,从而保证所有消息存储在同一个 Partition 中,实现全局有序。

创建一个 Partition 的 Topic
bin/kafka-topics.sh --create --topic singlePartitionTopic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
在 Spring Boot 中发送消息到该 Topic
@Service
public class KafkaSinglePartitionProducerService {

    private static final String TOPIC = "singlePartitionTopic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

通过这种方式,所有发送到 singlePartitionTopic 的消息都会进入同一个 Partition,确保消息顺序性。


http://www.kler.cn/a/466450.html

相关文章:

  • 用公网服务代理到本地电脑笔记
  • AngularJS HTML DOM
  • 【ArcGIS Pro二次开发实例教程】(1):图层的前置、后置
  • Jetpack Compose 学习笔记(四)—— CompositionLocal 与主题
  • 0基础跟德姆(dom)一起学AI 自然语言处理10-LSTM模型
  • 【Unity报错】error Cs0103: The name ‘keyCode‘ does not exist in the current context
  • Ubuntu Server安装谷歌浏览器
  • 多模态论文笔记——Coca
  • MATLAB中dbstack函数用法
  • 【pyqt】(四)Designer布局
  • 根据 el-dialog 的高度动态计算 el-table 的高度
  • 常规继承类对象大小
  • Cause: java.sql.SQLException: sql injection violation, comment not allow异常问题处理
  • 【MySQL基础篇】三、表结构的操作
  • 最新MySQL面试题(2025超详细版)
  • 【GeekBand】C++设计模式笔记22_Chain of Responsibility_职责链
  • AWS Lambda基础知识
  • 【Vue】分享一个快速入门的前端框架以及如何搭建
  • 非docker方式部署openwebui过程记录
  • linux-centos-安装miniconda3
  • 掌控ctf-2月赛
  • 区块链安全常见的攻击分析——可预测随机数漏洞 (Predictable Randomness Vulnerability)【12】
  • 路由基本配置实验
  • 《一个孤独漫步者的遐想-卢梭》阅读笔记
  • C#数字转大写人民币
  • docker镜像制作的命令,docker自定义镜像