当前位置: 首页 > article >正文

flink kafka sink (scala)

将对象数据通过Gson 转为jsonString,在将数据写到kafka中,这个可以根据需要修改,比如按照\t分开也行,可以节省字段名称的空间。

这里还有一个问题,就是每来一条数据都需要new Gson 对象,有没有办法减少创建呢

我们知道job 和task之间是不能够传输序列化的对象的。

那么如果需要减少Gson的创建,可以自定义map函数,继承并实现RichMapFunction中的方法,其中open就可以只创建一次Gson。

data.map(new Gson().toJson(_))
.addSink(new FlinkKafkaProducer[String]("topicName", new SimpleStringSchema(), props, Optional.ofNullable[FlinkKafkaPartitioner[String]](null)))
.uid("write-to-kafka")
.name("write-to-kafka")

自定义map:

private class DemoMap extends RichMapFunction[Data, String] {
var gson:Gson=_
override def open(parameters: Configuration): Unit = {
gson=new Gson()
}

override def map(value: Data): String = {
gson.toJson(value)
}

override def close(): Unit = {
}
}


http://www.kler.cn/a/320909.html

相关文章:

  • vue 模板语法 ( 插值表达式 | 属性绑定 | 双向数据绑定 | 指令 | 按键修饰符 )
  • GoogleCloud服务器的SSH连接配置
  • vue如何实现组件切换
  • 【大数据学习 | HBASE高级】hbase-phoenix 与二次索引应用
  • T265相机双目鱼眼+imu联合标定(全记录)
  • hrnet人体关键点检测模型适配atlas笔记
  • 大模型的实践应用30-大模型训练和推理中分布式核心技术的应用
  • layui upload.render 设置文件名
  • GB28181语音对讲协议详解
  • docker - 镜像操作(拉取、查看、删除)
  • Endnote激活码失效
  • Vue3使用hiprint——批次打印条码
  • js判断一个对象里有没有某个属性
  • 细说Flink状态管理
  • 深度学习激活函数
  • DC00015基于java web校园网上购物系统
  • Python图形用户界面设计的15个基础组件
  • SpringCloud Alibaba五大组件之——Sentinel
  • html实现黑白棋
  • 代码随想录算法训练营43期 | Day 21 —— 108.将有序数组转换为二叉搜索树、 538.把二叉搜索树转换为累加树
  • 【Linux】通过内核以太层可查看应用程序运行时访问外网情况
  • SQL - 进阶语法(一)
  • 数据结构——C语言单链表的实现
  • python实现石头,剪刀,布(turtle库简易版)
  • python接口自动化——unittest断言
  • leetcode 2024.9.26