当前位置: 首页 > article >正文

kafka 生产者拦截器

生产者拦截器

kafka 消息发送到Broker 之前大概需要经过 生产者拦截器 、序列化器分区器等一系列处理。本文主要介绍生产者拦截器

生产者拦截器可以在消息发送之前对消息进行拦截。它可以改变消息内容,包括key , value ,topic 等任何信息

通常不推荐修改key , topic , 我们可以给消息添加一些额外信息,比如版本号,过滤一些"非法"消息等。

图片

拦截器接口介绍

public interface ProducerInterceptor<K, V> extends Configurable {
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);

    void onAcknowledgement(RecordMetadata var1, Exception var2);

    void close();
}

生产者拦截器接口包含三个方法

  • onSend 方法可以对消息进行定制化修改

  • onAcknowledgement 在消息被应答或者发送失败时执行

    该方法在IO线程执行,所以不要执行一些耗时操作,会影响消息投递速度

  • close 用于执行一些资源释放的工作

自定义生产者拦截器Demo

该生产者拦截器给消息添加个包含版本号信息的Header ,配置生产者

ProducerConfig.INTERCEPTOR_CLASSES_CONFIG 属性使其生效

    private Map<String, Object> produceConfigs() {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        configMap.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class);
        configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(CustomerProduceInterector.class));
        return configMap;
    }
public class CustomerProduceInterector implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
    
        producerRecord.headers().add(new RecordHeader("version","v1".getBytes()));
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        System.out.println("onAcknowledgement  ....");
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

结语

生产者拦截器还是比较简单,相应的还有消费者拦截器,我们平常业务也未必用到,殊途同归,在一些RPC框架比如dubbo服务提供方和消费端都有类似的拦截器,可以做一些链路追踪等等。

以上就是我个人的理解  喜欢的一起关注交流学习


http://www.kler.cn/a/318106.html

相关文章:

  • 2024年12月大语言模型最新对比:GPT-4、Claude 3、文心一言等详细评测
  • linux RCU调优
  • 004-spring-注解aop的使用
  • 图神经网络_图嵌入_SDNE
  • Arduino驱动DS18B20测量环境温度
  • Spring Boot 项目创建
  • yum 安装gcc 时,提示glibc错误依赖
  • LeetCode题练习与总结:二叉树的最近公共祖先--236
  • 读书笔记——DDIA-v2 设计数据密集型应用(第二版)
  • 卷积神经网络——手写数字识别
  • PX4固定翼控制器详解(五)——L1、NPFG控制器
  • 347. 前 K 个高频元素
  • 【2024W36】肖恩技术周刊(第 14 期):什么是完美副业?
  • 大模型培训讲师叶梓:Llama Factory 微调模型实战分享提纲
  • 用Swift实现验证回文字符串
  • 空栈压数 - 华为OD统一考试(E卷)
  • 一.python入门
  • Spring Boot框架在心理教育辅导系统中的应用
  • HTTP协议详解
  • javascript:检查JavaScript对象属性是否存在
  • kubernets部署prometheus监控
  • MySQL:用户管理
  • VSCode使用Clangd
  • 《程序猿之设计模式实战 · 适配器模式》
  • 云计算和虚拟化技术 背诵
  • Django一分钟:DRF快速实现JWT认证与RBAC权限校验