当前位置: 首页 > article >正文

Flink Flink中的合流

一、Flink中的基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。

二、联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在这里插入图片描述
在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:

stream1.union(stream2, stream3, ...)

注意:union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。

代码实现:我们可以用下面的代码做一个简单测试:

package com.flink.DataStream.UnionStream;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkUnionStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);
        SingleOutputStreamOperator<Integer> source1 = streamExecutionEnvironment
                .socketTextStream("localhost", 1111)
                .map(a -> Integer.parseInt(a));
        SingleOutputStreamOperator<Integer> source2 = streamExecutionEnvironment
                .socketTextStream("localhost", 2222)
                .map(a -> Integer.parseInt(a));
        DataStreamSource<String> source3 = streamExecutionEnvironment.fromElements("3", "4", "5");
        DataStream<Integer> unionResult = source1.union(source2, source3.map(Integer::valueOf));
        unionResult.print();
        streamExecutionEnvironment.execute();
    }
}

在这里插入图片描述
在这里插入图片描述

三、连接(Connect)

为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的结果并不是DataStream,而是一个“连接流”。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中。
在这里插入图片描述


http://www.kler.cn/a/148842.html

相关文章:

  • 使用Python实现对接Hadoop集群(通过Hive)并提供API接口
  • 【机器学习】如何配置anaconda环境(无脑版)
  • 基于碎纸片的拼接复原算法及MATLAB实现
  • SpringBoot参数注解
  • 从社交媒体到元宇宙:Facebook未来发展新方向
  • 搭建深度学习开发环境
  • Python---lambda表达式
  • 交换机的VRRP主备配置例子
  • 计网Lesson3 - 计算机网络评价指标与封包解包
  • 别再让假的fiddler教程毒害你了,来看这套最全最新的fiddler全工具讲解
  • 基于C#实现Kruskal算法
  • DGL在异构图上的GraphConv模块
  • 【Redisson】基于自定义注解的Redisson分布式锁实现
  • 堆的应用(堆排序、Top-K问题)
  • 大模型的开源闭源
  • linux -系统通用命令查询
  • viple模拟器使用(四):unity模拟器中实现沿右墙迷宫算法
  • 门面模式-C++实现
  • java中IO知识点概念
  • GoLong的学习之路,进阶,RabbitMQ (消息队列)
  • Jmeter-分布式压测(远程启动服务器,windows)
  • 代码随想录-刷题第九天
  • 通义千问 Qwen-7B-Chat-Int4 模型本地化部署
  • 机器人规划算法——movebase导航框架源码分析
  • Linux的软件安装
  • linaro交叉编译工具链下载与使用笔记