文章目录
- 1、发送消息 KafkaService
- 2、生产者 service-album -> AlbumInfoServiceImpl
- 2.1、新增 saveAlbumInfo()
- 2.2、更新 updateAlbumInfo()
- 2.3、删除 removeAlbumInfo()
- 3、消费者 service-search - > AlbumListener.java
- 新增:如果是
公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
新增数据 - 更新:如果是
公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
更新数据
如果是私有的
专辑则发送消息给kafka,search通过监听器获取消息es删除
数据 - 删除:发送消息给kafka,search通过监听器获取消息es删除数据
1、发送消息 KafkaService
package com.atguigu.tingshu.common.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Service
public class KafkaService {
private static final Logger logger = LoggerFactory.getLogger(KafkaService.class);
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMsg(String topic, String msg){
this.sendMsg(topic, null, null, msg);
}
public void sendMsg(String topic, Integer partition, String key, String msg){
CompletableFuture<SendResult> future = this.kafkaTemplate.send(topic, partition, key, msg);
future.whenCompleteAsync((result, ex) -> {
if (ex != null){
logger.error("生产者发送消息失败!原因:{}", ex.getMessage());
}
});
}
}
whenCompleteAsync:异步完成时的处理、当异步操作完成时

2、生产者 service-album -> AlbumInfoServiceImpl

2.1、新增 saveAlbumInfo()
- 新增:如果是
公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
新增数据



@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {
@Autowired
private AlbumAttributeValueMapper attributeValueMapper;
@Autowired
private AlbumStatService albumStatService;
@Autowired
private KafkaService kafkaService;
@Transactional(rollbackFor = Exception.class)
@Override
public void saveAlbumInfo(AlbumInfoVo albumInfoVo) throws FileNotFoundException {
AlbumInfo albumInfo = new AlbumInfo();
BeanUtils.copyProperties(albumInfoVo, albumInfo);
Long userId = AuthContextHolder.getUserId();
albumInfo.setUserId(userId == null ? 1 : userId);
albumInfo.setTracksForFree(5);
albumInfo.setSecondsForFree(30);
albumInfo.setStatus(SystemConstant.ALBUM_STATUS_PASS);
this.save(albumInfo);
Long albumInfoId = albumInfo.getId();
List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();
if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {
albumAttributeValueVoList.forEach(albumAttributeValueVo -> {
AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();
BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);
albumAttributeValue.setAlbumId(albumInfoId);
this.attributeValueMapper.insert(albumAttributeValue);
});
}
this.albumStatService.saveAlbumStat(albumInfoId);
if (StringUtils.equals(albumInfo.getIsOpen(), "1")) {
this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumInfoId.toString());
}
}
}

2.2、更新 updateAlbumInfo()
- 更新:如果是
公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
更新数据
如果是私有的
专辑则发送消息给kafka,search通过监听器获取消息es删除
数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {
@Autowired
private AlbumAttributeValueMapper attributeValueMapper;
@Autowired
private KafkaService kafkaService;
@Transactional
@Override
public void updateAlbumInfo(Long albumId, AlbumInfoVo albumInfoVo) {
AlbumInfo albumInfo = new AlbumInfo();
BeanUtils.copyProperties(albumInfoVo, albumInfo);
albumInfo.setId(albumId);
this.updateById(albumInfo);
this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));
List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();
if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {
albumAttributeValueVoList.forEach(albumAttributeValueVo -> {
AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();
BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);
albumAttributeValue.setAlbumId(albumId);
this.attributeValueMapper.insert(albumAttributeValue);
});
}
if (StringUtils.equals(albumInfoVo.getIsOpen(), "1")) {
this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumId.toString());
} else {
this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());
}
}
}

2.3、删除 removeAlbumInfo()
- 删除:发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {
@Autowired
private AlbumAttributeValueMapper attributeValueMapper;
@Autowired
private AlbumStatMapper albumStatMapper;
@Autowired
private KafkaService kafkaService;
@Transactional
@Override
public void removeAlbumInfo(Long albumId) {
this.removeById(albumId);
this.albumStatMapper.delete(new LambdaUpdateWrapper<AlbumStat>().eq(AlbumStat::getAlbumId, albumId));
this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));
this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());
}
}

3、消费者 service-search - > AlbumListener.java

package com.atguigu.tingshu.search.listener;
@Component
public class AlbumListener {
@Autowired
private AlbumInfoFeignClient albumInfoFeignClient;
@Autowired
private UserInfoFeignClient userInfoFeignClient;
@Autowired
private CategoryFeignClient categoryFeignClient;
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_UPPER)
public void upper(String albumId){
if (StringUtils.isBlank(albumId)){
return;
}
Result<AlbumInfo> albumInfoResult = this.albumInfoFeignClient.getAlbumInfo(Long.valueOf(albumId));
Assert.notNull(albumInfoResult, "同步数据时,获取专辑信息失败!");
AlbumInfo albumInfo = albumInfoResult.getData();
Assert.notNull(albumInfo, "同步数据时,没有对应的专辑!");
AlbumInfoIndex albumInfoIndex = new AlbumInfoIndex();
BeanUtils.copyProperties(albumInfo, albumInfoIndex);
Result<UserInfoVo> userInfoVoResult = this.userInfoFeignClient.getUserById(albumInfo.getUserId());
Assert.notNull(userInfoVoResult, "数据导入时,获取主播信息失败!");
UserInfoVo userInfoVo = userInfoVoResult.getData();
if (userInfoVo != null){
albumInfoIndex.setAnnouncerId(userInfoVo.getId());
albumInfoIndex.setAnnouncerName(userInfoVo.getNickname());
}
Result<BaseCategoryView> categoryResult = this.categoryFeignClient.getAllLevelCategories(albumInfo.getCategory3Id());
Assert.notNull(categoryResult, "数据导入时,获取分类信息失败!");
BaseCategoryView baseCategoryView = categoryResult.getData();
if (baseCategoryView != null) {
albumInfoIndex.setCategory1Id(baseCategoryView.getCategory1Id());
albumInfoIndex.setCategory2Id(baseCategoryView.getCategory2Id());
}
int playNum = (new Random().nextInt(100) + 1) * 10000;
albumInfoIndex.setPlayStatNum(playNum);
int subscribeNum = (new Random().nextInt(100) + 1) * 10000;
albumInfoIndex.setSubscribeStatNum(subscribeNum);
int buyNum = (new Random().nextInt(100) + 1) * 10000;
albumInfoIndex.setBuyStatNum(buyNum);
int commentNum = (new Random().nextInt(100) + 1) * 10000;
albumInfoIndex.setCommentStatNum(commentNum);
albumInfoIndex.setHotScore(playNum * 0.1 + commentNum * 0.2 + subscribeNum * 0.3 + buyNum * 0.4);
Result<List<AlbumAttributeValue>> albumAttributeValueResult = this.albumInfoFeignClient.getAlbumAttributeValue(albumInfo.getId());
Assert.notNull(albumAttributeValueResult, "数据导入时,获取标签及值失败!");
List<AlbumAttributeValue> albumAttributeValues = albumAttributeValueResult.getData();
if (!CollectionUtils.isEmpty(albumAttributeValues)){
albumInfoIndex.setAttributeValueIndexList(albumAttributeValues.stream().map(albumAttributeValue -> {
AttributeValueIndex attributeValueIndex = new AttributeValueIndex();
BeanUtils.copyProperties(albumAttributeValue, attributeValueIndex);
return attributeValueIndex;
}).collect(Collectors.toList()));
}
this.elasticsearchTemplate.save(albumInfoIndex);
}
@KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_LOWER)
public void lower(String albumId){
if (StringUtils.isBlank(albumId)){
return;
}
this.elasticsearchTemplate.delete(albumId, AlbumInfoIndex.class);
}
}