当前位置: 首页 > article >正文

RabbitMQ代码实战2

RabbitMQ代码实战2

RPC远程过程调用模式队列(RPC)

模型

img

package cn.yanghuisen.rpc.server;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * RPC模式队列-服务端
 */
public class RPCServer {

    // 队列名称
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    /**
     * 计算斐波那契数列
     *
     * @param n
     * @return
     */
    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.100");
        factory.setPort(5672);
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");

        try {
            // 通过工厂创建连接
            final Connection connection = factory.newConnection();
            // 获取通道
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            /*
                限制RabbitMQ只发不超过1条的消息给同一个消费者。
                当消息处理完毕后,有了反馈,才会进行第二次发送。
             */
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            // 获取消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                // 获取replyTo队列和correlationId请求标识
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";
                try {
                    // 接收客户端消息
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    // 服务端根据业务需求处理
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    // 将处理结果发送至replyTo队列同时携带correlationId属性
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps,
                            response.getBytes("UTF-8"));
                    // 手动回执消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // RabbitMq consumer worker thread notifies the RPC server owner thread
                    // RabbitMq消费者工作线程通知RPC服务器其他所有线程运行
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };
            // 监听队列
            /*
                autoAck = true代表自动确认消息
                autoAck = false代表手动确认消息
             */
            boolean autoAck = false;
            channel.basicConsume(RPC_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
            });
            // Wait and be prepared to consume the message from RPC client.
            // 线程等待并准备接收来自RPC客户端的消息
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

package cn.yanghuisen.rpc.client;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

/**
 * RPC模式队列-客户端
 */
public class RPCClient implements AutoCloseable {

    private Connection connection;
    private Channel channel;
    // 队列名称
    private String requestQueueName = "rpc_queue";

    // 初始化连接
    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.100");
        factory.setPort(5672);
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] args) {
        try (RPCClient fibonacciRpc = new RPCClient()) {
            for (int i = 0; i < 10; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                // 请求服务端
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 请求服务端
    public String call(String message) throws IOException, InterruptedException {
        // correlationId请求标识ID
        final String corrId = UUID.randomUUID().toString();

        // 获取队列名称
        String replyQueueName = channel.queueDeclare().getQueue();

        // 设置replyTo队列和correlationId请求标识
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        // 发送消息至队列
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        // 设置线程等待,每次只接收一个响应结果
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        // 接受服务器返回结果
        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                // 将给定的元素在给定的时间内设置到线程队列中,如果设置成功返回true, 否则返回false
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        // 从线程队列中获取值,如果线程队列中没有值,线程会一直阻塞,直到线程队列中有值,并且取得该值
        String result = response.take();
        // 从消息队列中丢弃该值
        channel.basicCancel(ctag);
        return result;
    }

    // 关闭连接
    public void close() throws IOException {
        connection.close();
    }
}

3.7、确认模式队列(confirm)

如何确定消息队列收到了生产者发送的消息?如果在发送消息前程序崩了怎么办?

3.7.1、事务机制控制
  • txSelect():开启事务
  • txCommit():提交事务
  • txRollback():回滚事务
package cn.yanghuisen.tx.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
 * 事务-发送消息
 */
public class Send {

    // 队列名称
    private final static String QUEUE_NAME = "tx";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.100");
        factory.setPort(5672);
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");

        Connection connection = null;
        Channel channel = null;

        // 通过工厂创建连接
        try{
            connection = factory.newConnection();
            // 获取通道
            channel = connection.createChannel();
            // 开启事务
            channel.txSelect();
            /*
                声明队列
                1、队列名称
                2、是否持久化
                3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
                    1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
                    2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
                    3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
                    这种队列适用于只限于一个客户端发送读取消息的应用场景。
               4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 消息内容
            String message = "Hello World!";
            // 将消息放入队列并发送
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            int i = 1/0;
            System.out.println(" [x] Sent '" + message + "'");
            channel.txCommit();
        }catch (Exception e){
            e.printStackTrace();
            // 回滚
            channel.txRollback();
            channel.close();
            connection.close();


        }
    }
}

package cn.yanghuisen.tx.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 事务-接收消息
 */
public class Recv {

    // 队列名称
    private final static String QUEUE_NAME = "tx";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.100");
        factory.setPort(5672);
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");
        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信息
        Channel channel = connection.createChannel();
        // 申明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 接收消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

缺点:降低了RabbitMQ的消息吞吐量

解决:使用confirm模式

总结:使用事务,可以在发送请求但是没有提交事务前回滚事务,撤回发送的消息。

3.7.2、确认模式(confirm)
生产者设置为确认模式,发送消息时所有的消息都会被指派一个唯一的ID,一旦消息被投递套指定的队列之后,就会返回一个确认结果给生产者(包含消息的唯一ID),这样生产者就知道了消息已经正确到达了目的地。如果消息和队列时可以持久化的,那么确认消息会将消息写入磁盘后发出。

confirm模式大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回 确认的同时继续发送下一条消息,当消息终得到确认之后,生产者应用便可以通过回调方法来处理该 确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序 同样可以在回调方法中处理该nack消息。

实现Confirm确认机制有三种方式

1、普通Confirm模式

每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。

package cn.yanghuisen.confirm.sync.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
 * 确认模式-同步-单条-发送消息
 */
public class Send {

    // 队列名称
    private final static String QUEUE_NAME = "confirm_sync";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.100");
        factory.setPort(5672);
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");

        Connection connection = null;
        Channel channel = null;

        // 通过工厂创建连接
        try{
            connection = factory.newConnection();
            // 获取通道
            channel = connection.createChannel();
            // 开启确认模式
            channel.confirmSelect();
            /*
                声明队列
                1、队列名称
                2、是否持久化
                3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
                    1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
                    2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
                    3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
                    这种队列适用于只限于一个客户端发送读取消息的应用场景。
               4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 消息内容
            String message = "Hello World!";
            // 将消息放入队列并发送
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

            // 确认消息是否发送成功
            if (channel.waitForConfirms()){
                System.out.println("消息发送成功");
            }else {
                System.out.println("消息发送失败");
            }



            System.out.println(" [x] Sent '" + message + "'");
        }catch (Exception e){
            e.printStackTrace();


        }
    }
}

package cn.yanghuisen.confirm.sync.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 确认模式-同步-单条-接收消息
 */
public class Recv {

    // 队列名称
    private final static String QUEUE_NAME = "confirm_sync";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.100");
        factory.setPort(5672);
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");
        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信息
        Channel channel = connection.createChannel();
        // 申明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 接收消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

2、批量confirm模式:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务器端 confirm。

package cn.yanghuisen.confirm.sync.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
 * 确认模式-同步-批量-发送消息
 */
public class Send {

    // 队列名称
    private final static String QUEUE_NAME = "confirm_sync";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.100");
        factory.setPort(5672);
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");

        Connection connection = null;
        Channel channel = null;

        // 通过工厂创建连接
        try{
            connection = factory.newConnection();
            // 获取通道
            channel = connection.createChannel();
            // 开启确认模式
            channel.confirmSelect();
            /*
                声明队列
                1、队列名称
                2、是否持久化
                3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
                    1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
                    2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
                    3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
                    这种队列适用于只限于一个客户端发送读取消息的应用场景。
               4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 消息内容
            String message = "Hello World!";
            // 将消息放入队列并发送
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

            // 确认消息是否发送成功-多条
            // 如果有一条没被确认,就会抛IO异常
            channel.waitForConfirmsOrDie();



//            if (channel.waitForConfirms()){
//                System.out.println("消息发送成功");
//            }else {
//                System.out.println("消息发送失败");
//            }



            System.out.println(" [x] Sent '" + message + "'");
        }catch (Exception e){
            e.printStackTrace();


        }
    }
}

package cn.yanghuisen.confirm.sync.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 确认模式-同步-批量-接收消息
 */
public class Recv {

    // 队列名称
    private final static String QUEUE_NAME = "confirm_sync";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.100");
        factory.setPort(5672);
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");
        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信息
        Channel channel = connection.createChannel();
        // 申明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 接收消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

3、异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个 方法。

package cn.yanghuisen.confirm.async.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

/**
 * 确认模式-异步-发送消息
 */
public class Send {

    // 队列名称
    private final static String QUEUE_NAME = "confirm_async";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.100");
        factory.setPort(5672);
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");

        Connection connection = null;
        Channel channel = null;

        // 通过工厂创建连接
        try{
            // 维护信息发送回执deliveryTag
            final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
            // 创建连接
            connection = factory.newConnection();
            // 获取通道
            channel = connection.createChannel();
            // 开启确认模式
            channel.confirmSelect();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 添加channel 监听
            channel.addConfirmListener(new ConfirmListener() {
                /**
                 * 已确认
                 * @param l 唯一标识
                 * @param b 确认多条还是单条,true多条
                 * @throws IOException
                 */
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    // 判断确认的是多条还是单条
                    if (b){
                        System.out.println("handleAck--success-->multiple" + l);
                        // 清除前 l 标识ID
                        confirmSet.headSet(l+1).clear();
                    }else {
                        System.out.println("handleAck--success-->single" + l);
                        confirmSet.remove(l);
                    }
                }

                /**
                 * 未确认
                 * @param l
                 * @param b
                 * @throws IOException
                 */
                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    if (b){
                        System.out.println("handleNack--failed-->multiple-->" + l);
                        // 清除前 deliveryTag 项标识id
                        confirmSet.headSet(l + 1L).clear();
                    }else {
                        System.out.println("handleNack--failed-->single" + l);
                        confirmSet.remove(l);
                    }
                }
            });


            // 循环发送消息
            while (true){
                // 消息内容
                String message = "Hello World!";
                // 获取unconfirm的消息序号
                Long seqNo = channel.getNextPublishSeqNo();
                // 将消息放入队列并发送
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                // 将消息序号添加到SortedSet
                confirmSet.add(seqNo);
            }
        }catch (Exception e){
            e.printStackTrace();
            channel.close();
            connection.close();

        }
    }
}

package cn.yanghuisen.confirm.async.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 确认模式-异步-接收消息
 */
public class Recv {

    // 队列名称
    private final static String QUEUE_NAME = "confirm_async";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.100");
        factory.setPort(5672);
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");
        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信息
        Channel channel = connection.createChannel();
        // 申明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 接收消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}


http://www.kler.cn/a/411667.html

相关文章:

  • 4.6 JMeter HTTP信息头管理器
  • 嵌入式硬件设计:从概念到实现的全流程
  • 非root用户安装CUDA
  • 【Flutter】搭建Flutter开发环境,安卓开发
  • Python和C++急性损伤空间结构
  • Python学习——字符串操作方法
  • Leetcode322.零钱兑换(HOT100)
  • 训练的decoder模型文本长度不一致,一般设置为多大合适,需要覆盖最长的文本长度么
  • Spring集成RabbitMQ
  • 【spark-spring boot】学习笔记
  • IDEA某个Impl下的引入的文件红色
  • mp4视频流推送的学习
  • IDEA插件CamelCase,快速转变命名格式
  • 《硬件架构的艺术》笔记(八):消抖技术
  • css:转换
  • SQL优化笔记--explain看执行计划--主要还是看用了哪些索引,所以你是否失效--分库分表的注意事项(未完)
  • C#中面试的常见问题008
  • 列表代码思路
  • 前端技术选型之uniapp
  • MySQL中char和varchar的区别详解
  • JavaWeb——请求响应(5/8)-请求:日期参数json参数(使用场景及封装类型、接收方式、在 Postman 中传递、在服务端接收)
  • Spring框架使用xml方式配置ThreadPoolTaskExecutor线程池,并且自定义线程工厂
  • jdk17-LongAddr 类increment()方法注释说明
  • c++中的lambda表达式!
  • 【H2O2|全栈】JS进阶知识(十一)axios入门
  • ChatGPT如何辅助academic writing?