当前位置: 首页 > article >正文

【大数据学习 | Spark-SQL】SparkSQL读写数据

我们使用sparksql进行编程,编程的过程我们需要创建dataframe对象,这个对象的创建方式我们是先创建RDD然后再转换rdd变成为DataFrame对象。

但是sparksql给大家提供了多种便捷读取数据的方式。

//原始读取数据方式
sc.textFile().toRDD
sqlSc.createDataFrame(rdd,schema)
//更便捷的使用方式
sqlSc.read.text|orc|parquet|jdbc|csv|json
df.write.text|orc|parquet|jdbc|csv|json

write写出存储数据的时候也是文件夹的,而且文件夹不能存在。

  • csv是一个介于文本和excel之间的一种格式,如果是文本打开用逗号分隔的。
  • text文本普通文本,但是这个文本必须只能保存一列内容。

以上两个文本都是只有内容的,没有列的。

  • json是一种字符串结构,本质就是字符串,但是存在kv,例子 {"name":"zhangsan","age":20}

多平台解析方便,带有格式信息。

  • orc格式一个列式存储格式,hive专有的。
  • parquet列式存储,顶级项目

以上都是列式存储问题,优点(1.列式存储,检索效率高,防止冗余查询 2.带有汇总信息,查询特别快 3.带有轻量级索引,可以跳过大部分数据进行检索),他们都是二进制文件,带有格式信息。

jdbc 方式,它是一种协议,只要符合jdbc规范的服务都可以连接,mysql,oracle,hive,sparksql

整体代码:

package com.hainiu.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.{SparkConf, SparkContext}

import java.util.Properties

object TestMovieWithSql {
  def main(args: Array[String]): Unit = {
    //??movie???
    //1.id  middle=name  last=type
    val conf = new SparkConf()
    conf.setAppName("movie")
    conf.setMaster("local[*]")
    conf.set("spark.shuffle.partitions","20")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)

    import sqlSc.implicits._
    //deal data
    val df = sc.textFile("data/movies.txt")
      .flatMap(t => {
        val strs = t.split(",")
        val mid = strs(0)
        val types = strs.reverse.head
        val name = strs.tail.reverse.tail.reverse.mkString(" ")
        types.split("\\|").map((mid, name, _))
      }).toDF("mid", "mname", "type")

    df.limit(1).show()

    val df1 = sc.textFile("data/ratings.txt")
      .map(t=>{
        val strs = t.split(",")
        (strs(0),strs(1),strs(2).toDouble)
      }).toDF("userid","mid","score")
    df1.limit(1).show()

    import org.apache.spark.sql.functions._
    val df11 = df.join(df1, "mid").groupBy("userid", "type")
      .agg(count("userid").as("cnt"))
      .withColumn("rn", row_number().over(Window.partitionBy("userid").orderBy($"cnt".desc)))
      .where("rn = 1")
      .select("userid", "type")

    val df22 = df.join(df1, "mid").groupBy("type", "mname")
      .agg(avg("score").as("avg"))
      .withColumn("rn", row_number().over(Window.partitionBy("type").orderBy($"avg".desc)))
      .where("rn<4")
      .select("type", "mname")

    val df33 = df11.join(df22, "type")

    //spark3.1.2?? spark2.x

//    df33.write.csv()
    df33.write
      .format("csv")
      .save("data/csv")

//    df33.write.
//      csv("data/csv")
//    df33.write.json("data/json")

//    df33.write.parquet("data/parquet")
//    df33.write.orc("data/orc")
//    val pro = new Properties()
//    pro.put("user","root")
//    pro.put("password","hainiu")
//    df33.write.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro)
  }
}

为了简化存储的计算方式:

package com.hainiu.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object TestSink {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test sink")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
    import sqlSc.implicits._
    import org.apache.spark.sql.functions._
    val df = sc.textFile("data/a.txt")
      .map(t=>{
        val strs = t.split(" ")
        (strs(0),strs(1),strs(2),strs(3))
      }).toDF("id","name","age","gender")
      .withColumn("all",concat_ws(" ",$"id",$"name",$"age",$"gender"))
      .select("all")
//    df.write.csv("data/csv")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2")
//      .save("data/csv")
//    df.write.parquet("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2")
//      .save("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2")
//      .save("data/json")
    df.write.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2")
      .save("data/text")
  }
}

读取数据代码:

package com.hainiu.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

import java.util.Properties

object TestReadData {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("movie")
    conf.setMaster("local[*]")
    conf.set("spark.shuffle.partitions", "20")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
//    sqlSc.read.text("data/text").show()
//    sqlSc.read.csv("data/csv").show()
//  
//    sqlSc.read.parquet("data/parquet").show()
//    sqlSc.read.json("data/json").show()

    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2").load("data/text").show()
    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2").load("data/csv").show()
    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2").load("data/json").show()
    sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2").load("data/parquet").show()

    sqlSc.read.orc("data/orc").show()
    val pro = new Properties()
    pro.put("user","root")
    pro.put("password","hainiu")
    sqlSc.read.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro).show()
  }
}

http://www.kler.cn/a/412380.html

相关文章:

  • 腾讯云OCR车牌识别实践:从图片上传到车牌识别
  • Selenium 自动化测试demo
  • 手搓人工智能—聚类分析(下)谱系聚类与K-mean聚类
  • ubuntu中使用ffmpeg和nginx推流rtmp视频
  • 论文阅读:A fast, scalable and versatile tool for analysis of single-cell omics data
  • 网络安全学习77天(记录)
  • STM32 使用ARM Compiler V6 编译裸机 LWIP协议栈报错的解决方法
  • 【pyspark学习从入门到精通21】机器学习库_4
  • 解决`-bash: ./configure:/bin/sh^M:解释器错误: 没有那个文件或目录`的问题
  • 项目学习:仿b站的视频网站项目03-注册功能
  • 沃丰科技出海客服系统:打造全球化客户服务新标杆
  • 日志打印规范
  • AVL、B树和B+树
  • 学习笔记039——SpringBoot整合Redis
  • width设置100vh但出现横向滚动条的问题
  • 速度革命:esbuild如何改变前端构建游戏 (1)
  • 多模态大模型打造沉浸式社交体验,Soul App创始人张璐团队海外首秀GITEX GLOBAL
  • 使用OpenCV实现图像拼接
  • 【C++第三方库】Muduo库结合ProtoBuf库搭建服务端和客户端的过程和源码
  • 【JavaEE初阶 — 网络编程】Socket 套接字 & UDP数据报套接字编程
  • Linux 虚拟机下安装RedisJSON
  • 【Pytorch框架】无中生有,从0到1使用Dataset类处理MNIST数据集
  • 多线程1:基础概念、接口介绍、锁
  • 通俗理解人工智能、机器学习和深度学习的关系
  • 【carla生成车辆时遇到的问题】carla显示的坐标和carlaworld中提取的坐标y值相反
  • 前后端中Json数据的简单处理