当前位置: 首页 > article >正文

如何使用RabbitMQ和Python实现广播消息

使用 RabbitMQ 和 Python 实现广播消息的过程涉及设置一个消息队列和多个消费者,以便接收相同的消息。RabbitMQ 的 “fanout” 交换机允许你将消息广播到所有绑定的队列。以下是如何实现这一过程的详细步骤。

在这里插入图片描述

1、问题背景

在将系统从Morbid迁移到RabbitMQ时,发现RabbitMQ无法提供Morbid默认提供的广播行为。在广播模式下,当一个消息被添加到队列时,所有的消费者都会收到它。然而,在RabbitMQ中,消息会以轮询的方式分发给各个监听器。

代码例子如下:

# 消费者
import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demoqueue', ack='auto')

while True:
    pass
conn.disconnect()

# 发送者
import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demotopic', ack='auto')

while True:
    pass
conn.disconnect()

通过上述代码,将会出现问题,导致无法实现广播消息。

2、解决方案

  1. 使用交换机和队列来实现广播消息。具体方法如下:

(1)使用amqplib库来创建交换机和队列。在发送消息时,将消息发送到交换机,而不是队列。在接收消息时,将队列绑定到交换机,这样就可以收到交换机上所有消息。代码修改如下:

# 发送者
import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

# 发送消息到交换机
exchange = 'my_exchange'
conn.send(str(i), exchange=exchange, destination='')

# 接收者
import stomp
import sys
from amqplib import client_0_8 as amqp
#read in the exchange name so I can set up multiple recievers for different exchanges to tset
exchange = sys.argv[1]
conn = amqp.Connection(host="localhost:5672", userid="username", password="password",
 virtual_host="/", insist=False)

chan = conn.channel()

chan.access_request('/', active=True, write=True, read=True)

#declare my exchange
chan.exchange_declare(exchange, 'topic')
#not passing a queue name means I get a new unique one back
qname,_,_ = chan.queue_declare()
#bind the queue to the exchange
chan.queue_bind(qname, exchange=exchange)

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'browser', 'browser')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="username", password="password")
headers = {}

#subscribe to the queue
conn.subscribe(destination=qname, ack='auto')

while True:
    pass
conn.disconnect()

(2)使用StompJS 库来实现广播消息。具体方法如下:

// 消费者
var stompClient = Stomp.client('ws://localhost:61613');

stompClient.connect({}, function(frame) {
  stompClient.subscribe('/topic/demoqueue', function(message) {
    console.log('Received message: ' + message.body);
  });
});

// 发送者
var stompClient = Stomp.client('ws://localhost:61613');

stompClient.connect({}, function(frame) {
  stompClient.send('/topic/demoqueue', {}, 'Hello, world!');
});

通过以上步骤,你可以实现 RabbitMQ 的消息广播功能。多个消费者可以同时接收来自同一个生产者的消息,这是构建分布式系统时非常常见的场景。如果需要更复杂的消息处理,可以在此基础上进行扩展。


http://www.kler.cn/a/381879.html

相关文章:

  • 有没有检测吸烟的软件 ai视频检测分析厂区抽烟报警#Python
  • CASA(Carnegie-Ames-Stanford Approach) 模型原理及实践
  • 解释下什么是面向对象?面向对象和面向过程的区别?
  • 语音增强的损失函数选择
  • IntelliJ IDEA 快捷键大全:提升开发效率的利器
  • AI芯片常见概念
  • 深度学习基础知识-Batch Normalization(BN)超详细解析
  • 第二节 管道符、重定向与环境变量
  • 手写一个axios方法
  • python爬取旅游攻略(1)
  • SparkSql读取数据的方式
  • 多模态PaliGemma——Google推出的基于SigLIP和Gemma的视觉语言模型
  • 十四届蓝桥杯STEMA考试Python真题试卷第二套第四题
  • (八)JavaWeb后端开发——Tomcat
  • 使用GitHub Actions实现CI/CD流程
  • JavaScript数据类型- Symbol 详解
  • 各种网络设备的工作原理
  • Hive SQL中判断内容包含情况的全面指南
  • MR30分布式IO模块与高效PLC协同
  • 鸥柏(OBOO)户外触摸广告屏科技创新 高速服务区收费站案例
  • SAP ABAP开发学习——WDA 二 控制器
  • 【笔记】变压器-热损耗-频响曲线推导 - 02 预备知识
  • 如何完全禁用Ant Design Vue 4自带样式
  • 如何使用Web-Check和cpolar实现安全的远程网站监测与管理
  • 华为HarmonyOS借助AR引擎帮助应用实现虚拟与现实交互的能力2-管理AR会话
  • Python+Appium+Pytest+Allure自动化测试框架-安装篇