当前位置: 首页 > article >正文

整合ES(Elasticsearch)+MQ(RabbitMQ)实现商品上下架/跨模块远程调用

商品上下架过程中,修改数据库表上下架状态,之后通过RabbitMQ发送消息,最终实现ES中数据同步

  • nacos服务发现和注册
  • ES面向文档型数据库
  • RabbitMQ

在这里插入图片描述

ES

  1. 用户将数据提交到Elasticsearch数据库中
  2. 通过分词控制器将对应的语句分词
  3. 将其权重和分词结果一并存入数据
  4. 当用户搜索数据,在根据权重将结果排名、打分,并将结果呈现给用户

在这里插入图片描述
在这里插入图片描述

①、下载对应的ES
在这里插入图片描述
在这里插入图片描述

②、下载并安装分词器
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

③、启动ES(前提是系统配置好了JDK环境变量)

通过浏览器验证是否启动成功
在这里插入图片描述

④、官方推荐的客户端kibana
在这里插入图片描述
在这里插入图片描述

通过浏览器进行访问,通过DevTools工具
在这里插入图片描述
在这里插入图片描述

RabbitMQ

#拉取镜像
docker pull rabbitmq:3.8-management

#三个端口号:通信端口号,后台访问端口号,集群端口号
docker run -d --restart=always -p5672:5672 -p 12572:15672 --name rabbitmq rabbitmq:3.8-management

管理后台:http://ip:15672

在这里插入图片描述

业务模块

一、service-search模块消费者

①、依赖spring-boot-starter-ata-elasticsearch
远程调用spring-cloud-starter-openfeign
引入三定义service-client模块坐标
引入rabbit模块rabbit_util

②、配置文件

server:
	port: 8204
feign:
	sentinel: # 熔断机制
		enabled: true
	client:	
		config:
			default: #配置全局的feign的调用超时时间 如果有指定的服务配置,则默认的配置不会生效
				connectTimeout: 30000 #指定消费者连接服务提供者超时时间
				readTimeout: 50000 #调用服务提供者的服务超时时间
spring:
	main:
		allow-bean-definition-overriding: true#当遇到同样名字,是否允许覆盖注册
	elasticsearch:
		rest:
			uris: http://localhost:9200

	rabbitmq:
		host: 192.168.197.128
		port: 5672
		username: guest
		password: guest
		publisher-confirm-type: CORRELATED #发布确认模式,消息是否被成功发送到交换机
		publisher-returns: true
		listener:
			simple:
				prefetch: 1
				concurrency: 3
				acknowledge-mode: manual #消费端手动确认
	redis:
		host: localhost
		port: 6379
		database: 0
		timeout: 1800000
		password:
		lettuce:
			pool:
				max-active: 20#最大连接数
				max-wait: -1
				max-idle: 5#最大则色等待时间(负数表示没限制)
				min-idle: 0#最小空闲

③、启动类(当前模块不需要连接数据库,所以排除。否则需要在application.yml中配置数据库连接)

@SpringBootApplication(exclude=DatasourceAutoConfiguration.class)//取消数据源自动配置
@EnableDiscoveryClient
@EnableFeignClients //远程调用
public class ServiceSearchApplication{

}

④、业务接口、服务

@RestController
@RequestMapping("api/search/sku")
public class SkuApiController{
	
	@Autowired
	private SkuService skuService;

	//上架,将数据加入ES中
	@GetMapping("inner/upperSku/{skuId}")
	public Result upperSku(@PathVariable Long skuId){
		skuService.upperSku(skuId);
		return Result.ok(null);
	}

	//下架,将数据从ES中删除
	@GetMapping("inner/lowerSku/{skuId}")
	public Result lowerSku(@PathVariable Long skuId){
		skuService.lowerSku(skuId);
		return Result.ok(null);
	}
}

service接口忽略,只创建其是实现类

@Service
public class SkuServiceImpl implements SkuService{

	@Autowired
	private SkuRepository skuRepository;

	//注入远程调用的接口
	@Autowired
	private ProductFeignClient productFeignClient;
	
	Override
	public void upperSku(Long skuId){
		
		//远程调用获取数据
		SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
		if(skuInfo == null){
			return;
		}
		Category category = productFeignClient.getCategory(skuInfo.getCategoryId());
		
		SkuEs skuEs = new SkuEs();
		if(category != null){
			skuEs.setCategoryId(category.getId());
			skuEs.setCategoryName(category.getName());
		}

		//封装SKU信息
		skuEs.setId(skuId.getId());
		skuEs.setKeyWord(skuInfogetSkuName()+","+skuEs.getCategoryName());
		skuEs.setWareId(skuInfo.getWareId());
		skuEs.setIsNewPerson(skuInfo.getInsNewPerson());
		skuEs.setImgUrl(skuInfo.getImgUrl());
		skuEs.setTitle(skuInfo.getSkuName());
		if(skuInfo.getSkuTyoe()==SkuType.COMMON.getCode()){
			skuEs.setSkyType(0);
			skuEs.setPrice(skuInfo.getPrice().doubleValue());
			skuEs.setStock(skuInfo.getStock());
			skuEs.setSale(skuInfo.getSale());
			skuEs.setPerLimit(skuInfo.getPerLimit());
		}

		//调用方法添加到ES
		skuRepository.save(skuEs);
	}

	@Override
	public void lowerSku(Long skuId){
		skuRepository.deleteById(skuId);
	}
}

依赖中SpringData的模块,通过Spring提供的ElasticsearchRepository操作ES
泛型是操作的实体类和其主键

public interface SkuRepository extends ElasticsearchRepository<SkuEs,Long>{

}

⑤、创建接收rabbitMQ消息的类

@Component
public class SkuReceiver{
	
	@Autowired
	private SkuService skuService;

	//durable=true  表示对消息进行持久化
	@RabbitListener(bindings = @QueueBinding(
		value=@Queue(value=MqConst.QUEUE_GOODS_UPPER,durable="true"),
		exchange=@Exchange(value=MqConst.EXCHANGE_GOODS_DIRECT),
		key={MqConst.ROUTING_GOODS_UPPER}
		))
	public void upperSku(Long skuId,Message message,Channel channel){
		
		if(skuId != null){
			//调用方法商品上架
			skuService.upperSku(skuId);
		}
		//配置文件中的acknowledge-mode: manual #消费端手动确认   true表示多个消息
		channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
	}

	//商品下架
	@RabbitListener(bindings = @QueueBinding(
		value=@Queue(value=MqConst.QUEUE_GOODS_LOWER,durable="true"),
		exchange=@Exchange(value=MqConst.EXCHANGE_GOODS_DIRECT),
		key={MqConst.ROUTING_GOODS_LOWER}
		))
	public void lowerSku(Long skuId,Message message,Channel channel){
		if(skuId != null){
			skuService.lowerSku(skuId);
		}
		channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
	}
}

二、service-product模块,生产者

依赖导入rabbit-util模块坐标
配置文件application.yml中添加rabbit的连接信息

①、上架下架,发送消息给rabbitMQ

@ApiOperation("商品上下架")
@GetMapping("/publish/{skuId}/{status}")
public Result publish(@PathVariable Long skuId,@PathVariable Integer status){

	
	skuInfoService.publish(skuId,status);//发送消息给rabbitMQ
	return Result.ok(null);
}
//注入MQ
@Autowired
private RabbitService rabbitService;

@Override
public void publish(Long skuId,Integer status){

	if(status == 1){//上架
		SkuInfo skuInfo = baseMapper.selectById(skuId);
		skuInfo.setPublishStatus(status);
		baseMapper.updateById(skuIdFo);

		//上架时,整合mq把数据同步到es中
		rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT,MqConst.ROUTING_GOODS_UPPER,skuId);
	}else{//下架
		SkuInfo skuInfo = baseMapper.selectById(skuId);
		skuInfo.setPublishStatus(status);
		baseMapper.updateById(skuInfo);

		//下架,整合mq把数据同步到es中
		rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT,MqConst.ROUTING_GOODS_LOWER,skuId);
	}
}

②、为远程调用提供数据

@RestController
@RequestMapping("/api/product")
public class PruductInnerController{

	@Autowired
	private CategoryService categoryService;

	@Autowired
	private SkuInfoService skuInfoService;
	
	//根据SKUID获取分类信息
	@GetMapping("inner/getCategory/{categoryId}")
	public Category getCategory(@PathVariable Long categoryId){
		Category category = categoryService.getById(categoryId);
		return category;
	}

	//根据SKUID获取SKU信息
	@GetMapping("inner/getSkuInfo/{skuId}")
	public SkuInfo getSkuInfo(@PathVariable Long skuId){
		return skuInfoService.getById(skuId);
	}
}

三、service-client模块

定义生产者的接口

@FeignClient(value="service-product") 
pubic interface ProductFeignClient{
	
	@GetMapping("/api/product/inner/getCategory/{categoryId}")
	public Category getCategory(@PathVariable("categoryId") Long categoryId);

	@GetMapping("/api/product/inner/getSkuInfo/{skuId}")
	public SkuInfo getSkuInfo(@PathVariable("skuId") Long skuId);
}

三、common工程下创建rabbit-util模块

①、依赖

spring-cloud-starter-bus-amqp

②、封装发送消息的方法

@Service
public class RabbitService{
	
	@Autowired
	private RabbitTemplate rebbitTemplate;

	//发送那个消息的方法
	public boolean sendMessage(String exchange,String routingKey,Object message){
		
		rabbitTemplate.convertAndSend(exchage,routingKey,message); 
		return true;
	}
}

由于默认只能发送字符串,因此需要拓展配置类,发送对象转换为json

@Configuration
public class MQConfig{
	
	@Bean
	public MessageConverter messageConverter(){
		return new Jackson2JsonMessageConverter();
	}
}

消息确认

在这里插入图片描述

@Component
public class MQProducerAckConfig implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback{
	
	@Autowired
	private RabbitTemplate rabbitTemplates;

	@PostConstruct
	public void init(){
		rabbitTemplate.setReturnCallback(this);
		rabbitTemplate.setConfirmCallback(this);
	}

	public void confirm(CorrelationData correlationData){
		if(ack){
			System.out.println("消息发送成功!");
		}else{
			System.out.println("消息发送失败!" + cause);
		}
	}

	@Override
	public void returnedMessage(Message message,int replyCode,String replayText,String exchange,String routingKey){
		System.out.println("消息主题: " + new String(message.getBody()));
		System.out.println("应答码: " + replayCode);
		System.out.println("描述: " + replayText);
		System.out.println("消息使用的交换器 exchange: " + exchange);
		System.out.println("消息使用的路由键 routing: " + routingKey);
	}
}

常量(交换机、路由ke、队列的等名称)

public class MqConst{
	
	//消息补偿
	public static final String MQ_KEY_PREFIX = "ssyx.mq:list";
	public static final int RETRY_COUNT = 3;

	//商品上下架
	public static final String EXCHANGE_GOODS_DIRECT= "ssyx.goods.direct";
	public static final String ROUTING_GOODS_UPPER = "ssyx.goods.upper";
	public static final String ROUTING_GOODS_LOWER = "ssyx.goods.lower";

	//队列
	public static final String QUEUE_GOODS_UPPER = "ssyx.goods.upper";
	public static final String QUEUE_GOODS_LOWER = "ssyx.goods.lower";
}

http://www.kler.cn/a/541227.html

相关文章:

  • Spark 源码 | 脚本分析总结
  • MYSQL学习笔记(七):新年第一篇之子查询
  • 大语言模型需要的可观测性数据的关联方式
  • 502 Bad Gateway 错误详解:从表现推测原因,逐步排查直至解决
  • Jenkins数据备份到windows FTP服务器
  • Go语言构建微服务:从入门到实战
  • redis底层数据结构——链表
  • Guava学习(一)
  • DeepSeek 助力 Vue 开发:打造丝滑的步骤条
  • CNN-day5-经典神经网络LeNets5
  • 如何顺利开设Facebook账户并设置广告账户
  • Linux下的调试器 —— gdb
  • 基于 Nginx 的 CDN 基础实现
  • 数据库约束(2)
  • 旅客服务系统(PSS, Passenger Service System)
  • 亚博microros小车-原生ubuntu支持系列 27、手掌控制小车运动
  • OpenAI Deep Research被huggingface 24小时复现开源了
  • leetcode 2104. 子数组范围和
  • C++STL(六)——list模拟
  • IEEE期刊Word导出PDF注意事项
  • 性能优化中的系统架构优化
  • (五)Spring Boot学习——spring security +jwt使用(前后端分离模式)
  • 【文本处理】如何在批量WORD和txt文本提取手机号码,固话号码,提取邮箱,删除中文,删除英文,提取车牌号等等一些文本提取固定格式的操作,基于WPF的解决方案
  • [2025年最新]2024.3版本idea无法安装插件问题解决
  • 思科模拟器配置VRRP-详细
  • 【MySQL — 数据库基础】深入解析MySQL的聚合查询