当前位置: 首页 > article >正文

Python RabbitMQ 消息队列监听

Python RabbitMQ 消息队列监听

# coding: utf-8
# 测试消息消费

import datetime
import logging as log
import os
from pathlib import Path
from typing import List

import pika

# 设置日志格式
Path("./logs").mkdir(parents=True, exist_ok=True)
os.chdir("./logs/")
log_file_name = datetime.date.today().strftime("%Y-%m-%d")
log_format = (
    "%(asctime)s.%(msecs)03d [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s"
)
log.basicConfig(
    level=log.INFO,
    filename="python-check-" + log_file_name + ".log",
    datefmt="%Y-%m-%d %H:%M:%S",
    format=log_format,
    encoding="utf-8",
)


class RabbitMQConsumer:
    def __init__(self, host="localhost", queue_name="test-queue", batch_size=5):
        self.host = host
        self.queue_name = queue_name
        self.batch_size = batch_size
        self.connection = None
        self.channel = None
        self.message_count = 0
        self.messages: List[pika.spec.Basic.Deliver] = []
        self.delivery_tags: List[int] = []

    def conn(self):
        log.info("测试消费者 连接RabbitMQ开始!")
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self.host)
        )
        self.channel = self.connection.channel()
        # 声明队列
        self.channel.queue_declare(queue=self.queue_name, durable=True)
        # 设置QoS,限制未确认的消息数量
        self.channel.basic_qos(prefetch_count=self.batch_size)
        log.info("测试消费者 连接RabbitMQ成功!")

    def close(self):
        log.info("测试消费者 关闭RabbitMQ连接!")
        if self.connection and not self.connection.is_closed:
            self.connection.close()

    def start_consuming(self):
        log.info("测试消费者 监听开始!")
        try:
            # 确保已连接
            if not self.connection or self.connection.is_closed:
                self.conn()
            log.info(f"测试消费者 监听队列:{self.queue_name}")
            # 设置消费回调
            self.channel.basic_consume(
                queue=self.queue_name,
                on_message_callback=self.customer,
            )
            # 开始消费
            self.channel.start_consuming()
        except KeyboardInterrupt:
            log.error("测试消费者 停止消费!")
            self.close()
        except Exception as e:
            log.error(f"测试消费者 发生错误 停止消费:{str(e)}")
            self.close()

    def customer(self, ch, method, properties, body):
        log.error(f"测试消费者 接受到消息:{body.decode()}")
        try:
            # 打印消息内容
            print(f"测试消费者 接受到消息:{body.decode()}")

            # 设置计数器和列表 消息达到batch_size才消费
            self.messages.append(body)
            self.delivery_tags.append(method.delivery_tag)
            self.message_count += 1

            # 当达到批处理大小时,进行批量确认
            if self.message_count >= self.batch_size:
                print(f"\n批量处理完成 {self.batch_size} 条消息")
                print("消息内容:", [msg.decode() for msg in self.messages])

                # 确认所有消息
                ch.basic_ack(delivery_tag=self.delivery_tags[-1], multiple=True)

                # 重置计数器和列表
                self.message_count = 0
                self.messages = []
                self.delivery_tags = []

        except Exception as e:
            log.error(f"测试消费者 处理消息异常:{str(e)}")
            # 发生错误时,拒绝消息并重新入队
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)


if __name__ == "__main__":
    # 创建消费者实例并开始消费
    consumer = RabbitMQConsumer(host="localhost", queue_name="test-queue", batch_size=5)
    consumer.start_consuming()


http://www.kler.cn/news/367795.html

相关文章:

  • CSS伪元素以及伪类和CSS特性
  • 管家婆财贸ERP BB040.销售单插行快捷键+BB041.超期应收款审核条件控制
  • C++11实践指北
  • 「Qt Widget中文示例指南」如何实现半透明背景?
  • 通过cv库智能切片 把不同的分镜切出来 自媒体抖音快手混剪
  • 基于信号分解和多种深度学习结合的上证指数预测模型
  • w001基于SpringBoot的在线拍卖系统
  • React Native 项目使用Expo模拟器运行iOS和Android
  • 【线下培训】龙信科技应邀参与了由教育部网络安全与执法虚拟教研室(中国刑事警察学院)举办的学术讲座
  • android手动用证书签名apk apksigner工具
  • Unity3D学习FPS游戏(2)简单场景、玩家移动控制
  • gin入门教程(2):go安装以及初始目录构建
  • 简化深度学习实验管理:批量训练和自动记录方案
  • 暴力匹配算法 (BF):字符串匹配算法的演进之路
  • springboot 网上影院订票系统-计算机毕业设计源码06993
  • 小程序视频SDK解决方案,提供个性化开发和特效定制设计
  • 笔记整理—linux驱动开发部分(1)驱动梗概
  • 第五十二章 安全元素的详细信息 - EncryptedData 详情
  • 【含开题报告+文档+PPT+源码】基于SpringBoot爱之屋摄影预约管理系统的设计与实现
  • Depcheck——专门用于检测 JavaScript 和 Node.js 项目中未使用依赖项的工具
  • 安全知识见闻-通信协议安全
  • uniapp 报错Invalid Host header
  • 求个数不确定的整数的最大公约数(java)
  • WSL(Ubuntu20.04)编译和安装DPDK
  • PHP const 和 define主要区别
  • 关闭钉钉AI助理