Mysql与ElasticSearch间的数据同步场景模拟
一、场景简介
现有酒店管理与酒店搜索预定两个分离的微服务模块,为了数据的安全性我们在就带你管理模块通过Mysql进行crud,为了搜索的高效和质量在搜索预定模块我们采用了ElasticSearch搜索引擎(视作一种NoSQL 数据库),这样一来如何同步数据就是我们必须要解决的问题。
而上图这样的调用暴露接口的方式无疑又违背了分模块单一职责的初衷,降低了效率又增加了耦合,为了解决这一问题,我们将采用消息中间件Rabbit MQ来承担通讯的角色。
二、问题解决
有关MQ的内容大家可以自行查找或查看官方文档RabbitMQ Tutorials | RabbitMQ
1.引入依赖
因为两个模块一个发布,一个监听都需要用到MQ,所以两者都要引入依赖
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置.yml
spring:
rabbitmq:
port: 5672
host: 192.168.56.128 #主机地址
username: #账号
password: #密码
virtual-host: / #虚拟主机
2.常量定义
一些关于MQ中间件定义和使用时需要用到的常量,当然也可以直接在用的时候再声明,这里选择做统一管理。
public class MqConstant {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE="hotel.topic";
/**
* 监听新增和修改的队列
*/
public final static String HOTEL_INSERT_QUEUE="hotel.insert.queue";
/**
* 监听删除的队列
*/
public final static String HOTEL_DELETE_QUEUE="hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY="hotel.insert.key";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY="hotel.delete.key";
}
3.组件定义
可以直接在@RabbitListener注解中定义绑定关系等属性,这里为了看起来更清晰,我采用@Bean注解定义在配置类,放在需要进行监听的用户搜索模块
import cn.itcast.hotel.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
//定义交换机,关于fanout,direct,topic三种不同交换机的区别可查看官方文档
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstant.HOTEL_EXCHANGE,true,false);
}
//定义增改队列
@Bean
public Queue insertQueue(){
return new Queue(MqConstant.HOTEL_INSERT_QUEUE,true);
}
//定义删除队列
@Bean
public Queue deleteQueue(){
return new Queue(MqConstant.HOTEL_DELETE_QUEUE,true);
}
//定义增改绑定关系
@Bean
public Binding insertQueueBinding(){
return new Binding(MqConstant.HOTEL_INSERT_QUEUE,
Binding.DestinationType.QUEUE,MqConstant.HOTEL_EXCHANGE,
MqConstant.HOTEL_INSERT_KEY,null);
}
//定义删除绑定关系
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstant.HOTEL_DELETE_KEY);
}
}
4.消息发布
酒店管理模块中在增删改业务成功完成后应该向MQ发布消息通知,我们来补全Service中的逻辑
import cn.itcast.hotel.constant.MqConstant;
import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.service.IHotelService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 保存酒店信息并发送消息到 RabbitMQ
* @param hotel 酒店对象
*/
public void saveHotelWithMessage(Hotel hotel) {
// 使用 MyBatis-Plus 保存酒店信息到数据库
boolean saveSuccess = this.save(hotel);
if (saveSuccess) {
// 发送消息到 RabbitMQ
rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE, MqConstant.HOTEL_INSERT_KEY, hotel.getId());
} else {
throw new RuntimeException("保存酒店信息失败");
}
}
/**
* 更新酒店信息并发送消息到 RabbitMQ
* @param hotel 酒店对象
*/
public void updateHotelWithMessage(Hotel hotel) {
// 检查 ID 是否存在
if (hotel.getId() == null) {
throw new IllegalArgumentException("ID 不能为空");
}
// 使用 MyBatis-Plus 更新酒店信息
boolean updateSuccess = this.updateById(hotel);
if (updateSuccess) {
// 发送消息到 RabbitMQ
rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE, MqConstant.HOTEL_INSERT_KEY, hotel.getId());
} else {
throw new RuntimeException("更新酒店信息失败");
}
}
/**
* 删除酒店信息并发送消息到 RabbitMQ
* @param id
*/
@Override
public void removeByIdWithMessage(Long id) {
if (id == null) {
throw new IllegalArgumentException("ID 不能为空");
}
// 使用 MyBatis-Plus 更新酒店信息
boolean removeSuccess = this.removeById(id);
if (removeSuccess) {
// 发送消息到 RabbitMQ
rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE, MqConstant.HOTEL_DELETE_KEY, id);
} else {
throw new RuntimeException("删除酒店信息失败");
}
}
}
5.监听消息
在用户搜索模块,需要监听MQ发来的消息,根据key判断并执行ES的更新操作
- 监听
import cn.itcast.hotel.constant.MqConstant; import cn.itcast.hotel.service.IHotelService; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class HotelListener { @Autowired private IHotelService hotelService; /** * 监听酒店新增或修改 * @param id */ @RabbitListener(queues = MqConstant.HOTEL_INSERT_QUEUE) public void listenHotelInsert(Long id){ hotelService.insertDocById(id); } /** * 监听酒店删除 * @param id */ @RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE) public void listenHotelDelete(Long id){ hotelService.deleteDocById(id); } }
- 更新操作
@Override public void deleteDocById(Long id) { //request对象 DeleteRequest deleteRequest = new DeleteRequest("hotel",id.toString()); //发送请求 try { restHighLevelClient.delete(deleteRequest,RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void insertDocById(Long id) { //获取数据库数据 Hotel hotel = getById(id); //转换es HotelDoc hotelDoc = new HotelDoc(hotel); //request对象 IndexRequest indexRequest =new IndexRequest("hotel").id(hotel.getId().toString()); //json indexRequest.source(JSON.toJSONString(hotelDoc), XContentType.JSON); //发送请求 try { restHighLevelClient.index(indexRequest,RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
三、功能测试
在酒店管理模块中更改MySQL数据库内容,观察是否同步到利用ES数据库回显的用户查询模块,以下是价格1500以上酒店的间隔升序回显:
可以看到深圳大中华最低,以1599排在第一页首位
我们在管理模块将其价格上升十倍
再回到用户搜索模块,可以看到深圳大中华价格已经变为15990并且出现在最后一页末尾,可见ES中数据也被同步修改,这里就不在通过DSL查询向大家展示了。
通过以上更改,我们实现了更合理高效的数据同步,那么本次的分享到这里就结束了,感谢大家阅读。