kafka+spring cloud stream 发送接收消息
方案 1:使用旧版 @StreamListener
(适用于 Spring Cloud Stream <= 2.x)
1. 添加依赖(pom.xml
)
<!-- Spring Cloud Stream + Kafka Binder -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
2. 定义消息通道接口
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MyChannels {
String INPUT = "myInput";
@Input(INPUT)
SubscribableChannel input();
}
3. 使用 @StreamListener
监听
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
@Service
@EnableBinding(MyChannels.class) // 绑定消息通道
public class KafkaStreamListener {
@StreamListener(MyChannels.INPUT)
public void handleMessage(String message) {
System.out.println("Received via @StreamListener: " + message);
}
}
4. 配置 application.properties
# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092
# 绑定输入通道到 Kafka Topic
spring.cloud.stream.bindings.myInput.destination=my-topic
spring.cloud.stream.bindings.myInput.group=my-group
spring.cloud.stream.bindings.myInput.content-type=text/plain
方案 2:新版函数式编程模型(推荐,Spring Cloud Stream >= 3.x)
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Component
public class KafkaStreamListener {
@Bean
public Consumer<String> myInput() {
return message -> {
System.out.println("Received via Function: " + message);
};
}
}
# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092
# 绑定函数到 Kafka Topic
spring.cloud.stream.bindings.myInput-in-0.destination=my-topic
spring.cloud.stream.bindings.myInput-in-0.group=my-group
spring.cloud.stream.bindings.myInput-in-0.content-type=text/plain
生产者代码示例(发送消息)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
@Autowired
private StreamBridge streamBridge;
public void sendMessage(String topic, String message) {
streamBridge.send(topic, message);
}
}
测试步骤
-
启动 Kafka:确保 Kafka 和 Zookeeper 服务运行。
-
创建 Topic:
kafka-topics --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
-
发送消息:
kafkaProducer.sendMessage("my-topic", "Hello Kafka Stream!");
-
查看消费者日志:
Received via @StreamListener: Hello Kafka Stream! // 或 Received via Function: Hello Kafka Stream!
常见问题
-
版本兼容性:
-
Spring Cloud Stream 3.x 后需使用函数式编程。
-
检查 Spring Boot 版本与 Spring Cloud Stream 的匹配关系(如 Spring Boot 2.6.x + Spring Cloud 2021.x)。
-
-
绑定配置:
-
函数式模型中,绑定名称格式为
<functionName>-in-<index>
(如myInput-in-0
)。
-
-
序列化配置:
-
若传递 JSON 对象,需配置
content-type=application/json
并添加 Jackson 依赖。
-
总结
-
旧版:使用
@StreamListener
+ 通道接口(适合遗留代码升级)。 -
新版:推荐函数式编程模型(更简洁,符合现代 Spring 设计)。
-
根据实际 Spring Cloud Stream 版本选择方案!