当前位置: 首页 > article >正文

kafka及异步通知文章上下架

1)自媒体文章上下架

需求分析

2)kafka概述

消息中间件对比

特 性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

开 发 语 言

java

erlang

java

scala

单 机 吞 吐 量

万级

万级

10万级

100万级

时 效 性

ms

us

ms

ms级以内

可 用 性

高(主从)

高(主从)

非常高(分布 式)

非常高(分布 式)

功 能 特 性

成熟的产品、较全的 文档、各种协议支持 好

并发能力强、 性能好、延迟 低

MQ功能比较 完善,扩展性 佳

只支持主要的MQ功能, 主要应用于大数据领域

消息中间件对比-选择建议

消息中间 件

建议

Kafka

追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务

RocketMQ

靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验

RabbitMQ

性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的 RabbitMQ

kafka介绍

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:http://kafka.apach e.org/

kafka介绍-名词解释

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

3)kafka安装配置

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

https://blog.csdn.net/m0_70325779/article/details/137248462

4)kafka入门

生产者发送消息,多个消费者只能有一个消费者接收到消息生产者发送消息,多个消费者都可以接收到消息

(1)创建kafka-demo项目,导入依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>
(2)生产者发送消息
/**
 * 生产者
 */
public class ProducerQuickStart {
    public static void main(String[] args) throws ExecutionException, 
InterruptedException {
        //1.kafka链接配置信息
        Properties prop = new Properties();
        //kafka链接地址
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        //key和value的序列化
       
 prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.se
rialization.StringSerializer");
       
 prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.
serialization.StringSerializer");
        //2.创建kafka生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String,String>
(prop);
        //3.发送消息
        /**
         * 第一个参数 :topic
         * 第二个参数:消息的key
         * 第三个参数:消息的value
         */
        ProducerRecord<String,String> kvProducerRecord = new 
ProducerRecord<String,String>("topic-first","key-001","hello kafka");
        //同步发送消息
         RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
        System.out.println(recordMetadata.offset());
        //4.关闭消息通道 必须要关闭,否则消息发送不成功
        producer.close();
   }
}
(3)消费者接收消息
/**
 * 消费者
 */
public class ConsumerQuickStart {
    public static void main(String[] args) {
        //1.kafka的配置信息
        Properties prop = new Properties();
        //链接地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //key和value的反序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
        //设置消费者组
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //2.创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
String>(prop);
        //3.订阅主题
        consumer.subscribe(Collections.singletonList("topic-first"));
        //4.拉取消息
        while (true) {
            ConsumerRecords<String, String> consumerRecords = 
consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : 
consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
           }
       }
   }
}
使用情景:
  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息(一对一)
  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息(一对多)

springboot集成kafka入门

1.导入spring-kafka依赖信息
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>
2.在resources下创建文件application.yml
server:
 port: 9991
spring:
 application:
   name: kafka-demo
 kafka:
   bootstrap-servers: 192.168.200.130:9092
   producer:
     retries: 10
     key-serializer: org.apache.kafka.common.serialization.StringSerializer
     value-serializer: org.apache.kafka.common.serialization.StringSerializer
   consumer:
   group-id: ${spring.application.name}-test
     key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     value-deserializer: 
org.apache.kafka.common.serialization.StringDeserializer
3.消息生产者
4.消息消费者

传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息
  • 接收消息

自媒体文章上下架功能完成

需求分析

已发表且已下架的文章可以上架

流程说明

接口定义

说明

接口路径

/api/v1/news/down_or_up

请求方式

POST

参数

DTO

响应结果

ResponseResult

DTO
@Data
public class WmNewsDto {
    
    private Integer id;
    /**
    * 是否上架 0 下架 1 上架
    */
    private Short enable;
                       
}
ResponseResult

自媒体文章上下架-功能实现

接口定义

water-wemedia工程

在water-wemedia工程下的WmNewsController新增方法

@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
    return null;
}

在WmNewsDto中新增enable属性,完整的代码如下:

@Data
public class WmNewsDto {
    
    private Integer id;
     /**
     * 标题
     */
    private String title;
     /**
     * 频道id
     */
    private Integer channelId;
     /**
     * 标签
     */
    private String labels;
     /**
     * 发布时间
     */
    private Date publishTime;
     /**
     * 文章内容
     */
    private String content;
     /**
     * 文章封面类型 0 无图 1 单图 3 多图 -1 自动
     */
    private Short type;
     /**
     * 提交时间
     */
    private Date submitedTime; 
     /**
      * 状态 提交为1 草稿为0
     */
    private Short status;
     
     /**
     * 封面图片列表 多张图以逗号隔开
     */
    private List<String> images;
    /**
     * 上下架 0 下架 1 上架
     */
    private Short enable;
}
业务层编写

在WmNewsService新增方法

/**
 * 文章的上下架
 * @param dto
 * @return
 */
public ResponseResult downOrUp(WmNewsDto dto);
实现方法
/**
 * 文章的上下架
 * @param dto
 * @return
 */
@Override
public ResponseResult downOrUp(WmNewsDto dto) {
    //1.检查参数
    if(dto.getId() == null){
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
   }
    //2.查询文章
    WmNews wmNews = getById(dto.getId());
    if(wmNews == null){
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不
存在");
   }
    //3.判断文章是否已发布
    if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章
不是发布状态,不能上下架");
   }
    //4.修改文章enable
    if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){
        update(Wrappers.
<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
.eq(WmNews::getId,wmNews.getId()));
   }
    
    
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
控制器
@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
    return wmNewsService.downOrUp(dto);
}
测试
消息通知article端文章上下架
在water-common模块下导入kafka依赖
<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>
在自媒体端的nacos配置中心配置kafka的生产者
spring:
 kafka:
   bootstrap-servers: localhost:9092
   producer:
     retries: 10
     key-serializer: org.apache.kafka.common.serialization.StringSerializer
     value-serializer: org.apache.kafka.common.serialization.StringSerializer
在自媒体端文章上下架后发送消息
//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){
    Map<String,Object> map = new HashMap<>();
    map.put("articleId",wmNews.getArticleId());
    map.put("enable",dto.getEnable());
   
 kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONS
tring(map));
}
常量类:
public class WmNewsMessageConstants {
    public static final String 
WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}
在article端的nacos配置中心配置kafka的消费者
spring:
 kafka:
   bootstrap-servers: localhost:9092
   consumer:
     group-id: ${spring.application.name}
     key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     value-deserializer: 
org.apache.kafka.common.serialization.StringDeserializer
在article端编写监听,接收数据
@Component
@Slf4j
public class ArtilceIsDownListener {
    @Autowired
    private ApArticleConfigService apArticleConfigService;
    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void onMessage(String message){
        if(StringUtils.isNotBlank(message)){
            Map map = JSON.parseObject(message, Map.class);
            apArticleConfigService.updateByMap(map);
            log.info("article端文章配置修改,articleId={}",map.get("articleId"));
       }
   }
}

修改ap_article_config表的数据新建ApArticleConfigService

public interface ApArticleConfigService extends IService<ApArticleConfig> {
    /**
     * 修改文章配置
     * @param map
     */
    public void updateByMap(Map map);
}
实现类:
@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends 
ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements 
ApArticleConfigService {
    /**
     * 修改文章配置
     * @param map
     */
    @Override
    public void updateByMap(Map map) {
        //0 下架 1 上架
        Object enable = map.get("enable");
        boolean isDown = true;
        if(enable.equals(1)){
            isDown = false;
       }
        //修改文章配置
        update(Wrappers.
<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articl
eId")).set(ApArticleConfig::getIsDown,isDown));
   }
}

http://www.kler.cn/a/293625.html

相关文章:

  • 软件测试面试大全(含答案+文档)
  • 设计模式之装饰器模式(SSO单点登录功能扩展,增加拦截用户访问方法范围场景)
  • WordPress 6.7 “Rollins”发布
  • Llama微调测试记录
  • flutter下拉刷新上拉加载的简单实现方式三
  • C# 模拟浏览器自操作(自动化办公)
  • Jenkins构建CI/CD
  • VSC++: 括号对称比较
  • 使用Cskin时候 遇到按钮有默认阴影问题解决
  • Java语法全解析:掌握基本规则,打造稳固编程基础!
  • 企业级Ansible自动化运维项目案例:实战与技巧
  • 集成 Logrus 到 Gin:打造高效的 Go Web 日志系统
  • 【python因果推断库10】工具变量回归与使用 pymc 验证工具变量3
  • 音频基础学习四——声音的能量与分贝
  • 探索Mem0:下一代人工智能与机器学习内存管理基础设施(二)Mem0+Ollama 部署运行
  • .net中的内存管理和垃圾回收
  • CSS学习14[重点]--定位、边偏移、定位模式
  • linux-性能优化命令
  • Css:属性选择器、关系选择器及伪元素
  • 【项目二】C++高性能服务器开发——日志系统(日志器,日志级别,日志事件)
  • 2024高教杯数学建模B题思路
  • shell编程--正则表达式
  • SAP ABAP 程序迁移工具 SAPLINK ABAP GIT
  • Oracle 19c数据库:Windows详细安装与配置指南
  • 【操作系统】进程同步之共享内存
  • [dp]答疑