电商项目-数据同步解决方案(三)商品上架同步更新ES索引库
一、 需求分析和业务逻辑
主要应用技术有:Feign远程调用, 消息队列-RabbitMQ ,分布式搜索引擎-ElasticSearch,Eureka,Canal
商品上架将商品的sku列表导入或者更新索引库。
数据监控微服务需要定义canal监听器,监听商品表的改变,一旦发现商品表内容改变,需要将商品的SPUID发送到消息队列中。
实现思路:
(1)在数据监控微服务中监控tb_spu表的数据,当tb_spu发生更改且is_marketable为1时,表示商品上架,将spu的id发送到rabbitmq。
(2)在rabbitmq管理后台创建商品上架交换器(fanout)。使用分列模式的交换器是考虑商品上架会有很多种逻辑需要处理,导入索引库只是其中一项,另外还有商品详细页静态化等操作。这样我们可以创建导入索引库的队列和商品详细页静态化队列并与商品上架交换器进行绑定。
(3)搜索微服务从rabbitmq的导入索引库的队列中提取spu的id,通过feign调用商品微服务得到sku的列表,并且通过调用elasticsearch的高级restAPI 将sku列表导入到索引库。
二、 搭建环境和代码实现
主要应用技术有:Feign远程调用, 消息队列-RabbitMQ ,分布式搜索引擎-ElasticSearch,Eureka,Canal
实现步骤:
2.1 发送消息到mq
(1)在rabbitmq后台创建交换器goods_up_exchange(类型为fanout),创建队列search_add_queue绑定交换器goods_up_exchange,更新rabbitmq配置类
首先定义交换机名称GOODS_UP_EXCHANGE,定义一个队列用来接收商品消息SEARCH_ADD_QUEUE,声明队列,声明交换机,绑定队列与交换机。
@Configuration
public class RabbitMQConfig {
//交换机名称
private static final String GOODS_UP_EXCHANGE="goods_up_exchange";
//定义队列名称
private static final String SEARCH_ADD_QUEUE="search_add_queue";
//定义队列名称
public static final String AD_UPDATE_QUEUE="ad_update_queue";
//声明队列
@Bean
public Queue queue(){
return new Queue(AD_UPDATE_QUEUE);
}
//声明队列
@Bean(AD_UPDATE_QUEUE)
public Queue AD_UPDATE_QUEUE(){
return new Queue(AD_UPDATE_QUEUE);
}
//声明交换机
@Bean(GOODS_UP_EXCHANGE)
public Exchange GOODS_UP_EXCHANGE(){
return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();
}
//队列绑定交换机
@Bean
public Binding AD_UPDATE_QUEUE_BINDING(@Qualifier(AD_UPDATE_QUEUE) Queue queue,@Qualifier(GOODS_UP_EXCHANGE) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
(2)数据监控微服务新增SpuListener,添加以下代码:
@CanalEventListener
public class SpuListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@ListenPoint(schema = "changgou_goods",table = "tb_spu")
public void goodsUp(CanalEntry.EventType eventType,CanalEntry.RowData rowData){
//获取改变之前的数据并将这部分数据转换为map
Map<String,String> oldData=new HashMap<>();
rowData.getBeforeColumnsList().forEach((c)->oldData.put(c.getName(),c.getValue()));
//获取改变之后的数据并这部分数据转换为map
Map<String,String> newData = new HashMap<>();
rowData.getAfterColumnsList().forEach((c)->newData.put(c.getName(),c.getValue()));
//获取最新上架的商品 0->1
if ("0".equals(oldData.get("is_marketable")) && "1".equals(newData.get("is_marketable"))){
//将商品的spuid发送到mq
rabbitTemplate.convertAndSend(RabbitMQConfig.GOODS_UP_EXCHANGE,"",newData.get("id"));
}
}
}
2.2 ES索引库环境准备
Linux虚拟机镜像中包含elasticsearch的相关docker镜像
2.3 创建索引结构
新建shangcheng_service_search_api模块,并添加索引库实体类
(1) 添加依赖
<dependencies>
<dependency>
<groupId>com.shangcheng</groupId>
<artifactId>shangcheng_common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
</dependencies>
(2) 创建实体类
@Document(indexName = "skuinfo", type = "docs")
public class SkuInfo implements Serializable {
//商品id,同时也是商品编号
@Id
@Field(index = true, store = true, type = FieldType.Keyword)
private Long id;
//SKU名称
@Field(index = true, store = true, type = FieldType.Text, analyzer = "ik_smart")
private String name;
//商品价格,单位为:元
@Field(index = true, store = true, type = FieldType.Double)
private Long price;
//库存数量
@Field(index = true, store = true, type = FieldType.Integer)
private Integer num;
//商品图片
@Field(index = false, store = true, type = FieldType.Text)
private String image;
//商品状态,1-正常,2-下架,3-删除
@Field(index = true, store = true, type = FieldType.Keyword)
private String status;
//创建时间
private Date createTime;
//更新时间
private Date updateTime;
//是否默认
@Field(index = true, store = true, type = FieldType.Keyword)
private String isDefault;
//SPUID
@Field(index = true, store = true, type = FieldType.Long)
private Long spuId;
//类目ID
@Field(index = true, store = true, type = FieldType.Long)
private Long categoryId;
//类目名称
@Field(index = true, store = true, type = FieldType.Keyword)
private String categoryName;
//品牌名称
@Field(index = true, store = true, type = FieldType.Keyword)
private String brandName;
//规格
private String spec;
//规格参数
private Map<String, Object> specMap;
//getter & setter略
}
2.4 搜索微服务搭建
(1)创建shangcheng_service_search模块,pom.xml引入依赖
<dependencies>
<dependency>
<groupId>com.shangcheng</groupId>
<artifactId>shangcheng_common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>com.shangcheng</groupId>
<artifactId>shangcheng_service_goods_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.shangcheng</groupId>
<artifactId>shangcheng_service_search_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
(2)shagncheng_service_search的application.yml
server:
port: 9009
spring:
application:
name: search
rabbitmq:
host: 192.168.200.128
redis:
host: 192.168.200.128
main:
allow-bean-definition-overriding: true #当遇到同样名字的时候,是否允许覆盖注册
data:
elasticsearch:
cluster-name: elasticsearch
cluster-nodes: 192.168.200.128:9300
thymeleaf:
cache: false
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:6868/eureka
instance:
prefer-ip-address: true
feign:
hystrix:
enabled: true
client:
config:
default: #配置全局的feign的调用超时时间 如果 有指定的服务配置 默认的配置不会生效
connectTimeout: 600000 # 指定的是 消费者 连接服务提供者的连接超时时间 是否能连接 单位是毫秒
readTimeout: 600000 # 指定的是调用服务提供者的 服务 的超时时间() 单位是毫秒
#hystrix 配置
hystrix:
command:
default:
execution:
timeout:
#如果enabled设置为false,则请求超时交给ribbon控制
enabled: false
isolation:
strategy: SEMAPHORE
(3)创建com.shangcheng包,包下创建SearchApplication
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients(basePackages = {"com.shangcheng.goods.feign"})
public class SearchApplication {
public static void main(String[] args) {
SpringApplication.run(SearchApplication.class,args);
}
}
(4) 将rabbitmq配置类放入该模块下
package com.shangcheng.search.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//定义交换机名称
public static final String GOODS_UP_EXCHANGE="goods_up_exchange";
public static final String GOODS_DOWN_EXCHANGE="goods_down_exchange";
//定义队列名称
public static final String AD_UPDATE_QUEUE="ad_update_queue";
public static final String SEARCH_ADD_QUEUE="search_add_queue";
public static final String SEARCH_DEL_QUEUE="search_del_queue";
//声明队列
@Bean
public Queue queue(){
return new Queue(AD_UPDATE_QUEUE);
}
@Bean(SEARCH_ADD_QUEUE)
public Queue SEARCH_ADD_QUEUE(){
return new Queue(SEARCH_ADD_QUEUE);
}
@Bean(SEARCH_DEL_QUEUE)
public Queue SEARCH_DEL_QUEUE(){
return new Queue(SEARCH_DEL_QUEUE);
}
//声明交换机
@Bean(GOODS_UP_EXCHANGE)
public Exchange GOODS_UP_EXCHANGE(){
return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();
}
@Bean(GOODS_DOWN_EXCHANGE)
public Exchange GOODS_DOWN_EXCHANGE(){
return ExchangeBuilder.fanoutExchange(GOODS_DOWN_EXCHANGE).durable(true).build();
}
//队列与交换机的绑定
@Bean
public Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE)Queue queue,@Qualifier(GOODS_UP_EXCHANGE)Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
@Bean
public Binding GOODS_DOWN_EXCHANGE_BINDING(@Qualifier(SEARCH_DEL_QUEUE)Queue queue,@Qualifier(GOODS_DOWN_EXCHANGE)Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
2.5 商品服务查询商品信息的实现
(1) SkuController新增方法
@GetMapping("/spu/{spuId}")
public List<Sku> findSkuListBySpuId(@PathVariable("spuId") String spuId){
Map<String,Object> searchMap = new HashMap<>();
if (!"all".equals(spuId)){
searchMap.put("spuId",spuId);
}
searchMap.put("status","1");
List<Sku> skuList = skuService.findList(searchMap);
return skuList;
}
(2) shangcheng_service_goods_api新增common依赖
<dependencies>
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
(3) 定义skuFegin接口
@FeignClient(name="goods")
@RequestMapping("/sku")
public interface SkuFeign {
/***
* 多条件搜索品牌数据
* @param spuId
* @return
*/
@GetMapping("/sku/spu/{spuId}")
public List<Sku> findSkuListBySpuId(@PathVariable("spuId") String spuId);
}
2.6 搜索微服务批量导入数据逻辑
(1) 创建 com.shangcheng.search.dao包,并新增ESManagerMapper接口
public interface ESManagerMapper extends ElasticsearchRepository<SkuInfo,Long> {
}
(2)创建 com.shangcheng.search.service包,包下创建接口EsManagerService
public interface ESManagerService {
/**
* 创建索引库结构
*/
void createMappingAndIndex();
/**
* 导入全部数据到ES索引库
*/
void importAll();
/**
* 根据spuid导入数据到ES索引库
* @param spuId 商品id
*/
void importDataBySpuId(String spuId);
}
(2)创建com.shangcheng.search.service.impl包,包下创建服务实现类
@Service
public class ESManagerServiceImpl implements ESManagerService {
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@Autowired
private SkuFeign skuFeign;
@Autowired
private ESManagerMapper esManagerMapper;
//创建索引库结构
@Override
public void createMappingAndIndex() {
//创建索引
elasticsearchTemplate.createIndex(SkuInfo.class);
//创建映射
elasticsearchTemplate.putMapping(SkuInfo.class);
}
//导入全部sku集合进入到索引库
@Override
public void importAll() {
//查询sku集合
List<Sku> skuList = skuFeign.findSkuListBySpuId("all");
if (skuList == null || skuList.size()<=0){
throw new RuntimeException("当前没有数据被查询到,无法导入索引库");
}
//skulist转换为json
String jsonSkuList = JSON.toJSONString(skuList);
//将json转换为skuinfo
List<SkuInfo> skuInfoList = JSON.parseArray(jsonSkuList, SkuInfo.class);
for (SkuInfo skuInfo : skuInfoList) {
//将规格信息转换为map
Map specMap = JSON.parseObject(skuInfo.getSpec(), Map.class);
skuInfo.setSpecMap(specMap);
}
//导入索引库
esManagerMapper.saveAll(skuInfoList);
}
//根据spuid查询skuList,添加到索引库
@Override
public void importDataBySpuId(String spuId) {
List<Sku> skuList = skuFeign.findSkuListBySpuId(spuId);
if (skuList == null || skuList.size()<=0){
throw new RuntimeException("当前没有数据被查询到,无法导入索引库");
}
//将集合转换为json
String jsonSkuList = JSON.toJSONString(skuList);
List<SkuInfo> skuInfoList = JSON.parseArray(jsonSkuList, SkuInfo.class);
for (SkuInfo skuInfo : skuInfoList) {
//将规格信息进行转换
Map specMap = JSON.parseObject(skuInfo.getSpec(), Map.class);
skuInfo.setSpecMap(specMap);
}
//添加索引库
esManagerMapper.saveAll(skuInfoList);
}
}
(3) 创建com.shangcheng.search.controller.定义ESManagerController
@RestController
@RequestMapping("/manager")
public class ESManagerController {
@Autowired
private ESManagerService esManagerService;
//创建索引库结构
@GetMapping("/create")
public Result create(){
esManagerService.createMappingAndIndex();
return new Result(true, StatusCode.OK,"创建索引库结构成功");
}
//导入全部数据
@GetMapping("/importAll")
public Result importAll(){
esManagerService.importAll();
return new Result(true, StatusCode.OK,"导入全部数据成功");
}
}
2.7 接收mq消息执行导入
shangcheng_service_search工程创建com.shangcheng.search.listener包,包下创建类
@Component
public class GoodsUpListener {
@Autowired
private ESManagerService esManagerService;
@RabbitListener(queues = RabbitMQConfig.SEARCH_ADD_QUEUE)
public void receiveMessage(String spuId){
System.out.println("接收到的消息为: "+spuId);
//查询skulist,并导入到索引库
esManagerService.importDataBySpuId(spuId);
}
}
2.8 测试
(1)启动环境 eureka 、elasticsearch 、canal服务端、canal数据监控微服务、rabbitmq
(2)启动商品微服务、搜索微服务
(3)修改tb_spu某记录的is_marketable值为1,观察控制台输出,启动kibana查询记录是否导入成功