当前位置: 首页 > article >正文

Spark2.x:通过 JDBC 连接数据库(DataFrame)

这里以关系数据库MySQL(MarinDB)为例。下面我们要新建一个测试Spark程序的数据库,数据库名称是“spark”,表的名称是“student”。

请执行下面命令在Linux中启动MySQL数据库,并完成数据库和表的创建,以及样例数据的录入:

MariaDB [(none)]> create database spark;
Query OK, 1 row affected (0.00 sec)

MariaDB [(none)]> use spark;
Database changed

MariaDB [spark]> create table student(id int(4), name char(20), gender char(4), age int(4));
Query OK, 0 rows affected (0.05 sec)

MariaDB [spark]> insert into student values(1,'Xueqian','F',23);
Query OK, 1 row affected (0.01 sec)

MariaDB [spark]> insert into student values(2,'Weiliang','M',24);
Query OK, 1 row affected (0.01 sec)

MariaDB [spark]>  select * from student;
+------+----------+--------+------+
| id   | name     | gender | age  |
+------+----------+--------+------+
|    1 | Xueqian  | F      |   23 |
|    2 | Weiliang | M      |   24 |
+------+----------+--------+------+
2 rows in set (0.00 sec)

上面已经创建好了我们所需要的MySQL数据库和表,下面我们编写Spark应用程序连接MySQL数据库并且读写数据。

Spark支持通过JDBC方式连接到其他数据库获取数据生成DataFrame。

下面,我们要启动一个spark-shell,而且启动的时候,要附加一些参数。启动Spark Shell时,必须指定mysql连接驱动jar包。

[songxitang@master ~]$ spark2-shell \
> --jars /home/songxitang/spark/jars/mysql-connector-java-5.1.41-bin.jar \
> --driver-class-path /home/songxitang/spark/jars/mysql-connector-java-5.1.41-bin.jar

上面的命令行中,在一行的末尾加入斜杠\,是为了告诉spark-shell,命令还没有结束。

启动进入spark-shell以后,可以执行以下命令连接数据库,读取数据,并显示:

scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://master:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "123456").load()
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> jdbcDF.show()
+---+--------+------+---+
| id|    name|gender|age|
+---+--------+------+---+
|  1| Xueqian|     F| 23|
|  2|Weiliang|     M| 24|
+---+--------+------+---+

下面我们再来看一下如何往MySQL中写入数据。

现在我们开始在spark-shell中编写程序,往spark.student表中插入两条记录。
下面,我们要启动一个spark-shell,而且启动的时候,要附加一些参数。启动Spark Shell时,必须指定mysql连接驱动jar包(如果你前面已经采用下面方式启动了spark-shell,就不需要重复启动了)

启动进入spark-shell以后,可以执行以下命令连接数据库,写入数据,程序如下(你可以把下面程序一条条拷贝到spark-shell中执行):

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
 
//下面我们设置两条数据表示两个学生信息
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
 
//下面要设置模式信息
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
 
//下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
 
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
 
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "root") //表示用户名是root
prop.put("password", "123456") //表示密码是123456
prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver
 
//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
studentDF.write.mode("append").jdbc("jdbc:mysql://master:3306/spark", "spark.student", prop)

在spark-shell中执行完上述程序后,我们可以看一下效果,看看MarinDB数据库中的spark.student表发生了什么变化。

MariaDB [spark]> select * from student;
+------+-----------+--------+------+
| id   | name      | gender | age  |
+------+-----------+--------+------+
|    1 | Xueqian   | F      |   23 |
|    2 | Weiliang  | M      |   24 |
|    3 | Rongcheng | M      |   26 |
|    4 | Guanhua   | M      |   27 |
+------+-----------+--------+------+
4 rows in set (0.00 sec)


http://www.kler.cn/a/287295.html

相关文章:

  • 每日OJ题_牛客_天使果冻_递推_C++_Java
  • Python | Leetcode Python题解之第564题数组嵌套
  • 找不到vcruntime140.dll怎么办,彻底解决vcruntime140.dll丢失的5种方法
  • Kotlin return与return@forEachIndexed
  • Python中的正则表达式教程
  • RDIFramework.NET CS敏捷开发框架 V6.1发布(.NET6+、Framework双引擎、全网唯一)
  • 如何使用python抓包,附代码
  • Avalonia 播放 VLC 视频(Windows / Linux)
  • 【HTTP、Web常用协议等等】前端八股文面试题
  • C# 编译程序引用C++DLL托管动态链接库实例
  • 用Python实现时间序列模型实战——Day 8: 季节性ARIMA模型 (SARIMA)
  • 分页查询--条件查询
  • STM32 ADC采样详解
  • verilog bug记录-修改信号线频率
  • zookeeper分部式锁
  • ES6----练习题
  • 如何在S7-200 SMART CPU断电后保持高速计数器的当前值
  • Unity学习路线
  • 打造一流的研发型企业--- 金发科技研发驱动力初探
  • MATLAB学习笔记3
  • 使用 OpenSSL 进行 RSA 密钥生成与加解密操作(命令行方式)
  • 用MATLAB 画一个64QAM的星座图
  • gitlab使用
  • easyPOI生成的excel添加水印
  • Spark MLlib模型训练—分类算法Multinomial Logistic Regression
  • 【生活英语】2、喜欢与讨厌