SpringKafka生产者、消费者消息拦截
1 前言
在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。
2 基于Kafka管理的拦截器
Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor
和
org.apache.kafka.clients.consumer.ConsumerInterceptor
, 示例如下:
2.1 定义拦截器
生产者拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 在发送消息之前操作
System.out.println("Sending message: " + record.value());
return record; // 继续发送
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
// 资源清理
}
@Override
public void configure(Map<String, ?> configs) {
// 可以在这里获取配置
}
}
2.2 定义消费者拦截器
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
// 配置拦截器
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
// 处理接收到的消息
records.forEach(record -> {
System.out.println("Consumed message: " + record.value());
});
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
// 资源清理
}
}
2.3 添加拦截器
方式一,通过工厂自定义器设置拦截器
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {
@Override
public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));
}
@Override
public void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {
consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));
}
}
方式二,通过配置设置拦截器
spring:
kafka:
producer:
properties:
interceptor.classes: org.example.kafka.CustomProducerInterceptor
consumer:
properties:
interceptor.classes: org.example.kafka.CustomConsumerInterceptor
2.4 拦截器使用Spring容器中的Bean
上面的方法可以看到,拦截器由于没有在Spring容器中管理,则无法使用容器中其他Bean来做业务处理,那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例:
Bean定义
@Component
public class MyComponent {
public void checkMessage(String message) {
System.out.println("Sending message: " + message);
}
}
生产者拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {
private MyComponent myComponent;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
myComponent.checkMessage(record.value());
return record; // 继续发送
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
// 资源清理
}
@Override
public void configure(Map<String, ?> configs) {
myComponent = configs.get("myComponent");
}
}
设置拦截器
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {
@Autowired
private MyComponent myComponent;
@Override
public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
producerFactory.updateConfigs(Map.of(
"myComponent", myComponent,
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()
));
}
}
3 基于Spring-Kafka管理的拦截器
基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别(ConsumerRecords),如果要对单条消息拦截,可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor
接口。
3.1 单条消息拦截接口定义
由于此拦截器是受Spring容器管理的,所以可以通过@Component注解自动注入到容器中,进行自动拦截。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;
@Component
public class CustomRecordInterceptor implements RecordInterceptor<Object, Object> {
@Override
public ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record) {
System.out.println(record.topic());
return record;
}
}