当前位置: 首页 > article >正文

Spring Cloud Stream与Kafka(一)

Spring Cloud Stream与Kafka(一)

​ 在实际开发过程中,消息中间件用于解决应用解耦,异步消息,流量削峰等问题,实现高可用、高性能、可伸缩和最终一致性架构。不同的消息中间件实现方式不同,内部结构是不一样的。比如常见的RabbitMQ和Kafka,RabbitMQ有exchange,kafka有topic、partition,这些中间件的差异性导致我们在实际项目开发过程中造成了一定的干扰。如果采用了其中的一种,后面的业务需求,我想往另一种消息队列迁移,有一堆东西需要重做。Spring Cloud Stream是一种解耦的方式。

文章目录

    • Spring Cloud Stream与Kafka(一)
      • 简单介绍
      • Kafka实例
        • 生产者
        • 消费者

简单介绍

  1. Spring Cloud Stream是由一个中间件中立的核心组成,应用通过Spring Cloud Stream插入的input(相当于消费者)和output(相当于生产者)通道与外界交流。通道通过指定中间件的Binder与外部代理连接,业务开发者不需要关注具体的消息中间件,只需要关注Binder对应程序提供的抽象概念来使用中间件实现业务就可以了。Spring Cloud Stream许多抽象和原语,简化了消息驱动微服务应用程序的开发。

SCSt-with-binder

  • 最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消费者,顶层可以向绑定层生产消费、获取消息。
  1. Binder绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要考虑具体的中间件实现。当需要升级或更换中间件产品时,我们要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑。

  2. 在Spring Cloud Stream中的消息通信方式遵循发布订阅模式,当一条消息被投递到消息中间件后,它会通过共享的主题进行广播,消费者在订阅的主题收到消息后触发自身的业务逻辑处理。这里的主题是抽象概念,代表发布共享消息给消费者的地方。在不同的消息中间中,主题可能对应着不同的概念。

  3. Destination Binders是负责提供与外部消息系统集成的组件。Destination Bindings是外部消息系统和最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。Message是生产者和消费者用于与目标绑定器通信的规范化数据结构。

Kafka实例

  1. 分别创建生产者kafka-producer和消费者kafka-consumer,引入依赖。
<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>
生产者
  1. 添加配置文件application.yml
spring:
  cloud:
    stream:
      kafka:
        binder: # 绑定器
          brokers: 192.168.182.171:9092   # broker的IP和端口
  application:
    name: kafka-provider

server:
  port: 8301
  1. 添加启动类
package org.lxx.stream.kafka.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
    }

}
  1. 添加配置文件
package org.lxx.stream.kafka.producer.config;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@Component
public interface MyProcessor {

    String MESSAGE_OUTPUT = "log_output";

    //在Kafka中创建主题log_output
    @Output(MESSAGE_OUTPUT)
    SubscribableChannel logOutput();
}
  1. 创建实体类
package org.lxx.stream.kafka.producer.entity;

import lombok.Data;

import java.util.Date;

@Data
public class LogInfo {
    private String clientVersion;
    private String userId;
    private String clientIP;
    private Date time;
}
  1. 创建控制器
package org.lxx.stream.kafka.producer.controller;

import lombok.extern.slf4j.Slf4j;
import org.lxx.stream.kafka.producer.config.MyProcessor;
import org.lxx.stream.kafka.producer.entity.LogInfo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.sql.Date;
import java.time.Instant;

@Slf4j
@RestController
@EnableBinding(value = {MyProcessor.class})
public class MessageController {

    @Resource
    private MyProcessor myProcessor;

    @GetMapping("sendLogMessage")
    public void sendLogMessage(String message) {
        Message<String> strMessage = 
            MessageBuilder.withPayload(message).build();
        myProcessor.logOutput().send(strMessage);
    }

    @GetMapping("sendObjLogMessage")
    public void sendObjLogMessage() {
        LogInfo logInfo = new LogInfo();
        logInfo.setClientIP("192.168.1.111");
        logInfo.setClientVersion("1.0");
        logInfo.setUserId("198663383837434");
        logInfo.setTime(Date.from(Instant.now()));
        Message<LogInfo> strMessage = 
            MessageBuilder.withPayload(logInfo).build();
        myProcessor.logOutput().send(strMessage);
    }
}
消费者
  1. 添加配置文件application.yml
spring:
  cloud:
    stream:
      kafka:
        binder: # 绑定器
          brokers: 192.168.182.171:9092   # broker的IP和端口
  application:
    name: kafka-consumer

server:
  port: 8302
  1. 添加启动类
package org.lxx.stream.kafka.consumer;

import org.lxx.stream.kafka.consumer.config.MyProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(value = MyProcessor.class)
@SpringBootApplication
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }

}
  1. 添加配置类
package org.lxx.stream.kafka.consumer.config;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@Component
public interface MyProcessor {

    String MESSAGE_INPUT = "log_input";
    String MESSAGE_OUTPUT = "log_output";
    String LOG_FORMAT_INPUT = "log_format_input";
    String LOG_FORMAT_OUTPUT = "log_format_output";

    @Input(MESSAGE_INPUT)
    SubscribableChannel logInput();

    @Output(MESSAGE_OUTPUT)
    SubscribableChannel logOutput();

    @Input(LOG_FORMAT_INPUT)
    SubscribableChannel logFormatInput();

    @Output(LOG_FORMAT_OUTPUT)
    SubscribableChannel logFormatOutput();

}
  1. 添加监听器
package org.lxx.stream.kafka.consumer.service;

import lombok.extern.slf4j.Slf4j;
import org.lxx.stream.kafka.consumer.config.MyProcessor;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageListener {

    @StreamListener(MyProcessor.MESSAGE_INPUT)
    @SendTo(MyProcessor.LOG_FORMAT_OUTPUT)
    public String processLogMessage(String message) {
        //通过MyProcessor.MESSAGE_INPUT接收消息
        //然后通过SendTo把处理后的消息发送到MyProcessor.LOG_FORMAT_OUTPUT
        log.info("GET Message:" + message);
        return message;
    }

    @StreamListener(MyProcessor.LOG_FORMAT_INPUT)
    public void processFormatLogMessage(String message) {
        //接收来自MyProcessor.LOG_FORMAT_INPUT 的消息
        //也就是SendTo发送的加工后的消息
        log.info("接收到格式化后的消息:" + message);
    }

}

http://www.kler.cn/a/287656.html

相关文章:

  • 「刘一哥GIS」系列专栏《GRASS GIS零基础入门实验教程(配套案例数据)》专栏上线了
  • redhat安装docker 24.0.7
  • 关于jwt和security
  • 本地部署Web-Check网站检测与分析利器并实现远程访问实时监测
  • js-判断一个object(对象)是否为空
  • C++单例模式的设计
  • 【网络安全】Bingbot索引投毒实现储存型XSS
  • 华为OD机试真题 - 拼接URL(Python/JS/C/C++ 2024 D卷 100分)
  • RabbitMQ当消息消费失败时,会重新进入队列吗?
  • skywalking接入nginx
  • ElasticSearch 集群索引和分片的CURD
  • 51单片机-LED闪烁
  • MD5 数字摘要算法的详细介绍与 Python 实现
  • RabbitMQ安装步骤
  • 一键编译QT5源码脚本(交叉编译arm64、mips64版本)
  • Laravel邮件发送功能的实现的方法和技巧?
  • 【HTML】模拟消息折叠效果【附源代码】
  • 云计算day37
  • 解决Linux安装epel源提示没有可用安装包
  • 访问Neo4j验证失败(The client is unauthorized due to authentication failure.)
  • 缓存使用-缓存击穿、穿透、雪崩概念
  • 数据仓库系列13:增量更新和全量更新有什么区别,如何选择?
  • 基于单片机的自动浇花控制写设计任务书
  • python语言基础(六)--深浅拷贝、闭包与装饰器
  • element-plus 报错 ResizeObserver loop limit exceeded 解决
  • 线性代数 第五讲:线性方程组_齐次线性方程组_非齐次线性方程组_公共解同解方程组_详解