当前位置: 首页 > article >正文

pyflink datastream数据流ds经过一系列转换后转为table,t_env.from_data_stream(ds)

在 pyflink 处理数据流过程中,有时候需要将data_stream转为table,下面是正确的方式,即每一个算子(map,reduce, window)操作之后需要指定输出数据类型。

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import DataTypes, StreamTableEnvironment, Schema

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
ds = env.from_collection([(12, "Alice"), (0, "Bob")], type_info=Types.TUPLE([Types.LONG(), Types.STRING()]))

def update_tel(data):
    return data

## 正确用法,每一步操作算子之后都需要加上输出的数据类型 output_type
ds = ds.map(lambda x: update_tel(x), output_type=Types.TUPLE([Types.INT(), Types.STRING()]))
#input_table = t_env.from_data_stream(ds).alias("score", "name")
input_table = t_env.from_data_stream(ds)
input_table.print_schema()
t_env.create_temporary_view("MyView", input_table)
t_env.from_path("MyView").print_schema()

# 输出:
#(
#  `f0` INT NOT NULL,
#  `f1` STRING
#)
"""
## 错误用法,不指定output_type
ds = ds.map(lambda x: update_tel(x))
#input_table = t_env.from_data_stream(ds).alias("score", "name")
input_table = t_env.from_data_stream(ds)
input_table.print_schema()
t_env.create_temporary_view("MyView", input_table)
t_env.from_path("MyView").print_schema()

输出:
(
  `f0` RAW('[B', '...')
)
"""

参考:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/python/datastream_tutorial/
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/data_stream_api/
https://github.com/apache/flink/tree/release-1.16/flink-python/pyflink/examples/datastream


http://www.kler.cn/a/395143.html

相关文章:

  • 传奇996_21——龙岭事件
  • 微信小程序=》基础=》常见问题=》性能总结
  • AI 扩展开发者思维方式:以 SQL 查询优化为例
  • Qt 获取当前系统中连接的所有USB设备的信息 libudev版
  • 字节跳动核心技术:TT推荐系统从0-1落地应用
  • AI写作(四)预训练语言模型:开启 AI 写作新时代(4/10)
  • 【C++学习(35)】在Linux中基于ucontext实现C++实现协程(Coroutine),基于C++20的co_await 协程的关键字实现协程
  • 机器学习在网络安全中的应用
  • 问:SQL优化,七条实践总结?
  • Rust枚举之卧龙凤雏(Rust Option枚举、Rust Result枚举)(Rust Enum、Some(T)、Ok(T)、Err(E))链式操作
  • TKinter实现与Dash应用的同步启停控制
  • kubernetes简单入门实战
  • 【大语言模型】ACL2024论文-10 CSCD-IME: 纠正拼音输入法产生的拼写错误
  • MathGPT的原理介绍,在中小学数学教学的应用场景,以及代码样例实现
  • Leetcode:3258. 统计满足 K 约束的子字符串数量 I
  • 什么是CRM系统?
  • 华为eNSP:RSTP
  • 【前端】vue 如何完全销毁一个组件
  • JavaScript 面试题
  • 助力网络安全发展,安全态势攻防赛事可视化
  • PostgreSQL 计算两个时间之间的日期差
  • Cyberchef配合Wireshark提取并解析TCP/FTP流量数据包中的文件
  • NeRF在农业领域的应用-------------(1)
  • 深入理解ElasticSearch分词器:详解各种分词器的原理与应用
  • 鸿蒙学习生态应用开发能力全景图-开发者支持平台(5)
  • Spring——原理:IoC