当前位置: 首页 > article >正文

Kafka 实战演练:创建、配置与测试 Kafka全面教程

文章目录

  • 1.配置文件
  • 2.消费者
    • 1.注解方式
    • 2.KafkaConsumer
  • 3.依赖
    • 1.注解依赖
    • 2.KafkaConsumer依赖

本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得

1.配置文件

  1. Yml配置
spring:
  kafka:
    bootstrap-servers: 
    consumer:
      group-id: iot-testaaaaaaaaaa11aaaaaaaaa
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      security:
        protocol: SASL_SSL
      sasl:
        mechanism: PLAIN
        jaas:
          config: org.apache.kafka.common.security.plain.PlainLoginModule required 
              username="" 
              password="";
      ssl:
        truststore:
          type: JKS
          location: src/main/resources/client.truststore.jks
          password: 
        endpoint.identification.algorithm:

yaml配置

2.消费者

1.注解方式

    @KafkaListener(topics = {"abcd"})
    public void listen(ConsumerRecord<?, ?> record){

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("---->"+record);
            System.out.println("---->"+message);

        }

    }

注解方式

2.KafkaConsumer

/**
 * @author XHao
 */
public class MqsConsumer {

    public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";

    private KafkaConsumer<Object, Object> consumer;

    MqsConsumer(String path) {
        Properties props = new Properties();
        try {
            InputStream in = new BufferedInputStream(new FileInputStream(path));
            props.load(in);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }
        consumer = new KafkaConsumer<Object, Object>(props);
    }

    public MqsConsumer() {
        Properties props = new Properties();
        try {
            props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }
        consumer = new KafkaConsumer<Object, Object>(props);
    }

    public void consume(List topics) {
        consumer.subscribe(topics);
    }

    public ConsumerRecords<Object, Object> poll(long timeout) {
        return consumer.poll(timeout);
    }

    public void close() {
        consumer.close();
    }

    /**
     * get classloader from thread context if no classloader found in thread
     * context return the classloader which has loaded this class
     *
     * @return classloader
     */
    public static ClassLoader getCurrentClassLoader() {
        ClassLoader classLoader = Thread.currentThread()
                .getContextClassLoader();
        if (classLoader == null) {
            classLoader = MqsConsumer.class.getClassLoader();
        }
        return classLoader;
    }

    /**
     * 从classpath 加载配置信息
     *
     * @param configFileName 配置文件名称
     * @return 配置信息
     * @throws IOException
     */
    public static Properties loadFromClasspath(String configFileName) throws IOException {
        ClassLoader classLoader = getCurrentClassLoader();
        Properties config = new Properties();

        List<URL> properties = new ArrayList<URL>();
        Enumeration<URL> propertyResources = classLoader
                .getResources(configFileName);
        while (propertyResources.hasMoreElements()) {
            properties.add(propertyResources.nextElement());
        }

        for (URL url : properties) {
            InputStream is = null;
            try {
                is = url.openStream();
                config.load(is);
            } finally {
                if (is != null) {
                    is.close();
                    is = null;
                }
            }
        }

        return config;
    }
}

在这里插入图片描述

3.依赖

1.注解依赖

      <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2.KafkaConsumer依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

可视化大屏项目经常用到消息转换,实时状态等 记录一下吧

如果点赞多,评论多会更新详细教程,待补充。


http://www.kler.cn/a/293460.html

相关文章:

  • 【121. 买卖股票的最佳时机】——贪心算法/动态规划
  • Fastapi使用MongoDB作为数据库
  • 什么是数字图像?
  • MYSQL 库,表 基本操作
  • springboot项目中,使用ProGuard 对代码进行混淆
  • redhat虚拟机
  • 《Python爬虫逆向实战》加密方法远程调用(RPC)
  • TRIZ在充电桩安全中的应用探究
  • Java 入门指南:Java 并发编程 —— Fork/Join 框架 实现任务的拆分与合并
  • 探索PDF的奥秘:pdfrw库的神奇之旅
  • 2024,中国服务器操作系统迎云智主升浪
  • 鸿蒙开发5.0【高级图表实现】 解决方案
  • c++的类和对象
  • vue-seamless-scroll(二)点击事件
  • 为什么深度学习用GPU而不是CPU
  • electron 客户端 windows linux(麒麟V10)多系统离线打包 最新版 <二>
  • FPGA开发:Verilog基础语法
  • 如何进行不同数据库的集群操作?--从部署谈起,今天来看MySQL和NoSql数据库Redis的集群
  • FFMPEG -- 音频开发
  • 初识命名空间
  • S7-PLC
  • 安装win7鼠标键盘不能动原因分析及解决办法
  • 【AI学习】聊两句深度学习的目标函数
  • 计算机网络27——Linux1
  • 黑马JavaWeb开发笔记14——Tomcat(介绍、安装与卸载、启动与关闭)、入门程序解析(起步依赖、SpringBoot父工程、内嵌Tomcat)
  • EmguCV学习笔记 VB.Net 10.2 人脸识别 FaceRecgnizer类