当前位置: 首页 > article >正文

rabbitMQ 简单使用

安装 rabbitMQ

下载地址:rabbitmq-3.12.0

安装 windows rabbitMQ 需要的命令

进入 rabbitMQ 的 sbin 目录后 cmd (需要管理员权限)
在这里插入图片描述

rabbitmq-plugins.bat enable rabbitmq_management

随后重启 rabbitMQ

#关闭服务
net stop rabbitmq 
#开启服务
net start rabbitmq 

UI 界面地址 (用户名密码都为 guest)

http://localhost:15672

rabbitMQ 在 Java 中的使用

yml 配置

#rabbitmq 配置
rabbitmq:
  host: localhost
  port: 5672
  password: guest
  username: guest

依赖引入

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

● 路由键(Routing Key):这是一个字符串,由生产者在发送消息时指定,用于指示交换机应该将消息路由到哪个队列。路由键通常与消息的内容或类型有关。
● 交换机(Exchange):交换机是消息传递的中转站,它负责接收来自生产者的消息,并根据路由键将消息路由到一个或多个队列。
● 队列(Queue):队列是存储消息的地方,消费者从队列中获取消息进行处理。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

/**
 * 用于创建交换机和队列(只用在程序启动前执行一次----用@PostConstruct实现)
 */
@Slf4j
@Component
public class InitRabbitMqBean {

    @Value("${spring.rabbitmq.host:localhost}")
    private String host;
    // @PostConstruct用于在Spring容器实例化Bean之后执行初始化逻辑,且只执行一次
    @PostConstruct
    public void init() {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(host);
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            String EXCHANGE_NAME = "code_exchange";
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            // 创建队列,随机分配一个队列名称
            String queueName = "code_queue";
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, EXCHANGE_NAME, "my_routingKey");
            log.info("消息队列启动成功");
        } catch (Exception e) {
            log.error("消息队列启动失败", e);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.wxw.znojbackendjudgeservice.judge.JudgeService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

/**
 * @description 消息消费者
 */
@Component
@Slf4j
public class MyMessageConsumer {

    @Resource
    private JudgeService judgeService;

    // 指定程序监听的消息队列和确认机制
    @SuppressWarnings("LanguageDetectionInspection")
    @SneakyThrows
    // @RabbitListener 是 SpringBoot 中用于简化 RabbitMQ 消息监听器配置的注解
    // 它允许你将一个方法标记为消息监听器,这样当队列中有消息到达时,SpringBoot 将自动调用该方法来处理消息
    // 消息确认模式被设置为 MANUAL(消息处理完成后需要手动确认)
    @RabbitListener(queues = {"code_queue"}, ackMode = "MANUAL")
    public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        log.info("receiveMessage message = {}", message);
        long questionSubmitId = Long.parseLong(message);
        try {
            judgeService.doJudge(questionSubmitId);
            // 确认 deliveryTag 标识的单个消息已经被成功处理
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 拒绝 deliveryTag 标识的单个消息,并将其重新放回队列中
            channel.basicNack(deliveryTag, false, false);
        }
    }
}
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

/**
 * 消息生产者
 */
@Component
public class MyMessageProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     * @param exchange
     * @param routingKey
     * @param message
     */
    public void sendMessage(String exchange, String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

在 OJ 项目中,用户做题后提交,提交的题目 ID 就是消息,此时生产者调用发送消息方法,将题目 ID 作为消息发送给交换机 code_exchange,之后再由交换机决定转发给哪个消费者

@Override
public long doQuestionSubmit(QuestionSubmitAddRequest questionSubmitAddRequest, User loginUser) {
    // ...
    // ...
    Long questionSubmitId = questionSubmit.getId();
    // 发送消息----异步调用判题服务
    myMessageProducer.sendMessage("code_exchange", "my_routingKey", String.valueOf(questionSubmitId));
    // ...
    // ...
}

http://www.kler.cn/a/325484.html

相关文章:

  • excel仅复制可见单元格,仅复制筛选后内容
  • 【后端面试总结】tls中.crt和.key的关系
  • 鸿蒙打包发布
  • 09.VSCODE:安装 Git for Windows
  • 如何优化Elasticsearch大文档查询?
  • 从代码层面熟悉UniAD,开始学习了解端到端整体架构
  • 23中设计模式,以及三种常见的设计模式demo
  • 使用::selection改变文字被选中后的颜色
  • 深圳mes制造系统的主要功能
  • WIFI密码默认显示
  • OpenAI员工流失的背后:地盘争夺、倦怠、薪酬要求
  • 大模型+AIGC技术实操:GPT 大模型部署使用 AIGC实战落地方案
  • LeetCode讲解篇之3. 无重复字符的最长子串
  • springboot异常(三):异常处理原理
  • 超详细的华为ICT大赛报名流程
  • golang学习笔记32——哪些是用golang实现的热门框架和工具
  • Android Webview和ScrollView冲突和WebView使用总结
  • 数仓建模:DataX同步Mysql数据到Hive如何批量生成建表语句?| 基于SQL实现
  • cuda程序编译流程
  • Uniapp 跨域
  • 超好用的10款视频剪辑软件,从入门到精通
  • 浅谈GDDRAM的三种寻址模式
  • DigitalOcean 全球负载均衡是什么?
  • DBMS-2.3 数据库设计(3)——数据库规范化设计实现(3NF、BCNF模式分解)
  • 【有啥问啥】具身智能(Embodied AI):人工智能的新前沿
  • 基于Python大数据可视化的民族服饰数据分析系统