当前位置: 首页 > article >正文

SpringKafka生产者、消费者消息拦截

1 前言

在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。

2 基于Kafka管理的拦截器

Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor
org.apache.kafka.clients.consumer.ConsumerInterceptor, 示例如下:

2.1 定义拦截器

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // 在发送消息之前操作
        System.out.println("Sending message: " + record.value());
        return record; // 继续发送
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {
        // 资源清理
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 可以在这里获取配置
    }
}

2.2 定义消费者拦截器

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {
    @Override
    public void configure(Map<String, ?> configs) {
        // 配置拦截器
    }

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        // 处理接收到的消息
        records.forEach(record -> {
            System.out.println("Consumed message: " + record.value());
        });
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

    }

    @Override
    public void close() {
        // 资源清理
    }
}

2.3 添加拦截器

方式一,通过工厂自定义器设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;

@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {

    @Override
    public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
        producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));
    }
    
    @Override
    public void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {
        consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));
    }
}

方式二,通过配置设置拦截器

spring:
  kafka:
    producer:
      properties:
        interceptor.classes: org.example.kafka.CustomProducerInterceptor
    consumer:
      properties:
        interceptor.classes: org.example.kafka.CustomConsumerInterceptor

2.4 拦截器使用Spring容器中的Bean

上面的方法可以看到,拦截器由于没有在Spring容器中管理,则无法使用容器中其他Bean来做业务处理,那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例:
Bean定义

@Component
public class MyComponent {
    public void checkMessage(String message) {
         System.out.println("Sending message: " + message);
    }
}

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {
    private MyComponent myComponent;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        myComponent.checkMessage(record.value());
        return record; // 继续发送
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {
        // 资源清理
    }

    @Override
    public void configure(Map<String, ?> configs) {
        myComponent = configs.get("myComponent");
    }
}

设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;

@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {
   
    @Autowired
    private MyComponent myComponent;

    @Override
    public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
        producerFactory.updateConfigs(Map.of(
            "myComponent", myComponent,
            ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()
        ));
    }
}

3 基于Spring-Kafka管理的拦截器

基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别(ConsumerRecords),如果要对单条消息拦截,可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor接口。

3.1 单条消息拦截接口定义

由于此拦截器是受Spring容器管理的,所以可以通过@Component注解自动注入到容器中,进行自动拦截。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;

@Component
public class CustomRecordInterceptor implements RecordInterceptor<Object, Object> {

    @Override
    public ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record) {
        System.out.println(record.topic());
        return record;
    }
}

http://www.kler.cn/a/379439.html

相关文章:

  • 《Java 实现希尔排序:原理剖析与代码详解》
  • 在Mac下安装时间序列软件Hector
  • 占地1.1万平,2亿投资的智能仓储系统:高架库、AGV、码垛机器人……
  • 美国大学生数学建模竞赛(MCM/ICM)介绍
  • QT——记事本项目
  • 【Python+Pycharm】2024-Python安装配置教程
  • 算法设计题(树和二叉树)
  • 自然语言处理研究方向在跨语言处理方面有哪些新的创新思路?
  • 【c++日常刷题】两个数字的交集、点击消除、最小花费爬楼梯
  • 架构师备考-软件工程相关补充
  • Android使用scheme方式唤醒处于后台时的App场景
  • Python复习2
  • 笔记-利率学习记录
  • easy-es使用以及Es和MySQL同步
  • Go-Sqlite3学习
  • “格格不入”的星瑞东方曜,燃油市场有麻烦了
  • 进程守护SuperVisord内部的进程定时监测并重启
  • 2024年华为OD机试真题-最小的调整次数-Python-OD统一考试(E卷)
  • locust压测工具环境搭建(Linux、Mac)
  • FBX福币交易所国际油价突然大涨!美伊针锋相对
  • json-server的使用(根据json数据一键生成接口)
  • jenkins自动化构建vue(web)项目并部署(项目实战)
  • RocketMQ可视化工具- Dashboard 使用教程 (附带可下载文件)
  • gulp入门教程14:vinyl
  • Git学习记录
  • MoonNet基准测试更新