当前位置: 首页 > article >正文

使用RabbitMQ实现微服务间的异步消息传递

使用RabbitMQ实现微服务间的异步消息传递

      • RabbitMQ简介
      • 安装RabbitMQ
        • 在Ubuntu上安装RabbitMQ
        • 在CentOS上安装RabbitMQ
      • 配置RabbitMQ
      • 创建微服务
        • 生产者服务
          • 安装依赖
          • 生产者代码
        • 消费者服务
          • 消费者代码
      • 运行微服务
      • 消息模式
        • 直接模式
          • 生产者代码
          • 消费者代码
        • 扇出模式
          • 生产者代码
          • 消费者代码
        • 主题模式
          • 生产者代码
          • 消费者代码
      • 高级特性
        • 持久化
          • 生产者代码
          • 消费者代码
        • 确认机制
          • 消费者代码
      • 监控和日志
        • 监控
        • 日志
      • 故障排除
      • 总结

在现代分布式系统中,微服务架构越来越受到欢迎。微服务之间需要进行高效、可靠的消息传递。RabbitMQ作为一个成熟的开源消息中间件,能够很好地满足这一需求。本文将详细介绍如何使用RabbitMQ实现微服务间的异步消息传递。

RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议。它支持多种消息模式,如发布/订阅、路由、主题等。

安装RabbitMQ

RabbitMQ可以在多种操作系统上安装,包括Linux、macOS和Windows。
在Ubuntu上安装RabbitMQ
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
在CentOS上安装RabbitMQ
sudo yum install epel-release
sudo yum install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

配置RabbitMQ

安装完成后,可以使用以下命令进行基本配置。
sudo rabbitmq-plugins enable rabbitmq_management
sudo systemctl restart rabbitmq-server
访问RabbitMQ管理界面:`http://localhost:15672`,默认用户名和密码都是`guest`。

创建微服务

我们将创建两个简单的微服务:生产者服务和消费者服务。
生产者服务
生产者服务负责发送消息到RabbitMQ。
安装依赖
pip install pika
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

message = 'Hello World!'
channel.basic_publish(exchange='', routing_key='hello', body=message)
print(f'Sent: {message}')
connection.close()
消费者服务
消费者服务负责从RabbitMQ接收消息。
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()

运行微服务

先启动消费者服务,然后启动生产者服务。
# 启动消费者服务
python consumer.py

# 启动生产者服务
python producer.py

消息模式

RabbitMQ支持多种消息模式,包括直接模式、扇出模式、主题模式和头部模式。
直接模式
直接模式是最简单的模式,消息会被发送到指定的队列。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='direct_queue')

message = 'Direct message'
channel.basic_publish(exchange='', routing_key='direct_queue', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='direct_queue')

channel.basic_consume(queue='direct_queue', auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()
扇出模式
扇出模式将消息广播到所有绑定的队列。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')

message = 'Fanout message'
channel.basic_publish(exchange='fanout_exchange', routing_key='', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='fanout_exchange', queue=queue_name)

channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()
主题模式
主题模式允许更复杂的路由规则。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')

routing_key = 'kern.critical'
message = 'Critical kernel message'
channel.basic_publish(exchange='topic_exchange', routing_key=routing_key, body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

binding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key=binding_key)

channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()

高级特性

RabbitMQ还支持许多高级特性,如持久化、确认机制、死信队列等。
持久化
可以配置消息和队列的持久化,以确保消息不会因为RabbitMQ服务器重启而丢失。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='durable_queue', durable=True)

message = 'Persistent message'
channel.basic_publish(exchange='', routing_key='durable_queue', body=message, properties=pika.BasicProperties(delivery_mode=2))
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='durable_queue', durable=True)

channel.basic_consume(queue='durable_queue', on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()
确认机制
可以配置消费者在处理完消息后发送确认,以确保消息不会被重复处理。
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='ack_queue')

channel.basic_consume(queue='ack_queue', on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()

监控和日志

RabbitMQ提供了丰富的监控和日志功能,可以用于监控和调试。

监控
可以通过RabbitMQ管理界面监控队列、交换机和连接等。

日志
可以通过配置文件调整日志级别和输出方式。

故障排除

如果RabbitMQ配置出现问题,可以使用以下命令进行故障排除。

sudo rabbitmqctl status
sudo journalctl -u rabbitmq-server

总结

通过本文,你已经学习了如何使用RabbitMQ实现微服务间的异步消息传递。我们介绍了RabbitMQ的基本概念、安装方法、配置RabbitMQ、创建微服务、消息模式(直接模式、扇出模式、主题模式)、高级特性(持久化、确认机制)、监控和日志、故障排除等内容。掌握了这些知识,将有助于你在实际工作中更好地利用RabbitMQ来构建高效、可靠的微服务架构。
RabbitMQ管理界面示例
RabbitMQ消息传递模式示例

使用RabbitMQ可以显著提高微服务间消息传递的可靠性和效率。


http://www.kler.cn/a/377920.html

相关文章:

  • 【ArcGIS Pro微课1000例】0064:栅格目录、栅格数据集、镶嵌数据集
  • Go怎么做性能优化工具篇之基准测试
  • 反序列化为啥可以利用加号绕过php正则匹配
  • linux定时器操作
  • 微服务分布式(二、注册中心Consul)
  • 随手记:小程序兼容后台的wangEditor富文本配置链接
  • Java学习教程,从入门到精通,Java 循环结构:while 和 do...while(17)
  • 2024年 · 地表最强的十大遥感影像分割模型
  • Js内建对象
  • 10个领先的增强现实平台【AR】
  • uniapp 使用uni.getRecorderManager录音,wav格式采样率低于44100,音频播放不了问题解决
  • 无人机敏捷反制技术算法详解!
  • 同一个页面击穿element样式后,会影响同样组件的使用
  • C#与C++交互开发系列(二十):跨进程通信之共享内存(Shared Memory)
  • 论文阅读:Computational Long Exposure Mobile Photography (一)
  • [SICTF Round4] Crypto
  • 简易了解Pytorch中的@ 和 * 运算符(附Demo)
  • 图优化以及如何将信息矩阵添加到残差
  • 网络编程项目之UDP聊天室
  • 【书生.浦语实战营】——入门岛
  • 【OpenSearch】机器学习(Machine Learning)神经搜索教程
  • 【Android】View的事件分发机制
  • Java项目实战II基于Spring Boot的美食烹饪互动平台的设计与实现(开发文档+数据库+源码)
  • 十四届蓝桥杯STEMA考试Python真题试卷第二套第二题
  • 解锁同城流量密码,六大实用技巧全解析
  • 勒索软件通过易受攻击的 Cyber​​Panel 实例攻击网络托管服务器