微服务实战——ElasticSearch(保存)
商品上架——ElasticSearch(保存)
0.商城架构图
1.商品Mapping
分析:商品上架在 es 中是存 sku 还是 spu ?
- 检索的时候输入名字,是需要按照 sku 的 title 进行全文检索的
- 检索使用商品规格,规格是 spu 的公共属性,每个 spu 是一样的
- 按照分类 id 进去的都是直接列出 spu 的,还可以切换。
- 我们如果将 sku 的全量信息保存到 es 中(包括 spu 属性)就太多量字段了。
- 我们如果将 spu 以及他包含的 sku 信息保存到 es 中,也可以方便检索。但是 sku 属于spu 的级联对象,在 es 中需要 nested 模型,这种性能差点。
- 但是存储与检索我们必须性能折中。
- 如果我们分拆存储,spu 和 attr 一个索引,sku 单独一个索引可能涉及的问题。
-
- 检索商品的名字,如“手机”,对应的 spu 有很多,我们要分析出这些 spu 的所有关联属性,再做一次查询,就必须将所有 spu_id 都发出去。假设有 1 万个数据,数据传输一次就10000*4=4MB;并发情况下假设 1000 检索请求,那就是 4GB 的数据,传输阻塞时间会很长,业务更加无法继续。
- 所以,我们如下设计,这样才是文档区别于关系型数据库的地方,宽表设计,不能去考虑数据库范式。
- 向ES添加商品属性映射
向ES添加商品属性映射
PUT product
{
"mappings":{
"properties": {
"skuId":{
"type": "long"
},
"spuId":{
"type": "keyword"
},
"skuTitle": {
"type": "text",
"analyzer": "ik_smart"
},
"skuPrice": {
"type": "keyword"
},
"skuImg":{
"type": "keyword",
"index": false,
"doc_values": false
},
"saleCount":{
"type":"long"
},
"hasStock": {
"type": "boolean"
},
"hotScore": {
"type": "long"
},
"brandId": {
"type": "long"
},
"catalogId": {
"type": "long"
},
"brandName": {
"type": "keyword",
"index": false,
"doc_values": false
},
"brandImg":{
"type": "keyword",
"index": false,
"doc_values": false
},
"catalogName": {
"type": "keyword",
"index": false,
"doc_values": false
},
"attrs": {
"type": "nested",
"properties": {
"attrId": {
"type": "long"
},
"attrName": {
"type": "keyword",
"index": false,
"doc_values": false
},
"attrValue": {
"type": "keyword"
}
}
}
}
}
}
index :
默认 true ,如果为 false ,表示该字段不会被索引,但是检索结果里面有,但字段本身不能
当做检索条件。
doc_values :
默认 true ,设置为 false ,表示不可以做排序、聚合以及脚本操作,这样更节省磁盘空间。
还可以通过设定 doc_values 为 true , index 为 false 来让字段不能被搜索但可以用于排序、聚合以及脚本操作:
spu在es中的存储模型分析总结
如果每个sku都存储规格参数,会有冗余存储,因为每个spu对应的sku的规格参数都一样。但是如果将规格参数单独建立索引会出现检索时出现大量数据传输的问题,会阻塞网络因此我们选用第一种存储模型,以空间换时间。
2.上架细节
上架是将后台的商品放在 es 中可以提供检索和查询功能:
- hasStock:代表是否有库存。默认上架的商品都有库存。如果库存无货的时候才需要更新一下 es
- 库存补上以后,也需要重新更新一下 es
- hotScore 是热度值,我们只模拟使用点击率更新热度。点击率增加到一定程度才更新热度值。
- 下架就是从 es 中移除检索项,以及修改 mysql 状态
商品上架步骤:
- 先在 es 中按照之前的 mapping 信息,建立 product 索引。
- 点击上架,查询出所有 sku 的信息,保存到 es 中
- es 保存成功返回,更新数据库的上架状态信息
3.数据一致性
- 商品无库存的时候需要更新 es 的库存信息
- 商品有库存也要更新 es 的信息
4.ES中的数组扁平化
关于“nested”,Nested datatype | Elasticsearch Guide [7.6] | Elastic
ES中数组的扁平化处理:
对象数组的扁平化:
内部对象字段数组的工作方式与您预期的不同。Lucene没有内部对象的概念,所以Elasticsearch将对象层次结构简化为字段名和值的简单列表。例如,以下文件:
PUT my_index/_doc/1
{
"group" : "fans",
"user" : [
{
"first" : "John",
"last" : "Smith"
},
{
"first" : "Alice",
"last" : "White"
}
]
}
在内部将转换成一个文档,看起来是这样的:
{
"group" : "fans",
"user.first" : [ "alice", "john" ],
"user.last" : [ "smith", "white" ]
}
查询my_index的映射
GET my_index/_mapping
{
"my_index" : {
"mappings" : {
"properties" : {
"group" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"user" : {
"properties" : {
"first" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"last" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
}
}
user.first和user.last字段被平铺成多值字段,alice和white之间的关联也丢失了。在查询alice和smith时,这个文档将将发生错误的匹配
GET my_index/_search
{
"query": {
"bool": {
"must": [
{ "match": { "user.first": "Alice" }},
{ "match": { "user.last": "Smith" }}
]
}
}
}
所想要的只是user.first="Alice",user.last="Smith",本身是查询不到的,但是却查询出来了两条结果:
{
"took" : 49,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 0.5753642,
"hits" : [
{
"_index" : "my_index",
"_type" : "_doc",
"_id" : "1",
"_score" : 0.5753642,
"_source" : {
"group" : "fans",
"user" : [
{
"first" : "John",
"last" : "Smith"
},
{
"first" : "Alice",
"last" : "White"
}
]
}
}
]
}
}
删除“my_index”索引
DELETE my_index
重新创建my_index索引
PUT my_index
{
"mappings": {
"properties": {
"user": {
"type": "nested"
}
}
}
}
重新插入数据
PUT my_index/_doc/1
{
"group" : "fans",
"user" : [
{
"first" : "John",
"last" : "Smith"
},
{
"first" : "Alice",
"last" : "White"
}
]
}
再次查询user.first="Alice",user.last="Smith"时,查询不到数据
GET my_index/_search
{
"query": {
"bool": {
"must": [
{ "match": { "user.first": "Alice" }},
{ "match": { "user.last": "Smith" }}
]
}
}
}
查询结果:
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 0,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
}
}
5.商品上架接口实现
商品上架需要在es中保存spu信息并更新spu的状态信息,由于SpuInfoEntity
与索引的数据模型并不对应,所以我们要建立专门的vo进行数据传输
1、商品上架接口
接口文档:商品系统 - 20、商品上架
POST /product/spuinfo/{spuId}/up
请求参数
分页数据
响应数据
{
"msg": "success",
"code": 0
}
功能效果
新增“com.cwh.common.to.es.SkuEsModel”类,代码如下:
package com.cwh.common.to.es;
import lombok.Data;
import java.math.BigDecimal;
import java.util.List;
@Data
public class SkuEsModel {
private Long skuId;
private Long spuId;
private String skuTitle;
private BigDecimal skuPrice;
private String skuImg;
private Long saleCount;
private boolean hasStock;
private Long hotScore;
private Long brandId;
private Long catalogId;
private String brandName;
private String brandImg;
private String catalogName;
private List attrs;
@Data
public static class Attr{
private Long attrId;
private String attrName;
private String attrValue;
}
}
编写商品上架的接口
修改“com.cwh.gulimall.product.controller.SpuInfoController”类,代码如下:
@PostMapping("spuinfo/{spuId}/up")
public R spuUp(@PathVariable("spuId") Long spuId){
spuInfoService.up(spuId);
return R.ok();
}
修改“com.cwh.gulimall.product.service.SpuInfoService”类,代码如下:
/**
* 商品上架
*
* @param spuId
*/
void up(Long spuId);
由于每个spu对应的各个sku的规格参数相同,因此我们要将查询规格参数提前,只查询一次
修改“com.cwh.gulimall.product.service.impl.SpuInfoServiceImpl”类,代码如下:
@Override
public void up(Long spuId) {
// 1、查出当前spuId对应的sku信息,品牌名字
List<SkuInfoEntity> skus = skuInfoService.getSkuBySpuId(spuId);
List<Long> skuIdList = skus.stream().map(SkuInfoEntity::getSkuId).collect(Collectors.toList());
// 2.1、发送远程调用,库存系统查询是否有库存
Map<Long, Boolean> stockMap = null;
try {
R r = wareFeignService.getSkusHasStock(skuIdList);
TypeReference<List<SkuHasStockVo>> typeReference = new TypeReference<List<SkuHasStockVo>>() {
};
stockMap = r.getData(typeReference).stream().collect(Collectors.toMap(SkuHasStockVo::getSkuId, SkuHasStockVo::getHasStock));
} catch (Exception e) {
log.error("库存服务查询异常,原因:", e);
}
// 2.4、查询当前sku的所有可以被用来检索的规格属性
List<ProductAttrValueEntity> baseAttrs = productAttrValueService.baseAttrListForSpu(spuId);
List<Long> attrIds = baseAttrs.stream().map(attr -> attr.getAttrId()).collect(Collectors.toList());
List<Long> searchAttrIds = attrService.selectSearchAttrs(attrIds);
Set<Long> idSet = new HashSet<>(searchAttrIds);
List<SkuEsModel.Attrs> attrsList = baseAttrs.stream().filter(item -> idSet.contains(item.getAttrId())).map(item -> {
SkuEsModel.Attrs attrs1 = new SkuEsModel.Attrs();
BeanUtils.copyProperties(item, attrs1);
return attrs1;
}).collect(Collectors.toList());
// 2、封装每个sku的信息
Map<Long, Boolean> finalStockMap = stockMap;
List<SkuEsModel> upProducts = skus.stream().map(sku -> {
// 组装需要的数据
SkuEsModel esModel = new SkuEsModel();
BeanUtils.copyProperties(sku, esModel);
esModel.setSkuPrice(sku.getPrice());
esModel.setSkuImg(sku.getSkuDefaultImg());
// 2.1、是否有库存 hasStock,hotScore
if (finalStockMap == null) {
esModel.setHasStock(true);
} else {
esModel.setHasStock(finalStockMap.get(sku.getSkuId()));
}
// 2.2、热度评分。0
esModel.setHotScore(0L);
// 2.3、查询品牌和分类的名字信息
BrandEntity brand = brandService.getById(esModel.getBrandId());
esModel.setBrandName(brand.getName());
esModel.setBrandImg(brand.getLogo());
CategoryEntity category = categoryService.getById(esModel.getCatalogId());
esModel.setCatalogName(category.getName());
// 2.4、设置检索属性
esModel.setAttrs(attrsList);
System.out.println("======================esModel" + esModel);
return esModel;
}).collect(Collectors.toList());
// 3、将数据发送给es进行保存
R r = searchFeignService.productStatusUp(upProducts);
System.out.println("=========================" + r);
if (r.getCode() == 0) {
//远程调用成功
// 3.1、修改当前spu的状态
System.out.println("修改当前spu的状态");
baseMapper.updateSpuStatus(spuId, ProductConstant.StatusEnum.SPU_UP.getCode());
} else {
// 远程调用失败
// TODO 3.2、重复调用?接口幂等性;重试机制
/**
* Feign调用流程:
* 1、构造请求数据,将对象转为json
* RequestTemplate template = buildTemplateFromArgs.create(argv);
* 2、发送请求进行执行(执行成功会解码响应数据)
* executeAndDecode(template)
* 3、执行请求会有重试机制
* while(true){
* try{
* executeAndDecode(template);
* }catch(){
* retryer.continueOrPropagate(e);
* throw ex;
* continue;
* }
* }
*/
}
}
2、查出当前spuId对应的sku信息,品牌名字
修改“com.cwh.gulimall.product.service.SkuInfoService”类,代码如下:
/**
* 查出当前spuId对应的sku信息
*
* @param spuId
* @return
*/
List<SkuInfoEntity> getSkuBySpuId(Long spuId);
修改“com.cwh.gulimall.product.service.impl.SkuInfoServiceImpl”类,代码如下:
@Override
public List<SkuInfoEntity> getSkuBySpuId(Long spuId) {
List<SkuInfoEntity> list = this.list(new QueryWrapper<SkuInfoEntity>().eq("spu_id", spuId));
return list;
}
3、封装每个sku的信息
3.1、发送远程调用,库存系统查询是否有库存
修改“com.cwh.gulimall.product.feign.WareFeignService”类,代码如下:
package com.cwh.gulimall.product.feign;
import com.cwh.common.utils.R;
import com.cwh.gulimall.product.vo.SkuHasStockVo;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
@FeignClient("gulimall-ware")
public interface WareFeignService {
/**
* 1、R设计的时候可以加上泛型
* 2、直接返回我们想要的结果
* 3、自己封装返回结果
* @param skuIds
* @return
*/
@PostMapping("/ware/waresku/hasStock")
R getSkusHasStock(@RequestBody List<Long> skuIds);
}
修改”com.cwh.gulimall.ware.controller.WareSkuController”,代码如下:
/**
* 查询sku是否有库存
*/
@PostMapping("hasStock")
public R getSkusHasStock(@RequestBody List<Long> skuIds){
// sku_id, stock
List<SkuHasStockVo> vos = wareSkuService.getSkusHasStock(skuIds);
return R.ok().setData(vos);
}
修改”com.cwh.gulimall.ware.service.WareSkuService”类,代码如下:
List getSkusHasStock(List skuIds);
修改”com.cwh.gulimall.ware.service.WareSkuService”类,代码如下:
@Override
public List<SkuHasStockVo> getSkusHasStock(List<Long> skuIds) {
List<SkuHasStockVo> collect = skuIds.stream().map(skuId -> {
SkuHasStockVo vo = new SkuHasStockVo();
// 查询sku的总库存量
Long count = baseMapper.getSkuStock(skuId);
vo.setSkuId(skuId);
vo.setHasStock(count == null ? false : count > 0);
return vo;
}).collect(Collectors.toList());
return collect;
}
修改“com.cwh.gulimall.ware.dao.WareSkuDao”类,代码如下
Long getSkuStock(Long skuId);
修改“com.cwh.gulimall.ware.dao.WareSkuDao.xml”类,代码如下
<select id="getSkuStock" resultType="java.lang.Long">
select sum(stock - stock_locked) from wms_ware_sku where sku_id=#{sku_id}
</select>
2.2、查询当前sku的所有可以被用来检索的规格属性
修改“com.cwh.gulimall.product.service.AttrService”类,代码如下:
/**
* 在指定的所有属性集合里面,挑出检索属性
*
* @param attrIds
* @return
*/
List<Long> selectSearchAttrs(List<Long> attrIds);
修改“com.cwh.gulimall.product.service.impl.AttrServiceImpl”类,代码如下:
@Override
public List<Long> selectSearchAttrs(List<Long> attrIds) {
return baseMapper.selectSearchAttrIds(attrIds);
}
4、将数据发送给es进行保存
修改“com.cwh.gulimall.product.feign.SearchFeignService”类,代码如下:
package com.cwh.gulimall.product.feign;
import com.cwh.common.to.es.SkuEsModel;
import com.cwh.common.utils.R;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
@FeignClient("gulimall-search")
public interface SearchFeignService {
@PostMapping("search/save/product")
public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModels);
}
4.1、创建gulimall-search
1、添加pom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.cwh.gulimall</groupId>
<artifactId>gulimall-search</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>gulimall-search</name>
<description>ElasticSearch检索服务</description>
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.4.2</elasticsearch.version>
</properties>
<dependencies>
<!--导入es的rest-high-level-client-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.2</version>
</dependency>
<dependency>
<groupId>com.auguigu.gulimall</groupId>
<artifactId>gulimall-commom</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
2、修改yml
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
spring.application.name=gulimall-search
server.port=12000
3、添加主配置类
package com.cwh.gulimall.search;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@EnableDiscoveryClient
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class GulimallSearchApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallSearchApplication.class, args);
}
}
4、配置ElaseaticSearch
修改“com.cwh.gulimall.search.config.GulimallElasticSearchConfig”类,代表如下:
package com.cwh.gulimall.search.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestOperations;
/**
* 1、导入依赖
* 2、编写配置,给容器中注入一个RestHighLevelClient
* 3、参照API操作
*/
@Configuration
public class GulimallElasticSearchConfig {
public static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// builder.addHeader("Authorization", "Bearer " + TOKEN);
// builder.setHttpAsyncResponseConsumerFactory(
// new HttpAsyncResponseConsumerFactory
// .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
@Bean
public RestHighLevelClient restHighLevelClient() {
RestClientBuilder builder = RestClient.builder(new HttpHost("192.168.43.125", 9200, "http"));
return new RestHighLevelClient(builder);
}
}
修改“com.cwh.gulimall.search.controller.ElasticSaveController”类,代表如下:
package com.cwh.gulimall.search.controller;
import com.cwh.common.constant.ProductConstant;
import com.cwh.common.exception.BizCodeEnume;
import com.cwh.common.to.es.SkuEsModel;
import com.cwh.common.utils.R;
import com.cwh.gulimall.search.service.ProductSaveService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RequestMapping("/search/save")
@RestController
@Slf4j
public class ElasticSaveController {
@Autowired
ProductSaveService productSaveService;
/**
* 上架商品
*/
@PostMapping("/product")
public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModels) {
boolean b;
try {
b = productSaveService.productStatusUp(skuEsModels);
} catch (Exception e) {
log.error("ElasticSaveController商品上架错误:{}", e);
return R.error(BizCodeEnume.PRODUCT_UP_EXCEPTION.getCode(), BizCodeEnume.PRODUCT_UP_EXCEPTION.getMsg());
}
if (!b) {
return R.ok();
} else {
return R.error(BizCodeEnume.PRODUCT_UP_EXCEPTION.getCode(), BizCodeEnume.PRODUCT_UP_EXCEPTION.getMsg());
}
}
}
修改“com.cwh.gulimall.search.service.ProductSaveService”类,代表如下:
public class EsConstant {
public static final String PRODUCT_INDEX = "product"; //sku数据在es中的索引
}
修改“com.cwh.gulimall.search.service.impl.ProductSaveServiceImpl”类,代表如下:
package com.cwh.gulimall.search.service.impl;
import com.alibaba.fastjson.JSON;
import com.cwh.common.to.es.SkuEsModel;
import com.cwh.gulimall.search.config.GulimallElasticSearchConfig;
import com.cwh.gulimall.search.constant.EsConstant;
import com.cwh.gulimall.search.service.ProductSaveService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Service
public class ProductSaveServiceImpl implements ProductSaveService {
@Autowired
RestHighLevelClient restHighLevelClient;
@Override
public boolean productStatusUp(List<SkuEsModel> skuEsModels) throws IOException {
// 保存到es
// 1、给es中建立索引。product,建立好映射关系
// 2、给es中保存这些数据
// BulkRequest bulkRequest, RequestOptions options
BulkRequest bulkRequest = new BulkRequest();
for (SkuEsModel model : skuEsModels) {
// 1、构造保存请求
IndexRequest indexRequest = new IndexRequest(EsConstant.PRODUCT_INDEX);
indexRequest.id(model.getSkuId().toString());
String jsonString = JSON.toJSONString(model);
indexRequest.source(jsonString, XContentType.JSON);
bulkRequest.add(indexRequest);
}
BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);
// TODO 如果批量错误
boolean b = bulk.hasFailures();
List<String> collect = Arrays.stream(bulk.getItems()).map(item -> item.getId()).collect(Collectors.toList());
log.info("商品上架完成:{},返回数据:{}", collect, bulk.toString());
return b;
}
}
4.2、修改当前spu的状态
修改"com.cwh.gulimall.product.dao.SpuInfoDao"类,代码如下:
void updateSpuStatus(@Param("spuId") Long spuId,@Param("code") int code);
修改"com.cwh.gulimall.product.dao.SpuInfoDao.xml"类,代码如下:
<update id="updateSpuStatus">
update pms_spu_info set publish_status=#{code},update_time=NOW() where id =#{spuId}
</update>