当前位置: 首页 > article >正文

spring使用rabbitmq当rabbitmq集群节点挂掉 spring rabbitmq怎么保证高可用,rabbitmq网络怎么重新连接

##spring rabbitmq代码示例

Controller代码

import com.alibaba.fastjson.JSONObject;
import com.newland.mi.config.RabbitDMMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import java.io.UnsupportedEncodingException;
import java.util.UUID;

@Controller
@RequestMapping("/rabbitmq")
public class RabbitmqController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/getMessage")
    @ResponseBody
    public String getMessage() throws UnsupportedEncodingException {
        JSONObject obj = new JSONObject();
        obj.put("yym", "yym");
        MessageProperties messageProperties = new MessageProperties();
        String msgId = UUID.randomUUID().toString();
        rabbitTemplate.send(RabbitDMMQConfig.YYM_EXCHANGE, RabbitDMMQConfig.YYM_ROUTINGKEY, new Message(obj.toString().getBytes("UTF-8"), messageProperties), new CorrelationData(msgId));
        return "success";
    }

}

RabbitmqConfig代码

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitDMMQConfig {

    protected final static Logger log = LoggerFactory.getLogger(RabbitDMMQConfig.class);

    /**
     * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
     * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
     * Queue:消息的载体,每个消息都会被投到一个或多个队列。
     * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
     * Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
     * vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
     * Producer:消息生产者,就是投递消息的程序.
     * Consumer:消息消费者,就是接受消息的程序.
     * Channel:消息通道,在客户端的每个连接里,可建立多个channel.
     */

    public static final String YYM_EXCHANGE = "yym-exchange";
    public static final String YYM_QUEUE = "yym-queue";
    public static final String YYM_ROUTINGKEY = "yym-routingKey";

    /**
     * 死信队列:
     */
    public final static String deadQueueName = "ad_dead_queue";
    public final static String deadRoutingKey = "ad_dead_routing_key";
    public final static String deadExchangeName = "ad_dead_exchange";

    @Bean
    public Queue deadQueue() {
        Queue queue = new Queue(deadQueueName, true);
        return queue;
    }

    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(deadExchangeName);
    }

    @Bean
    public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
    }


    /**
     * 死信队列 交换机标识符
     */
    public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
    /**
     * 死信队列交换机绑定键标识符
     */
    public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";




    @Bean
    public ConnectionFactory yymConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("192.168.3.162:5672,192.168.3.162:5673,192.168.3.162:5674");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setConnectionTimeout(15000);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        log.info("svc dm ConnectionFactory success");
        return connectionFactory;
    }

    /**
     * 必须是prototype类型
     * @return
     */
    @Bean
    public RabbitTemplate yymRabbitTemplate() {
        RabbitTemplate yymRabbitTemplate = new RabbitTemplate(dmConnectionFactory());
        yymRabbitTemplate.setMandatory(true);
        yymRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})", JSON.toJSONString(correlationData), ack, cause);
            }
        });
        yymRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey,
                        replyCode, replyText, new String(message.getBody()));
            }
        });
        log.info("RabbitTemplate success");
        return dmRabbitTemplate;
    }


    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     HeadersExchange :通过添加属性key-value匹配
     DirectExchange:按照routingkey分发到指定队列
     TopicExchange:多关键字匹配
     */
    @Bean
    public DirectExchange yymDefaultExchange() {
        return new DirectExchange(YYM_EXCHANGE);
    }


    /**
     * 获取队列A
     * @return
     */
    @Bean
    public Queue queueA() {
        // 队列持久
        return new Queue(YYM_QUEUE, true);
    }


    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queueA()).to(dmDefaultExchange()).with(YYM_ROUTINGKEY);
    }

}

##docker rabbitmq 集群

yym@yym:~$ docker ps -a
CONTAINER ID   IMAGE                                          COMMAND                  CREATED        STATUS                     PORTS
                                                                                                        NAMES
d95cd024f3c9   rabbitmq:3-management                          "docker-entrypoint.s…"   10 days ago    Up 6 hours                 4369/tcp, 5671/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:5674->5672/tcp, [::]:5674->5672/tcp, 0.0.0.0:15674->15672/tcp, [::]:15674->15672/tcp   rabbitmq-node3
fd35f01e8b2d   rabbitmq:3-management                          "docker-entrypoint.s…"   10 days ago    Up 26 seconds              4369/tcp, 5671/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:5673->5672/tcp, [::]:5673->5672/tcp, 0.0.0.0:15673->15672/tcp, [::]:15673->15672/tcp   rabbitmq-node2
83aa5e48fb3b   rabbitmq:3-management                          "docker-entrypoint.s…"   10 days ago    Up 21 minutes              4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp       rabbitmq-node1

##关闭其中一个节点

docker stop 83aa5e48fb3b

##CachingConnectionFactory监听关闭事件ShutdownListener.shutdownCompleted

@Override
	public void shutdownCompleted(ShutdownSignalException cause) {
		this.closeExceptionLogger.log(logger, "Channel shutdown", cause);
		int protocolClassId = cause.getReason().protocolClassId();
		if (protocolClassId == RabbitUtils.CHANNEL_PROTOCOL_CLASS_ID_20) {
			getChannelListener().onShutDown(cause);
		}
		else if (protocolClassId == RabbitUtils.CONNECTION_PROTOCOL_CLASS_ID_10) {
			getConnectionListener().onShutDown(cause);
		}

	}

##请求controller rabbitmq/getMessage

RabbitTemplate发起请求

##connection等空,新建一个connection

 ##使用CachingConnectionFactory缓存里面的this.connection.target

  ##使用CachingConnectionFactory上次连接缓存里面的this.connection

##findOpenChannel从channelList缓存数组中清理掉channel.isOpen()是关闭的

 ##判断连接是否打开的connection.isOpen()

 ##关闭事件监听ShutdownListener.shutdownCompleted中 this.shutdownCause已经有值,所以不等空,是否打开连接为假。isOpen函数返回假。

 

 ##connection.isOpen()连接未打开,channel为空,新建一个channel

 ##createBareChannel新建channel

 ##新建createConnection连接

##连接 

##根据配置地址进行连接

 ##创建新的连接

##新连接

##使用socket创建连接

##192.168.3.162:5672节点是关闭的,创建失败

 ##循环下一个节点192.168.3.162:5673连接,直到节点可以连接

##SocketFrameHandler socket 读写流

##初始化心跳

 ##this.connection.target创建完成

## 执行mq消息发送


http://www.kler.cn/a/442609.html

相关文章:

  • Facebook 隐私风波:互联网时代数据安全警钟
  • 浅谈云计算07 | 云安全机制
  • 【BLE】CC2541之ADC
  • SQL面试题1:连续登陆问题
  • 【day5】Redis持久化之AOF + Redis事务_锁机制
  • SpringBoot配置文件
  • 使用Python打造高效的PDF文件管理应用(合并以及分割)
  • Spring Boot 集成 Elasticsearch怎样在不启动es的情况下正常启动服务
  • 【21天学习AI底层概念】day5 机器学习的三大类型不能解决哪些问题?
  • 秒杀抢购场景下实战JVM级别锁与分布式锁
  • 四、网络层:数据平面,《计算机网络(自顶向下方法 第7版,James F.Kurose,Keith W.Ross)》
  • WPF 使用LibVLCSharp.WPF实现视频播放、停止、暂停功能
  • 【排序算法】——插入排序
  • .Net Core注册一个定制任务执行类服务
  • 首屏加载慢问题
  • 备战秋招:2024游戏开发入行与跳槽面试详解
  • 智能移动交通执法方案:易泊车牌识别相机助力精准执法与数据驱动管理
  • HCIA-Access V2.5_3_3_2_VLAN路由配置与实现
  • 机器学习之偏差
  • Shadcn UI 实战:打造可维护的企业级组件库
  • 简单配置,全面保护:HZERO审计服务让安全触手可及
  • 2412d,d的8月会议
  • Unity超优质动态天气插件(含一年四季各种天气变化,可用于单机局域网VR)
  • 【ETCD】【源码阅读】深入解析 EtcdServer.applyConfChange 方法
  • Web网络通信 --- 后端消息推送
  • Bootstrap 表格