当前位置: 首页 > article >正文

Mysql与ElasticSearch间的数据同步场景模拟

一、场景简介

        现有酒店管理与酒店搜索预定两个分离的微服务模块,为了数据的安全性我们在就带你管理模块通过Mysql进行crud,为了搜索的高效和质量在搜索预定模块我们采用了ElasticSearch搜索引擎(视作一种NoSQL 数据库),这样一来如何同步数据就是我们必须要解决的问题。

 

        而上图这样的调用暴露接口的方式无疑又违背了分模块单一职责的初衷,降低了效率又增加了耦合,为了解决这一问题,我们将采用消息中间件Rabbit MQ来承担通讯的角色。

 

 二、问题解决

        有关MQ的内容大家可以自行查找或查看官方文档RabbitMQ Tutorials | RabbitMQ

1.引入依赖

因为两个模块一个发布,一个监听都需要用到MQ,所以两者都要引入依赖

        <!--amqp-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

 2.配置.yml

spring:
  rabbitmq:
    port: 5672
    host: 192.168.56.128 #主机地址
    username: #账号
    password: #密码
    virtual-host: / #虚拟主机

2.常量定义

        一些关于MQ中间件定义和使用时需要用到的常量,当然也可以直接在用的时候再声明,这里选择做统一管理。

public class MqConstant {
    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE="hotel.topic";
    /**
     * 监听新增和修改的队列
     */
    public final static String HOTEL_INSERT_QUEUE="hotel.insert.queue";
    /**
     * 监听删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE="hotel.delete.queue";
    /**
     * 新增或修改的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY="hotel.insert.key";
    /**
     * 删除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY="hotel.delete.key";

}

 3.组件定义

可以直接在@RabbitListener注解中定义绑定关系等属性,这里为了看起来更清晰,我采用@Bean注解定义在配置类,放在需要进行监听的用户搜索模块

import cn.itcast.hotel.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
    //定义交换机,关于fanout,direct,topic三种不同交换机的区别可查看官方文档
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstant.HOTEL_EXCHANGE,true,false);
    }
    //定义增改队列
    @Bean
    public Queue insertQueue(){
        return new Queue(MqConstant.HOTEL_INSERT_QUEUE,true);
    }
    //定义删除队列
    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstant.HOTEL_DELETE_QUEUE,true);
    }
    //定义增改绑定关系
    @Bean
    public Binding insertQueueBinding(){
        return new Binding(MqConstant.HOTEL_INSERT_QUEUE,
                Binding.DestinationType.QUEUE,MqConstant.HOTEL_EXCHANGE,
                MqConstant.HOTEL_INSERT_KEY,null);
    }
    //定义删除绑定关系
    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstant.HOTEL_DELETE_KEY);
    }
}

 4.消息发布

酒店管理模块中在增删改业务成功完成后应该向MQ发布消息通知,我们来补全Service中的逻辑

import cn.itcast.hotel.constant.MqConstant;
import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.service.IHotelService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 保存酒店信息并发送消息到 RabbitMQ
     * @param hotel 酒店对象
     */
    public void saveHotelWithMessage(Hotel hotel) {
        // 使用 MyBatis-Plus 保存酒店信息到数据库
        boolean saveSuccess = this.save(hotel);
        if (saveSuccess) {
            // 发送消息到 RabbitMQ
            rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE, MqConstant.HOTEL_INSERT_KEY, hotel.getId());
        } else {
            throw new RuntimeException("保存酒店信息失败");
        }
    }
    /**
     * 更新酒店信息并发送消息到 RabbitMQ
     * @param hotel 酒店对象
     */
    public void updateHotelWithMessage(Hotel hotel) {
        // 检查 ID 是否存在
        if (hotel.getId() == null) {
            throw new IllegalArgumentException("ID 不能为空");
        }

        // 使用 MyBatis-Plus 更新酒店信息
        boolean updateSuccess = this.updateById(hotel);
        if (updateSuccess) {
            // 发送消息到 RabbitMQ
            rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE, MqConstant.HOTEL_INSERT_KEY, hotel.getId());
        } else {
            throw new RuntimeException("更新酒店信息失败");
        }
    }

    /**
     * 删除酒店信息并发送消息到 RabbitMQ
     * @param id
     */
    @Override
    public void removeByIdWithMessage(Long id) {
        if (id == null) {
            throw new IllegalArgumentException("ID 不能为空");
        }
        // 使用 MyBatis-Plus 更新酒店信息
        boolean removeSuccess = this.removeById(id);
        if (removeSuccess) {
            // 发送消息到 RabbitMQ
            rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE, MqConstant.HOTEL_DELETE_KEY, id);
        } else {
            throw new RuntimeException("删除酒店信息失败");
        }
    }
}

5.监听消息

在用户搜索模块,需要监听MQ发来的消息,根据key判断并执行ES的更新操作

  • 监听
    import cn.itcast.hotel.constant.MqConstant;
    import cn.itcast.hotel.service.IHotelService;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HotelListener {
        @Autowired
        private IHotelService hotelService;
        /**
         * 监听酒店新增或修改
         * @param id
         */
        @RabbitListener(queues = MqConstant.HOTEL_INSERT_QUEUE)
        public void listenHotelInsert(Long id){
            hotelService.insertDocById(id);
        }
        /**
         * 监听酒店删除
         * @param id
         */
        @RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE)
        public void listenHotelDelete(Long id){
            hotelService.deleteDocById(id);
        }
    }
    
  •  更新操作
    @Override
        public void deleteDocById(Long id) {
            //request对象
            DeleteRequest deleteRequest = new DeleteRequest("hotel",id.toString());
            //发送请求
            try {
                restHighLevelClient.delete(deleteRequest,RequestOptions.DEFAULT);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public void insertDocById(Long id) {
            //获取数据库数据
            Hotel hotel = getById(id);
            //转换es
            HotelDoc hotelDoc = new HotelDoc(hotel);
            //request对象
            IndexRequest indexRequest =new IndexRequest("hotel").id(hotel.getId().toString());
            //json
            indexRequest.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            //发送请求
            try {
                restHighLevelClient.index(indexRequest,RequestOptions.DEFAULT);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

三、功能测试 

         在酒店管理模块中更改MySQL数据库内容,观察是否同步到利用ES数据库回显的用户查询模块,以下是价格1500以上酒店的间隔升序回显:

可以看到深圳大中华最低,以1599排在第一页首位

 我们在管理模块将其价格上升十倍

再回到用户搜索模块,可以看到深圳大中华价格已经变为15990并且出现在最后一页末尾,可见ES中数据也被同步修改,这里就不在通过DSL查询向大家展示了。

通过以上更改,我们实现了更合理高效的数据同步,那么本次的分享到这里就结束了,感谢大家阅读。


http://www.kler.cn/a/588990.html

相关文章:

  • MySQL数据库(数据库操作)4
  • Vue.js 中的计算属性、监听器与方法:区别与使用场景
  • 系统架构设计师—论文解析—论文写作技巧
  • How to install cangjie on Linux mint 22.1
  • python和devops
  • 在 macOS 上优化 Vim 用于开发
  • 论文阅读笔记——ADALORA: ADAPTIVE BUDGET ALLOCATION FOR PARAMETER-EFFICIENT FINE-TUNING
  • 五种最新优化算法(ALA、AE、DOA、GOA、OX)求解多个无人机协同路径规划(可以自定义无人机数量及起始点),MATLAB代码
  • SpringBoot中的定时任务实现方式有哪些?
  • Flutter:签名板封装
  • 沐数科技数据开发岗笔试题2025
  • 《C#上位机开发从门外到门内》3-2::Modbus数据采集系统
  • HTTP+DNS综合实验
  • 谷粒商城:性能压测JVM堆区
  • C# Winform 实现换肤,并自定义皮肤功能
  • C#—闭包详解
  • 群体智能优化算法-旗鱼优化算法 (Sailfish Optimizer, SFO,含Matlab源代码)
  • SQL Server表数据变更捕获的5种方法及实战对比
  • C++(初阶)(五)——类和对象(下)
  • 【极光 Orbit·STC8x】05. GPIO库函数驱动LED流动