kafka动态监听主题
简单版本
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;
@Service
public class DynamicKafkaListenerService {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
public void registerListener(String topic) {
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener((MessageListener<String, String>) record -> {
System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
});
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
container.setBeanName(topic + "-listener");
container.start();
}
}
手动ack版本
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;
@Service
public class DynamicKafkaListenerService {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
public void registerListener(String topic) {
ContainerProperties containerProperties = new ContainerProperties(topic);
// 设置消息监听器为 AcknowledgingMessageListener
containerProperties.setMessageListener((AcknowledgingMessageListener<String, String>) (record, ack) -> {
try {
System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
// 模拟消息处理逻辑
// 处理完成后手动确认消息
if (ack != null) {
ack.acknowledge();
}
} catch (Exception e) {
// 处理异常情况,例如记录日志或重试等
System.err.println("消息处理失败: " + e.getMessage());
}
});
// 设置手动确认模式
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
container.setBeanName(topic + "-listener");
container.start();
}
}
批量处理版本
@Autowired
private ConsumerFactory<String, String> consumerFactory;
public void registerListener(String topic) {
ContainerProperties containerProperties = new ContainerProperties(topic);
// 设置手动确认模式
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 设置批量消息监听器
containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {
try {
for (ConsumerRecord<String, String> record : records) {
System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
}
// 模拟消息处理逻辑
// 处理完成后手动批量确认消息
ack.acknowledge();
} catch (Exception e) {
// 处理异常情况,例如记录日志或重试等
System.err.println("消息处理失败: " + e.getMessage());
}
});
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
container.setBeanName(topic + "-listener");
container.start();
}
可关闭版本
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class DynamicKafkaListenerService {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
// 用于保存每个主题对应的监听器容器
private final Map<String, ConcurrentMessageListenerContainer<String, String>> containerMap = new HashMap<>();
/**
* 开启一个监听
*/
public void registerListener(String topic) {
ContainerProperties containerProperties = new ContainerProperties(topic);
// 设置手动确认模式
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 设置批量消息监听器
containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {
try {
for (ConsumerRecord<String, String> record : records) {
System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());
}
// 模拟消息处理逻辑
// 处理完成后手动批量确认消息
ack.acknowledge();
} catch (Exception e) {
// 处理异常情况,例如记录日志或重试等
System.err.println("消息处理失败: " + e.getMessage());
}
});
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
container.setBeanName(topic + "-listener");
container.start();
// 将监听器容器保存到 map 中
containerMap.put(topic, container);
}
/**
* 关闭一个监听
*/
public void stopListener(String topic) {
ConcurrentMessageListenerContainer<String, String> container = containerMap.get(topic);
if (container != null && container.isRunning()) {
container.stop();
// 从 map 中移除已停止的监听器容器
containerMap.remove(topic);
}
}
}
调用添加监听
/**
* 配置详情
*/
@GetMapping("/getModelZdyConfInfo")
public String getModelZdyConfInfo(String topic) {
dynamicKafkaListenerService.registerListener(topic);
return "添加" + topic + "监听成功";
}