大数据技术之SparkSQL——数据的读取和保存
一、通用的加载和保存方式
SparkSQL提供了通用的保存数据和数据加载的方式。根据不同的参数读取,并保存不同格式的数据。SparkSQL默认读取和保存的文件格式为Parquet。
1.1 加载数据
spark.read.load 是加载数据的通用方式。
如果读取不同格式的数据,可以对不同的数据格式进行设定,如:
spark.read.format("json").load("data/user.json")
// 可以简化为如下:
spark.read.json("data/user.json")
1.2 保存数据
spark.write.save 是保存数据的通用方式。
如果保存不同格式的数据,可以对不同的数据格式进行设定,如:
df.write.format("json").save("data/user.json")
df.write.format("orc").saveAsTable("dws_events.DF_user_friend_count")
// 可以简化为如下:
spark.write.json("data/user.json")
保存操作可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设置。SaveMode是一个枚举类,其中常量包括:
Scala/ Java | Any Language | Meaning |
---|---|---|
SaveMode.ErrorIfExists(default) | "error"(default) | 如果文件已经存在则抛出异常 |
SaveMode.Append | "append" | 如果文件已经存在则追加 |
SaveMode.Overwrite | "overwrite" | 如果文件已经存在则覆盖 |
SaveMode.Ignore | "ignore" | 如果文件已经存在则忽略 |
如:
df.write.mode("append").json("data/user.json")
二、Parquet
SparkSQL的默认数据源为Parquet格式。Parquet是一种能够有效存储嵌套数据的列式存储格式。
数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可以修改默认数据源格式。
三、JSON
SparkSQL能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]。可以通过SparkSession.read.json()去加载JSON文件。
1)导入隐式转换
import spark.implicits._
2)加载JSON文件
val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path)
3)创建临时表
peopleDF.createOrReplaceTempView("people")
4)数据查询
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 and 19")
四、CSV
SparkSQL可以配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列。
spark.read.format("csv")
.option("sep",";")
.option("inferSchema", "true")
.option("header", "true")
.load("data/user.csv")
五、MySQL
SparkSQL可以通过JDBC从关系型数据库中读取数据的方式来创建DataFrame。通过对DataFrame进行一系列的计算后,再将数据写回到关系型数据库中。如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径,或将相关的数据库驱动放到spark的类路径下。
spark-shell
bin/spark-shell
--jars mysql-connector-java-8.0.30.jar
IDEA中通过JDBC对MySQL进行操作
1)导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
2)读取数据
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
// 创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
// 方式1:通过load方式读取
val df = spark.read.format("jdbc")
.option("url", "jdbc:mysql://192.168.136.20:3306/spark-sql") // 数据库名字
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "xsqone")
.option("pasword", "root")
.option("dbtable", "user") // 表名
.load()
df.show
// 方式2:通用的load方法读取(参数另一种形式)
spark.read.format("jdbc")
.options(
Map(
"url"->"jdbc:mysql://linux1:3306/spark-sql?user=root&password=123123",
"dbtable"->"user",
"driver"->"com.mysql.jdbc.Driver"))
.load().show
// 方式3:使用JDBC读取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123123")
val df: DataFrame = spark.read.jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)
df.show
// 释放资源
spark.stop()
3)写入数据
//方式1:通用的方式 format 指定写出类型
val df = spark.write.format("jdbc")
.option("url", "jdbc:mysql://192.168.136.20:3306/spark-sql") // 数据库名字
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "xsqone")
.option("pasword", "root")
.option("dbtable", "user") // 表名
.mode(SaveMode.Append)
.save()
// 方式2:通过JDBC方法
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123123")
ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)
六、Hive
Hive是Hadoop上的SQL引擎,SparkSQL编译时可以包含Hive支持,也可以不包含。
1)内嵌的HIVE(使用较少)
如果使用spark内嵌的hive,那么可以直接使用。hive的元数据存储在derby中,默认仓库地址:$SPARK_HOME/spark-warehouse
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
scala> spark.sql("create table aa(id int)")
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| aa| false|
+--------+---------+-----------+
向表中加载本地数据
scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")
scala> spark.sql("select * from aa").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+
2)外部的HIVE
想要连接外部已经部署好的HIVE,需要下面几个步骤:
1、spark 要接管hive需要把 hive-site.xml 拷贝到 conf/ 目录下
2、把 MySQL 的驱动拷贝到 jars/ 目录下
3、如果访问不到 hdfs ,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/ 目录下
4、重启 spark-shell
scala> spark.sql("show tables").show
20/04/25 22:05:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+--------+--------------------+-----------+
|database| tableName|isTemporary|
+--------+--------------------+-----------+
| default| emp| false|
| default|hive_hbase_emp_table| false|
| default| relevance_hbase_emp| false|
| default| staff_hive| false|
| default| ttt| false|
| default| user_visit_action| false|
+--------+--------------------+-----------+
3)运行SparkSQL CLI
SparkSQL CLI可以很方便的在本地运行HIVE元数据服务以及从命令行执行查询任务。在spark目录下执行如下命令启动SparkSQL CLI,直接执行SQL语句。类似hive窗口。
bin/spark-sql
4)运行Spark beeline
Spark Thrift Server 是 spark 基于 HiveServer2 实现的一个 Thrift 服务。旨在无缝兼容 HiveServer2 。由于 Spark Thrift Server 的接口和协议都与 HiveServer2 完全一致,因此部署好 Spark Thrift Server 后,可以直接使用 hive 的 beeline 访问 spark Thrift Serve r执行相关语句。
如果想连接 Thrift Server ,需要通过以下几个步骤:
1、spark 要接管 hive 需要把 hive-site 拷贝到 conf/ 目录下
2、把 MySQL 的驱动拷贝到 jars/ 目录下
3、如果访问不到 hdfs ,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/ 目录下
4、启动Thrift Server
sbin/start-thriftserver.sh
5、使用 beeline 连接 Thrift Server
bin/beeline -u jdbc:hive2://linux1:10000 -n root
5)代码操作HIVE:enableHiveSupport()
1> 导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
2> 将hive-site.xml文件拷贝到项目的resource目录中
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!--配置数据库连接-->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive2?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<!--配置数据库连接驱动-->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<!--配置数据库连接用户名-->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>admin</value>
<description>username to use against metastore database</description>
</property>
<!--配置数据库连接密码-->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>admin</value>
<description>password to use against metastore database</description>
</property>
<!--配置使用hive查询数据时,显示所查询字段的头信息-->
<property>
<name>hive.cli.print.header</name>
<value>true</value>
<description>Whether to print the names of the columns in query output.</description>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
<description>Whether to include the current database in the Hive prompt.</description>
</property>
</configuration>
3> 启用hive支持
//创建 SparkSession
val spark: SparkSession = SparkSession
.builder()
// 添加对应主机IP
.config("hive.metastore.uris", "thrift://192.168.153.139:9083")
.enableHiveSupport()
.master("local[*]")
.appName("sql")
.getOrCreate()
4> 增加对应的依赖关系(包含MySQL驱动)
spark.sql("show tables").show