java_将数据存入elasticsearch进行高效搜索
使用技术简介:
(1) 使用Nginx实现反向代理,使前端可以调用多个微服务
(2) 使用nacos将多个服务管理关联起来
(3) 将数据存入elasticsearch进行高效搜索
(4) 使用消息队列rabbitmq进行消息的传递
(5) 使用 openfeign 进行多个服务之间的api调用
参考: 56 尚上优选项目-平台管理端-整合ES+MQ实现商品上下架-功能最终测试_哔哩哔哩_bilibili
1. 使用Nginx实现反向代理
使用Nginx实现反向代理,使前端可以通过一个端口,调用多个微服务(的端口)
前端中的配置的base api端口 9901:
反向代理逻辑图:
图示 /acl /sys 为两个服务的名称中的路径字符串。
在nginx中的配置如下:
2. 使用nacos将多个服务管理关联起来
通过nacos将多个服务关联起来,这里实现一个产品上架(存入elasticsearch仓库,简称es)下架(从es仓库删除)的功能.service-product提供商品的信息,service-search通过远程调用(FeignClient)调用service-product的接口,获取商品的具体信息,存入或者从es中删除。
service-product 和 service-search 两个服务通过消息队列进行通讯(rabbitmq消息队列, pom名称:spring-cloud-starter-bus-amqp)
nacos部分:pom依赖:
<!--服务注册 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency>
在各个服务中添加在naco的注册:示例 service-produce模块的application.yml中添加cloud.nacos.discovery.server-addr
在 main主程序中添加 启用Nacos服务发现注解:@EnableDiscoveryClient
同理在service-search模块中:
启动nacos服务,同时启动自己的服务,看自己的服务是否注册成功:
下载安装合适版本的nacos(版本不对应会出兼容性问题):
到nacos目录,执行:
.\bin\startup.cmd -m standalone
启动nacos后,启动自己的服务,如自己的服务注册成功,会在log中有如下提示:
在浏览器中输入: http://192.168.136.1:8848/nacos/index.html 也可看是否启动成功,若你的目标服务在“服务列表”中,说明服务注册成功,nacos正常运行,服务可以被关联起来,可以通过restful风格进行数据的传递了。
3. 使用消息队列rabbitmq进行消息的传递
在需要使用的服务service-search的pom.xml中添加依赖:
<!--rabbitmq消息队列--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency>
安装rabbitmq服务,示例在虚拟机docker中安装 rabbitmq:3.8-management:
#拉取镜像
docker pull rabbitmq:3.8-management
#创建容器启动
docker run -d --restart=always -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8-management
查看安装rabbitmq的虚拟机ip地址:
docker中安装好后,可以在本机(win11)上测试下安装是否成功:浏览器中输入:http://192.168.68.132:15672/ (注意这里是自己虚拟机的ip地址)
根据自己虚拟机的地址,在需要使用的服务service-search 和 serivice-product 中配置ip地址和端口号等信息:
编写代码,使用 RabbitTemplate, 进行消息的发送:
进行消息发送:指定j交换机exchange 字符串(自定义一个),路由器routing, 发送内容 Object message (这里是 Long skuId)
为了让发送的消息能构正确解析,需要定义一个@Configuration, 使用Jackson2JsonMessageConverter,进行消息的类型转换,这里是将Long skuId 转换成Json格式:
package com.atguigu.ssyx.mq.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
注意rabbit端还需要对 RabbitTemplate 做一些初始化操作,参考init():
package com.atguigu.ssyx.mq.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class MQProducerAckConfig implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback {
// 我们发送消息使用的是 private RabbitTemplate rabbitTemplate; 对象
// 如果不做设置的话 当前的rabbitTemplate 与当前的配置类没有任何关系!
@Autowired
private RabbitTemplate rabbitTemplate;
// 设置 表示修饰一个非静态的void方法,在服务器加载Servlet的时候运行。并且只执行一次!
// PostConstruct注解的函数:在 Spring 容器创建并初始化 Bean 后自动调用,一个类中只能有一个 @PostConstruct 注解的方法
@PostConstruct
public void init(){
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
/**
* 表示消息是否正确发送到了交换机上
*
* @param correlationData 消息的载体
* @param ack 判断是否发送到交换机上
* @param cause 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功!");
} else {
System.out.println("消息发送失败!" + cause);
}
}
/**
* 消息如果没有正确发送到队列中,则会走这个方法!如果消息被正常处理,则这个方法不会走!
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.out.println("消息主体: " + new String(message.getBody()));
System.out.println("应答码: " + replyCode);
System.out.println("描述:" + replyText);
System.out.println("消息使用的交换器 exchange : " + exchange);
System.out.println("消息使用的路由键 routing : " + routingKey);
}
}
消息的接收端(消费者) ,通过RabbitListener 定位到到发送者发送的消息队列上:
package com.atguigu.ssyx.receiver;
import com.atguigu.ssyx.mq.constant.MqConst;
import com.rabbitmq.client.Channel;
import com.atguigu.ssyx.search.service.SkuService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class SkuReceiver {
@Autowired
private SkuService skuService;
/**
* 商品上架
* @param skuId
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(bindings = @QueueBinding( //绑定接收什么消息 //消费者,接收消息
value=@Queue(value = MqConst.QUEUE_GOODS_UPPER, durable = "true"), // durable 持久化
exchange = @Exchange(value = MqConst.EXCHANGE_GOODS_DIRECT),
key = {MqConst.ROUTING_GOODS_UPPER}
))
public void upperSku(Long skuId, Message message, Channel channel) throws IOException {
// 发送者: rabbitService.sendMsg(MqConst.EXCHANGE_GOODS_DIRECT,
// MqConst.ROUTING_GOODS_UPPER,
// skuId);
//:发送者函数原型:public boolean sendMsg(String exchange, String routingKey, Object message)
// 这里的Object message使用 public MessageConverter messageConverter() 方法转成了json格式,
// 确保了生产者和消费者之间的消息序列化与反序列化逻辑一致,
// 所以 Object message 对应这里接收者的 Long skuId 形参,实现了参数传递的目的。
try{
if(skuId != null) {
skuService.upperSku(skuId);
}
/**
* 第一个参数:表示收到的消息的标号
* 第二个参数:如果为true表示可以签收多个消息
*/
//DeliveryTag 交货标签
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
System.out.print("john:IOException occurred while processing message");
throw e; // 或者处理该异常
}
}
/**
* 商品下架
* @param skuId
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConst.QUEUE_GOODS_LOWER, durable = "true"),
exchange = @Exchange(value = MqConst.EXCHANGE_GOODS_DIRECT), //交换器
key = {MqConst.ROUTING_GOODS_LOWER} //路由器
))
public void lowerSku(Long skuId, Message message, Channel channel) throws IOException {
try {
if (skuId != null) {
skuService.lowerSku(skuId);
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
catch (IOException e) {
System.out.print("john:IOException occurred while processing message");
throw e; // 或者处理该异常
}
}
}
注意着的对应关系:
public boolean sendMsg(String exchange, String routingKey, Object message)
sendMsg中的 Object message 被 public MessageConverter messageConverter() 正确解析成了 Long skuId
upperSku(Long skuId, Message message, Channel channel)
至此,rabbitmq完成了从service-product传递一个skuId到service-serach的动作。
若有消息发送送时,界面中会出翔对应的消息队列的名称:
4.将数据存入elasticsearch进行高效搜索
elasticsearch的配置
下载elasticsearch Past Releases of Elastic Stack Software | Elastic, elasticsearch-7.8.0-windows-x86_64.zip (注意版本不对,会有兼容性问题)
下载对应版本的分词器:elasticsearch-analysis-ik-7.8.0.zip
下载 kibana来进行客户端操作 kibana-7.8.0-windows-x86_64.zip
解压 elasticsearch-7.8.0-windows-x86_64文件,在
path\elasticsearch-7.8.0-windows-x86_64\elasticsearch-7.8.0\plugins\ 目录下新建文件夹,命名为ik,解压elasticsearch-analysis-ik-7.8.0,将内容所有内容copy到ik目录中:
在 es目录下运行 .\bin\elasticsearch.bat 启动es:
可以看到启动成功了:
浏览器界面看下 http://localhost:9200/ ,出现下面界面,说明启动成功
配置kibana
下载,解压,修改如下3个配置 (不同版本可能不一样,有增加或者删除项):
启动kibana:
.\bin\kibana.bat
浏览器访问看下:http://127.0.0.1:5601/
分词器的测试:
代码配置
首先定义一个存储仓库对应的类 SkuRepository, 继承自 ElasticsearchRepository:
package com.atguigu.ssyx.search.repository;
import com.atguigu.ssyx.model.search.SkuEs;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
public interface SkuRepository extends ElasticsearchRepository<SkuEs, Long> {
}
使用 skuRepository.save(skuEs) 方法,即可存入 es仓库中。
使用 skuRepository.deleteById(skuId); 即可从仓库中删除。
使用 ProductFeignClient 进行跨服务访问
因为service-serach要使用service-product的信息,但时两个服务,这是需要使用 ProductFeignClient 进行restful风格的api传递参数,进行远程调用。
首先需要pom中引入依赖:
<!-- 服务调用feign --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> <scope>provided </scope> </dependency>
在对应的 service-serach的application-dev.yml中进行配置:
在主程序中药加入远程调用注释: @EnableFeignClients
然后,定义service-product中的api,将参数传入到指定的api接口:
package com.atguigu.ssyx.product.api;
import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import com.atguigu.ssyx.product.service.CategoryService;
import com.atguigu.ssyx.product.service.SkuInfoService;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
// 内部调用,不是前端调用
@RequestMapping("/api/product") //该类是为了内部调用,供service-search使用,所以命名为内部inner
public class ProductInnerController {
@Autowired
private CategoryService categoryService;
@Autowired
private SkuInfoService skuInfoService;
//根据categoryId获取商品的category信息
@ApiOperation(value = "根据categoryId获取商品的category信息")
@GetMapping("inner/getCategory/{categoryId}")
public Category getCategoryById(@PathVariable("categoryId") Long categoryId) {
return categoryService.getById(categoryId);
}
//根据skuId获取skuInfo
@ApiOperation(value = "根据skuId获取skuInfo信息")
@GetMapping("inner/getSkuInfo/{skuId}")
public SkuInfo getSkuInfoById(@PathVariable("skuId") Long skuId) {
return skuInfoService.getById(skuId);
}
}
定义使用该api的函数,并使用@FeignClient 注明从哪个模块进行api的对接:
package com.atguigu.ssyx.client.product;
import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
//与service\service-product\src\main\resources\application.yml中的application。name名称一致
@FeignClient(value = "service-product")
public interface ProductFeignClient {
//作为\service\service-product中的\product\api\ProductInnerController.java
//中函数的接口定义文件
//注意要使用完整的restful风格的路径
//用于远程调用(service-search远程调用service-product)
@GetMapping("/api/product/inner/getCategory/{categoryId}")
public Category getCategoryById(@PathVariable("categoryId") Long categoryId);
@GetMapping("/api/product/inner/getSkuInfo/{skuId}")
public SkuInfo getSkuInfoById(@PathVariable("skuId") Long skuId);
}
定义feign对象 productFeignClient,调用api对应的函数接口:
SkuInfo skuInfo = productFeignClient.getSkuInfoById(skuId); // ... Category category = productFeignClient.getCategoryById(skuInfo.getCategoryId());
package com.atguigu.ssyx.search.service.impl;
import com.alibaba.fastjson.JSON;
import com.atguigu.ssyx.client.product.ProductFeignClient;
import com.atguigu.ssyx.enums.SkuType;
import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import com.atguigu.ssyx.model.search.SkuEs;
import com.atguigu.ssyx.search.repository.SkuRepository;
import com.atguigu.ssyx.search.service.SkuService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import static jdk.nashorn.internal.runtime.regexp.joni.Config.log;
@Slf4j
@Service
public class SkuServiceImpl implements SkuService {
//通过service-product-client中的feign远程调用service-product中的方法
@Autowired
private ProductFeignClient productFeignClient;
//写入ES,即ElasticsearchRepository的接口类
@Autowired
private SkuRepository skuRepository;
@Override
public void upperSku(Long skuId) {
SkuEs skuEs = new SkuEs();
//为skuEs一个一个属性赋值
SkuInfo skuInfo = productFeignClient.getSkuInfoById(skuId);
if (skuInfo == null) {
return;
}
Category category = productFeignClient.getCategoryById(skuInfo.getCategoryId());
if (category != null) {
skuEs.setCategoryId(category.getId());
skuEs.setCategoryName(category.getName());
}
skuEs.setId(skuInfo.getId());
skuEs.setKeyword(skuInfo.getSkuName() + "," + skuEs.getCategoryName()); //keyword不分词查询
skuEs.setWareId(skuInfo.getWareId());
skuEs.setIsNewPerson(skuInfo.getIsNewPerson());
skuEs.setImgUrl(skuInfo.getImgUrl());
//分词查询的字段数据类型必须是 FieldType.Text
skuEs.setTitle(skuInfo.getSkuName());
if (skuInfo.getSkuType() == SkuType.COMMON.getCode()) //普通还是秒杀
{
skuEs.setSkuType(0);
skuEs.setPrice(skuInfo.getPrice().doubleValue());
skuEs.setStock(skuInfo.getStock()); //仓库数量
skuEs.setSale(skuInfo.getSale());
skuEs.setPerLimit(skuInfo.getPerLimit()); //每人限购数量
} else {
//TODO 待完善-秒杀商品
}
//使用 ElasticsearchRepository 提供的方法保存ES信息
SkuEs save = skuRepository.save(skuEs);
System.out.print("upperSku:" + JSON.toJSONString(save));
}
@Override
public void lowerSku(Long skuId) {
//使用 ElasticsearchRepository 提供的方法删除ES信息
skuRepository.deleteById(skuId);
}
}
5. 效果验证
实验中的表的名称是skues:
对应
public interface SkuRepository extends ElasticsearchRepository<SkuEs, Long> {
}
对应代码中的名称:
当服务中的 public void upperSku(Long skuId) 被调用时,即 skuRepository.save(skuEs);被调用时,对应的数据就会写入 es中,可在 kibana中进行查看:
使用下面命令进行查看:
GET /_cat/indices?v
查看更多数据,(数据结构取决于自己数据库和代码中对数据的定义):
使用如下命令,其中skues为你的index (表名)名称
POST /skues/_search
{
"query":{
"match_all":{}
}
}
结果有了: