Spring Cloud Stream 4.0.4 rabbitmq 发送消息多function
使用 idea 创建 Springboot 项目
添加 Spring cloud stream 和 rabbitmq 依赖
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>springcloudstream-demo1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springcloudstream-demo1</name>
<description>springcloudstream-demo1</description>
<properties>
<java.version>17</java.version>
<spring-cloud.version>2023.0.0-RC1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-web</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
修改配置文件
--- # rabbitmq 消费者配置
spring:
rabbitmq:
host: localhost
port: 5672
username: ruoyi
password: ruoyi123
cloud:
stream:
rabbit:
bindings:
demo2-in-0:
consumer:
delayed-exchange: true
bindings:
demo-in-0:
content-type: application/json
destination: demo-destination
group: demo-group
binder: rabbit
demo1-in-0:
content-type: application/json
destination: demo1-destination
group: demo1-group
binder: rabbit
demo2-in-0:
content-type: application/json
destination: demo2-destination
group: demo2-group
binder: rabbit
function:
definition: demo;demo1;demo2
创建消费者
package com.example.springcloudstreamdemo1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.function.Consumer;
@SpringBootApplication
public class SpringcloudstreamDemo1Application {
public static void main(String[] args) {
SpringApplication.run(SpringcloudstreamDemo1Application.class, args);
}
/**
* 注意方法名称 demo 要与配置文件中的spring.cloud.stream.bindings.demo-in-0 保持一致
* 其中 -in-0 是固定写法,in 标识消费者类型,0是消费者索引
*/
@Bean
public Consumer<Person> demo() {
return person -> {
System.out.println("Received: " + person);
};
}
@Bean
public Consumer<String> demo1() {
return msg -> {
System.out.println("Received: " + msg);
};
}
@Bean
public Consumer<Person> demo2() {
return msg -> {
System.out.println("Received: " + msg);
};
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
}
注意当多个消费者时,需要添加配置项:spring.cloud.function.definition
启动项目
启动日志
查看 mq 控制台
交换机信息
交换机名称对应:
spring.cloud.stream.bindings.demo-in-0.destination配置项的值
队列信息
- 队列名称是交换机名称+分组名
添加生产者配置
--- #生产者配置
spring:
cloud:
stream:
rabbit:
bindings:
demo2-out-0:
producer:
delayedExchange: true #设置为延迟队列
bindings:
demo-out-0:
content-type: application/json
destination: demo-destination # 同消费者保持一致
binder: rabbit
demo1-out-0:
content-type: application/json
destination: demo1-destination
binder: rabbit
demo2-out-0:
content-type: application/json
destination: demo2-destination
binder: rabbit
创建消息生产者
package com.example.springcloudstreamdemo1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController
{
@Autowired
private StreamBridge streamBridge;
@GetMapping("sendMsg")
public String sendMsg(int delay, String name){
//delay 延时时间毫秒
SpringcloudstreamDemo1Application.Person person = new SpringcloudstreamDemo1Application.Person();
person.setName(name);
Message<SpringcloudstreamDemo1Application.Person> message = MessageBuilder.withPayload(person)
.setHeader("x-delay", delay).build();
// 发送延时消息
streamBridge.send("demo2-out-0", message);
streamBridge.send("demo1-out-0", person);
streamBridge.send("demo-out-0", person);
return "发送成功";
}
}
启动测试
发送消息
http://localhost:8080/sendMsg?delay=10000&name=zhangsan
打印消息
问题总结
问题一
Multiple functional beans were found [*,*], thus can't determine default function definition. Please use 'spring.cloud.function.definition' property to explicitly define it.
解决办法:
查看配置是否正确:
spring.cloud.function.definition