当前位置: 首页 > article >正文

1.rabbitMQ介绍

0.思考 我们以前为什么要学习java直接的框架代码,而不是用springboot整合的框架,在学习完MQ后,我的答案是,可以直接写成更灵活的MQ代码(其他框架也是,SSM我们为什么要学,在于灵活度更高,以后可能会遇到SSM的代码我就可以看得懂),springboot整合虽然完成了大多数功能,但是我要其他普通java代码(非springboot)兼容也是可以用原生的(万一有这种老项目呢,总不可能把老项目改为springboot项目吧…),还有一个就是学习了原生的可以看得懂和写出springboot的MQ配置文件了

1.什么是mq(message queue)

1.消息队列 FIFO队列
2.存放的内容是message
3.是跨进程的通信机制,发送信息不需要依赖其他服务

2.应用

1.流量削峰,订单系统无法直接访问1w流量,放到mq可以慢慢处理
不至于系统奔溃,但是访问速度会变慢需要排队
2.应用解耦,订单系统直接调用支付系统/其他系统
支付系统可能会故障导致订单系统故障
用mq在中间可以发信息让执行任务
3.异步处理,不用一直等待服务了,处理完任务通知即可

3.mq的分类

1.activemq 高可用信息可靠高,但是apache社区维护少了 ,ms级
2.kafka(大公司使用) 大数据的杀手锏 ms级消息有序的,大数据实时计算日志采集.消费一次
缺点单机超过54个队列/分区 load彪高 社区更新慢,load越多信息响应更高,消费失败不支持重试
3.RocketMQ(金融互联网)阿里开源,单机吞吐量10w级,支持10亿信息堆积,不会因为信息堆积导致性能低下
缺点: 支持语言少,java c++
4.rabbitMQ(中小公司) 企业最流行的消息中间件 enlang语言高并发特性,多语言, pyjava ruby,.net…ajax社区活跃
缺点: 企业版要收费,学习成本高

  1. MQ的4大概念(一个队列对应一个消费者)
`
   生产者 --->交换机--绑定--队列-->消费者
                        --队列-->消费者 

5.名词

1.channel信道(提高利用率,不会只占用整个tcp连接,有多个信道发送信息,)
2.virtual host 多租户(vh 有多个交换机exchange)
3.blinding exchange和queue的绑定关系

6.安装mq

    #开机启动服务,上传elang,rabbitmq文件
   rpm -ivh erlang-21.3-1.el7.x86_64.rpm
   yum install socat -y
   rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
   chkconfig rabbitmq-server on
     #web界面,先关机后安装插件启动,访问15672,要关防火墙
   rabbitmq-plugins enable rabbitmq_management
   #启动服务
   /sbin/service rabbitmq-server start
    #看状态
   /sbin/service rabbitmq-server status
   #停服务
   /sbin/service rabbitmq-server stop
 
   #添加账号赋权限才能登录admin密码123
   rabbitmqctl add_user admin 123
   rabbitmqctl set_user_tags admin administrator
   rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
   #查看用户
   rabbitmqctl list_users
   #重置命令
    rabbitmqctl stop_app
    rabbitmqctl reset
   #重启
   rabbitmqctl start_app

7.写java项目 生产者 消费者

    1.导入maven依赖
           <!--rabbitmq 依赖客户端-->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.9.0</version>
            </dependency>
            <!--操作文件流的一个依赖-->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.6</version>
            </dependency>
    2. 生产者 通过chanel操作交换机的默认的队列
       ConnectionFactory
       Connection
       Channel
         //队列名
         //队列消息是否持久化到磁盘,默认在内存中
        //队列是否只供一个消费者消费(不共享)
        //是否自动删除,开新队列
       //其他参数
       channel.("name",false,false,false,null)
       //1.哪个交换机,队列名称,其他参数.消息体(二进制)
       String msg="hello";  
     channel.basicPublish("","qname",null,msg.getBytes("UTF-8"));
     //到mq的Queues界面看消息
   public class Producer {
    public static String QUEUE_NAME="hello1";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.10.104");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String msg="hello3";
		//发送消息        
		channel.basicPublish("",QUEUE_NAME,null,msg.getBytes("UTF-8"));

        System.out.println("发送信息成功");


    }
}    


    3.消费者队列类 设置ip账号和密码
       ConnectionFactory
      Connection=factory.createConnection();
       Channel
       DeliverCallBack =(xxx)->{
                  sout(new String(message.getBody(),"UTF-8"));
         };
       CancelCallBack  =xxx->{};
       //消费, 项目队列名,是否自动应答(就是要手动处理还是被动处理)
       //失败的回调,消费者取消消费的回调(都要lambda表达式)
        channel.basicConsume();
                             
   public class Consumer {
    public static String QUEUE_NAME="hello1";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.10.104");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        DeliverCallback deliverCallback=(consumerTag,delivery)->{
            System.out.println(new String(delivery.getBody()));
            System.out.println(consumerTag);

        };
        CancelCallback cancelCallback=(var1)->{

            System.out.println("应答失败");
        };


        Channel channel = connection.createChannel();
       channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

        System.out.println("接收信息成功");
    }
}
    //但是发现一个队列只能发一次,多了接收不到,什么问题???
    //原来开启了手动应答,这一句代码第二个参数现在是自动应答
    channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

8.工作队列 (上面是普通队列)避免立即执行资源密集型任务
生产者大量发消息到队列,消费者需要有多个工作线程处理任务(就不会处理单线程慢慢的处理)
但是要注意消息不能重复消费,导致重复的任务

1.mq默认使用轮询(你一个我一个消息,不会重复)分发给工作线程
2.封装连接信道工具类
3.worker1.class工作线程1 工作线程2
开发工具的work01–>edit config–>allow parallel run
//启动2个work01的窗口 …psf
//写生产者

9.消息应答(防止消息没有被消费者处理完中途丢失消息)处理完成通知mq删除队列的消息

         1.自动应答(少使用 用于机器非常可靠能快速及时处理任务)
         2.手动应答(多使用)
             1.批量应答(不建议) 会把信道的信息全部确认(虽然可以减少网络压力,但是数据可能丢失)
             2.不批量 (只应答当前发送过来的消息,发过去一次应答一次)
                   如 队列发送了 1 ,2 ,3消息,会等待3的处理结果并确认

10.消息应答重新入队(消息发送但是tcp在应答前断开连接[消息发送到线程,队列已经没有数据了,需要恢复数据],没有ack确认消息,需要重新入队发给其他工作线程处理)
如动图mq1 消息最后丢失
请添加图片描述

1.消息自动重新入队

      //消息的标记, 是否批量应答
   channel.basicAck(message.getEnvelope().getDeliveryTag(),false);  

2.代码如下
//提供者

  public class AckMsg {
    public static final String QUEUE_NAME="ack";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = MQRabbitUtil.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //队列名
        //队列消息是否持久化到磁盘,默认在内存中
        //队列是否只供一个消费者消费(不共享)
        //是否自动删除,开新队列
        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String next = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,next.getBytes("UTF-8"));
        }
    }
}
       //消费者
     public class Consumer1 {
public static final String QUEUE_NAME="ack";
public static void main(String[] args) throws IOException, TimeoutException {
    Channel channel = MQRabbitUtil.getChannel();
    DeliverCallback deliverCallback=(consumerTag, delivery)->{
        SleepUtils.sleep(1);
        System.out.println("消息处理快接收:"+new String(delivery.getBody(),"UTF-8"));
        System.out.println(consumerTag);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);


    };
    CancelCallback cancelCallback=(var1)->{

        System.out.println("应答失败");
    };
        //消费, 项目队列名,是否自动应答(就是要手动处理还是被动处理)
        //失败的回调,消费者取消消费的回调(都要lambda表达式)
        boolean IsAck=false;
        channel.basicConsume(QUEUE_NAME, IsAck, deliverCallback, cancelCallback);

        System.out.println("worker2正在等待接收消息");
    }

}
        //消费者2
       public class Consumer2 {
    public static final String QUEUE_NAME="ack";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = MQRabbitUtil.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            SleepUtils.sleep(30);
            System.out.println("消息处理慢接收:"+new String(delivery.getBody(),"UTF-8"));
            System.out.println(consumerTag);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);


        };
        CancelCallback cancelCallback=(var1)->{

            System.out.println("应答失败");
        };
        //消费, 项目队列名,是否自动应答(就是要手动处理还是被动处理)
        //失败的回调,消费者取消消费的回调(都要lambda表达式)
        boolean IsAck=false;
        channel.basicConsume(QUEUE_NAME, IsAck, deliverCallback, cancelCallback);

        System.out.println("worker2正在等待接收消息");
    }

}

10.队列持久化(消息在mq保存,而不发出去就消失)
//需要将原来不持久化的队列删除,不然报错
//界面的feature会变为D代表持久化
//生产者写

   boolean Duration=true;
        channel.queueDeclare(QUEUE_NAME,Duration,false,false,null);

11.消息持久化(也会丢失)

//准备写入磁盘的时候,没有存储完,消息在缓存中
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("UTF-8"));
      //是否自动应答 false,不然的话会学习达到消费者那里消费

12.不公平分配(能者多劳,轮询是公平的)常用

   1.消费者设置 
      //预取值 直接设置下面的,指定每个消费者得到几条
      int prefetchCount =1;//系统会根据处理时间动态分配
      //积压的数据才能看到效果(就是同一时间能接受多少条数据)
      channel.basicQos(prefetchCount);//默认是0公平分发,1代表不公平

13.怎么确保MQ消息不丢失

1.队列持久化
2.队列的消息持久化
3.发布确认

14.发布确认原理(之前的是消费者确认,这里在生产者确认!!!,确认消费者(mq)保存在磁盘上才能保证队列绝对不丢失)
1.单个确认(同步确认,发一条确认一条,速度慢)

     Channel channel = MQRabbitUtil.getChannel();
    channel.confirmSelect();//代表要确认磁盘中mq已经存储了数据,先写

   boolean flag=channel.waitForConfirms();
    if(flag){
            System.out.println("消息已经写入磁盘的确认");

        }
       ----------全部代码------------
  public class AckMsg {
    public static final String QUEUE_NAME="ack4";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Channel channel = MQRabbitUtil.getChannel();
        channel.confirmSelect();//代表要确认磁盘中mq已经存储了数据,先写

        boolean Duration=true;
        //队列名
        //队列消息是否持久化到磁盘,默认在内存中
        //队列是否只供一个消费者消费(不共享)
        //是否自动删除,开新队列
        //其他参数
        channel.queueDeclare(QUEUE_NAME,Duration,false,false,null);

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String next = scanner.next();
            //交换机
            //队列名
            //设置消息持久化
            //二进制
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
            boolean flag=channel.waitForConfirms();
            if(flag){
                System.out.println("消息已经写入磁盘的确认");

            }

        }
    }
}

-----consumer2-----

public class Consumer2 {
    public static final String QUEUE_NAME="ack4";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        channel.basicQos(5);
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            SleepUtils.sleep(30);
            System.out.println("消息处理慢接收:"+new String(delivery.getBody(),"UTF-8"));
            System.out.println(consumerTag);
            //确认的标志
            //是否批量应答
			//这里是我手动应答的代码            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);



        };
        CancelCallback cancelCallback=(var1)->{

            System.out.println("应答失败");
        };
        //消费队列名
        // 是否自动应答(就是要手动处理还是被动处理)
        //失败的回调,消费者取消消费的回调(都要lambda表达式)
        boolean IsAck=false;//手动应答
        channel.basicConsume(QUEUE_NAME, IsAck, deliverCallback, cancelCallback);

        System.out.println("worker2正在等待接收消息");
    }

}
2.批量确认(性能高,但是一群消息发送,出故障时不知道那个消息出故障)(不使用他)
 //批量发数据 确认1次
  //代码也是一样的
   int batchSize=100; //100条确认一次
  for(int i=0;i<MESSAGE_COUNT;i++){
          if(i%batchSize==0){
channel.waitForConfirms();//确认
        }

          
  }
3.异步批量确认(性能高,信息可靠但是代码难写)(比前面两个都快) 使用一个有序的map key记录编号和消息,信息到后面才异步确定
      //!!!消费者没有确认代码,提供者必须要有 下面
      
       channel.confirmSelect();                         
    //消费者有个broker通知map是否收到信息
               nackCallback #未确认回调
               ackCallback   #确认回调
    //提供者代码,写监听器 监听成功接口还是失败接口,不用waitForConfirms
//不看他也可以,因为并发访问时线程不安全
ConfirmCallback ackCallback=(tag,multiple)->{
                     if(multiple){//如果批量确认,直接批量删除标签
                              //成功一个,在全部列表删除该标签的值
                              ConcurrentNavigableMap<Long,String> confirmed=out.headMap(deliverTag);
                              confirm.clear();//清除确认的
                    }else{
						 out.remove(tag);//直接删除
                     }
                         sout("确认的消息"+tag)
            }
           ConfirmCallback nackCallback=(tag,multiple)->{ 
                        out.get(tag);//得到为确认的消息
                         sout("确认的消息"+tag)
            }
       channel.addConfirmListener(ackCallback,nackCallback);//异步通知
    //怎么处理未确认消息,因为发消息和确认消息是两个队列,所以使用ConcurrentLinkedQueue对确认和发布线程进行消息传递
   //全部发送的消息(发送消息的时候) - 记录成功的消息(确认成功的接口)=未确认的消息
    //老师选择了另外一个来记录 1.序号和信息关联2.轻松批量删除3.支持高并发(多线程)
         ConcurrentSkipListMap<Long,String> out=new ConcurrentSkipListMap<>();
  for(int i=0;i<MESSAGE_COUNT;i++){
       String msg="msg"+i; 
       channel.basicPublish("",queueName,null,msg.getBytes());
       out.put(channel.getNextPublishSeqNo(),msg);
          
  }

http://www.kler.cn/a/17377.html

相关文章:

  • 如何在Python中实现一个简单的搜索引擎:从零开始的指南
  • [HarmonyOS]简单说一下鸿蒙架构
  • 【ubuntu】单进程申请4GB内存
  • 使用Matlab神经网络工具箱
  • 【设计模式】结构型模式(四):组合模式、享元模式
  • css:没错又是我
  • JavaScript闭包的基本原理和应用场景
  • 人的全面发展评价指标体系—基于相关-主成分分析构建
  • 2000-2019年30省研发资本存量(含计算过程和原始数据)
  • 大数据Doris(八):Broker部署和集群启停脚本
  • 高效学习方法和工具推荐,让你事半功倍!
  • clickhouse里的数组数据类型与相关使用介绍
  • 【C++复习1】程序结构和C++的工作原理
  • Java程序设计入门教程--数组
  • 小球下落(dropping balls)uva679
  • go 打包文件夹成zip文件
  • Envoy控制面实践
  • 漫画 | Linux之父:财务自由以后,我失眠了!
  • 华为OD机试 - 整理扑克牌(Python)
  • [计算机图形学]光场,颜色与感知(前瞻预习/复习回顾)
  • 对比 LVS 负载均衡群集的 NAT 模式和 DR 模式,基于 CentOS 7 构建 LVS-DR 群集
  • springboot 集成 shardingSphere 加mybatisplus 自带增加 分页查询 和源代码包 分库分表 单库 分表 使用雪花算法id
  • node.js 处理路径问题
  • VR与AR:哪个有更大的潜力改变未来?
  • 今天面了个字节跳动拿35K出来的,真是砂纸擦屁股,给我露了一手啊
  • Skywalking