当前位置: 首页 > article >正文

flink kafka sink (scala)

将对象数据通过Gson 转为jsonString,在将数据写到kafka中,这个可以根据需要修改,比如按照\t分开也行,可以节省字段名称的空间。

这里还有一个问题,就是每来一条数据都需要new Gson 对象,有没有办法减少创建呢

我们知道job 和task之间是不能够传输序列化的对象的。

那么如果需要减少Gson的创建,可以自定义map函数,继承并实现RichMapFunction中的方法,其中open就可以只创建一次Gson。

data.map(new Gson().toJson(_))
.addSink(new FlinkKafkaProducer[String]("topicName", new SimpleStringSchema(), props, Optional.ofNullable[FlinkKafkaPartitioner[String]](null)))
.uid("write-to-kafka")
.name("write-to-kafka")

自定义map:

private class DemoMap extends RichMapFunction[Data, String] {
var gson:Gson=_
override def open(parameters: Configuration): Unit = {
gson=new Gson()
}

override def map(value: Data): String = {
gson.toJson(value)
}

override def close(): Unit = {
}
}


http://www.kler.cn/a/320909.html

相关文章:

  • 缓存-Redis-常见问题-缓存击穿-永不过期+逻辑过期(全面 易理解)
  • Python入门教程 —— 网络编程
  • 游戏关卡设计的常用模式
  • RK3562编译Android13 ROOT固件教程,触觉智能开发板演示
  • 【SQL】掌握SQL查询技巧:数据分组与排序
  • 基于YOLO5的机械臂视觉抓取实现
  • 大模型的实践应用30-大模型训练和推理中分布式核心技术的应用
  • layui upload.render 设置文件名
  • GB28181语音对讲协议详解
  • docker - 镜像操作(拉取、查看、删除)
  • Endnote激活码失效
  • Vue3使用hiprint——批次打印条码
  • js判断一个对象里有没有某个属性
  • 细说Flink状态管理
  • 深度学习激活函数
  • DC00015基于java web校园网上购物系统
  • Python图形用户界面设计的15个基础组件
  • SpringCloud Alibaba五大组件之——Sentinel
  • html实现黑白棋
  • 代码随想录算法训练营43期 | Day 21 —— 108.将有序数组转换为二叉搜索树、 538.把二叉搜索树转换为累加树
  • 【Linux】通过内核以太层可查看应用程序运行时访问外网情况
  • SQL - 进阶语法(一)
  • 数据结构——C语言单链表的实现
  • python实现石头,剪刀,布(turtle库简易版)
  • python接口自动化——unittest断言
  • leetcode 2024.9.26