当前位置: 首页 > article >正文

kafka快速上手

一、kafka介绍

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。

kafka官网:http://kafka.apach e.org/

 

二、kafka入门

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息
  • 生产者发送消息,多个消费者都可以接收到消息

步骤如下: 

(1)创建kafka-demo项目,导入依赖 

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
 </dependency>

(2)生产者发送消息

public class ProducerQuickStart {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //1.kafka链接配置信息
        Properties prop = new Properties();

        //kafka链接地址
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

        //key和value的序列化
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.创建kafka生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);

        //3.发送消息
        /**
         * 第一个参数 :topic
         * 第二个参数:消息的key
         * 第三个参数:消息的value
         */
        ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");

        //同步发送消息
        producer.send(kvProducerRecord);

        //4.关闭消息通道  必须要关闭,否则消息发送不成功
        producer.close();
    }
}

  (3)消费者接收消息

public class ConsumerQuickStart {
    public static void main(String[] args) {
        //1.kafka的配置信息
        Properties prop = new Properties();

        //链接地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        //key和value的反序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //设置消费者组
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

        //2.创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("topic-first"));

        //4.拉取消息
        while (true) {
            // 读取数据,读取超时时间为100ms ,即每个1000ms拉取一次
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }

    }
}

使用情景:

生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息 (一对一)

生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息 (一对多) 

三、springboot集成kafka

1.导入spring-kafka依赖信息

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <!-- Spring Kafka的依赖,排除了kafka-clients -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!-- 显式声明kafka-clients的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
     </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>

2.在resources下创建文件application.yml

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      retries: 10 #重试的次数
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.消息生产者

@RestController
public class HelloController {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("test-topic","HelloWorld");
//        User user=new User();
//        user.setUsername("lili");
//        user.setAge(18);
//        kafkaTemplate.send("test-topic", JSON.toJSONString(user));

        return "ok";
    }
}

4.消息消费者

@Component
public class HelloListener {
    
    @KafkaListener(topics = "test-topic")
    public void onMessage(String message){
        if (!StringUtils.isEmpty(message)){
            System.out.println(message);
//            User user = JSON.parseObject(message, User.class);
//            System.out.println(user);
        }
    }
}

传递消息为对象:

目前springboot整合后的kafka,因为序列化器是StringSerializer,可以把要传递的对象进行转json字符串,接收消息后再转为对象即可


http://www.kler.cn/a/288435.html

相关文章:

  • 【文件I/O】文件持久化
  • Transformer入门教程全解析(一)
  • 深入讲解 Docker 及实践
  • AI多模态技术介绍:视觉语言模型(VLMs)指南
  • 【通俗理解】AI的两次寒冬:从感知机困局到深度学习前夜
  • 在iStoreOS上安装Tailscale
  • React 服务器组件
  • 智能汽车座椅制造:RFID技术助力精密加工与全程追踪
  • Getting an error trying to import environment OpenAI Gym
  • mongodb 时间存储使用Date还是时间戳
  • 【Python机器学习】NLP词频背后的含义——主成分分析
  • 使⽤docker部署project-exam-system(2)
  • [翻译+笔记] 用于视频生成的Diffusion Model
  • codesys进行控制虚拟轴运动时出现的一些奇怪bug的解释
  • 山体滑坡监测预警系统—百科分享
  • 开放式耳机怎么戴?开放式耳机比入耳式耳机舒适吗?
  • leetcode43字符串乘法
  • 梯度提升机:数据分析的强有力工具
  • webpack-01
  • 【HarmonyOS 4.0】网络请求 - axios
  • Spring Boot实现发QQ邮件
  • Windows环境Chrome安装提示无可用更新问题解决【2024年版】
  • 【2024-2025源码+文档+调试讲解】微信小程序的城市公交查询系统
  • 前端js—实现字符串拼接
  • 驱动和固件的区别 — 简单介绍
  • 美国海外仓可以用哪家海外仓系统好?