当前位置: 首页 > article >正文

使用 RabbitMQ 和 Go 构建异步订单处理系统

使用 RabbitMQ 和 Go 构建异步订单处理系统

我们可以通过构建一个订单处理系统来演示如何使用消息队列(MQ)实现异步任务处理。这个项目将使用 RabbitMQ 作为消息队列,并使用 Go 语言来实现。以下是项目的详细教程和相关环境配置。

项目描述

功能:模拟一个简单的电商订单处理系统,包括下单、库存扣减、邮件通知、以及发货通知。每个任务通过消息队列异步处理。

环境配置

1. 安装 Go 环境

确保已经安装 Go 语言开发环境,可以通过以下命令确认:

go version

如果没有安装,可以从 Go 官方网站 下载并安装。
2. 安装 RabbitMQ

RabbitMQ 可以通过 Docker 轻松安装:

docker run -d --hostname rabbitmq --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
RabbitMQ 管理控制台可以通过 http://localhost:15672 访问。
默认用户名和密码都是 guest。

3. 创建 Go 项目

mkdir order-processing
cd order-processing
go mod init order-processing

4. 安装 RabbitMQ Go 客户端库

go get github.com/streadway/amqp

项目结构

项目将包含以下文件和目录:

order-processing/
│
├── main.go                 // 入口文件,初始化 MQ 连接
├── producer.go             // 生产者代码,模拟下单请求
├── consumer_inventory.go   // 消费者代码,处理库存扣减
├── consumer_email.go       // 消费者代码,处理邮件发送
└── consumer_shipping.go    // 消费者代码,处理发货通知

代码实现

1. main.go - 初始化 MQ 连接

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 声明队列
    q, err := ch.QueueDeclare(
        "order_queue", // 队列名称
        true,          // 持久化
        false,         // 自动删除
        false,         // 独占
        false,         // 无需等待
        nil,           // 额外属性
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
    // 创建消费者
    msgs, err := ch.Consume(
        q.Name, // 队列名称
        "",     // 消费者标识符
        true,   // 自动应答
        false,  // 独占
        false,  // 无需等待
        false,  // no local
        nil,    // 额外属性
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    <-forever
}

2. producer.go - 生产者代码

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "order_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    body := "New Order"
    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

3. consumer_inventory.go - 处理库存扣减

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "order_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Processing inventory for order: %s", d.Body)
            // 处理库存扣减逻辑
        }
    }()

    <-forever
}

4. consumer_email.go - 处理邮件发送

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "order_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Sending email for order: %s", d.Body)
            // 处理邮件发送逻辑
        }
    }()

    <-forever
}

5. consumer_shipping.go - 处理发货通知

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "order_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Notifying shipping for order: %s", d.Body)
            // 处理发货通知逻辑
        }
    }()

    <-forever
}

运行步骤

1.启动 RabbitMQ(如果使用 Docker 已启动,则跳过这步)。

docker run -d --hostname rabbitmq --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

2.启动消费者:分别运行 consumer_inventory.goconsumer_email.goconsumer_shipping.go文件。

3.发送订单消息:运行 producer.go 文件,模拟用户下单。
在这里插入图片描述

运行后的验证

1.消费者处理消息的日志输出:

每个消费者程序启动后,会开始监听 RabbitMQ 中相应的队列。当生产者发送消息时,消费者会从队列中读取消息,并输出处理的日志信息。
消费者处理消息的流程如下:
    consumer_inventory.go 处理库存更新的消息。
    consumer_email.go 处理订单确认邮件的发送。
    consumer_shipping.go 处理发货通知的消息。

你可以在每个运行的消费者窗口中看到相应的日志输出,例如:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2.RabbitMQ 管理界面监控: 通过访问 RabbitMQ 管理界面,你可以检查 RabbitMQ 队列的状态:

查看哪些队列正在运行。
检查队列中是否有消息积压。
查看消息的流入和流出情况,确保消费者正在从队列中消费消息。

在这里插入图片描述

项目中的关键点解释

RabbitMQ 消息模型:
    生产者会将消息发布到 RabbitMQ 的一个交换机(Exchange)上,交换机会将消息路由到相应的队列(Queue)。
    消费者监听队列并从中获取消息进行处理。每个消费者可以处理一个特定类型的消息。

AMQP协议:
    Go 代码中的 amqp 库基于 AMQP 协议(Advanced Message Queuing Protocol),这是 RabbitMQ 使用的消息协议。它定义了如何在生产者和消费者之间传递消息。

队列持久化与自动应答:
    RabbitMQ 队列可以配置为持久化消息,确保在系统重启时消息不会丢失。
    你可以在消费者中配置是否自动应答(acknowledgment),以确认消息处理成功后再从队列中移除。当前例子中是使用自动应答的模式。

问题排查

如果在运行过程中遇到问题,可以按以下步骤排查:

1.检查 RabbitMQ 是否正常运行:确保 Docker 容器正常运行,并且端口没有冲突。

docker ps

如果 RabbitMQ 没有启动,可以通过命令 docker start some-rabbit 来启动。

2.确保消费者能够连接到 RabbitMQ:如果消费者无法连接到 RabbitMQ,可能是因为配置错误或 RabbitMQ 服务未启动。检查消费者的日志输出,确认连接是否成功。

3.查看 RabbitMQ 日志:通过 RabbitMQ 管理界面可以查看日志,排查是否有消息未路由到队列或者连接失败的错误。

通过这些步骤,你应该能够顺利运行和验证这个消息队列项目。如果有更多问题或需要其他帮助,随时告诉我!

总结

通过这篇博客,你应当掌握了 RabbitMQ 的基本使用方法,了解了如何将 RabbitMQ 与 Go 应用集成,从而构建可靠的消息传递系统。这些技能将为你在开发异步消息处理和微服务架构方面奠定坚实的基础。


http://www.kler.cn/a/292045.html

相关文章:

  • 整数唯一分解定理
  • Python教程笔记(2)
  • 嵌入式硬件电子电路设计(五)MOS管详解(NMOS、PMOS、三极管跟mos管的区别)
  • Python安装(ubuntu)
  • reduce-scatter:适合分布式计算;Reduce、LayerNorm和Broadcast算子的执行顺序对计算结果的影响,以及它们对资源消耗的影响
  • The 3rd Universal CupStage 15: Chengdu, November 2-3, 2024(2024ICPC 成都)
  • Apple “Glowtime”活动:iPhone 16、Apple Intelligence亮相
  • SQL进阶技巧:给定数字的频率查询中位数 | 中位值计算问题
  • vscode 20 个实用插件
  • 计算机毕业设计选题推荐-高校实验室教学管理系统-Java/Python项目实战
  • c语言中的动态内存管理
  • 面向可信和节能的雾计算医疗决策支持系统的优化微型机器学习与可解释人工智能
  • uni-app应用更新(Android端)
  • C语言预处理详解
  • 彻底解决 node/npm, Electron下载失败相关问题, 从底层源码详解node electron 加速配置
  • 无需更换摄像头,无需施工改造,降低智能化升级成本的智慧工业开源了。
  • ClickHousez中如何定时清理过期数据库?
  • 生信机器学习入门4 - scikit-learn训练逻辑回归(LR)模型和支持向量机(SVM)模型
  • Qt (13)【Qt窗口 —— 颜色对话框 QColorDialog】
  • spring cloud gateway 之删除请求头
  • 【CNN训练梯度裁剪】
  • HarmonyOS $r访问资源
  • MyPrint打印设计器(九)svg篇-圆
  • 【计算机视觉前沿研究 热点 顶会】ECCV 2024中Mamba有关的论文
  • C# NX二次开发-获取体全部面
  • Circuitjs 在线电路模拟器使用指南