当前位置: 首页 > article >正文

RabbitMQ练习(Topics)

 1、RabbitMQ教程

《RabbitMQ Tutorials》icon-default.png?t=N7T8https://www.rabbitmq.com/tutorials

2、环境准备

参考:《RabbitMQ练习(Hello World)》和《RabbitMQ练习(Work Queues)》。

确保RabbitMQ、Sender、Receiver、Receiver2容器正常安装和启动。

root@k0test1:~# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
root@k0test1:~# docker start sender
root@k0test1:~# docker start receiver
root@k0test1:~# docker start receiver2
root@k0test1:~# docker network inspect bridge

网络拓扑:

3、Topics练习

3.1 概述

在前面的Routing练习中,不再使用仅能进行简单广播的fanout exchange,而是使用了direct exchange,从而实现选择性地接收日志。但direct exchange仍然有局限性——它不能基于多个条件进行路由。

在常见的日志系统中,不仅可以根据严重性订阅日志,还可以根据发出日志的来源进行订阅。比如Unix工具syslog,它根据严重性(信息/警告/关键...)和设施(认证/定时任务/内核...)来发出日志(原文:You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).)。

这将提供很大的灵活性——比如只希望接收来自'cron'的关键错误日志,以及来自'kern'的所有日志。

要在日志系统中实现这一点,需要学习更复杂的主题交换机(topic exchange)

3.2 Topic exchange

1、路由键格式要求

发送到主题交换机(topic exchange)的消息所携带的路由键(routing_key)不能是任意形式——它必须是用分隔的单词列表。这些单词可以是任何内容,但通常它们和消息的某些特征相关。一些有效的路由键示例包括:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。路由键中的单词数量可以随意,只要不超过255字节的限制。

2、绑定键格式要求

绑定键(binding key)也必须是相同的形式。主题交换机(topic exchange)背后的逻辑类似于直接交换机(direct exchange)——路由键需要和绑定键进行匹配,使用特定路由键发送的消息将被传递到所有与匹配绑定键绑定的队列。然而,对于绑定键有两个重要的特殊情况:

  • * 可以替代恰好一个单词。
  • # 可以替代0个或多个单词。

3、举例说明:

在这个例子中,我们将发送描述动物的消息。这些消息将使用由三个词组成的路由键发送(由两个点分隔)。路由键的第一个词将描述速度(celerity),第二个词描述颜色(colour),第三个词描述物种(species):"<celerity>.<colour>.<species>"。

我们创建了三个绑定(bindings):

  • Q1 绑定的键是 "*.orange.*",意味着它对所有橙色的动物感兴趣。
  • Q2 绑定的键有两个:"*.*.rabbit" 和 "lazy.#",意味着它对所有关于兔子的消息以及所有懒惰的动物感兴趣。

这些绑定可以总结为:

  • Q1 对所有橙色的动物感兴趣。
  • Q2 想要听到所有关于兔子的消息,以及所有关于懒惰动物的消息。

具有路由键 "quick.orange.rabbit" 的消息将被发送到两个队列。消息 "lazy.orange.elephant" 也会被发送到两个队列。另一方面,"quick.orange.fox" 只会被发送到第一个队列,而 "lazy.brown.fox" 只会被发送到第二个队列。"lazy.pink.rabbit" 将只被发送到第二个队列一次,尽管它匹配了两个绑定。"quick.brown.fox" 不匹配任何绑定,因此将被丢弃。

如果我们违反协议,发送了一个或四个词的消息,比如 "orange" 或 "quick.orange.new.rabbit",那么这些消息将不会匹配任何绑定,并将丢失。

另一方面,尽管 "lazy.orange.new.rabbit" 有四个词,但它将匹配最后一个绑定,并将被发送到第二个队列。

主题交换机(Topic Exchange)

主题交换机(Topic Exchange)是一个非常强大的消息交换机制,它可以像其他类型的交换机一样工作。以下是主题交换机的两种主要行为:

  1. 当一个队列使用"#"(井号)作为绑定键与主题交换机绑定时,它将接收到所有的消息,而不管路由键是什么。这与扇出交换机(fanout exchange)的行为类似,即所有发送到交换机的消息都会被分发到所有绑定的队列。

  2. 如果在绑定中没有使用特殊字符"*"(星号)和"#"(井号),主题交换机的行为就会和直接交换机(direct exchange)一样。这意味着消息只会被分发到那些绑定键与消息的路由键完全匹配的队列。

主题交换机提供了一种灵活的方式来根据消息的路由键进行消息的路由和分发。通过使用星号(*)和井号(#)作为通配符,可以创建复杂的路由规则,从而实现对消息的精确控制。

3.3 代码说明

日志系统中使用主题交换机,日志的路由键将有两个单词组成:"<facility>.<severity>",每个日志消息的路由键将由两个部分组成,第一部分是设施或系统组件的名称,第二部分是日志消息的严重性级别。例如,一个路由键可能是"auth.error",表示与认证相关的错误消息。使用主题交换机,可以基于这些路由键的不同组合来灵活地路由和分发日志消息。 

3.3.1 Sending

进入sender容器,vi编写emit_log_topic.py:

root@sender:/# vi emit_log_topic.py
root@sender:/# cat emit_log_topic.py 
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='172.17.0.2'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent {routing_key}:{message}")
connection.close()
root@sender:/#

这段代码是一个使用 Python 编写的简单 RabbitMQ 发送程序,它使用了 pika 库来与 RabbitMQ 服务器进行交互。下面是代码的详细说明:

  1. 导入所需库:

    • import pika: 导入 pika 库,这是 Python 中用于与 RabbitMQ 交互的库。
    • import sys: 导入 sys 模块,用于访问由命令行参数提供的值。
  2. 建立连接:

    • connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2')): 创建一个到 RabbitMQ 服务器的连接。这里指定了服务器的 IP 地址为 172.17.0.2
  3. 创建通道:

    • channel = connection.channel(): 在建立的连接上创建一个新的通道。
  4. 声明交换机:

    • channel.exchange_declare(exchange='topic_logs', exchange_type='topic'): 声明一个名为 topic_logs 的主题交换机。exchange_type='topic' 指定了交换机的类型为 topic,这意味着它可以根据路由键的模式来路由消息。
  5. 设置路由键和消息:

    • routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info': 从命令行参数中获取路由键。如果有提供第一个参数(sys.argv[1]),则使用它作为路由键;如果没有提供,则默认使用 'anonymous.info'
    • message = ' '.join(sys.argv[2:]) or 'Hello World!': 从命令行参数中获取消息内容。如果有提供第二个及后续参数,将它们连接成一个字符串作为消息;如果没有提供,则默认消息为 'Hello World!'
  6. 发布消息:

    • channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message): 使用指定的交换机和路由键发布消息。消息的内容是之前设置的 message
  7. 打印消息确认:

    • print(f" [x] Sent {routing_key}:{message}"): 打印一条消息到控制台,确认消息已经发送,并显示路由键和消息内容。
  8. 关闭连接:

    • connection.close(): 关闭与 RabbitMQ 服务器的连接。

这个脚本可以作为命令行工具使用,通过指定不同的路由键和消息内容来发送消息到 RabbitMQ。使用示例:

python emit_log_topic.py auth.error "User authentication failed"

这将向 RabbitMQ 发送一个路由键为 auth.error 的消息,内容为 "User authentication failed"。如果没有提供路由键,消息将使用默认的 'anonymous.info' 路由键发送。

3.3.2 Receiving

进入receiver/receiver2容器,vi编写receive_logs_topic.py:

root@receiver:/# vi receive_logs_topic.py 
root@receiver:/# cat receive_logs_topic.py 
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='172.17.0.2'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body}")


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
root@receiver:/# 

这段代码是一个使用 Python 编写的 RabbitMQ 接收程序,它同样使用了 pika 库来与 RabbitMQ 服务器进行交互。下面是代码的详细说明:

  1. 导入所需库:

    • import pika: 导入 pika 库,用于与 RabbitMQ 交互。
    • import sys: 导入 sys 模块,用于访问命令行参数。
  2. 建立连接:

    • connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2')): 创建一个到 RabbitMQ 服务器的阻塞模式连接,指定服务器的 IP 地址。
  3. 创建通道:

    • channel = connection.channel(): 在连接上创建一个新的通道。
  4. 声明交换机:

    • channel.exchange_declare(exchange='topic_logs', exchange_type='topic'): 声明一个名为 topic_logs 的主题交换机。
  5. 声明队列:

    • result = channel.queue_declare('', exclusive=True): 声明一个非持久的、独占的队列。exclusive=True 意味着这个队列只对连接它的客户端可见,并且当连接关闭时,队列将被删除。
    • queue_name = result.method.queue: 从结果中获取队列名称。
  6. 检查绑定键:

    • binding_keys = sys.argv[1:]: 从命令行参数中获取所有的绑定键。
    • if not binding_keys: ...: 如果没有提供绑定键,打印用法信息并退出程序。
  7. 绑定队列到交换机:

    • 循环遍历所有的 binding_keys,使用 channel.queue_bind() 方法将队列绑定到 topic_logs 交换机,并为每个绑定键设置相应的路由键。
  8. 等待日志消息:

    • 打印提示信息,告知用户程序正在等待日志消息,并说明如何退出程序。
  9. 定义消息回调函数:

    • def callback(ch, method, properties, body): ...: 定义一个回调函数,当接收到消息时会被调用。它打印出消息的路由键和消息体。
  10. 设置消息消费:

    • channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True): 设置消息消费,指定队列名称、回调函数和自动确认消息。
  11. 开始接收消息:

    • channel.start_consuming(): 启动消息接收循环,直到用户中断(例如使用 CTRL+C)。

这个脚本可以作为命令行工具使用,通过指定一个或多个绑定键来接收匹配这些键的消息。例如,如果你想要接收所有与 auth 相关的日志消息,你可以使用以下命令行:

python receive_logs_topic.py auth.*

这将使得脚本接收所有路由键以 auth 开头的日志消息。使用星号(*)作为通配符,可以匹配任意数量的字符。

 3.4、开始测试

3.4.1 接收所有日志

1、在 receiver 容器中打开命令行界面,运行以下命令以接收所有日志:

root@receiver:/# python3 receive_logs_topic.py "#"
 [*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings:

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic <--创建的topic exchange
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topic
        direct

root@30acfada6737:/#  rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0 <--创建的临时队列
root@30acfada6737:/#  rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
root@30acfada6737:/# 

根据 rabbitmqctl list_bindings 命令的输出,第一个绑定的路由键实际上与目的地的名称相同。这里是具体的解释:

  1. 源(source): 没有显示名称,这通常指的是默认的交换机(default exchange)。每个队列在创建时都会自动绑定到这个默认交换机上,使用队列的名称作为路由键。

  2. 源类型(source_kind): exchange 表示源是一个交换机。

  3. 目的地(destination): amq.gen-eiFT38g9RD-iuc-RCn0FMw 是一个自动生成的队列名称。

  4. 路由键(routing_key): 与目的地名称相同,即 amq.gen-eiFT38g9RD-iuc-RCn0FMw。这意味着只有当消息的路由键与队列名称完全匹配时,消息才会被路由到这个队列。

  5. 参数(arguments): 空列表 [] 表示这个绑定没有使用任何额外的参数。

  6. 第二个绑定:

    • 源(source): topic_logs,这是一个命名的主题交换机。
    • 目的地(destination): amq.gen-eiFT38g9RD-iuc-RCn0FMw,与第一个绑定中的队列相同。
    • 路由键(routing_key): #,这是一个特殊字符,表示这个队列将接收所有通过 topic_logs 交换机的消息。

这意味着:

  • 第一个绑定是队列与其默认交换机之间的绑定,当消息不指定交换机名字(此时使用默认交换机),同时路由键和队列名称相同时,才会被这个队列接收。
  • 第二个绑定是队列与 topic_logs 交换机之间的绑定,由于使用了 # 作为路由键,这个队列将接收所有发送到 topic_logs 交换机的消息,无论它们的实际路由键是什么。

如果你想要测试这些绑定关系,你可以:

  • 使用 emit_log_topic.py脚本发送消息到 topic_logs 交换机,不指定路由键或使用 任意信息 作为路由键,然后观察这些消息是否被自动生成的队列接收。
  • 如果你有 receive_logs_topic.py 脚本正在监听这个队列,你应该能够看到所有发送的消息被打印出来。

 3、在 sender 容器中打开命令行界面,运行以下命令以发送日志:

root@sender:/# python3 emit_log_topic.py "kern.critical" "A critical kernel error"
 [x] Sent kern.critical:A critical kernel error
root@sender:/# python3 emit_log_topic.py "test" "This is a test"                  
 [x] Sent test:This is a test
root@sender:/# 

4、观察 receiver 容器的命令行输出,检查是否收到了发送的日志。

root@receiver:/# python3 receive_logs_topic.py "#"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'A critical kernel error'
 [x] test:b'This is a test'

5、receiver容器继续准备接受所有日志。 

3.4.2 接收特定设施的日志

1、在 receiver2 容器中打开命令行界面,运行以下命令以接收日志:

root@receiver2:/# python3 receive_logs_topic.py "kern.*"
 [*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings: 

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic 
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topic
        direct
root@30acfada6737:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0
amq.gen-XmrA_fE3vyX-j4pFSbpqww  0  <---receiver2创建的临时队列
root@30acfada6737:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []
        exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   amq.gen-XmrA_fE3vyX-j4pFSbpqww  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []
root@30acfada6737:/# 

根据rabbitmqctl list_bindings 命令的输出:

topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []

我们可以看到以下信息:

  1. 虚拟主机(vhost): 绑定关系是在默认的虚拟主机 / 下列出的。

  2. 源(source_name): topic_logs,这是一个主题交换机(exchange)的名称。

  3. 源类型(source_kind): exchange 表示源是一个交换机。

  4. 目的地(destination_name): amq.gen-XmrA_fE3vyX-j4pFSbpqww,这是一个自动生成的队列名称。

  5. 目的地类型(destination_kind): queue 表示目的地是一个队列。

  6. 路由键(routing_key): kern.*,这表明这个绑定将匹配所有以 kern. 开头的路由键。这是一个主题路由键,使用星号(*)作为通配符,代表任意数量的字符。

  7. 参数(arguments): 空列表 [] 表示这个绑定没有使用任何额外的参数。

这个绑定表明,任何发送到 topic_logs 交换机并且路由键以 kern. 开头的消息都将被路由到名为 amq.gen-XmrA_fE3vyX-j4pFSbpqww 的队列。例如,消息使用路由键 kern.warningkern.critical 都会被这个队列接收。

如果你想测试这个特定的绑定,你可以按照以下步骤操作:

  1. 使用 emit_log_topic.py 脚本发送消息到 topic_logs 交换机,使用一个以 kern. 开头的路由键,例如 kern.critical

  2. 确保 receive_logs_topic.py 脚本正在监听 amq.gen-XmrA_fE3vyX-j4pFSbpqww 队列。

  3. 观察 receive_logs_topic.py 脚本的输出,检查是否收到了使用 kern.critical 路由键发送的消息。

  4. 如果没有收到消息,请检查脚本的 RabbitMQ 连接设置和队列监听设置是否正确。

  5. 你还可以测试发送使用不同路由键的消息,以验证只有匹配 kern.* 模式的消息才会被接收。

 3、在 sender 容器中打开命令行界面,继续运行以下命令以发送日志:

root@sender:/# python3 emit_log_topic.py "kern.critical" "third: A critical kernel error" 
 [x] Sent kern.critical:third: A critical kernel error
root@sender:/# python3 emit_log_topic.py "test" "fourth: This is a test"     
 [x] Sent test:fourth: This is a test
root@sender:/#   

4、观察 receiver 容器的命令行输出,检查是否收到了发送的日志。

root@receiver:/# python receive_logs_topic.py "#"
bash: python: command not found
root@receiver:/# python3 receive_logs_topic.py "#"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'A critical kernel error'
 [x] test:b'This is a test'
 [x] kern.critical:b'third: A critical kernel error'
 [x] test:b'fourth: This is a test'

5、观察 receiver2容器的命令行输出,检查是否收到了发送的日志。

root@receiver2:/# python3 receive_logs_topic.py "kern.*"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'third: A critical kernel error'

3.4.3 接收特定严重性的日志

1、在 receiver2 容器中新开命令行界面,运行以下命令以接收日志:

root@receiver2:/# python3 receive_logs_topic.py "*.critical"
 [*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings: 

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topic
        direct
root@30acfada6737:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0
amq.gen-dZZcREH6GManw2OusGgU2g  0
amq.gen-XmrA_fE3vyX-j4pFSbpqww  0
root@30acfada6737:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []
        exchange        amq.gen-dZZcREH6GManw2OusGgU2g  queue   amq.gen-dZZcREH6GManw2OusGgU2g  []
        exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   amq.gen-XmrA_fE3vyX-j4pFSbpqww  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
topic_logs      exchange        amq.gen-dZZcREH6GManw2OusGgU2g  queue   *.critical      []
topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []
root@30acfada6737:/# 

根据rabbitmqctl list_bindings 命令的输出:

topic_logs      exchange        amq.gen-dZZcREH6GManw2OusGgU2g  queue   *.critical      []

可以了解到以下信息:

  1. 虚拟主机(vhost): 绑定关系是在默认的虚拟主机 / 下列出的。

  2. 源(source_name): topic_logs,这是一个主题交换机的名称。

  3. 源类型(source_kind): exchange 表示源是一个交换机。

  4. 目的地(destination_name): amq.gen-dZZcREH6GManw2OusGgU2g,这是一个自动生成的队列名称。

  5. 目的地类型(destination_kind): queue 表示目的地是一个队列。

  6. 路由键(routing_key): *.critical,这表明这个绑定将匹配所有以 *.critical 结尾的路由键。在这个上下文中,星号(*)作为通配符,代表任意数量的字符,但在这个特定的路由键中,它实际上匹配任何以点(.)后跟 critical 结尾的字符串。

  7. 参数(arguments): 空列表 [] 表示这个绑定没有使用任何额外的参数。

这个绑定表明,任何发送到 topic_logs 交换机并且路由键以 *.critical 结尾的消息都将被路由到名为 amq.gen-dZZcREH6GManw2OusGgU2g 的队列。例如,消息使用路由键 auth.criticalcron.critical 都会被这个队列接收。

如果你想测试这个特定的绑定,你可以按照以下步骤操作:

  1. 使用 emit_log_topic.py 脚本发送消息到 topic_logs 交换机,使用一个以 .critical 结尾的路由键,例如 auth.critical

  2. 确保 receive_logs_topic.py 脚本正在监听 amq.gen-dZZcREH6GManw2OusGgU2g 队列。

  3. 观察 receive_logs_topic.py 脚本的输出,检查是否收到了使用 auth.critical 路由键发送的消息。

  4. 如果没有收到消息,请检查脚本的 RabbitMQ 连接设置和队列监听设置是否正确。

  5. 你还可以测试发送使用不同路由键的消息,以验证只有匹配 *.critical 模式的消息才会被接收。例如,发送一个路由键为 info.critical 的消息,它应该被接收;而发送一个路由键为 info.normal 的消息,则不应该被这个特定的队列接收。

3、在 sender 容器中打开命令行界面,继续运行以下命令以发送第5个和第6个日志:

root@sender:/# python3 emit_log_topic.py "auth.critical" "fifth log: This is fifth log"          
 [x] Sent auth.critical:fifth log: This is fifth log
root@sender:/# python3 emit_log_topic.py "kern.critical" "sixth log: This is sixth log"              
 [x] Sent kern.critical:sixth log: This is sixth log
root@sender:/# 

4、观察日志接收情况:

receiver(收到5、6)

root@receiver:/# python receive_logs_topic.py "#"
bash: python: command not found
root@receiver:/# python3 receive_logs_topic.py "#"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'A critical kernel error'
 [x] test:b'This is a test'
 [x] kern.critical:b'third: A critical kernel error'
 [x] test:b'fourth: This is a test'
 [x] auth.critical:b'fifth log: This is fifth log'
 [x] kern.critical:b'sixth log: This is sixth log'

 receiver2 第一个终端(只收到6):

root@receiver2:/# python3 receive_logs_topic.py "kern.*"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'third: A critical kernel error'
 [x] kern.critical:b'sixth log: This is sixth log'

receiver2第二个终端(收到5、6):

root@receiver2:/# python3 receive_logs_topic.py "*.critical"
 [*] Waiting for logs. To exit press CTRL+C
 [x] auth.critical:b'fifth log: This is fifth log'
 [x] kern.critical:b'sixth log: This is sixth log'

3.4.4 创建多个绑定的测试

1、在 receiver2 容器中新开第三个终端界面,运行以下命令以接收日志:

root@receiver2:/# python3 receive_logs_topic.py "kern.*" "*.critical"
 [*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings: 

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topic
        direct
root@30acfada6737:/# rabbitmqctl list_queues   
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-zR4Pv3875mquqMTLIkMCUQ  0
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0
amq.gen-HS_61KsFpuerwoA17IMUmw  0
amq.gen-XmrA_fE3vyX-j4pFSbpqww  0
root@30acfada6737:/# rabbitmqctl list_bindings 
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-zR4Pv3875mquqMTLIkMCUQ  queue   amq.gen-zR4Pv3875mquqMTLIkMCUQ  []
        exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []
        exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   amq.gen-HS_61KsFpuerwoA17IMUmw  []
        exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   amq.gen-XmrA_fE3vyX-j4pFSbpqww  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   *.critical      []
topic_logs      exchange        amq.gen-zR4Pv3875mquqMTLIkMCUQ  queue   *.critical      []
topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   kern.*  []
topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []
root@30acfada6737:/# 

rabbitmqctl list_bindings 命令输出中:

topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   *.critical      []
topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   kern.*  []

可以看到以下信息:

  1. 虚拟主机(vhost): 绑定关系是在默认的虚拟主机 / 下列出的。

  2. 源(source_name): topic_logs,这是一个主题交换机的名称。

  3. 源类型(source_kind): exchange 表示源是一个交换机。

  4. 目的地(destination_name): amq.gen-HS_61KsFpuerwoA17IMUmw,这是一个自动生成的队列名称。

  5. 目的地类型(destination_kind): queue 表示目的地是一个队列。

  6. 路由键(routing_key): 存在两条绑定,每条绑定有不同的路由键:

    • *.critical:这个绑定将匹配所有以 .critical 结尾的路由键,例如 auth.critical 或 cron.critical
    • kern.*:这个绑定将匹配所有以 kern. 开头的路由键,例如 kern.warning 或 kern.emerg
  7. 参数(arguments): 两个绑定的参数列表都是空的 [],表示这些绑定没有使用任何额外的参数。

这个输出显示了 topic_logs 交换机绑定到同一个队列 amq.gen-HS_61KsFpuerwoA17IMUmw 上的两种不同的路由模式。这意味着这个队列将接收符合这两种模式的任何消息。

测试这些绑定的步骤:

  1. 发送消息:

    • 使用 emit_log_topic.py 脚本发送消息到 topic_logs 交换机,使用不同的路由键来测试绑定。例如:
      • 发送一个路由键为 auth.critical 的消息,应该被队列接收。
      • 发送一个路由键为 kern.warning 的消息,也应该被队列接收。
  2. 监听队列:

    • 确保 receive_logs_topic.py 脚本正在监听 amq.gen-HS_61KsFpuerwoA17IMUmw 队列。
  3. 观察输出:

    • 观察 receive_logs_topic.py 脚本的输出,检查是否收到了使用上述路由键发送的消息。
  4. 验证绑定:

    • 如果消息没有被接收,检查脚本的 RabbitMQ 连接设置和队列监听设置是否正确。
    • 验证发送的消息路由键是否符合任一绑定的模式。
  5. 探索其他路由键:

    • 发送不符合上述两种模式的消息,例如使用路由键 info.normal,以验证消息不会被接收。

通过这些步骤,你可以验证 topic_logs 交换机的绑定是否按预期工作,并且消息是否能够正确地根据路由键模式被路由到指定的队列。

3、在 sender 容器中打开命令行界面,继续运行以下命令以发送第7个和第8个日志:

root@sender:/# python3 emit_log_topic.py "auth.critical" "seventh log: This is 7th log"       
 [x] Sent auth.critical:seventh log: This is 7th log
root@sender:/# python3 emit_log_topic.py "kern.warning" "eighth log: This is 8th log"                      
 [x] Sent kern.warning:eighth log: This is 8th log
root@sender:/# python3 emit_log_topic.py "info.normal" "ninth log: This is 9th log"                   
 [x] Sent info.normal:ninth log: This is 9th log
root@sender:/# 

4、观察日志接收情况:

receiver: (收到7、8、9)

root@receiver:/# python3 receive_logs_topic.py "#"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'A critical kernel error'
 [x] test:b'This is a test'
 [x] kern.critical:b'third: A critical kernel error'
 [x] test:b'fourth: This is a test'
 [x] auth.critical:b'fifth log: This is fifth log'
 [x] kern.critical:b'sixth log: This is sixth log'
 [x] auth.critical:b'seventh log: This is 7th log'
 [x] kern.warning:b'eighth log: This is 8th log'
 [x] info.normal:b'ninth log: This is 9th log'

receiver2第一个终端:(收到8,匹配kern.*)

root@receiver2:/# python3 receive_logs_topic.py "kern.*"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'third: A critical kernel error'
 [x] kern.critical:b'sixth log: This is sixth log'
 [x] kern.warning:b'eighth log: This is 8th log'

receiver2第二个终端:(收到7,匹配*.critical)

(receiver2误操作中断了之前的连接,重建了和rabbitmq的连接)

root@receiver2:/# python3 receive_logs_topic.py "*.critical"
 [*] Waiting for logs. To exit press CTRL+C
 [x] auth.critical:b'fifth log: This is fifth log'
 [x] kern.critical:b'sixth log: This is sixth log'
^CTraceback (most recent call last):
  File "//receive_logs_topic.py", line 32, in <module>
    channel.start_consuming()
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 1883, in start_consuming
    self._process_data_events(time_limit=None)
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 2044, in _process_data_events
    self.connection.process_data_events(time_limit=time_limit)
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 842, in process_data_events
    self._flush_output(common_terminator)
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 514, in _flush_output
    self._impl.ioloop.poll()
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/select_connection.py", line 579, in poll
    self._poller.poll()
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/select_connection.py", line 1184, in poll
    events = self._poll.poll(self._get_max_wait())
KeyboardInterrupt

root@receiver2:/# python3 receive_logs_topic.py "*.critical"
 [*] Waiting for logs. To exit press CTRL+C
 [x] auth.critical:b'seventh log: This is 7th log'

receiver2第三个终端:(收到7、8,匹配kern.*或者*.critical)

root@receiver2:/# python3 receive_logs_topic.py "kern.*" "*.critical"
 [*] Waiting for logs. To exit press CTRL+C
 [x] auth.critical:b'seventh log: This is 7th log'
 [x] kern.warning:b'eighth log: This is 8th log'

4、小结

这篇文章是 RabbitMQ 官方教程的第五部分,使用 Python 客户端 Pika 来演示如何使用主题交换机(Topic Exchange)。以下是对文章内容的总结:

1、预备条件

  • RabbitMQ 需要安装并运行在标准端口(5672)上。
  • 如果使用不同的主机、端口或凭据,则需要调整连接设置。
  • 需要使用 Pika RabbitMQ 客户端版本 1.0.0。

2、教程重点

  • 教程介绍了如何使用主题交换机来改善日志系统,允许基于多个标准进行消息路由。
  • 通过使用主题交换机,可以实现类似于 Unix 系统中 syslog 工具的功能,即根据消息的严重性和来源进行路由。

3、主题交换

  • 主题交换机的消息路由键(routing_key)必须是由点分隔的单词列表。
  • 绑定键(binding_key)也采用相同的格式,可以使用特殊字符:
    • *(星号):匹配正好一个单词。
    • #(井号):匹配零个或多个单词。

4、示例

  • 假设发送的消息描述动物,路由键由三个单词组成,分别表示速度、颜色和物种。
  • 创建了三个绑定:
    • Q1 绑定到 *.orange.*:对所有橙色动物感兴趣。
    • Q2 绑定到 *.*.rabbit 和 lazy.#:对所有兔子和所有懒惰动物感兴趣。

5、消息路由示例

  • 消息 quick.orange.rabbit 将被 Q1 和 Q2 接收。
  • 消息 lazy.orange.elephant 也将被两者接收。
  • 消息 quick.orange.fox 只匹配 Q1,而 lazy.brown.fox 只匹配 Q2。
  • 如果消息不符合绑定键的模式,如 orange 或 quick.orange.new.rabbit,它们将不会被任何队列接收。

6、代码示例

  • 提供了两个 Python 脚本示例:
    • emit_log_topic.py:用于发送消息到主题交换。
    • receive_logs_topic.py:用于接收主题交换的消息。

7、使用方法

  • 接收所有日志:python receive_logs_topic.py "#"
  • 接收特定设施的日志(如 kern):python receive_logs_topic.py "kern.*"
  • 只接收特定严重性的日志(如 critical):python receive_logs_topic.py "*.critical"
  • 创建多个绑定:python receive_logs_topic.py "kern.*" "*.critical"
  • 发送特定路由键的日志:python emit_log_topic.py "kern.critical" "A critical kernel error"

8、注意事项

  • 代码没有对路由或绑定键做任何假设,可以根据需要使用多个路由键参数。

9、结语

  • 教程鼓励用户尝试和玩耍这些程序,以更好地理解主题交换机(topic exchange)的工作方式。

http://www.kler.cn/a/288326.html

相关文章:

  • nacos配置中心入门
  • 自动驾驶系列—从数据采集到存储:解密自动驾驶传感器数据采集盒子的关键技术
  • layui.all.js:2 Uncaught Error: Syntax error, unrecognized expression
  • MaxKB
  • Nuxt.js 应用中的 schema:beforeWrite 事件钩子详解
  • Linux源码阅读笔记-V4L2框架基础介绍
  • P7958 [COCI2014-2015#6] NEO
  • 如何处理海量数据
  • 事半功倍:利用增强现实提高工作效率
  • [AcWing]-完全背包问题-动态规划
  • RabbitMQ的TLL
  • Mac OS X 如何升级系统自带的 Ruby
  • 教程:使用显卡MX250做YOLO目标检测(定位)滑块缺口,包括获取数据集,对数据集手动标注,训练的代码,推理的代码,超多细节,你的第一次YOLO绝佳体验!
  • 微信小程序认证和备案
  • 比特币详解
  • (大三上_游戏开发设计模式_上课_1)多态练习_计算机
  • CUDA编程08 - 并行编程思维
  • 【React 简化路由的生成的方式
  • kafka3.7.1 单节点 KRaft部署测试发送和接收消息
  • 深入解析FPGA在SOC设计中的核心作用
  • 深入探讨Java中的分布式事务管理:实现、挑战与最佳实践
  • 超声波的应用
  • 【python计算机视觉编程——4.照相机模型与增强现实】
  • sqlite3的db.wait方法:等待所有查询完成
  • PyQt6 / PySide 6 实现可拖拽的多标签页 web 浏览器【1】(有 Bug)
  • Ansible 自动化运维项目