当前位置: 首页 > article >正文

Flink的流、批处理

Flink的数据流处理,是持续流模型,数据不会落地,上游和下游的Task同时启动,等待数据的到达,Flink的批处理还是用的MapReduce计算模型,先处理map端,再执行reduce端。

flink的流处理(STREAMING):

是持续流模型,上游和下游的task任务同时启动,持续等待数据的到达,可以处理无界流和有界流,因为数据是一条条进行处理的,但是处理有界流的时候,打印出来的结果会显示多个(最终结果)
比如文本每行都有java,一共有java20,但是会打印java 8 ,java 10,java 20,显示多个最终结果

flink的批处理(BATCH):

底层还是MapReduce的计算模型,先处理map端,再处理reduce端,只能处理有界流
不能处理无界流
, 因为执行的方式不同,MR不可能持续等待数据的到达,而是一次性批量的处理数据

代码展示如下:


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

//TODO:无界流:程序开始之后,数据量是不确定的,没有边界,程序一直持续等待数据的到达
//     有界流:就是指数据量是确定的,比如一个文件,是知道具体的大小的

//todo:flink的流处理(STREAMING):是持续流模型,上游和下游的task任务同时启动,持续等待数据的到达,
// 可以处理无界流和有界流,因为数据是一条条进行处理的,但是处理有界流的时候,打印出来的结果会显示多个(最终结果)
// 比如文本每行都有java,一共有java20,但是会打印java 8 ,java 10,java 20,显示多个最终结果

//todo:flink的批处理(BATCH):底层还是MapReduce的计算模型,先处理map端,再处理reduce端,只能处理有界流
// 不能处理无界流, 因为执行的方式不同,MR不可能持续等待数据的到达,而是一次性批量的处理数据



public class flinkTest {
    public static void main(String[] args) throws Exception {
        //创建flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(2);
        //数据从上游发送到下游的缓存时间,默认也是200ms延迟
        env.setBufferTimeout(200);
        
        //数据读取,无界流
        //nc -lk 8888
        DataStreamSource<String> linesDSN = env.socketTextStream("master", 8888);
        linesDSN.print();

//        //数据读取,有界流,N是no代表无界,H是have代表有界
//           //设置为批处理的方式
//         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//        DataStream<String> linesDSH = env.readTextFile("ScalaTest/src/main/java/data/score.txt");
//        linesDSH.print();

        //启动flink,execute会触发任务调度
        env.execute("w");


    }
}


http://www.kler.cn/a/387944.html

相关文章:

  • Lua语言中常用的字符串操作函数
  • 深度学习笔记11-优化器对比实验(Tensorflow)
  • 【动态规划篇】欣赏概率论与镜像法融合下,别出心裁探索解答括号序列问题
  • Training-free regional prompting for diffusion transformers
  • 【Docker】docker compose 安装 Redis Stack
  • STM32的存储结构
  • 【开源免费】基于SpringBoot+Vue.JS医院管理系统(JAVA毕业设计)
  • 使用ref操作DOM(React)
  • Qt菜单功能实现
  • 【Ant.designpro】上传图片
  • LeetCode-两整数之和
  • 清华大学提出Mini-Omni2:开源多模态模型,功能与GPT-4o媲美!
  • 继承的学习
  • 漫途LoRa家族系列产品,高效、稳定、智能的物联网通信!
  • 智象未来(HiDream.ai):从科技创新启程,绘制智能未来新篇章
  • 【MySQL】数据的增删查改
  • //字符串数组
  • STM32单片机WIFI语音识别智能衣柜除湿消毒照明
  • 在 C# 中,ICollection 和 IList 接口有什么区别?
  • SQL注入攻击及其在SpringBoot中使用MyBatisPlus的防范策略
  • [OpenGL]使用OpenGL实现硬阴影效果
  • Audio-Language Models
  • MySQL8完全卸载方法-Win10系统
  • 4.python字面量
  • nvm 切换 Node.js 版本
  • LabVIEW扫描探针显微镜系统