当前位置: 首页 > article >正文

java_将数据存入elasticsearch进行高效搜索

使用技术简介:

(1) 使用Nginx实现反向代理,使前端可以调用多个微服务

(2) 使用nacos将多个服务管理关联起来

(3) 将数据存入elasticsearch进行高效搜索

(4) 使用消息队列rabbitmq进行消息的传递 

(5) 使用 openfeign 进行多个服务之间的api调用

参考: 56 尚上优选项目-平台管理端-整合ES+MQ实现商品上下架-功能最终测试_哔哩哔哩_bilibili

1. 使用Nginx实现反向代理

使用Nginx实现反向代理,使前端可以通过一个端口,调用多个微服务(的端口)

前端中的配置的base api端口 9901:

反向代理逻辑图:

图示 /acl /sys 为两个服务的名称中的路径字符串。

在nginx中的配置如下:

2. 使用nacos将多个服务管理关联起来

 通过nacos将多个服务关联起来,这里实现一个产品上架(存入elasticsearch仓库,简称es)下架(从es仓库删除)的功能.service-product提供商品的信息,service-search通过远程调用(FeignClient)调用service-product的接口,获取商品的具体信息,存入或者从es中删除。

service-product 和 service-search 两个服务通过消息队列进行通讯(rabbitmq消息队列, pom名称:spring-cloud-starter-bus-amqp)

nacos部分:pom依赖:

<!--服务注册 -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

在各个服务中添加在naco的注册:示例 service-produce模块的application.yml中添加cloud.nacos.discovery.server-addr

 在 main主程序中添加 启用Nacos服务发现注解:@EnableDiscoveryClient

同理在service-search模块中:

启动nacos服务,同时启动自己的服务,看自己的服务是否注册成功:

下载安装合适版本的nacos(版本不对应会出兼容性问题):

到nacos目录,执行:

.\bin\startup.cmd -m standalone

启动nacos后,启动自己的服务,如自己的服务注册成功,会在log中有如下提示:

在浏览器中输入: http://192.168.136.1:8848/nacos/index.html 也可看是否启动成功,若你的目标服务在“服务列表”中,说明服务注册成功,nacos正常运行,服务可以被关联起来,可以通过restful风格进行数据的传递了。

3. 使用消息队列rabbitmq进行消息的传递 

在需要使用的服务service-search的pom.xml中添加依赖:

<!--rabbitmq消息队列-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

安装rabbitmq服务,示例在虚拟机docker中安装 rabbitmq:3.8-management:

#拉取镜像
docker pull rabbitmq:3.8-management
#创建容器启动
docker run -d --restart=always -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8-management

查看安装rabbitmq的虚拟机ip地址: 

docker中安装好后,可以在本机(win11)上测试下安装是否成功:浏览器中输入:http://192.168.68.132:15672/ (注意这里是自己虚拟机的ip地址)

根据自己虚拟机的地址,在需要使用的服务service-search 和 serivice-product 中配置ip地址和端口号等信息:

 

编写代码,使用 RabbitTemplate, 进行消息的发送:

进行消息发送:指定j交换机exchange 字符串(自定义一个),路由器routing, 发送内容 Object message (这里是 Long skuId)

为了让发送的消息能构正确解析,需要定义一个@Configuration, 使用Jackson2JsonMessageConverter,进行消息的类型转换,这里是将Long skuId 转换成Json格式:

package com.atguigu.ssyx.mq.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

注意rabbit端还需要对 RabbitTemplate 做一些初始化操作,参考init():

package com.atguigu.ssyx.mq.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class MQProducerAckConfig implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback {

    //  我们发送消息使用的是 private RabbitTemplate rabbitTemplate; 对象
    //  如果不做设置的话 当前的rabbitTemplate 与当前的配置类没有任何关系!
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //  设置 表示修饰一个非静态的void方法,在服务器加载Servlet的时候运行。并且只执行一次!
    // PostConstruct注解的函数:在 Spring 容器创建并初始化 Bean 后自动调用,一个类中只能有一个 @PostConstruct 注解的方法
    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 表示消息是否正确发送到了交换机上
     *
     * @param correlationData 消息的载体
     * @param ack             判断是否发送到交换机上
     * @param cause           原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息发送成功!");
        } else {
            System.out.println("消息发送失败!" + cause);
        }
    }

    /**
     * 消息如果没有正确发送到队列中,则会走这个方法!如果消息被正常处理,则这个方法不会走!
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        System.out.println("消息主体: " + new String(message.getBody()));
        System.out.println("应答码: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routing : " + routingKey);
    }
}

消息的接收端(消费者) ,通过RabbitListener 定位到到发送者发送的消息队列上:

package com.atguigu.ssyx.receiver;
import com.atguigu.ssyx.mq.constant.MqConst;
import com.rabbitmq.client.Channel;
import com.atguigu.ssyx.search.service.SkuService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class SkuReceiver {

    @Autowired
    private SkuService skuService;

    /**
     * 商品上架
     * @param skuId
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(bindings = @QueueBinding( //绑定接收什么消息 //消费者,接收消息
            value=@Queue(value = MqConst.QUEUE_GOODS_UPPER, durable = "true"), // durable 持久化
            exchange = @Exchange(value = MqConst.EXCHANGE_GOODS_DIRECT),
            key = {MqConst.ROUTING_GOODS_UPPER}
    ))
    public void upperSku(Long skuId, Message message, Channel channel) throws IOException {
        // 发送者:   rabbitService.sendMsg(MqConst.EXCHANGE_GOODS_DIRECT,
        //                      MqConst.ROUTING_GOODS_UPPER,
        //                      skuId);
        //:发送者函数原型:public boolean sendMsg(String exchange, String routingKey, Object message)
        // 这里的Object message使用 public MessageConverter messageConverter() 方法转成了json格式,
        // 确保了生产者和消费者之间的消息序列化与反序列化逻辑一致,
        // 所以 Object message 对应这里接收者的 Long skuId 形参,实现了参数传递的目的。
        try{
            if(skuId != null) {
                skuService.upperSku(skuId);
            }
            /**
             * 第一个参数:表示收到的消息的标号
             * 第二个参数:如果为true表示可以签收多个消息
             */
            //DeliveryTag 交货标签
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            System.out.print("john:IOException occurred while processing message");
            throw e;  // 或者处理该异常
        }
    }

    /**
     * 商品下架
     * @param skuId
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = MqConst.QUEUE_GOODS_LOWER, durable = "true"),
            exchange = @Exchange(value = MqConst.EXCHANGE_GOODS_DIRECT), //交换器
            key = {MqConst.ROUTING_GOODS_LOWER} //路由器
    ))
    public void lowerSku(Long skuId, Message message, Channel channel) throws IOException {
        try {
            if (skuId != null) {
                skuService.lowerSku(skuId);
            }
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
        catch (IOException e) {
        System.out.print("john:IOException occurred while processing message");
        throw e;  // 或者处理该异常
    }
    }
}

注意着的对应关系:

public boolean sendMsg(String exchange, String routingKey, Object message

sendMsg中的 Object message 被 public MessageConverter messageConverter() 正确解析成了 Long skuId

upperSku(Long skuId, Message message, Channel channel)

至此,rabbitmq完成了从service-product传递一个skuId到service-serach的动作。

若有消息发送送时,界面中会出翔对应的消息队列的名称:

4.将数据存入elasticsearch进行高效搜索

elasticsearch的配置

下载elasticsearch Past Releases of Elastic Stack Software | Elastic, elasticsearch-7.8.0-windows-x86_64.zip (注意版本不对,会有兼容性问题)

下载对应版本的分词器:elasticsearch-analysis-ik-7.8.0.zip 

下载 kibana来进行客户端操作 kibana-7.8.0-windows-x86_64.zip

解压 elasticsearch-7.8.0-windows-x86_64文件,在

path\elasticsearch-7.8.0-windows-x86_64\elasticsearch-7.8.0\plugins\ 目录下新建文件夹,命名为ik,解压elasticsearch-analysis-ik-7.8.0,将内容所有内容copy到ik目录中:

在 es目录下运行 .\bin\elasticsearch.bat 启动es:

可以看到启动成功了:

浏览器界面看下 http://localhost:9200/ ,出现下面界面,说明启动成功

配置kibana

下载,解压,修改如下3个配置 (不同版本可能不一样,有增加或者删除项):

启动kibana:

 .\bin\kibana.bat

浏览器访问看下:http://127.0.0.1:5601/ 

分词器的测试:

代码配置

首先定义一个存储仓库对应的类  SkuRepository, 继承自 ElasticsearchRepository:

package com.atguigu.ssyx.search.repository;
import com.atguigu.ssyx.model.search.SkuEs;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

public interface SkuRepository extends ElasticsearchRepository<SkuEs, Long> {
}

使用 skuRepository.save(skuEs) 方法,即可存入 es仓库中。

使用 skuRepository.deleteById(skuId); 即可从仓库中删除。 

使用 ProductFeignClient 进行跨服务访问

因为service-serach要使用service-product的信息,但时两个服务,这是需要使用 ProductFeignClient 进行restful风格的api传递参数,进行远程调用。

首先需要pom中引入依赖:

<!-- 服务调用feign -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
    <scope>provided </scope>
</dependency>

在对应的 service-serach的application-dev.yml中进行配置:

在主程序中药加入远程调用注释: @EnableFeignClients 

然后,定义service-product中的api,将参数传入到指定的api接口:

package com.atguigu.ssyx.product.api;

import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import com.atguigu.ssyx.product.service.CategoryService;
import com.atguigu.ssyx.product.service.SkuInfoService;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
// 内部调用,不是前端调用
@RequestMapping("/api/product") //该类是为了内部调用,供service-search使用,所以命名为内部inner
public class ProductInnerController {

    @Autowired
    private CategoryService categoryService;

    @Autowired
    private SkuInfoService skuInfoService;

    //根据categoryId获取商品的category信息
    @ApiOperation(value = "根据categoryId获取商品的category信息")
    @GetMapping("inner/getCategory/{categoryId}")
    public Category getCategoryById(@PathVariable("categoryId") Long categoryId) {
        return categoryService.getById(categoryId);
    }

    //根据skuId获取skuInfo
    @ApiOperation(value = "根据skuId获取skuInfo信息")
    @GetMapping("inner/getSkuInfo/{skuId}")
    public SkuInfo getSkuInfoById(@PathVariable("skuId") Long skuId) {
        return skuInfoService.getById(skuId);
    }

}

定义使用该api的函数,并使用@FeignClient 注明从哪个模块进行api的对接:

package com.atguigu.ssyx.client.product;

import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

//与service\service-product\src\main\resources\application.yml中的application。name名称一致
@FeignClient(value = "service-product")
public interface ProductFeignClient {
    //作为\service\service-product中的\product\api\ProductInnerController.java
    //中函数的接口定义文件
    //注意要使用完整的restful风格的路径
    //用于远程调用(service-search远程调用service-product)
    @GetMapping("/api/product/inner/getCategory/{categoryId}")
    public Category getCategoryById(@PathVariable("categoryId") Long categoryId);

    @GetMapping("/api/product/inner/getSkuInfo/{skuId}")
    public SkuInfo getSkuInfoById(@PathVariable("skuId") Long skuId);

}

定义feign对象 productFeignClient,调用api对应的函数接口:

SkuInfo skuInfo = productFeignClient.getSkuInfoById(skuId);
// ...
Category category = productFeignClient.getCategoryById(skuInfo.getCategoryId());
package com.atguigu.ssyx.search.service.impl;

import com.alibaba.fastjson.JSON;
import com.atguigu.ssyx.client.product.ProductFeignClient;
import com.atguigu.ssyx.enums.SkuType;
import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import com.atguigu.ssyx.model.search.SkuEs;
import com.atguigu.ssyx.search.repository.SkuRepository;
import com.atguigu.ssyx.search.service.SkuService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import static jdk.nashorn.internal.runtime.regexp.joni.Config.log;

@Slf4j
@Service
public class SkuServiceImpl implements SkuService {

    //通过service-product-client中的feign远程调用service-product中的方法
    @Autowired
    private ProductFeignClient productFeignClient;

    //写入ES,即ElasticsearchRepository的接口类
    @Autowired
    private SkuRepository skuRepository;

    @Override
    public void upperSku(Long skuId) {
        SkuEs skuEs = new SkuEs();

        //为skuEs一个一个属性赋值
        SkuInfo skuInfo = productFeignClient.getSkuInfoById(skuId);
        if (skuInfo == null) {
            return;
        }
        Category category = productFeignClient.getCategoryById(skuInfo.getCategoryId());
        if (category != null) {
            skuEs.setCategoryId(category.getId());
            skuEs.setCategoryName(category.getName());
        }
        skuEs.setId(skuInfo.getId());
        skuEs.setKeyword(skuInfo.getSkuName() + "," + skuEs.getCategoryName()); //keyword不分词查询
        skuEs.setWareId(skuInfo.getWareId());
        skuEs.setIsNewPerson(skuInfo.getIsNewPerson());
        skuEs.setImgUrl(skuInfo.getImgUrl());
        //分词查询的字段数据类型必须是 FieldType.Text
        skuEs.setTitle(skuInfo.getSkuName());
        if (skuInfo.getSkuType() == SkuType.COMMON.getCode()) //普通还是秒杀
        {
            skuEs.setSkuType(0);
            skuEs.setPrice(skuInfo.getPrice().doubleValue());
            skuEs.setStock(skuInfo.getStock()); //仓库数量
            skuEs.setSale(skuInfo.getSale());
            skuEs.setPerLimit(skuInfo.getPerLimit()); //每人限购数量
        } else {
            //TODO 待完善-秒杀商品
        }

        //使用 ElasticsearchRepository 提供的方法保存ES信息
        SkuEs save = skuRepository.save(skuEs);
        System.out.print("upperSku:" + JSON.toJSONString(save));

    }

    @Override
    public void lowerSku(Long skuId) {
        //使用 ElasticsearchRepository 提供的方法删除ES信息
        skuRepository.deleteById(skuId);
    }
}

5. 效果验证

实验中的表的名称是skues:

对应

public interface SkuRepository extends ElasticsearchRepository<SkuEs, Long> {
}

对应代码中的名称: 

当服务中的  public void upperSku(Long skuId) 被调用时,即 skuRepository.save(skuEs);被调用时,对应的数据就会写入 es中,可在 kibana中进行查看:

使用下面命令进行查看:

GET /_cat/indices?v

查看更多数据,(数据结构取决于自己数据库和代码中对数据的定义):

使用如下命令,其中skues为你的index (表名)名称

POST /skues/_search
{
    "query":{
        "match_all":{}
    }
}

结果有了:


http://www.kler.cn/a/503006.html

相关文章:

  • 基于微信小程序的智能停车场管理系统设计与实现(LW+源码+讲解)
  • C++中引用参数与指针参数的区别与联系详解
  • 无需昂贵GPU:本地部署开源AI项目LocalAI在消费级硬件上运行大模型
  • Windows图形界面(GUI)-QT-C/C++ - QT控件创建管理初始化
  • 【STM32-学习笔记-9-】SPI通信
  • 基于单片机的智能花卉浇水系统的设计与实现
  • 【centos】校时服务创建-频率修改
  • Spring Boot整合SSE实时通信
  • ansible 知识点【回顾梳理】
  • Unity TextMesh Pro入门
  • 谈谈人工智能在中国:现状与未来展望
  • Swift语言的正则表达式
  • 理解AJAX与Axios:异步编程的世界
  • MySQL的多表设计和查询,事务以及索引了解
  • Web基础-分层解耦-三层架构
  • 数据仓库: 10- 新技术与趋势
  • day38 tcp 并发 ,linux下的IO模型----IO多路复用
  • linux的大内核锁与顺序锁
  • TaskBuilder前端页面CSS样式规则设置
  • python学opencv|读取图像(二十九)使用cv2.getRotationMatrix2D()函数旋转缩放图像
  • paddle——站在巨人肩膀上及背刺二三事
  • 如何用 SSH 访问 QNX 虚拟机
  • 如何用bigemap pro裁剪矢量数据?
  • 【MySQL】基础架构分析
  • 模板方法模式详解
  • Java 泛型的用法