当前位置: 首页 > article >正文

Kafka 之自定义消息拦截器【Kafka 拦截器】

前言:

拦截器这个概念相信大部分朋友都不会陌生,Spring MVC 拦截器相信大家都用过,拦截器的核心思想就是运行应用程序在不修改业务逻辑的前提下,动态的实现一组可插拔的事件处理器链,它可以在业务链路中的前后各个点进行对应的拦截,做一些统一的处理,Sping MVC 的拦截器大家都了解,本篇我们来分享一下 Kafka 的拦截器。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 之批量消息发送消费

Kafka 之消息广播消费

Kafka 之消息并发消费

Kafka 之顺序消息

Kafka 之事务消息

Kafka 如何保证消息不丢失?【消息手动 ACK】

什么是 Kafka 的拦截器?

Kafka 的拦截器分为 Producer 拦截器和 Consumer 拦截器,使用 Producer 拦截器可以在消息发送前及消息发送成功后植入自定义的业务逻辑,而 Consumer拦截器支持在消息消费前以及提交位移后编写特定逻辑,不管是 Producer 拦截器还是 Consumer 拦截器都支持拦截器链,可以将一系列的拦截器组装成一个拦截器链,Kafka 会按照添加顺序一次执行拦截器逻辑,Kafka 为我们提供了两个拦截器接口,分别是 ProducerInterceptor 和 ConsumerInterceptor,我们实现该接口重新方法实现自定义业务其逻辑即可。

ProducerInterceptor 源码分析

ProducerInterceptor 是 Kafka 的生产者拦截器,实现了 Configurable 接口,提供了 onSend、onAcknowledgement、close、configure 四个方法,方法的作用如下:

  • onSend:该方法会在消息发送之前被调用,如果你想给发送出去的消息进行统一处理,可以从这里下手。
  • onAcknowledgement:该方法会在消息成功提交或者发送失败后调用,我们的异步消息发送中有个 callback,onAcknowledgement 方法会在 callback 方法之前调用,需要注意的是该方法和 onSend 方法不是在同一个线程中调用,如果在这两个方法中使用贡献变量的时候就要特别注意,一般不建议在这个方法中加入过多的业务逻辑,否则会影响 Kafka 的性能。
  • close:拦截器关闭前的处理。
  • configure:初始化配置。
public interface ProducerInterceptor<K, V> extends Configurable {

    ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);

    void onAcknowledgement(RecordMetadata var1, Exception var2);

    void close();
}

public interface Configurable {
    void configure(Map<String, ?> var1);
}

ConsumerInterceptor 源码分析

ConsumerInterceptor 是 Kafka 的消费者拦截器,同样实现了 Configurable 接口,提供了 onConsume、onCommit、close、configure 四个方法,方法的作用如下:

  • onConsume:该方法会在消费者正式消费之前被调用,如果你想对消息消费之前做一些统一处理,可以在该方法中实现。
  • onCommit:该方法会在 Kafka 提交 offset 之后调用,通常可以在该方法中进行一些日志记录等。
  • close:拦截器关闭前的处理。
  • configure:初始化配置。
package org.apache.kafka.clients.consumer;

import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);

    void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);

    void close();
}

自定义实现 Kafka 生产者拦截器

Kafka 给我们提供了生产者拦截器接口,现在我们自己来实现一个 Kafka 生产者拦截器,自定义 Kafka 生产者拦截器代码如下:

 package com.order.service.interceptor;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * @ClassName: MyKafkaProducerInterceptor
 * @Author: Author
 * @Date: 2024/10/31 11:11
 * @Description:
 */
@Slf4j
public class MyKafkaProducerInterceptor implements ProducerInterceptor<Object, Object> {

    //消息发送前的确认
    @Override
    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> producerRecord) {
        log.info("消息发送前操作");
        return producerRecord;
    }

    //消息确认
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        log.info("消费发送后的回调");
    }

    //拦截器关闭后的操作
    @Override
    public void close() {

    }

    //初始化相关操作
    @Override
    public void configure(Map<String, ?> map) {

    }
}

在实现 Kafka 生产者拦截器的代码中,我这里只是记录了日志,真正在项目中使用的时候,根据自己的业务需求增加业务逻辑即可。

配置 Kafka 生产者拦截器

我们自定义了 Kafka 生产者拦截器,要想自定义的生产者拦截器生效,我们还需要配置该拦截器,核心代码如下:

//自定义生产者消息拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyKafkaProducerInterceptor.class.getName());
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

可以看到我们传入了一个 List 对象,也印证了前面说的 Kafka 支持一个拦截器链。

本案例完整的生产者配置代码如下:

package com.order.service.config;

import com.order.service.interceptor.MyKafkaConsumerInterceptor;
import com.order.service.interceptor.MyKafkaProducerInterceptor;
import com.order.service.kafka.CustomPartitioner;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author :author
 * @description:
 * @modified By:
 * @version: V1.0
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;

    @Value("${spring.kafka.producer.buffer-memory}")
    private String bufferMemory;

    @Value("${spring.kafka.producer.properties.linger.ms}")
    private String lingerMs;

    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(10);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //批量发送消息的大小 默认 16KB
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        //生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        //批量发送的的最大时间间隔,单位是毫秒
        props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
        //自定义分区器配置
        //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
        //自定义生产者消息拦截器
        List<String> interceptors = new ArrayList<>();
        interceptors.add(MyKafkaProducerInterceptor.class.getName());
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> newProducerFactory() {
        return new DefaultKafkaProducerFactory<>(getMyKafkaProps());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(newProducerFactory());
    }


}

自定义实现 Kafka 消费者拦截器

Kafka 给我们提供了消费者拦截器接口,现在我们自己来实现一个 Kafka 消费者拦截器,自定义 Kafka 消费者拦截器代码如下:

package com.order.service.interceptor;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.Map;

/**
 * @ClassName: MyKafkaConsumerInterceptor
 * @Author: Author
 * @Date: 2024/10/31 11:11
 * @Description:
 */
@Slf4j
//@Component
public class MyKafkaConsumerInterceptor implements ConsumerInterceptor<Object, Object> {

    //消息消费前的处理
    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
        log.info("消费前处理");
        return consumerRecords;
    }

    //拦截器关闭前的处理
    @Override
    public void close() {
        log.info("拦截器关闭前的处理");
    }

    //Kafka 提交 offset 前的处理
    @Override
    public void onCommit(Map map) {
        log.info("消息消费提交offset");
    }

    //初始化配置
    @Override
    public void configure(Map<String, ?> map) {
        log.info("拦截器初始化配置");
    }
}

同样在实现 Kafka 消费者拦截器的代码中,我这里同样只是记录了日志,真正在项目中使用的时候,根据自己的业务需求增加业务逻辑即可。

配置 Kafka 消费者拦截器

我们自定义了 Kafka 消费者拦截器,要想自定义的消费者拦截器生效,我们同样也还需要配置该拦截器,核心代码如下:

//添加自定义拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyKafkaConsumerInterceptor.class.getName());
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

可以看到我们传入了一个 List 对象,同样也印证了前面说的 Kafka 支持一个拦截器链。

本案例完整的消费者配置代码如下:

package com.order.service.config;

import com.order.service.interceptor.MyKafkaConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

import java.util.*;

/**
 * @author :author
 * @description:
 * @modified By:
 * @version: V1.0
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String offsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitIntervalMs;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(12);
        //是否自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //kafak 服务器
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        //消费组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //一次调用poll()操作时返回的最大记录数,默认值为500
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //自动提交时间间隔 默认 5秒
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        //添加自定义拦截器
        List<String> interceptors = new ArrayList<>();
        interceptors.add(MyKafkaConsumerInterceptor.class.getName());
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
        return props;
    }


    /**
     * @return org.springframework.kafka.config.KafkaListenerContainerFactory<org.springframework.kafka.listener.ConcurrentMessageListenerContainer < java.lang.String, java.lang.String>>
     * @date 2024/10/22 19:41
     * @description kafka 消费者工厂
     */
    @Bean("myContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));
        // 并发创建的消费者数量
        factory.setConcurrency(3);
        // 开启批处理
        factory.setBatchListener(true);
        //拉取超时时间
        factory.getContainerProperties().setPollTimeout(1500);
        //是否自动提交 ACK kafka 默认是自动提交
        if (!enableAutoCommit) {
            //共有7中方式
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        }

        return factory;
    }

    /*@Bean
    @Primary
    public ErrorHandler kafkaErrorHandler(){
        ConsumerRecordRecoverer recordRecoverer=new DeadLetterPublishingRecoverer(new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(getMyKafkaProps())));
        BackOff backOff=new FixedBackOff(10,3L);
        return new SeekToCurrentErrorHandler(recordRecoverer, backOff);
    }*/

    @Bean
    @Primary
    public BatchErrorHandler kafkaBatchErrorHandler() {
        // 创建 SeekToCurrentBatchErrorHandler 对象
        SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler();
        // 创建 FixedBackOff 对象
        BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
        batchErrorHandler.setBackOff(backOff);
        // 返回
        return batchErrorHandler;
    }


}

Kafka 自定义拦截器结果验证

Kafka 自定义拦截器结果验证之生产者代码

前面已经多次分享了 Kafka 的生产者代码了,这里用来做 Kafka 拦截器验证的案例代码如下:

package com.order.service.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @ClassName: ManualKafkaProducer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description: 手动ACK消息生产者
 */
@Slf4j
@Component
public class ManualKafkaProducer {


    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    //同步发送消息
    public void sendManualMessage(String message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(new Date());
        //同步发送消息
        try {
            kafkaTemplate.send("manual-ack-topic", message).get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("Manual ACK 消息生产者完成消息发送,当前时间:{}", dateStr);
    }

}

Kafka 自定义拦截器结果验证之消费者代码

前面已经多次分享了 Kafka 的消费者代码了,这里用来做 Kafka 拦截器验证的案例代码如下:

package com.order.service.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * @ClassName: ManualAckKafkaConsumer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description: 手动 ACK 消息消费
 */
@Slf4j
@Component
public class ManualAckKafkaConsumer {

    @KafkaListener(id = "my-kafka-manual-consumer",
            groupId = "my-kafka-consumer-manual-groupId-01",
            topics = "manual-ack-topic",
            containerFactory = "myContainerFactory")
    public void listen(String message, Acknowledgment acknowledgment) {
        log.info("Manual ACK 消息消费成功,消息内容:{}", message);
        //手动提交 ACK
        acknowledgment.acknowledge();
    }

}

Kafka 自定义拦截器结果验证

我们出发消息发送消费控制台得到如下日志:

2024-11-02 19:02:53.343  INFO 38324 --- [nio-8086-exec-5] c.o.s.i.MyKafkaProducerInterceptor       : 消息发送前操作
2024-11-02 19:02:53.449  INFO 38324 --- [ad | producer-1] c.o.s.i.MyKafkaProducerInterceptor       : 消费发送后的回调
2024-11-02 19:02:53.449  INFO 38324 --- [nio-8086-exec-5] c.o.s.k.producer.ManualKafkaProducer     : Manual ACK 消息生产者完成消息发送,当前时间:2024-11-02 19:02:53
2024-11-02 17:02:53.450  INFO 38324 --- [-consumer-0-C-1] c.o.s.i.MyKafkaConsumerInterceptor       : 消费前处理
2024-11-02 19:02:53.451  INFO 38324 --- [-consumer-0-C-1] c.o.s.k.consumer.ManualAckKafkaConsumer  : Manual ACK 消息消费成功,消息内容:我是一条同步消息
2024-11-02 19:02:53.456  INFO 38324 --- [-consumer-0-C-1] c.o.s.i.MyKafkaConsumerInterceptor       : 消息消费提交offset

根据控制台的日志结果来看,结果符合预期。

总结:本篇我们分享了 Kafka 的拦截器的相关操作,有了拦截器我们可以对 Kafka 的消息生产消费进行统一处理,可以让我们的业务更灵活,代码逻辑更严谨,希望可以帮助到有需要的小伙伴。

如有不正确的地方欢迎各位指出纠正。


http://www.kler.cn/a/392512.html

相关文章:

  • 使用 Python 创建多栏 Word 文档 – 详解
  • Word窗体联动Excel实现级联组合框
  • Java重要面试名词整理(四):并发编程(下)
  • Datawhale AI 冬令营学习笔记-零编程基础制作井字棋小游戏
  • JAVA开发入门学习七- 数组
  • Windows 11 安装 Dify 完整指南 非docker环境
  • 牛客小白月赛104-D小红开锁-模拟
  • Unity常见问题合集(一)
  • workerman的安装与使用
  • TCP/IP协议,TCP和UDP区别
  • L10.【LeetCode笔记】回文链表
  • QObject中QThreadData里面的postEventList和QObjectPrivate里面的postedEvents
  • caozha-comment(原生PHP评论系统)
  • 根据模型数据 处理流式数据 生成AI对话
  • [运维][Nginx]Nginx学习(1/5)--Nginx基础
  • QTableWidget的简单使用
  • Swift 开发教程系列 - 第11章:内存管理和 ARC(Automatic Reference Counting)
  • Redhat8.6安装MySQL8.0.31
  • 在启动 Spring Boot 项目时,报找不到 slf4j 的错误
  • openresty入门教程:access_by_lua_block
  • windows环境下手工创建oracle数据库监听
  • kafka生产消费问题
  • ffmpeg内存模型
  • 【go从零单排】go中的range的用法
  • 【原创】java+ssm+mysql美食论坛网系统设计与实现
  • macOS 应用公证指南:使用 fastlane 实现自动化公证流程