Springboot集成rabbitmq
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
DirectRabbitConfig.java
package com.zzxt.web.core.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue DirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("DzOrgSale",true);
}
//Direct交换机
@Bean
DirectExchange DirectExchange() {
return new DirectExchange("DzSalData",true,false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:DzOrgSale
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DzOrgSale");
}
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("DzSalData");
}
}
注解监听队列 DirectReceiver.java
package com.zzxt.web.core.config;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import com.zzxt.task.service.ITaskService;
@Component
public class DirectReceiver {
protected final Logger logger = LoggerFactory.getLogger(DirectReceiver.class);
@Autowired
private ITaskService taskService;
@RabbitListener(queues = "DzOrgSale")//监听的队列名称 DirectQueue
public void process(Message message, Channel channel) {
byte[] body = message.getBody();
try {
logger.info("接收到队列:{}的消息为:{}", "DzOrgSale", new String(body,"UTF-8"));
// 自己的业务模块,保存成功之后手动确认
boolean saveHxFySalDate = taskService.saveHxFySalDate(new String(body,"UTF-8"));
if(saveHxFySalDate) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}