当前位置: 首页 > article >正文

实时流式计算 kafkaStream

文章目录

  • 实时流式计算
  • Kafka Stream
  • Kafka Streams 的关键概念
  • KStream
  • Kafka Stream入门案例编写
  • SpringBoot 集成 Kafka Stream


实时流式计算

一般流式计算会与批量计算相比较

在这里插入图片描述

流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。
流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

应用场景

  • 日志分析
    网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
  • 大屏看板统计
    可以实时的查看网站注册数量,订单数量,购买数量,金额等。
  • 公交实时数据
    可以随时更新公交车方位,计算多久到达站牌等
  • 实时文章分值计算
    头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

技术方案选型

  • Hadoop
  • Apche Storm
  • Flink
  • Kafka Stream

可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。

Kafka Stream

Kafka Stream:提供了对存储于 Kafka内 的数据进行流式处理分析的功能

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

在这里插入图片描述

Kafka Streams 的关键概念

在这里插入图片描述

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

KStream

  • 数据结构类似于map,如下图,key-value 键值对

在这里插入图片描述

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。

在这里插入图片描述

Kafka Stream入门案例编写

  1. 需求分析,求单词个数(word count)

在这里插入图片描述

  1. 创建原生的 kafka staream 入门案例

导入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

在这里插入图片描述

在这里插入图片描述

package com.heima.kafka.sample;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 流式处理
 */
public class KafkaStreamQuickStart {

    public static void main(String[] args) {

        //kafka的配置信心
        Properties prop = new Properties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");

        //stream 构建器
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);


        //创建kafkaStream对象
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
        //开启流式计算
        kafkaStreams.start();
    }

    /**
     * 流式计算
     * 消息的内容:hello kafka  hello itcast
     * @param streamsBuilder
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        /**
         * 处理消息的value
         */
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //按照value进行聚合处理
                .groupBy((key,value)->value)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //统计单词的个数
                .count()
                //转换为kStream
                .toStream()
                .map((key,value)->{
                    System.out.println("key:"+key+",vlaue:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
    }
}
  1. 测试准备
  • 使用生产者在 topic 为:itcast_topic_input中发送多条消息
  • stream 接收 itcast_topic_input 的数据,进行聚合操作后,将处理结果发送到 itcast_topic_out
  • 使用消费者接收 topic 为:itcast_topic_out

结果:

  • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

SpringBoot 集成 Kafka Stream

  1. 配置

在这里插入图片描述

package com.heima.kafka.config;

import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

application.yml

在这里插入图片描述

kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}
  1. 在配置类中定义方法
  • 可注入StreamsBuilder
  • 返回值必须是KStream且放入spring容器中

在这里插入图片描述

package com.heima.kafka.stream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

@Configuration
@Slf4j
public class KafkaStreamHelloListener {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}
  1. 测试

启动 springboot 项目即可自动监听


http://www.kler.cn/a/154928.html

相关文章:

  • 【Three.js基础学习】22.New project structure
  • Ubuntu24.04上安装和配置MariaDB
  • 使用c#实现TCP客户端与服务器端
  • Mac解压包安装MongoDB8并设置launchd自启动
  • mysql每日一题(上升的温度,date数据的计算)
  • ubuntu20.04 colmap 安装2024.11最新
  • 第8关:定义一个名为PROC_AVGWEIGHT的有参数存储过程
  • ubuntu22.04离线手动安装openstack yoga和ceph quincy
  • 西南科技大学C++程序设计实验三(类与对象二)
  • hbase Master is initializing
  • 分布式事务有哪些解决方案?
  • 力扣labuladong一刷day24天
  • MySQL 教程 1.5
  • 同旺科技 USB TO SPI / I2C --- 调试W5500_Ping测试
  • 工业机器视觉megauging(向光有光)使用说明书(一,轻量级的visionpro)
  • java中@Async注解在CompletableFuture.runAsync里面使用没有生效的原因?
  • Java项目调用C/C++ SDK的方案汇总
  • 力扣题:字符串的反转-11.23
  • 盘点25个Html游戏Game源码网页爱好者不容错过
  • 前端面试JS— JS数据类型及相关内容
  • Linux 基础认识
  • 【每日一题】拼车+【差分数组】
  • DQN原理及PyTorch实现【强化学习】
  • java使用poi读写excel(处理上下标和科学计数法)
  • PriorityQueue类
  • 大数据基础设施搭建 - 业务数据同步策略