spring使用rabbitmq当rabbitmq集群节点挂掉 spring rabbitmq怎么保证高可用,rabbitmq网络怎么重新连接
##spring rabbitmq代码示例
Controller代码
import com.alibaba.fastjson.JSONObject;
import com.newland.mi.config.RabbitDMMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.io.UnsupportedEncodingException;
import java.util.UUID;
@Controller
@RequestMapping("/rabbitmq")
public class RabbitmqController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/getMessage")
@ResponseBody
public String getMessage() throws UnsupportedEncodingException {
JSONObject obj = new JSONObject();
obj.put("yym", "yym");
MessageProperties messageProperties = new MessageProperties();
String msgId = UUID.randomUUID().toString();
rabbitTemplate.send(RabbitDMMQConfig.YYM_EXCHANGE, RabbitDMMQConfig.YYM_ROUTINGKEY, new Message(obj.toString().getBytes("UTF-8"), messageProperties), new CorrelationData(msgId));
return "success";
}
}
RabbitmqConfig代码
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitDMMQConfig {
protected final static Logger log = LoggerFactory.getLogger(RabbitDMMQConfig.class);
/**
* Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
* Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
* Queue:消息的载体,每个消息都会被投到一个或多个队列。
* Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
* Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
* vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
* Producer:消息生产者,就是投递消息的程序.
* Consumer:消息消费者,就是接受消息的程序.
* Channel:消息通道,在客户端的每个连接里,可建立多个channel.
*/
public static final String YYM_EXCHANGE = "yym-exchange";
public static final String YYM_QUEUE = "yym-queue";
public static final String YYM_ROUTINGKEY = "yym-routingKey";
/**
* 死信队列:
*/
public final static String deadQueueName = "ad_dead_queue";
public final static String deadRoutingKey = "ad_dead_routing_key";
public final static String deadExchangeName = "ad_dead_exchange";
@Bean
public Queue deadQueue() {
Queue queue = new Queue(deadQueueName, true);
return queue;
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
}
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
}
/**
* 死信队列 交换机标识符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键标识符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
@Bean
public ConnectionFactory yymConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("192.168.3.162:5672,192.168.3.162:5673,192.168.3.162:5674");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setConnectionTimeout(15000);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
log.info("svc dm ConnectionFactory success");
return connectionFactory;
}
/**
* 必须是prototype类型
* @return
*/
@Bean
public RabbitTemplate yymRabbitTemplate() {
RabbitTemplate yymRabbitTemplate = new RabbitTemplate(dmConnectionFactory());
yymRabbitTemplate.setMandatory(true);
yymRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})", JSON.toJSONString(correlationData), ack, cause);
}
});
yymRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey,
replyCode, replyText, new String(message.getBody()));
}
});
log.info("RabbitTemplate success");
return dmRabbitTemplate;
}
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public DirectExchange yymDefaultExchange() {
return new DirectExchange(YYM_EXCHANGE);
}
/**
* 获取队列A
* @return
*/
@Bean
public Queue queueA() {
// 队列持久
return new Queue(YYM_QUEUE, true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queueA()).to(dmDefaultExchange()).with(YYM_ROUTINGKEY);
}
}
##docker rabbitmq 集群
yym@yym:~$ docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
NAMES
d95cd024f3c9 rabbitmq:3-management "docker-entrypoint.s…" 10 days ago Up 6 hours 4369/tcp, 5671/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:5674->5672/tcp, [::]:5674->5672/tcp, 0.0.0.0:15674->15672/tcp, [::]:15674->15672/tcp rabbitmq-node3
fd35f01e8b2d rabbitmq:3-management "docker-entrypoint.s…" 10 days ago Up 26 seconds 4369/tcp, 5671/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:5673->5672/tcp, [::]:5673->5672/tcp, 0.0.0.0:15673->15672/tcp, [::]:15673->15672/tcp rabbitmq-node2
83aa5e48fb3b rabbitmq:3-management "docker-entrypoint.s…" 10 days ago Up 21 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq-node1
##关闭其中一个节点
docker stop 83aa5e48fb3b
##CachingConnectionFactory监听关闭事件ShutdownListener.shutdownCompleted
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
this.closeExceptionLogger.log(logger, "Channel shutdown", cause);
int protocolClassId = cause.getReason().protocolClassId();
if (protocolClassId == RabbitUtils.CHANNEL_PROTOCOL_CLASS_ID_20) {
getChannelListener().onShutDown(cause);
}
else if (protocolClassId == RabbitUtils.CONNECTION_PROTOCOL_CLASS_ID_10) {
getConnectionListener().onShutDown(cause);
}
}
##请求controller rabbitmq/getMessage
RabbitTemplate发起请求
##connection等空,新建一个connection
##使用CachingConnectionFactory缓存里面的this.connection.target
##使用CachingConnectionFactory上次连接缓存里面的this.connection
##findOpenChannel从channelList缓存数组中清理掉channel.isOpen()是关闭的
##判断连接是否打开的connection.isOpen()
##关闭事件监听ShutdownListener.shutdownCompleted中 this.shutdownCause已经有值,所以不等空,是否打开连接为假。isOpen函数返回假。
##connection.isOpen()连接未打开,channel为空,新建一个channel
##createBareChannel新建channel
##新建createConnection连接
##连接
##根据配置地址进行连接
##创建新的连接
##新连接
##使用socket创建连接
##192.168.3.162:5672节点是关闭的,创建失败
##循环下一个节点192.168.3.162:5673连接,直到节点可以连接
##SocketFrameHandler socket 读写流
##初始化心跳
##this.connection.target创建完成
## 执行mq消息发送