当前位置: 首页 > article >正文

elasticsearch实战三 elasticsearch与mysql数据实时同步

一 介绍

elasticsearch数据不是一直不变的,需要与mysql、oracle等数据库的数据做同步。
本博客里涉及到的项目地址:https://www.aliyundrive.com/s/7bRWpTYsxWV

方案一: 同步调用,即操作mysql数据后,接着操作elasticsearch的数据

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高
    在这里插入图片描述

方案二: 引入mq中间件,操作完mysql后,发消息给mq,然后更新elasticsearch。

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

在这里插入图片描述

方案三: 监听mysql的binlog日志,操作mysql时,监听到binlog后,接着操作elasticsearch数据

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高(且目前只有mysql支持binlog)

在这里插入图片描述
本文介绍比较通用的方案,即方案二,使用的mq消息队列是rabbitmq

二 消费端搭建服务

资料里的hotel-admin项目,是用来操作mysql、产生mq消息的。hotel-demo项目,是用来操作es、消费mq消息的。
在这里插入图片描述

2.1 hotel-demo项目搭建rabbitmq

声明消息队列里的exchage、queue、RoutingKey
声明交换机、队列等,一般都是在消费者里操作。由于对于ES来说,新增与修改是一样的(修改时,找不到id,就会新增),所以队列只声明新增、删除两种队列即可。

hotel-demo:引入依赖

<!--amqp-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

添加rabbitmq配置信息
在这里插入图片描述
声明exchange、queue、RoutingKey

//常量类定义交换机、队列、路由key等,消息的消费者和发送者都要定义这个类
public class HotelMqConstants {

    /**
     * 交换机名称
     */
    public static final String EXCHANGE_NAME = "hotel.topic";

    /**
     * 新增、修改队列
     */
    public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";

    /**
     * 删除队列
     */
    public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";

    /**
     * 新增或修改的RoutingKey
     */
    public static final String INSERT_KEY = "hotel.insert";
    /**
     * 删除的RoutingKey
     */
    public static final String DELETE_KEY = "hotel.delete";
}

定义队列、主题等的绑定关系时,有两种方式

  1. 基于注解(较简单)
  2. 基于bean

这里使用基于bean的方式,MqConfig.java配置类

public class HotelMqConstants {
    //交换机名称
    public static final String EXCHANGE_NAME = "hotel.topic";
    //插入、更新数据时的队列名
    public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";
    //删除数据时的队列名
    public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";
    //插入、更新数据时的RoutingKey
    public static final String INSERT_KEY = "hotel.insert";
    //删除数据时的RoutingKey
    public static final String DELETE_KEY = "hotel.delete";
}
import cn.itcast.hotel.constants.HotelMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
    /**
     * 定义交换机
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        //参数一:交换机名字
        //参数二:持久化
        return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);
    }
    /**
     * 插入、更新数据的队列
     */
    @Bean
    public Queue insertQueue(){
        return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);
    }
    /**
     * 删除数据的队列
     */
    @Bean
    public Queue deleteQueue(){
        return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);
    }
    /**
     * 定义插入、更新数据时,队列、交换机、路由key的绑定关系
     */
    public Binding insertQueueBinding(){
        //队列绑定交换机、绑定RoutingKey
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY);
    }

    /**
     * 定义删除数据时,队列、交换机、路由key的绑定关系
     */
    public Binding deleteQueueBinding(){
        //队列绑定交换机、绑定RoutingKey
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY);
    }
}

2.2 hotel-admin项目搭建rabbitmq

启动访问8099端口
在这里插入图片描述
引入amqp依赖

<!--amqp-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

定义常量类

public class HotelMqConstants {
    //交换机名称
    public static final String EXCHANGE_NAME = "hotel.topic";
    //插入、更新数据时的队列名
    public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";
    //删除数据时的队列名
    public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";
    //插入、更新数据时的RoutingKey
    public static final String INSERT_KEY = "hotel.insert";
    //删除数据时的RoutingKey
    public static final String DELETE_KEY = "hotel.delete";
}

配置rabbitmq地址
在这里插入图片描述
发送mq消息代码

public class HotelController {
   @Autowired
    private IHotelService hotelService;

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        // 新增酒店
        hotelService.save(hotel);
        // 发送MQ消息(第三个参数是消息内容:hotel.getId())
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);

        // 发送MQ消息第三个参数是消息内容:hotel.getId())
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);

        // 发送MQ消息(第三个参数是消息内容:hotel.getId())
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);
    }
 }

2.3 hotel-demo监听消息

hotel-demo是消费者,负责监听消息

添加es依赖

<!--elasticsearch-->
<dependency>
     <groupId>org.elasticsearch.client</groupId>
     <artifactId>elasticsearch-rest-high-level-client</artifactId>
     <version>7.12.1</version>
</dependency>

import cn.itcast.hotel.constants.HotelMqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HotelListener {

    @Autowired
    private IHotelService hotelService;

    /**
     * 监听新增、修改的消息
     * @param hotelId
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),
            exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
            key = HotelMqConstants.INSERT_KEY
    ))
    public void listenHotelInsert(Long hotelId){
        // 新增
        hotelService.saveById(hotelId);
    }

    /**
     * 监听删除的消息
     * @param hotelId
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),
            exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
            key = HotelMqConstants.DELETE_KEY
    ))
    public void listenHotelDelete(Long hotelId){
        // 删除
        hotelService.deleteById(hotelId);
    }
}
import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
    @Autowired
    private RestHighLevelClient restHighLevelClient;
    
    @Override
    public void saveById(Long hotelId) {
        try {
            // 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查)
            Hotel hotel = getById(hotelId);
            // 把hotel对象转换为hotel的Doc
            HotelDoc hotelDoc = new HotelDoc(hotel);

            // 1.创建Request
            IndexRequest request = new IndexRequest("hotel").id(hotelId.toString());
            // 2.准备参数
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            // 3.发送请求
            restHighLevelClient.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException("新增酒店数据失败", e);
        }
    }
@Override
    public void deleteById(Long hotelId) {
        try {
            // 1.创建request
            DeleteRequest request = new DeleteRequest("hotel", hotelId.toString());
            // 2.发送请求
            restHighLevelClient.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException("删除酒店数据失败", e);
        }
    }
}

http://www.kler.cn/a/539087.html

相关文章:

  • C++ ——从C到C++
  • stm32小白成长为高手的学习步骤和方法
  • 音频进阶学习十一——离散傅里叶级数DFS
  • Spring Boot接入Deep Seek的API
  • PyTorch Profiler 的使用
  • DeepSeek-R1模型的数学原理(说人话)
  • 活动预告 | Power Hour: Copilot 引领商业应用的未来
  • 全面理解-c++11中的移动语义
  • Windows系统下设置Vivado默认版本:让工程文件按需打开
  • emlog最新跨站脚本漏洞(CNVD-2025-01607、CVE-2024-13140)
  • DeepSeek为何能爆火
  • QUIC 与 UDP 关系
  • 知识图谱可视化系统python+neo4j+vue3
  • 1.2 变革里程碑:Transformer 的崛起
  • 使用wpa_supplicant和wpa_cli 扫描wifi热点及配网
  • python--常用内置库
  • 机器学习:朴素贝叶斯分类器
  • BMS应用软件开发 — 11 CAN通讯
  • MongoDB开发规范
  • 青少年编程与数学 02-009 Django 5 Web 编程 03课题、项目结构
  • Puck.js,一款基于React的开源可视化编辑器
  • 内存的RANK具体指什么?
  • ML.NET库学习004:ML.NET基础知识复盘
  • 202406 青少年软件编程等级考试C/C++ 三级真题答案及解析(电子学会)
  • 测试文章内容1
  • 如何写出优秀的单元测试?