当前位置: 首页 > article >正文

如何使用SparkSql

一、SparkSql的前世今生

Hive->Shark->Spark SQL

二、SparkSql依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.1.2</version>

</dependency>

三、SparkSql DataFrame

DataFrame,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。

四、创建SQLContext

val sqlContext = SparkSession.builder().appName("RDD2DataFrameReflection").master("local").getOrCreate;

五、SparkSql创建DataFrame

val properties = new Properties();

properties.setProperty("user", "root");

properties.setProperty("password", "root");

val testDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/test", "user", properties);

val testDF =sqlContext.read

.format("jdbc")

.option("url", "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8")

.option("user", "root")

.option("password", "root")

.option("dbtable","user")

.option("dbtable","(select id from user where id < 50) as t") // sql语句必须是子查询

.load()

六、SparkSql DataFrame的操作

// 查询

testDF.show()

testDF.printSchema()

testDF.select("username").show()

testDF.select(testDF("username"), testDF("id") + 1).show()

testDF.filter(testDF("id") % 2 === 0).show()

testDF.groupBy("sex").count().show

七、RDD转换为DataFrame

1、使用反射来推断包含了特定数据类型的RDD的元数据

case class Student(id: Int, name: String, age: Int) // 定义模式类,属性名对应数据表的字段名

// RDD.toDF()转换为DataFrame

val studentDF = sc.textFile("data/students.txt", 1)

.map { line => line.split(",") }

.map { arr => Student(arr(0).trim().toInt, arr(1).trim(), arr(2).trim().toInt) }

.toDF()

//.map { arr => (arr(0).trim().toInt, arr(1).trim(), arr(2).trim().toInt) }

//.toDF("id", "name", "age")

val properties = new Properties()

properties.setProperty("user", "root")

properties.setProperty("password", "root")

studentDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8","student",properties);

2、编程方式使用Row将RDD转换为DataFrame

// 第一步,构造Row数据

val studentRDD = sc.textFile("data/students.txt", 1)

.map { line => Row(line.split(",")(0).toInt, line.split(",")(1), line.split(",")(2).toInt) }

// 第二步,构造元数据

val structType = StructType(Array(

StructField("id", IntegerType, true),

StructField("name", StringType, true),

StructField("age", IntegerType, true)))

// 第三步,转换RDD为DataFrame

val studentDF = sqlContext.createDataFrame(studentRDD, structType)

八、读入每个数据源,生成一个临时视图,通过一个sql去操作这些视图

val studentScoreDF = sqlContext.read.json("data/student_score.json");

studentScoreDF.createOrReplaceTempView("student_score");

val studentInfoDF = sqlContext.read.json("data/student_info.json")

studentInfoDF.createOrReplaceTempView("student_info")

val goodStudentInfoDF = sqlContext.sql("select s.name, s.score, i.gender from student_score s, student_info i where s.score>80 and s.name=i.name");

goodStudentInfoDF.rdd.collect().foreach(row => println(row(0) + " " + row(1) + " " + row(2)))

goodStudentInfoDF.write.json("data/result/goodstudents")

九、SparkSql UDF用户自定义函数

UDAF:User Defined Aggregate Function。用户自定义聚合函数


http://www.kler.cn/a/468598.html

相关文章:

  • STM32-笔记30-编程实现esp8266联网功能
  • 西门子200smart存储卡作用
  • Vue笔记-001-声明式渲染
  • HarmonyOS-面试资料
  • 计算机网络 (28)虚拟专用网VPN
  • GitHub 基础使用指南
  • GESP202406 二级【计数】题解(AC)
  • html生成注册与登录代码
  • 使用LINUX的dd命令制作自己的img镜像
  • 【CSS】第一天 基础选择器与文字控制属性
  • 实时数仓:基于数据湖的实时数仓与数据治理架构
  • 【人工智能】基于Python与OpenCV构建简单车道检测算法:自动驾驶技术的入门与实践
  • [读书日志]从零开始学习Chisel 第四篇:Scala面向对象编程——操作符即方法(敏捷硬件开发语言Chisel与数字系统设计)
  • 【开源监控工具】Uptime Kuma:几分钟设置实时监控你的网站性能
  • 计算机网络掩码、最小地址、最大地址计算、IP地址个数
  • Android学习20 -- NDK5--操作camera(TODO)
  • 【能用的方案】springBoot集成netty中如何使用@Value(通过依赖注入(DI)来访问)配置文件中的属性值
  • MaxKB知识库问答系统v1.9版本有哪些具体的改进?
  • 【网络安全 | 漏洞挖掘】通过模拟功能实现提权(Bugcrowd)
  • ESP32学习--SPIFFS文件系统
  • gaussdb中怎么查询一个表有多少GB
  • Spring MVC实战指南:构建高效Web应用的架构与技巧(二)
  • JMeter线程组Duration和循环次数设置冲突后,Duration优先级高
  • 代码随想录 day55 第十一章 图论part05
  • 数据结构之双链表(超详解)
  • 【intro】BLEU