当前位置: 首页 > article >正文

Spark sql怎么使用Kafka Avro序列化器

创建Kafka生产者实例:您需要使用org.apache.kafka.clients.producer.KafkaProducer类创建一个Kafka生产者实例。在创建生产者时需要指定使用Kafka Avro序列化器。

import org.apache.kafka.clients.producer.ProducerRecord

import org.apache.kafka.clients.producer.KafkaProducer

import io.confluent.kafka.serializers.KafkaAvroSerializer

import java.util.Properties

 

val props = new Properties()

props.put("bootstrap.servers", "localhost:9092")

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")

props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")

props.put("schema.registry.url", "http://localhost:8081")

 

val producer = new KafkaProducer[String, GenericRecord](props)

 

准备要序列化的数据:您需要将要序列化的数据准备好。例如,您可以使用Spark SQL创建DataFrame:

 

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types._

import org.apache.spark.sql.functions._

 

val spark = SparkSession.builder().appName("KafkaAvroSerializer").master("local[*]").getOrCreate()

 

val schema = StructType(Seq(

  StructField("name", StringType, true),

  StructField("age", IntegerType, true)

))

 

val data = Seq(

  ("Alice", 30),

  ("Bob", 40),

  ("Charlie", 50)

)

 

val df = spark.createDataFrame(data).toDF(schema)

序列化数据并将其发送到Kafka:您可以使用KafkaAvroSerializer序列化数据并将其发送到Kafka主题中。在下面的代码中,我们使用foreach函数遍历DataFrame中的每一行,并将其序列化为Avro格式后发送到Kafka主题中。

 

val topic = "my_topic"

 

df.foreach(row => {

  val key = row.getAs[String]("name")

  val value = new GenericData.Record(valueSchema)

  value.put("name", row.getAs[String]("name"))

  value.put("age", row.getAs[Int]("age"))

  

  val record = new ProducerRecord[String, GenericRecord](topic, key, value)

  producer.send(record)

})

 

 


http://www.kler.cn/a/5814.html

相关文章:

  • python学opencv|读取图像(三十一)缩放图像的三种方法
  • Three.js 用户交互:构建沉浸式3D体验的关键
  • Cursor无限续杯——解决Too many free trials.
  • 洛谷P1617————数字转英文
  • WebRTC 在视频联网平台中的应用:开启实时通信新篇章
  • STM32-笔记34-4G遥控灯
  • 阿里云高级技术专家林立翔:基于阿里云弹性GPU服务的神龙AI加速引擎,无缝提升AI训练性能
  • 每日学术速递4.3
  • React Native开发环境搭建-Windows平台
  • IT知识百科:什么是访问控制列表ACL?
  • @Lookup与@Bean@Scope获取原型实例,谁更胜一筹
  • gcc在Linux下如何运行一个C/C++程序
  • [致敬未来的攻城狮计划 1] 使用 “FSP Configuration”(FSP 配置)透视配置器设置运行环境
  • 2023年详解MySQL 开发规范
  • 指令操作码
  • python-实验报告-3
  • Nacos(Config)配置中心源码分析-02
  • 表和索引的并行度问题
  • 杂记——idea VM设置(idea启动缓慢,JVM部分参数解释,重启Idea)
  • 遗传算法(Genetic Algorithm,GA)
  • 交友项目【基础环境搭建】
  • 压缩器简介与实现
  • 炼钢厂VR职业技能实训软件,提高员工学习效率和掌握技能速度
  • 重置Win10电脑
  • Java中函数形参中`...`的作用
  • IT知识百科:什么是802.11ac(WiFi 5)?