整合ES(Elasticsearch)+MQ(RabbitMQ)实现商品上下架/跨模块远程调用
商品上下架过程中,修改数据库表上下架状态,之后通过RabbitMQ发送消息,最终实现ES中数据同步
- nacos服务发现和注册
- ES面向文档型数据库
- RabbitMQ
ES
- 用户将数据提交到Elasticsearch数据库中
- 通过分词控制器将对应的语句分词
- 将其权重和分词结果一并存入数据
- 当用户搜索数据,在根据权重将结果排名、打分,并将结果呈现给用户
①、下载对应的ES
②、下载并安装分词器
③、启动ES(前提是系统配置好了JDK环境变量)
通过浏览器验证是否启动成功
④、官方推荐的客户端kibana
通过浏览器进行访问,通过DevTools工具
RabbitMQ
#拉取镜像
docker pull rabbitmq:3.8-management
#三个端口号:通信端口号,后台访问端口号,集群端口号
docker run -d --restart=always -p5672:5672 -p 12572:15672 --name rabbitmq rabbitmq:3.8-management
管理后台:http://ip:15672
业务模块
一、service-search模块消费者
①、依赖spring-boot-starter-ata-elasticsearch
远程调用spring-cloud-starter-openfeign
引入三定义service-client模块坐标
引入rabbit模块rabbit_util
②、配置文件
server:
port: 8204
feign:
sentinel: # 熔断机制
enabled: true
client:
config:
default: #配置全局的feign的调用超时时间 如果有指定的服务配置,则默认的配置不会生效
connectTimeout: 30000 #指定消费者连接服务提供者超时时间
readTimeout: 50000 #调用服务提供者的服务超时时间
spring:
main:
allow-bean-definition-overriding: true#当遇到同样名字,是否允许覆盖注册
elasticsearch:
rest:
uris: http://localhost:9200
rabbitmq:
host: 192.168.197.128
port: 5672
username: guest
password: guest
publisher-confirm-type: CORRELATED #发布确认模式,消息是否被成功发送到交换机
publisher-returns: true
listener:
simple:
prefetch: 1
concurrency: 3
acknowledge-mode: manual #消费端手动确认
redis:
host: localhost
port: 6379
database: 0
timeout: 1800000
password:
lettuce:
pool:
max-active: 20#最大连接数
max-wait: -1
max-idle: 5#最大则色等待时间(负数表示没限制)
min-idle: 0#最小空闲
③、启动类(当前模块不需要连接数据库,所以排除。否则需要在application.yml中配置数据库连接)
@SpringBootApplication(exclude=DatasourceAutoConfiguration.class)//取消数据源自动配置
@EnableDiscoveryClient
@EnableFeignClients //远程调用
public class ServiceSearchApplication{
}
④、业务接口、服务
@RestController
@RequestMapping("api/search/sku")
public class SkuApiController{
@Autowired
private SkuService skuService;
//上架,将数据加入ES中
@GetMapping("inner/upperSku/{skuId}")
public Result upperSku(@PathVariable Long skuId){
skuService.upperSku(skuId);
return Result.ok(null);
}
//下架,将数据从ES中删除
@GetMapping("inner/lowerSku/{skuId}")
public Result lowerSku(@PathVariable Long skuId){
skuService.lowerSku(skuId);
return Result.ok(null);
}
}
service接口忽略,只创建其是实现类
@Service
public class SkuServiceImpl implements SkuService{
@Autowired
private SkuRepository skuRepository;
//注入远程调用的接口
@Autowired
private ProductFeignClient productFeignClient;
Override
public void upperSku(Long skuId){
//远程调用获取数据
SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
if(skuInfo == null){
return;
}
Category category = productFeignClient.getCategory(skuInfo.getCategoryId());
SkuEs skuEs = new SkuEs();
if(category != null){
skuEs.setCategoryId(category.getId());
skuEs.setCategoryName(category.getName());
}
//封装SKU信息
skuEs.setId(skuId.getId());
skuEs.setKeyWord(skuInfogetSkuName()+","+skuEs.getCategoryName());
skuEs.setWareId(skuInfo.getWareId());
skuEs.setIsNewPerson(skuInfo.getInsNewPerson());
skuEs.setImgUrl(skuInfo.getImgUrl());
skuEs.setTitle(skuInfo.getSkuName());
if(skuInfo.getSkuTyoe()==SkuType.COMMON.getCode()){
skuEs.setSkyType(0);
skuEs.setPrice(skuInfo.getPrice().doubleValue());
skuEs.setStock(skuInfo.getStock());
skuEs.setSale(skuInfo.getSale());
skuEs.setPerLimit(skuInfo.getPerLimit());
}
//调用方法添加到ES
skuRepository.save(skuEs);
}
@Override
public void lowerSku(Long skuId){
skuRepository.deleteById(skuId);
}
}
依赖中SpringData的模块,通过Spring提供的ElasticsearchRepository操作ES
泛型是操作的实体类和其主键
public interface SkuRepository extends ElasticsearchRepository<SkuEs,Long>{
}
⑤、创建接收rabbitMQ消息的类
@Component
public class SkuReceiver{
@Autowired
private SkuService skuService;
//durable=true 表示对消息进行持久化
@RabbitListener(bindings = @QueueBinding(
value=@Queue(value=MqConst.QUEUE_GOODS_UPPER,durable="true"),
exchange=@Exchange(value=MqConst.EXCHANGE_GOODS_DIRECT),
key={MqConst.ROUTING_GOODS_UPPER}
))
public void upperSku(Long skuId,Message message,Channel channel){
if(skuId != null){
//调用方法商品上架
skuService.upperSku(skuId);
}
//配置文件中的acknowledge-mode: manual #消费端手动确认 true表示多个消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
//商品下架
@RabbitListener(bindings = @QueueBinding(
value=@Queue(value=MqConst.QUEUE_GOODS_LOWER,durable="true"),
exchange=@Exchange(value=MqConst.EXCHANGE_GOODS_DIRECT),
key={MqConst.ROUTING_GOODS_LOWER}
))
public void lowerSku(Long skuId,Message message,Channel channel){
if(skuId != null){
skuService.lowerSku(skuId);
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
二、service-product模块,生产者
依赖导入rabbit-util模块坐标
配置文件application.yml中添加rabbit的连接信息
①、上架下架,发送消息给rabbitMQ
@ApiOperation("商品上下架")
@GetMapping("/publish/{skuId}/{status}")
public Result publish(@PathVariable Long skuId,@PathVariable Integer status){
skuInfoService.publish(skuId,status);//发送消息给rabbitMQ
return Result.ok(null);
}
//注入MQ
@Autowired
private RabbitService rabbitService;
@Override
public void publish(Long skuId,Integer status){
if(status == 1){//上架
SkuInfo skuInfo = baseMapper.selectById(skuId);
skuInfo.setPublishStatus(status);
baseMapper.updateById(skuIdFo);
//上架时,整合mq把数据同步到es中
rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT,MqConst.ROUTING_GOODS_UPPER,skuId);
}else{//下架
SkuInfo skuInfo = baseMapper.selectById(skuId);
skuInfo.setPublishStatus(status);
baseMapper.updateById(skuInfo);
//下架,整合mq把数据同步到es中
rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT,MqConst.ROUTING_GOODS_LOWER,skuId);
}
}
②、为远程调用提供数据
@RestController
@RequestMapping("/api/product")
public class PruductInnerController{
@Autowired
private CategoryService categoryService;
@Autowired
private SkuInfoService skuInfoService;
//根据SKUID获取分类信息
@GetMapping("inner/getCategory/{categoryId}")
public Category getCategory(@PathVariable Long categoryId){
Category category = categoryService.getById(categoryId);
return category;
}
//根据SKUID获取SKU信息
@GetMapping("inner/getSkuInfo/{skuId}")
public SkuInfo getSkuInfo(@PathVariable Long skuId){
return skuInfoService.getById(skuId);
}
}
三、service-client模块
定义生产者的接口
@FeignClient(value="service-product")
pubic interface ProductFeignClient{
@GetMapping("/api/product/inner/getCategory/{categoryId}")
public Category getCategory(@PathVariable("categoryId") Long categoryId);
@GetMapping("/api/product/inner/getSkuInfo/{skuId}")
public SkuInfo getSkuInfo(@PathVariable("skuId") Long skuId);
}
三、common工程下创建rabbit-util模块
①、依赖
spring-cloud-starter-bus-amqp
②、封装发送消息的方法
@Service
public class RabbitService{
@Autowired
private RabbitTemplate rebbitTemplate;
//发送那个消息的方法
public boolean sendMessage(String exchange,String routingKey,Object message){
rabbitTemplate.convertAndSend(exchage,routingKey,message);
return true;
}
}
由于默认只能发送字符串,因此需要拓展配置类,发送对象转换为json
@Configuration
public class MQConfig{
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
消息确认
@Component
public class MQProducerAckConfig implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplates;
@PostConstruct
public void init(){
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
public void confirm(CorrelationData correlationData){
if(ack){
System.out.println("消息发送成功!");
}else{
System.out.println("消息发送失败!" + cause);
}
}
@Override
public void returnedMessage(Message message,int replyCode,String replayText,String exchange,String routingKey){
System.out.println("消息主题: " + new String(message.getBody()));
System.out.println("应答码: " + replayCode);
System.out.println("描述: " + replayText);
System.out.println("消息使用的交换器 exchange: " + exchange);
System.out.println("消息使用的路由键 routing: " + routingKey);
}
}
常量(交换机、路由ke、队列的等名称)
public class MqConst{
//消息补偿
public static final String MQ_KEY_PREFIX = "ssyx.mq:list";
public static final int RETRY_COUNT = 3;
//商品上下架
public static final String EXCHANGE_GOODS_DIRECT= "ssyx.goods.direct";
public static final String ROUTING_GOODS_UPPER = "ssyx.goods.upper";
public static final String ROUTING_GOODS_LOWER = "ssyx.goods.lower";
//队列
public static final String QUEUE_GOODS_UPPER = "ssyx.goods.upper";
public static final String QUEUE_GOODS_LOWER = "ssyx.goods.lower";
}