当前位置: 首页 > article >正文

SpringBoot 集成RabbitMQ 实现钉钉日报定时发送功能

文章目录

  • 一、RabbitMq 下载安装
  • 二、开发步骤:
    • 1.MAVEN 配置
    • 2. RabbitMqConfig 配置
    • 3. RabbitMqUtil 工具类
    • 4. DailyDelaySendConsumer 消费者监听
    • 5. 测试延迟发送

一、RabbitMq 下载安装

官网:https://www.rabbitmq.com/docs

二、开发步骤:

1.MAVEN 配置

   		<!--RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.7.7</version>
        </dependency>

2. RabbitMqConfig 配置

package com.lq.common.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class RabbitMqConfig {
    /**延迟交换机名称*/
    public static final String  DELAY_EXCHANGE="DelayExchange";
    /**延迟队列名称*/
    public static final String  DELAY_QUEUE="DelayQueue";

    public static final String ROUTING_KEY="delay";


    @Bean
    public CustomExchange customExchange(){
        Map<String, Object> map = new HashMap<>();
        //设置交换机支持延迟消息推送
        map.put("x-delayed-type","direct");
        return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,map);
    }

    @Bean
    public Queue delayQueue(){

        return new Queue(DELAY_QUEUE,true);

    }

    @Bean
    public Binding DelayBinding(){
        return BindingBuilder.bind(delayQueue()).to(customExchange()).with(ROUTING_KEY).noargs();
    }

}

3. RabbitMqUtil 工具类

package com.lq.common.util;

import com.lq.common.config.RabbitMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


@Service
@Slf4j
public class RabbitMqUtil {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private DateTimeFormatter formatterDateTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    @PostConstruct
    public void init(){
        /**
         * 消息发送到交换机成功回调函数
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack){
                    log.info("消息投递到交换机成功");
                }else {
                    log.error("消息投递到交换机失败,原因->{}",cause);
                }
            }
        });
        /**交换机投递到队列失败回调函数**/
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("投递到队列失败,错误原因->{}",returned);
            }
        });

    }

    /**
     * @Description 发送延迟消息
     * @param content 延迟内容
     * @param delayTime 延迟时间 ,单位ms;  例如 5000 代表 5 秒
     * @Author hqd
     * @Date 2024-10-21
     */
    public Boolean sendDelayMessage(String content,Integer delayTime){
        log.info("消息发送时间->{}",LocalDateTime.now().format(formatterDateTime));

        rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE, RabbitMqConfig.ROUTING_KEY, content, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                log.info("延迟时间->{}",delayTime);
                //这个底层就是setHeader("x-delay",i);是一样的 设置延时时间
                message.getMessageProperties().setDelay(delayTime);//单位毫秒
                return message;
            }
        });
        return true;

    }


}

4. DailyDelaySendConsumer 消费者监听

package com.lq.daily.mq.consumer;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.lq.common.config.RabbitMqConfig;
import com.lq.daily.dto.DailyDelaySendDTO;
import com.lq.daily.service.ILqDailyService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * @Description 日报延迟发送消费者
 * @Author hqd
 * @Date 2024-10-21 16:04
 */
@Slf4j
@Component
public class DailyDelaySendConsumer {
    @Autowired
    private ILqDailyService lqDailyService;

    private DateTimeFormatter formatterDateTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    @RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE)
    public void dailyDelaySendListener(String content, Channel channel, Message message) throws IOException, InterruptedException{
        log.info("消息接收时间->{}", LocalDateTime.now().format(formatterDateTime));
        log.info("接收消息内容是->{}",content);
        log.info("{}",message.getMessageProperties().getDeliveryTag());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

        //处理日报发送业务逻辑
        if (StrUtil.isNotBlank(content)&& content.startsWith("{")){
            DailyDelaySendDTO dto = JSONObject.parseObject(content, DailyDelaySendDTO.class);
            if (ObjectUtil.isNotEmpty(dto)){
                lqDailyService.updateDailyDelaySend(dto.getDailyCode(), LocalDateTime.parse(dto.getDelaySendTime(),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")));
            }
        }

    }
}

5. 测试延迟发送

   @PassToken
    @GetMapping("/testDelayMq")
    @ApiOperation("测试Mq 延迟消息发送")
    public void testDelayMq(){
        DailyDelaySendDTO dto = new DailyDelaySendDTO();
        dto.setDailyCode("DC2024101015135400001");
        dto.setDelaySendTime("2024-10-22 10:58");

        LocalDateTime sendTime = LocalDateTime.parse(dto.getDelaySendTime()+":00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        long between = ChronoUnit.MILLIS.between(LocalDateTime.now(), sendTime);

        rabbitMqUtil.sendDelayMessage(JSON.toJSONString(dto),new Long(between).intValue());
    }

在这里插入图片描述


http://www.kler.cn/a/370182.html

相关文章:

  • “大模型横扫千军”背后的大数据挖掘--浅谈MapReduce
  • Decode Global专注金融创新,构建便捷的交易平台
  • .NET 8 项目 Docker 方式部署到 Linux 系统详细操作步骤
  • 云计算、AI与国产化浪潮下DBA职业之路风云变幻,如何谋破局启新途?
  • 【知识分享】PCIe5.0 TxRx 电气设计参数汇总
  • Bash语言的函数实现
  • [LeetCode] 526. 优美的排列
  • Docker | 校园网上docker pull或者docker run失败的一种解决方法
  • 探索C嘎嘎:认识string类
  • 【大数据分析与挖掘模型】matlab实现——非线性回归预测模型
  • 【计算机网络 - 基础问题】每日 3 题(五十七)
  • 《等保测评:安全与发展的双轮驱动》
  • 14 C语言中的关键字
  • Prometheus+Telegraf实现自定义监控项配置
  • RDD的常用转换算子
  • Qt实现播放器顶部、底部悬浮工具栏
  • typescript学习计划(一)--简单介绍typescript
  • VUE组件学习 | 六、v-if, v-else-if, v-else组件
  • OpenAI否认今年将发布“Orion”模型,其语音转写工具Whisper被曝存在重大缺陷|AI日报
  • 【C++】--------- 内存管理
  • 【spark】spark structrued streaming读写kafka 使用kerberos认证
  • 【网络篇】计算机网络——链路层详述(笔记)
  • List<T>属性和方法使用
  • springboot整合kafka
  • 【学习心得】远程root用户访问服务器中的MySQL8
  • 图片分类标注工具python