当前位置: 首页 > article >正文

RabbitMQ:python基础调用

前言

        紧接上回在windows上安装了最新版的RabbitMQ:

        RabbitMQ:windows最新版本4.0.5安装方案-CSDN博客

        这是官方给出的使用文档:How to Use RabbitMQ | RabbitMQ 

        这里我给出通过AI学习到的python使用方法 

理论截图 

        python直接使用pip安装pika模块即可开始使用RabbitMQ: 

pip install pika

        常用需要实现的队列模式

 

代码实现 

        话不多说,直接上代码 

        1、连接和关闭RabbitMQ服务 

import pika

# 创建连接
# 使用自定义的用户名和密码连接到 RabbitMQ
credentials = pika.PlainCredentials('manfish', '52manfish')
connection_params = pika.ConnectionParameters(
    host='localhost',  # RabbitMQ 服务器的主机名
    port=5672,         # RabbitMQ 的默认端口
    virtual_host='manfish',  # 虚拟主机名称
    credentials=credentials  # 认证凭证
)

connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# 声明队列json_queue
queue_name = 'json_queue'
channel.queue_declare(queue=queue_name)

# 关闭连接
connection.close()

        注意:当不填写账号密码时,模块将自动以guest管理员默认账密(guest/guest)连接,不建议养成这种习惯

        另:用户需要使用相同虚拟主机传递队列消息

        这里是webui上设置账密及分配虚拟主机的截图:

 

        2、直接队列

        消费者

 

import pika
import json

# 创建连接
# 使用自定义的用户名和密码连接到 RabbitMQ
credentials = pika.PlainCredentials('manfish1', '123456')
connection_params = pika.ConnectionParameters(
    host='localhost',  # RabbitMQ 服务器的主机名
    port=5672,         # RabbitMQ 的默认端口
    virtual_host='manfish1',  # 虚拟主机名称
    credentials=credentials  # 认证凭证
)

connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# 声明队列
queue_name = 'json_queue'
channel.queue_declare(queue=queue_name)

# 定义回调函数
def callback(ch, method, properties, body):
    print('------- get a msg -------')
    # 将 JSON 字符串反序列化为 Python 对象
    message_data = json.loads(body)
    print(f" [x] Received JSON message: {message_data}")

    # 打印 method 参数的详细信息
    print(f" [x] Delivery tag: {method.delivery_tag}")
    print(f" [x] Exchange: {method.exchange}")
    print(f" [x] Routing key: {method.routing_key}")
    print(f" [x] Redelivered: {method.redelivered}")

    # 打印 properties 参数的详细信息
    print(f" [x] Correlation ID: {properties.correlation_id}")
    print(f" [x] Content type: {properties.content_type}")
    print(f" [x] Headers: {properties.headers}")
    print(f" [x] Message ID: {properties.message_id}")
    print(f" [x] Timestamp: {properties.timestamp}")
    print(f" [x] User ID: {properties.user_id}")
    print(f" [x] App ID: {properties.app_id}")


# 开始消费
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
        生产者 
import pika
import json

# 创建连接
credentials = pika.PlainCredentials('manfish', '123456')
connection_params = pika.ConnectionParameters(
    host='localhost',  # RabbitMQ 服务器的主机名
    port=5672,         # RabbitMQ 的默认端口
    virtual_host='manfish1',  # 虚拟主机名称
    credentials=credentials  # 认证凭证
)

connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# 声明队列
queue_name = 'json_queue'
channel.queue_declare(queue=queue_name)

# 创建一个 Python 字典
message_data = {
    'name': 'Alice',
    'age': 30,
    'city': 'New York'
}

# 将字典转换为 JSON 字符串
message = json.dumps(message_data)

# 发送 JSON 数据
channel.basic_publish(
    exchange='',
    routing_key=queue_name,
    body=message
)
print(f" [x] Sent JSON message: {message}")

# 关闭连接
connection.close()
        截图 

 

     3、主题队列

        消费者
import pika
import json

# 创建连接
# 使用自定义的用户名和密码连接到 RabbitMQ
credentials = pika.PlainCredentials('manfish1', '123456')
connection_params = pika.ConnectionParameters(
    host='localhost',  # RabbitMQ 服务器的主机名
    port=5672,         # RabbitMQ 的默认端口
    virtual_host='manfish1',  # 虚拟主机名称
    credentials=credentials  # 认证凭证
)

connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# 声明交换机
exchange_name = 'topic_exchange'
channel.exchange_declare(exchange=exchange_name, exchange_type='topic')

# 声明队列
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

# 绑定队列到交换机
binding_key = 'user.*'
channel.queue_bind(
    exchange=exchange_name,
    queue=queue_name,
    routing_key=binding_key
)

# 定义回调函数
def callback(ch, method, properties, body):
    print('------- get a msg -------')
    print(f" [x] Received message: {body}")

    # 打印 method 参数的详细信息
    print(f" [x] Delivery tag: {method.delivery_tag}")
    print(f" [x] Exchange: {method.exchange}")
    print(f" [x] Routing key: {method.routing_key}")
    print(f" [x] Redelivered: {method.redelivered}")

    # 打印 properties 参数的详细信息
    print(f" [x] Correlation ID: {properties.correlation_id}")
    print(f" [x] Content type: {properties.content_type}")
    print(f" [x] Headers: {properties.headers}")
    print(f" [x] Message ID: {properties.message_id}")
    print(f" [x] Timestamp: {properties.timestamp}")
    print(f" [x] User ID: {properties.user_id}")
    print(f" [x] App ID: {properties.app_id}")

    # 确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 开始消费
# auto_ack为True时自动确认消息,但既为True又在回调中使用了ch.basic_ack(),则会导致报错——重复确认
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
        生产者 
import time

import pika
import json

# 创建连接
credentials = pika.PlainCredentials('manfish', '123456')
connection_params = pika.ConnectionParameters(
    host='localhost',  # RabbitMQ 服务器的主机名
    port=5672,         # RabbitMQ 的默认端口
    virtual_host='manfish1',  # 虚拟主机名称
    credentials=credentials  # 认证凭证
)

connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# 声明交换机
exchange_name = 'topic_exchange'
channel.exchange_declare(exchange=exchange_name, exchange_type='topic')

# 发送消息
routing_key = 'user.login'
message = 'User logged in.'
channel.basic_publish(
    exchange=exchange_name,
    routing_key=routing_key,
    body=message,

)
print(f" [x] Sent message: {message}")

# 关闭连接
connection.close()
        截图 


http://www.kler.cn/a/534205.html

相关文章:

  • kamailio-osp模块
  • DeepSeek大模型介绍、本地化部署与使用!【AI大模型】
  • Linux特权组全解析:识别GID带来的权限提升风险
  • Ubuntu x64下交叉编译ffmpeg、sdl2到目标架构为aarch64架构的系统(生成ffmpeg、ffprobe、ffplay)
  • 数据库开发常识(10.6)——SQL性能判断标准及索引误区(1)
  • vue 引入百度地图和高德天气 都得获取权限
  • DS图(中)(19)
  • 【分布式架构理论2】分布式架构要处理的问题及解决方案
  • 【自然语言处理(NLP)】Bahdanau 注意力(Bahdanau Attention)原理及代码实现
  • Day36-【13003】短文,数组的行主序方式,矩阵的压缩存储,对称、三角、稀疏矩阵和三元组线性表,广义表求长度、深度、表头、表尾等
  • 02、NodeJS学习笔记,第二节:express与中间件
  • Redis常见数据类型与编码方式
  • RabbitMQ 与 Kafka 的核心区别,如何选择合适的消息中间件?
  • 【LLM】为何DeepSeek 弃用MST却采用Rejection采样
  • 洛谷P2638 安全系统
  • 解锁.NET Fiddle:在线编程的神奇之旅
  • 【Elasticsearch】filter聚合
  • 信标链的基本概念
  • python基础入门:2.2运算符与表达式
  • 根据SQL导出三线表文档
  • 能否通过蓝牙建立TCP/IP连接来传输数据
  • js-对象-JSON
  • [LeetCode] 二叉树 I — 深度优先遍历(前中后序遍历) | 广度优先遍历(层序遍历):递归法迭代法
  • 微服务知识——微服务架构的演进过程
  • 【完整版】DeepSeek-R1大模型学习笔记(架构、训练、Infra)
  • Mybatis之常用动态Sql语句