Spark 之 解析json的复杂和嵌套数据结构

本文主要使用以下几种方法:

1,get_json_object():从一个json 字符串中根据指定的json 路径抽取一个json 对象

2,from_json():从一个json 字符串中按照指定的schema格式抽取出来作为DataFrame的列

3,to_json():将获取的数据转化为json格式

4,explode():炸裂成多行

5,selectExpr():将列转化为一个JSON对象的另一种方式


文件名是 mystudent.txt   具体内容如下,只有一条数据

1|{"dept":{"describe":"主要负责教学","name":"学术部"},"email":"zhangsan@edu.cn","id":79,"name":"zhangsan","stus":[{"grade":"三年级","id":12,"name":"xuesheng1","school":{"address":"南京","leader":"王总","name":"南京大学"}},{"grade":"三年级","id":3,"name":"xuesheng2","school":{"address":"南京","leader":"王总","name":"南京大学"}},{"grade":"三年级","id":1214,"name":"xuesheng3","school":{"address":"南京","leader":"王总","name":"南京大学"}}],"tel":"1585050XXXX"}

 大概是这样的结构:

  

 第一步:导入文件并分割成二元组转换成两列

val optionRDD: RDD[String] = sc.textFile("in/mystudent.txt")
optionRDD.foreach(println)

//分割,注意  |  用的是单引号
val option1: RDD[(String, String)] = optionRDD.map(x => {
      val arr = x.split('|');
      (arr(0), arr(1))
    })
option1.foreach(println)

//转化成两列
val jsonStrDF: DataFrame = option1.toDF("aid", "value")
        jsonStrDF.printSchema()
        jsonStrDF.show(false)

 第二步:按照几个大类先拆分

 val jsonObj: DataFrame = jsonStrDF.select(
      $"aid"
      , get_json_object($"value", "$.dept").as("dept")
      , get_json_object($"value", "$.email").as("email")
      , get_json_object($"value", "$.id").as("tid")
      , get_json_object($"value", "$.name").as("tname")
      , get_json_object($"value", "$.stus").as("stus")
      , get_json_object($"value", "$.tel").as("tel")
    )
    println("--------------------------1--------------------------")
    jsonObj.printSchema()
    jsonObj.show(false)

 第三步:把dept这个部分再分

val jsonObj2: DataFrame = jsonObj.select($"aid", $"email"
      , $"tid", $"tname"
      , get_json_object($"dept", "$.describe").as("describe")
      , get_json_object($"dept", "$.name").as("dname")
      , $"stus", $"tel"
    )
    println("--------------------------2--------------------------")
        jsonObj2.printSchema()
        jsonObj2.show(false)

 第四步:把stus这部分合并成数组

val fileds: List[StructField] =
      StructField("grade", StringType) ::
        StructField("id", StringType) ::
        StructField("name", StringType) ::
        StructField("school", StringType) :: Nil
val jsonObj3: DataFrame = jsonObj2.select(
      $"aid", $"describe", $"dname", $"email", $"tid", $"tname"
      , from_json($"stus", ArrayType(
        StructType(
          fileds
        )
      )
      ).as("events")
    )
    println("--------------------------3--------------------------")
    jsonObj3.printSchema()
    jsonObj3.show(false)

 第五步:explode炸裂stus 部分,分成三部分;并新增列,删除原数组数据

//炸裂
val jsonObj4: DataFrame = jsonObj3.withColumn("events", explode($"events"))
    println("--------------------------4--------------------------")
    jsonObj4.printSchema()
    jsonObj4.show(false)

//新增列,删除原数据
val jsonObj5: DataFrame = jsonObj4.withColumn("grade", $"events.grade")
      .withColumn("id", $"events.id")
      .withColumn("name", $"events.name")
      .withColumn("school", $"events.school")
      .drop("events")
    println("--------------------------5--------------------------")
    jsonObj5.printSchema()
    jsonObj5.show(false)

 第六步:分开school部分,并合并全表

val jsonObj6: DataFrame = jsonObj5.select($"aid", $"describe"
      , $"dname", $"email",$"tid",$"tname",$"grade",$"id",$"name",
      get_json_object($"school","$.address").as("address")
      ,get_json_object($"school","$.leader").as("leader")
    ,get_json_object($"school","$.name").as("schoolname"))
    println("--------------------------6--------------------------")
    jsonObj6.printSchema()
    jsonObj6.show(false)

 总结,全文代码如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}

object JsonMyStu {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("jsonstu3opdemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._

    val optionRDD: RDD[String] = sc.textFile("in/mystudent.txt")
    optionRDD.foreach(println)


//按照 | 分割成两列
    val option1: RDD[(String, String)] = optionRDD.map(x => {
      val arr = x.split('|');
      (arr(0), arr(1))
    })
    option1.foreach(println)

    val jsonStrDF: DataFrame = option1.toDF("aid", "value")
        jsonStrDF.printSchema()
        jsonStrDF.show(false)

    val jsonObj: DataFrame = jsonStrDF.select(
      $"aid"
      , get_json_object($"value", "$.dept").as("dept")
      , get_json_object($"value", "$.email").as("email")
      , get_json_object($"value", "$.id").as("tid")
      , get_json_object($"value", "$.name").as("tname")
      , get_json_object($"value", "$.stus").as("stus")
      , get_json_object($"value", "$.tel").as("tel")
    )
    println("--------------------------1--------------------------")
    jsonObj.printSchema()
    jsonObj.show(false)

    val jsonObj2: DataFrame = jsonObj.select($"aid", $"email"
      , $"tid", $"tname"
      , get_json_object($"dept", "$.describe").as("describe")
      , get_json_object($"dept", "$.name").as("dname")
      , $"stus", $"tel"
    )
    println("--------------------------2--------------------------")
        jsonObj2.printSchema()
        jsonObj2.show(false)

    val fileds: List[StructField] =
      StructField("grade", StringType) ::
        StructField("id", StringType) ::
        StructField("name", StringType) ::
        StructField("school", StringType) :: Nil
    val jsonObj3: DataFrame = jsonObj2.select(
      $"aid", $"describe", $"dname", $"email", $"tid", $"tname"
      , from_json($"stus", ArrayType(
        StructType(
          fileds
        )
      )
      ).as("events")
    )
    println("--------------------------3--------------------------")
    jsonObj3.printSchema()
    jsonObj3.show(false)

    val jsonObj4: DataFrame = jsonObj3.withColumn("events", explode($"events"))
    println("--------------------------4--------------------------")
    jsonObj4.printSchema()
    jsonObj4.show(false)

    val jsonObj5: DataFrame = jsonObj4.withColumn("grade", $"events.grade")
      .withColumn("id", $"events.id")
      .withColumn("name", $"events.name")
      .withColumn("school", $"events.school")
      .drop("events")
    println("--------------------------5--------------------------")
    jsonObj5.printSchema()
    jsonObj5.show(false)

    val jsonObj6: DataFrame = jsonObj5.select($"aid", $"describe"
      , $"dname", $"email",$"tid",$"tname",$"grade",$"id",$"name",
      get_json_object($"school","$.address").as("address")
      ,get_json_object($"school","$.leader").as("leader")
    ,get_json_object($"school","$.name").as("schoolname"))
    println("--------------------------6--------------------------")
    jsonObj6.printSchema()
    jsonObj6.show(false)

  }
}

拓展:

//如果分割符是  ,  则用以下方法,indexOf返回第一个此元素的下标值
    /*val optinRDD: RDD[String] = sc.textFile("in/mystudent.txt")
    optinRDD.foreach(println)
    val frame: RDD[(String, String)] = optinRDD.map(
      x => {
        //返回第一个,所在的位置
        val i: Int = x.indexOf(",")//1
        
        //开始截取
        //(0,i)--->(0,1)
        //(i+1) 2 从下标元素开始到末尾
        val tuple: (String, String) = (x.substring(0, i), x.substring(i + 1))
        tuple
      }
    )*/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/8170.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

身临其境数字世界:探索VR全景元宇宙展厅

随着科技的不断发展,虚拟现实技术已经成为我们生活中的一部分。VR全景元宇宙展厅作为其中的一种形式,正越来越受欢迎。在这里,您可以探索未知的世界,体验全新的视觉和感官体验。 一、VR全景元宇宙展厅的概述 VR全景元宇宙展厅是一…

前端学习:HTML链接

目录 一、HTML超链接(链接) 二、HTML链接语法 三、target属性 target属性值展示 四、name属性 五、补充 关于创建电子邮件链接时如何发送邮件内容 在进行抄送时,需要使用关键字:cc 在进行密送时,需要使用关键字&a…

Linux小黑板(14):基于环形队列的生成消费者模型

"多少人都,生来纯洁完美,心底从不染漆黑。" 我们先来瞅瞅我们之前基于阻塞队列的生产消费者模型代码。 void Push(const T& in){// 生产任务pthread_mutex_lock(&_mutex);while(is_full()){pthread_cond_wait(&_pcond,&_mute…

4款【新概念APP】对比+免费下载

4款【新概念APP】对比免费下载4款【新概念APP】对比免费下载新概念英语咖(体积小、无广告、全免费、不能倍速播放)新概念英语全册(免费,但强制广告,否则不能播放音频。可以倍速)新概念英语全四册&#xff0…

【开发工程师的运维小知识】docker安装gitlab

文章目录1 搜索gitlab的镜像2 拉取gitlab镜像3 创建挂载目录4 创建gitlab容器并启动5 查看是否启动成功6 修改配置文件7 重启gitlab8 获取root初始化密码9 修改root初始密码(可选)进入docker-gitlab容器内部打开控制台查找第一个User(这个就是…

【SQL Server】数据库开发指南(一)数据库设计

文章目录一、数据库设计的必要性二、什么是数据库设计三、数据库设计的重要性五、数据模型5.1 实体-关系(E-R)数据模型5.2 实体(Entity)5.3 属性(Attribute)5.5 关系(Relationship)六…

生成式人工智能所面临的问题有哪些?

在生成式人工智能中工作需要混合技术、创造性和协作技能。通过发展这些技能,您将能够在这个令人兴奋且快速发展的领域应对具有挑战性的问题。 生成式人工智能是指一类机器学习技术,旨在生成与训练数据相似但不完全相同的新数据。 换句话说,…

苹果6信号不好的快速解决方法

许多朋友反馈,苹果6的信号不佳,建议从以下方面查找: 方法一:开启飞行模式后再关闭 有时候手机由于周围环境网络比较差,会导致信号处于无服务状态,这时后我们开启飞行模式后再关闭飞行模式,系统就…

【多线程与高并发(锁)】1、锁的概念、分类和状态

1、锁的概念 java当中的锁、是在多线程环境下为保证共享资源健康、线程安全的一种手段。 线程操作某个共享资源之前,先对资源加一层锁,保证操作期间没有其他线程访问资源,当操作完成后,再释放锁。 2、锁的分类 Java中的锁按照…

Obsidian:实现日记记录【设计并使用模板】

问题背景 我是一个比较喜欢记录的人,有一定的写日记的习惯的,但是我又不太喜欢将自己的个人的数据寄人篱下,放在别人的数据库中。 于是就想着将自己的日记存放在自己本地的磁盘中…… 在一次偶然在B站中翻找资料时,我发现了这个…

Linux-Shell设计

一、shell 总论 ​ shell 就是“壳程序”,这个名字是针对 kernel 来说的,也就是在操作系统外围的程序(严格的讲,已经不是操作系统了)。宏观上的 shell 是所有的应用程序,而狭义上的 shell,指的…

STM32CubeMXA安装和创建项目

STM32CubeMXA安装和创建项目 安装STM32CubeMXA STM32CubeMX 运行环境搭建包含两个部分。首先是 Java 运行环境安装,其次是 STM32CubeMX 软件安装。 安装 JAVA 环境 对于 Java 运行环境,大家可以到 Java 官网 www.java.com 下载最新的 Java 软件 安装…

CSS 扫盲

✏️作者:银河罐头 📋系列专栏:JavaEE 🌲“种一棵树最好的时间是十年前,其次是现在” 目录引入方式内部样式内联样式外部样式CSS 选择器CSS 常用属性值字体属性设置字体大小粗细文字样式文本属性文本颜色文本对齐文本装…

使用Jmeter进行http接口测试

前言: 本文主要针对http接口进行测试,使用Jmeter工具实现。 Jmter工具设计之初是用于做性能测试的,它在实现对各种接口的调用方面已经做的比较成熟,因此,本次直接使用Jmeter工具来完成对Http接口的测试。 一、开发接口…

【Unity项目实战】从零手戳一个背包系统

首先我们下载我们的人物和背景资源,因为主要是背包系统,所以人物的移动和场景的搭建这里我们就不多讲了,我这里直接提供基础项目源码给大家去使用就行 基础项目下载地址: 链接: https://pan.baidu.com/s/1o7_RW_QQ1rrAbDzT69ApRw 提取码: 8s95 顺带说一下,这里用到了uni…

uniCloud开发api接口服务

首先创建一个云对象: 在创建的云对象的index.Obj.js中进行编码: const db uniCloud.database() module.exports {_before: function () { // 通用预处理器},async get(){//demo-user 是云数据中的一个表名let res await db.collection("demo-us…

最易学和最难学编程语言排行榜!

如果问一个程序员最容易学习的语言,就像问一个人他们最喜欢的冰淇淋。每个人都有自己的偏好,永远没有真正的正确答案。 正如开发者和教育家 Marek Zaluski 曾经说的那样,"编程语言是由程序员创造的,为程序员服务"。这几…

Hashtable是什么?它和Hashmap有什么区别?

博主简介:努力的打工人一枚博主主页:xyk:所属专栏: JavaEE初阶目录 一、什么是Hashtable? 二、Hashtable特点 2.1 Hashtable是怎么加锁的? 2.2Hashtable为什么不允许键值为null? 2.3Hashtable为什么线程安全&…

电动汽车热管理方案

热管理技术作为汽车节能、提高经济性和保障安全性的重要措施,在汽车研发过程中具有重要作用。传统燃油汽车的热管理系统主要包括发动机、变速器散热系统和汽车空调,而电动汽车的热管理系统在燃油汽车热管理架构的基础之上,又增加了电机电控热…

Docker实现MySQL8主从读写分离【超简洁】

1、首先拉取镜像 docker pull mysql 2、创建主库容器 docker run -p 3388:3306 --name master -e MYSQL_ROOT_PASSWORD123456 -d mysql --server-id1 --log-binbin-log --binlog-do-dbznzm-dlaq 说明: docker run 表示创建并运行容器-p 3388:3306 把宿主机的…
最新文章