Spark实操学习
Spark学习
- 一、Spark-Shell编程
- 1. 配置python3(三台服务器都要配置)
- 2. 开始Spark编程
- 3. spark-shell工具
- 二、Java项目测试
- 1. 新建项目
- 2. Spark-java代码测试
- 三、Scala项目测试
- 1. 安装scala
- 2. 安装包管理器sbt
- 3. 在编译工具中安装scala工具
- 4. 新建项目
- 5. spark-scala代码测试
- 四、Python项目测试
- 1.新建项目
- 2. Spark-python代码测试
- 五、代码打包提交
- 1. Scala代码打包提交
- 2. Java代码打包提交
- 3. Python代码打包提交
- 六、提交Spark集群并运行
- 1. Scala代码提交到Spark集群运行
- 2. Java代码提交到Spark集群运行
- 3. Python代码提交到Spark集群运行
- 4. 后续学习
- 参考资料
前面已经完成了 Spark集群搭建,接下来开始对spark进行一个简单的学习,主要参考b站相关视频;因为Spark支持Python、Scala、Java这三种语言,所有下面主要以针对这些进行操作
一、Spark-Shell编程
1. 配置python3(三台服务器都要配置)
先安装工具依赖
yum install -y git gcc zlib-devel bzip2 bzip2-devel readline-devel sqlite sqlite-devel openssl-devel xz xz-devel libffi-devel
克隆python管理工具pyenv
git clone https://github.com/pyenv/pyenv.git ~/.pyenv
配置环境变量
echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc
echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc
echo 'eval "$(pyenv init -)"' >> ~/.bashrc
然后source一下
source ~/.bashrc
安装python指定版本
pyenv install 3.7.7
安装可能会比较慢,这边推荐一个快一点的方法
- 先在浏览器把python-3.7.7的包下载下来
https://www.python.org/ftp/python/3.7.7/Python-3.7.7.tar.xz
- 然后把压缩包上传到 /root/.pyenv/cache/
因为pyenv install 3.7.7会先查看cache中是否存在,有就直接安装,不用再去拉取- 再次安装就行
pyenv install 3.7.7
4.选择python3.7.7
pyenv global 3.7.7
2. 开始Spark编程
先上传一个word.txt文件到服务器
启动pyspark
pyspark
读取文件,进行操作
# 读取文件
textFile = spark.read.text("file:///root/spark-shell-test/word.txt")
# 计数
textFile.count()
# 获取第一行
textFile.first()
现在,让我们将此DataFrame转换为新的DataFrame。我们调用filter返回一个新的DataFrame,其中包含文件中行的子集
linesWithSpark = textFile.filter(textFile.value.contains("时代"))
我们可以将转换和行动联系在一起:
textFile.filter(textFile.value.contains("时代")).count()
后续其他操作可观看spark官方文档测试
3. spark-shell工具
spark-shell是运用Scala语言,可通过spark-shell命令直接启动
spark-shell
读取文件,执行操作
var textFile = spark.read.text("file:///root/spark-shell-test/word.txt")
# 计数
textFile.count()
# 获取第一行
textFile.first()
spark-shell的简单测试就到此结束了,想了解更详细内容可前往spark官方文档查看
在生产中用spark-shell不太方便,spark-shell主要用于测试,生产中主要用ide等相关工具,所以接下来介绍生产下的使用
java:1.8,11
scala:2.12.19
python:3.10
以上这些需要安装在本机电脑上,具体安装流程就不做介绍,自行安装即可
二、Java项目测试
1. 新建项目
新建项目 -> 选择Maven -> 创建maven-archetype-quickstart骨架
项目结构如下
2. Spark-java代码测试
添加spark依赖到pom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.1</version>
</dependency>
新建sparkBasicExample.java文件
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class sparkBasicExample {
public static void main(String[] args) {
// 创建SparkConf配置对象
SparkConf conf = new SparkConf().setAppName("SparkBasicExample").setMaster("local[*]");
// 创建JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//创建一个简单的JavaRDD
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
//应用map转换操作
JavaRDD<Integer> squares = rdd.map(num -> num * num);
//应用filter转换操作
JavaRDD<Integer> evenSquares = squares.filter(num -> num % 2 == 0);
//执行行动操作collect,收集结果到驱动程序
List<Integer> evenSquaresCollected = evenSquares.collect();
//打印结果
evenSquaresCollected.forEach(System.out::println);
//关闭SparkSession
sc.stop();
}
}
启动程序后结果如下:
三、Scala项目测试
由于我没有安装scala,所以这里做一下记录,scala配置的jdk-11,1.8也可以用,但是没有代码提示
1. 安装scala
下载链接:https://www.scala-lang.org/download/2.12.19.html
安装成功后在cmd中测试一下
scala -version
2. 安装包管理器sbt
sbt是scala的包管理器,安装stb-1.10.1.msi
下载链接:https://github.com/sbt/sbt/releases/download/v1.10.1/sbt-1.10.1.msi
官网链接:https://www.scala-sbt.org/download
安装成功后在cmd中测试一下
sbt
3. 在编译工具中安装scala工具
这里我用的idea,也可以用vscode,可根据个人喜好来选择
在plugins下搜索scala安装
成功后重启idea
4. 新建项目
新建项目 -> 选择Scala -> 选择sbt -> next
选择对应的版本,然后finish,第一次会比较慢
这里我等了很久,没反应,就换了种方式
1.打开cmd,进入得到项目文件夹
2.输入sbt new(等个5分钟左右)
3.选择 模板 e 回车(等个5分钟左右)
4.输入项目名(spark-scala)
5.然后再用idea打开项目
5. spark-scala代码测试
修改build.sbt依赖文件(这里加载依赖要很久,慢慢等)
import Dependencies._
ThisBuild / scalaVersion := "2.12.19"
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / organization := "com.example"
ThisBuild / organizationName := "example"
lazy val root = (project in file("."))
.settings(
name := "spark-scala",
libraryDependencies ++= Seq(
munit % Test,
"org.apache.spark" %% "spark-core" % "3.5.1"
)
)
可以先在Hello文件run一下
新建文件SparkBasicExample
package example
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object SparkBasicExample {
def main(args: Array[String]): Unit = {
//创建SparkSession
val conf = new SparkConf().setAppName("HelloWorld").setMaster("local[2]")
val sc = new SparkContext(conf)
// 创建一个简单的RDD
val rdd = sc.parallelize(1 to 10)
//应用map转换操作
val squares = rdd.map(num => num * num)
// 应用filter转换操作
val evenSquares = squares.filter(num => num % 2 == 0)
//执行行动操作collect,收集结果到驱动程序
val evenSquaresCollected = evenSquares.collect()
//打印结果
evenSquaresCollected.foreach(println)
sc.stop()
}
}
四、Python项目测试
python编译工具我用的是pycharm
1.新建项目
创建项目,然后安装pyspark库
pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple
2. Spark-python代码测试
创建文件spark-python.py
from pyspark import SparkConf, SparkContext
def main():
# 创建SparkConf配置对象
conf = SparkConf().setAppName("App").setMaster("local[*]")
# 创建sparkContext
sc = SparkContext(conf=conf)
#创建一个简单的RDD
rdd = sc.parallelize(range(1,11))
#应用map转换操作
squares = rdd.map(lambda num:num * num)
#应用filter转换操作
even_squares =squares.filter(lambda num: num % 2 ==0)
#执行行动操作collect,收集结果到驱动程序
even_squares_collected=even_squares.collect()
#打印结果
for num in even_squares_collected:
print(num)
# 定制SparkContext
sc.stop()
if __name__ == "__main__":
main()
python不支持本地运行,写好后上传到集群中执行
spark提交python脚本命令
spark-submit \
--master local[8]
--deploy-mode cluster \
/path/to/your-script.py
参数说明:
- –master:指定Spark集群的主节点和资源数
- –deploy-mode:指定作业部署模式,对于Python应用程序,通常使用 cluster 模式
- /path/to/your-script.py:Python脚本文件的路径。
五、代码打包提交
这块是服务器本地测试,无需启动集群,后面一个板块则是需要启动集群测试
1. Scala代码打包提交
在terminal输入打包命令(项目中最好只保留测试文件)
sbt package
打包完成后找到jar包
上传到服务器
提交spark作业
spark-submit --class example.SparkBasicExample --master local[2] spark-scala_2.12-0.1.0-SNAPSHOT.jar
运行结果
2. Java代码打包提交
还是先打包程序(项目中最好只保留测试文件),然后上传服务器
提交spark作业
# org.chen 是pom文件中<groupId>org.chen</groupId>
spark-submit --class org.chen.SparkBasicExample --master local[2] spark-java-1.0-SNAPSHOT.jar
3. Python代码打包提交
python的方式比较简单,无需打包,直接拖到hadoop103节点(装了python3的环境)
提交spark作业
# org.chen 是pom文件中<groupId>org.chen</groupId>
spark-submit --master local[2] spark-python.py
也是可以执行成功的
六、提交Spark集群并运行
先启动spark集群
cd /opt/module/spark-3.4.3/
./sbin/start-all.sh
1. Scala代码提交到Spark集群运行
在之前的代码中去掉setMaster,然后重新打包
sbt clean # 先清理一下
sbt package # 打包
打包号后上传集群,提交到spark
spark-submit --class example.SparkBasicExample --master spark://hadoop103:7077 spark-scala_2.12-0.1.0-SNAPSHOT.jar
完成后可以在spark的web页面看到刚刚提交的作业
2. Java代码提交到Spark集群运行
和前面一样,在代码中去掉setMaster,然后重新打包,上传
执行spark提交命令
spark-submit --class org.chen.SparkBasicExample --master spark://hadoop103:7077 spark-java-1.0-SNAPSHOT.jar
3. Python代码提交到Spark集群运行
python同样去掉setMaster,上传
执行spark提交命令
spark-submit --master spark://hadoop103:7077 spark-python.py
4. 后续学习
可以在spark-shell中学习(无需启动集群),里面内置spark和sc对象
spark数据加载方式有hdfs(hadoop)、本地文件、数据库(mysql)
# 加载hdfs
# val rdd = sc.textFile("/tmp/word.txt")
# 加载本地文件/root/spark-shell-test/word.txt
val rdd = sc.textFile("file:///root/spark-shell-test/word.txt")
# 输出文件内容
rdd.foreach(println)
# 储存文件
rdd.saveAsTextFile("file:///root/word.txt")
后续可在spark官网或者其他学习视频了解更详细的知识。
参考资料
- (1小时速通)Spark实战入门,三种语言任选
- spark官网