Spark2.x:通过 JDBC 连接数据库(DataFrame)
这里以关系数据库MySQL(MarinDB)为例。下面我们要新建一个测试Spark程序的数据库,数据库名称是“spark”,表的名称是“student”。
请执行下面命令在Linux中启动MySQL数据库,并完成数据库和表的创建,以及样例数据的录入:
MariaDB [(none)]> create database spark;
Query OK, 1 row affected (0.00 sec)
MariaDB [(none)]> use spark;
Database changed
MariaDB [spark]> create table student(id int(4), name char(20), gender char(4), age int(4));
Query OK, 0 rows affected (0.05 sec)
MariaDB [spark]> insert into student values(1,'Xueqian','F',23);
Query OK, 1 row affected (0.01 sec)
MariaDB [spark]> insert into student values(2,'Weiliang','M',24);
Query OK, 1 row affected (0.01 sec)
MariaDB [spark]> select * from student;
+------+----------+--------+------+
| id | name | gender | age |
+------+----------+--------+------+
| 1 | Xueqian | F | 23 |
| 2 | Weiliang | M | 24 |
+------+----------+--------+------+
2 rows in set (0.00 sec)
上面已经创建好了我们所需要的MySQL数据库和表,下面我们编写Spark应用程序连接MySQL数据库并且读写数据。
Spark支持通过JDBC方式连接到其他数据库获取数据生成DataFrame。
下面,我们要启动一个spark-shell,而且启动的时候,要附加一些参数。启动Spark Shell时,必须指定mysql连接驱动jar包。
[songxitang@master ~]$ spark2-shell \
> --jars /home/songxitang/spark/jars/mysql-connector-java-5.1.41-bin.jar \
> --driver-class-path /home/songxitang/spark/jars/mysql-connector-java-5.1.41-bin.jar
上面的命令行中,在一行的末尾加入斜杠\,是为了告诉spark-shell,命令还没有结束。
启动进入spark-shell以后,可以执行以下命令连接数据库,读取数据,并显示:
scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://master:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "123456").load()
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> jdbcDF.show()
+---+--------+------+---+
| id| name|gender|age|
+---+--------+------+---+
| 1| Xueqian| F| 23|
| 2|Weiliang| M| 24|
+---+--------+------+---+
下面我们再来看一下如何往MySQL中写入数据。
现在我们开始在spark-shell中编写程序,往spark.student表中插入两条记录。
下面,我们要启动一个spark-shell,而且启动的时候,要附加一些参数。启动Spark Shell时,必须指定mysql连接驱动jar包(如果你前面已经采用下面方式启动了spark-shell,就不需要重复启动了)
启动进入spark-shell以后,可以执行以下命令连接数据库,写入数据,程序如下(你可以把下面程序一条条拷贝到spark-shell中执行):
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//下面我们设置两条数据表示两个学生信息
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
//下面要设置模式信息
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
//下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "root") //表示用户名是root
prop.put("password", "123456") //表示密码是123456
prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver
//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
studentDF.write.mode("append").jdbc("jdbc:mysql://master:3306/spark", "spark.student", prop)
在spark-shell中执行完上述程序后,我们可以看一下效果,看看MarinDB数据库中的spark.student表发生了什么变化。
MariaDB [spark]> select * from student;
+------+-----------+--------+------+
| id | name | gender | age |
+------+-----------+--------+------+
| 1 | Xueqian | F | 23 |
| 2 | Weiliang | M | 24 |
| 3 | Rongcheng | M | 26 |
| 4 | Guanhua | M | 27 |
+------+-----------+--------+------+
4 rows in set (0.00 sec)