SparkSQL读取本地文件写入MySQL
unit1:LoggerLevel
思路在main代码中,读取的文件要注意格式。
package com.units
import org.apache.log4j.{Level, Logger}
trait LoggerLevel {
Logger.getLogger("org").setLevel(Level.ERROR)
}
unit2:getLocalSparkSession
package com.units
import org.apache.spark.sql.SparkSession
object SparkUnit {
/**
* 一个class参数
**/
def getLocalSparkSession(appName: String): SparkSession = {
SparkSession.builder().appName(appName).master("local[*]").getOrCreate()
}
def getLocalSparkSession(appName: String, support: Boolean): SparkSession = {
if (support) SparkSession.builder().master("loca[*]").appName(appName).enableHiveSupport().getOrCreate()
else getLocalSparkSession(appName)
}
def getLocalSparkSession(appName: String, master: String): SparkSession = {
SparkSession.builder().appName(appName).master(master).getOrCreate()
}
def getLocalSparkSession(appName: String, master: String, support: Boolean): SparkSession = {
if (support) SparkSession.builder().appName(appName).master(master).enableHiveSupport().getOrCreate()
else getLocalSparkSession(appName, master)
}
def stopSpark(ss: SparkSession) = {
if (ss != null) {
ss.stop()
}
}
}
main:Demo09_SparkSQL_MySQL
package com.example
import java.util.Properties
import com.units.{LoggerLevel, SparkUnit}
object Demo09_SparkSQL_MySQL extends LoggerLevel {
def main(args: Array[String]): Unit = {
/**
* 读取两个数据,写入MySQL
*
* 实现:
* 1. 读取两个文件
* 2. 转为table操作对象
* 3. 写SQL查询
* 4. 保存查询SQL的对象
**/
val ss = SparkUnit.getLocalSparkSession("Demo09")
// 读取两个文件
val path = "F:\\下载\\哔哩哔哩下载\\dataset\\"
val student_class = ss.read.csv(path + "student_class.csv").toDF("id", "name", "class")
val student_info = ss.read.csv(path + "student_info.csv").toDF("id", "name", "gender", "age")
// SQL式编程需要转为table结构
student_class.createTempView("student_class")
student_info.createTempView("student_info")
// select
val student = ss.sql(
"""
| SELECT a.`id`,a.`name`,a.`class`,b.`gender`,b.`age`
| FROM `student_class` a,`student_info` b
| WHERE a.`id` = b.`id`
|""".stripMargin)
// JDBC
val url = "jdbc:mysql://localhost:3306/student_db"
val tb = "student"
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "1234567")
properties.setProperty("driverClass", "com.mysql.jdbc.Driver")
// save
student.write.mode("ignore").jdbc(url, tb, properties)
// tips :除了查询SQL返回对象,然后保存对象
// 还可以直接create table as select ... 一步到位
}
}