kafka 生产者拦截器
生产者拦截器
kafka 消息发送到Broker
之前大概需要经过 生产者拦截器 、序列化器、分区器等一系列处理。本文主要介绍生产者拦截器
生产者拦截器可以在消息发送之前对消息进行拦截。它可以
改变消息内容
,包括key , value ,topic 等任何信息
通常不推荐修改key , topic , 我们可以给消息添加一些额外信息,比如版本号,过滤一些"非法"消息等。
拦截器接口介绍
public interface ProducerInterceptor<K, V> extends Configurable {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
void onAcknowledgement(RecordMetadata var1, Exception var2);
void close();
}
生产者拦截器
接口包含三个方法
-
onSend 方法可以对消息进行定制化修改
- onAcknowledgement 在消息被应答或者发送失败时执行
该方法在IO线程执行,所以不要执行一些耗时操作,会影响消息投递速度
-
close 用于执行一些资源释放的工作
自定义生产者拦截器Demo
该生产者拦截器给消息添加个包含版本号信息的Header
,配置生产者
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG
属性使其生效
private Map<String, Object> produceConfigs() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
configMap.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class);
configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(CustomerProduceInterector.class));
return configMap;
}
public class CustomerProduceInterector implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
producerRecord.headers().add(new RecordHeader("version","v1".getBytes()));
return producerRecord;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
System.out.println("onAcknowledgement ....");
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
结语
生产者拦截器
还是比较简单,相应的还有消费者拦截器
,我们平常业务也未必用到,殊途同归,在一些RPC框架比如dubbo服务提供方和消费端都有类似的拦截器,可以做一些链路追踪等等。
以上就是我个人的理解
喜欢的一起关注交流学习