当前位置: 首页 > article >正文

RabbitMQ Java 客户端开发教程—官方原版

一、概述

客户端 API 公开 AMQP 0-9-1 协议模型中的关键实体, 具有额外的抽象,易于使用。

RabbitMQ Java 客户端使用 com.rabbitmq.client 作为其顶级包。 关键类和接口是:

  • 通道:表示一个AMQP 0-9-1通道,并提供大部分操作(协议方法)。

  • 连接:表示 AMQP 0-9-1 连接

  • 连接工厂:构造连接实例

  • 使用者:表示消息使用者

  • DefaultConsumer:消费者常用的基类

  • 基本属性:消息属性(元数据)

  • BasicProperties.Builder:BasicProperties 的生成器

协议操作可通过通道接口进行。连接是 用于打开通道,注册连接生命周期事件 处理程序,并关闭不再需要的连接。连接通过 ConnectionFactory 实例化, 这是您配置各种连接设置(例如虚拟主机或用户名)的方式。

二、连接和通道

此客户端版本独立于 RabbitMQ 服务器版本,可以与 RabbitMQ 服务器一起使用。 它们需要 Java 8 或更高版本:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>

核心 API 类是连接和通道,表示 AMQP 0-9-1 连接和 分别是通道。它们通常在使用前导入:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

三、连接到 RabbitMQ

以下代码使用给定的参数(主机名、端口号等)连接到 RabbitMQ 节点:

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

Connection conn = factory.newConnection();

所有这些参数对于 RabbitMQ 都有合理的默认值 节点在本地运行。

如果属性,则将使用该属性的默认值 在创建连接之前保持未分配状态:

Property

Default Value

Username

"guest"

Password

"guest"

Virtual host

"/"

Hostname

"localhost"

port

5672 for regular connections, 5671 for connections that use TLS

四、使用 URI 进行连接

或者,可以使用 URI:

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

所有这些参数都有合理的股票默认值 本地运行的 RabbitMQ 服务器。

可以在服务器节点日志中观察到成功和不成功的客户端连接事件。

请注意,默认情况下,用户来宾只能从本地主机连接。 这是为了限制生产系统中的已知凭据使用。

应用程序开发人员可以为连接分配自定义名称。如果设置, 该名称将在 RabbitMQ 节点日志以及管理 UI 中提及。

然后,可以使用连接接口打开通道:

Channel channel = conn.createChannel();

该通道现在可用于发送和接收消息,如后续部分所述。

五、使用端点列表

可以指定连接时要使用的终结点列表。第一个 将使用可访问的终结点。如果连接失败,请使用 终结点列表使应用程序可以连接到不同的 节点(如果原始节点已关闭)。

要使用多个端点,请向 ConnectionFactory#newConnection 提供地址列表。 地址表示主机名和端口对。

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
                                 , new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

将尝试连接到主机名 1:端口号1,如果 主机名 2:端口号2 失败。返回的连接是 数组中第一个成功(不抛出 IOException)。这完全等同于重复 在工厂上设置主机和端口,每次调用 factory.newConnection(),直到其中一个成功。

如果还提供了执行人服务(使用 form factory.newConnection(es, addrArr)) 线程池是 与(第一个)成功连接相关联。

六、断开与 RabbitMQ 的连接

要断开连接,只需关闭通道和连接:

channel.close();
conn.close();

请注意,关闭通道可能被认为是很好的做法,但在这里不是绝对必要的 - 它会完成 在基础连接关闭时自动执行。

七、连接和通道寿命

客户端连接应具有长期生存期。底层协议的设计和优化是针对 长时间运行的连接。这意味着每个操作打开一个新连接, 例如,发布的消息是不必要的,强烈建议不要这样做,因为它会引入很多 网络往返和开销。

通道也意味着长期存在,但由于许多可恢复的协议错误将 导致通道关闭,通道寿命可能短于其连接寿命。 每个操作关闭和打开新通道通常是不必要的,但可以 适当。如有疑问,请考虑先重复使用频道。

通道级异常,例如尝试从 不存在的队列将导致通道关闭。封闭的通道不能 更长的使用时间,并且不会再从服务器接收任何事件(例如 作为消息传递

八、客户端提供的连接名称

RabbitMQ 节点的客户端信息量有限:

  • 其 TCP 终结点(源 IP 地址和端口)

  • 使用的凭据

仅此信息就可能使识别应用程序和实例变得困难,特别是当凭据可能 共享和客户端通过负载均衡器连接,但无法启用代理协议。

为了更轻松地在服务器日志和管理 UI 中标识客户端, AMQP 0-9-1 客户机连接(包括 RabbitMQ Java 客户机)可以提供定制标识符。 如果设置,标识符将在日志条目和管理 UI 中提及。标识符称为 客户端提供的连接名称。该名称可用于标识应用程序或特定组件 在应用程序中。名称是可选的;但是,强烈建议开发人员提供一个 因为它将大大简化某些操作任务。

RabbitMQ Java 客户机的 ConnectionFactory#newConnection 方法覆盖接受客户机提供的连接名称。下面是上面使用的修改后的连接示例 它提供了这样一个名称:

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
// provides a custom connection name
Connection conn = factory.newConnection("app:audit component:event-consumer");

九、使用交换和队列

客户端应用程序使用 [交换] 和队列, 协议的高级构建块。 必须先声明这些,然后才能使用。声明任一类型的对象 只需确保存在该名称之一,并在必要时创建它。

继续前面的示例,以下代码声明了一个交换和一个以服务器命名的队列, 然后将它们绑定在一起。

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

这将主动声明以下对象, 两者都可以通过使用其他参数进行自定义。 在这里,他们都没有任何特殊的论据。

  • 持久、非自动删除的“直接”交换

  • 具有生成名称的非持久、独占、自动删除队列

然后,上述函数调用将队列绑定到交换 给定路由密钥。

请注意,当只有一个队列时,这将是声明队列的典型方法 客户想要使用它:它不需要一个众所周知的名字,不需要 其他客户端可以使用它(独占)并将被清理 自动(自动删除)。如果多个客户端想要共享一个队列 使用众所周知的名称,此代码将是合适的:

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

这将主动声明:

  • 持久、非自动删除的“直接”交换

  • 具有已知名称的持久、非独占、非自动删除队列

许多通道 API 方法都过载。 这些方便的 exchangeDeclare、queueDeclare 和 queueBind 的缩写形式使用合理的默认值。还有带有更多参数的较长表单,可让您覆盖这些默认值 必要时,在需要时给予完全控制。

这种“短格式,长格式”模式在整个客户端 API 使用过程中使用。

十、被动声明

队列和交换可以“被动”声明。被动声明只是检查实体是否 具有提供的名称存在。如果是这样,则操作是无操作。对于队列成功 被动声明将返回与非被动声明相同的信息,即 队列中处于就绪状态的使用者和消息。

如果实体不存在,则操作将失败,并出现通道级异常。频道 之后不能使用。应该打开一个新通道。通常使用一次性(临时) 被动声明的通道。

通道#队列声明被动和通道#交换声明被动是 用于被动声明的方法。下面的示例演示 Channel#queueDeclarePassive:

Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");
// returns the number of messages in Ready state in the queue
response.getMessageCount();
// returns the number of consumers the queue has
response.getConsumerCount();

Channel#exchangeDeclarePassive的返回值不包含任何有用的信息。因此 如果该方法返回并且没有发生通道异常,则表示交换确实存在。


http://www.kler.cn/a/445.html

相关文章:

  • C语言--字符串函数2
  • C语言0基础全面教程
  • Baklib——企业知识库搭建的轻松工具
  • 单片机——IIC协议与24C02
  • 自学大数据第八天~HDFS命令(二)
  • RK3568平台开发系列讲解(网络篇)常用的网络工具
  • [ 云计算 | Azure ] Chapter 04 | 核心体系结构之数据中心、区域与区域对、可用区和地理区域
  • 记第一次面试的过程(C++)
  • 如何发布自己的npm包
  • 基于MFC的JavaScript进行网页数据交互
  • 2023-03-15:屏幕录制并且显示视频,不要用命令。代码用go语言编写。
  • 计算机网络面试总结
  • 【C++】STL简介 及 string的使用
  • 爽,我终于掌握了selenium图片滑块验证码
  • 程序是怎样跑起来的(2)
  • JS中sort()方法返回值?
  • 高等数学——二重积分
  • 【JDK动态代理】及【CGLib动态代理】:Java的两种动态代理方式
  • 全网独家首发|极致版YOLOv7改进大提升(推荐)网络配置文件仅24层!更清晰更方便更快的改进YOLOv7网络模型
  • Linux中sudo,su与su -命令的区别