当前位置: 首页 > article >正文

kafka入门(一):kafka消息消费

安装kafka,创建 topic:

Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

添加依赖包:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.10.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

kafka代码示例(一):

主要按照以下步骤:

  • 设置 broker服务器的ip和端口, 设置 消费者群组id

  • 初始化消费者

  • 消费者订阅主题

  • 消费者批量拉取消息

public class KafkaDemo1 {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";
    public static final String GROUP_ID = "group.demo";

    public static void main(String[] args) {
        consumerRecord();
    }

    public static void consumerRecord() {
        //属性配置
        Properties properties = getProperties(BROKER_LIST, GROUP_ID);
        //消费者初始化
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //消息者订阅主题
        consumer.subscribe(Collections.singletonList(TOPIC));
        //循环
        while (true) {
            //每次拉取 1千条消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("=============> 消费kafka消息:"+ record.value());
            }
        }
    }

    public static Properties getProperties(String brokerList, String groupId) {
        Properties properties = new Properties();
        //序列化
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        //broker服务器的ip和端口,多个用逗号隔开
        properties.put("bootstrap.servers", brokerList);
        //消费者群组id
        properties.put("group.id", groupId);
        return properties;
    }

}


使用文章开头安装好的 kafka,并按文章中的步骤,创建 topic ,打开一个 生产者 producer,并发送消息。
在这里插入图片描述

观察idea 控制台,可以看到 成功消费了消息:

=============> 消费kafka消息:hello kafka

参考资料:

《深入理解kafka 核心设计与实践原理》


http://www.kler.cn/a/132874.html

相关文章:

  • 【windows】校园网AP隔离解决方案笔记-解决校内设备之间无法互相通信的臭毛病-附破解程序
  • 快排和归并
  • Jenkins的pipeline Script的 每个组件的详细讲解
  • 基于大语言模型意图识别和实体提取功能;具体ZK数值例子:加密货币交易验证;
  • 嵌入式课程day13-C语言指针
  • java中volatile 类型变量提供什么保证?能使得一个非原子操作变成原子操作吗?
  • 若依中脱敏
  • C#WPF文本转语音实例
  • 人工智能发展前景
  • Flutter 应用启动从闪屏页短暂黑屏再到第一个页面
  • 超全整理,Pytest自动化测试框架-多进程(pytest-xdist)运行总结...
  • Web前端—移动Web第一天(平面转换、渐变、综合案例--播客网页设计)
  • 计算机网络八股文
  • 十三、Docker的安装
  • 【刷题专栏—突破思维】LeetCode 138. 随机链表的复制
  • 趣学python编程 (二、计算机硬件和用途介绍)
  • ai语音电销机器人电销行业要怎么降低封号率?
  • Kotlin--1.基础语法
  • 在国内购买GPT服务前的一定要注意!!!
  • git基础命令
  • 【微信公众号】一、获取 access_token
  • .Net(C#)常用转换byte转uint32、byte转float等
  • 使用新版Visual Studio编译老项目部分报错处理
  • ChatGpt3.5已经应用了一段时间,分享一些自己的使用心得.
  • [Docker]六.Docker自动部署nodejs以及golang项目
  • 如何快速本地搭建悟空CRM结合内网穿透工具高效远程办公