【数据仓库】spark大数据处理框架
文章目录
- 概述
- 架构
- spark 架构角色
- 下载
- 安装
- 启动pyspark
- 启动spark-sehll
- 启动spark-sql
- spark-submit
- 经验
概述
Spark是一个性能优异的集群计算框架,广泛应用于大数据领域。类似Hadoop,但对Hadoop做了优化,计算任务的中间结果可以存储在内存中,不需要每次都写入HDFS,更适用于需要迭代运算的算法场景中。
Spark专注于数据的处理分析,而数据的存储还是要借助于Hadoop分布式文件系统HDFS等来实现。
大数据问题场景包含以下三种:
- 复杂的批量数据处理
- 基于历史数据的交互式查询
- 基于实时数据流的数据处理
Spark技术栈基本可以解决以上三种场景问题。
架构
1 spark Core :spark的核心模块,是spark运行基础。以RDD为数据抽象,提供python、java、scala、R语言的api,可以通过RDD编程进行海量离线数据批处理计算。
2 Spark SQL:基于Spark Core,提供结构化数据处理功能。可以使用SQL语言对数据进行处理,可用于离线计算场景。同时基于Spark SQL提供了StructuredStreaming模块,可以使用时SQL进行流式计算。
3 sparkStreaming : 以Spark Core为基础,提供数据的流式计算功能
4 MLlib:以spark Core为基础,进行机器学习计算,内置大量机器学习库和API算法等。
5 Graphx:以spark Core为基础,进行图计算,提供大量图计算的API,方便以分布式资源进行图计算。
6 spark底层的文件存储还是基于hdfs分布式文件系统,支持多种部署方式。
spark 架构角色
从两个层面理解:
资源管理层面:(典型的Master-Worker架构)
管理者:即Master角色,只能有一个
工作者:即Worker角色,可以有多个。一个worker在一个分布式节点上,监测当前节点的资源状况,向master节点汇总。
任务执行层面:
某任务管理者:Driver角色,一个任务只能有一个
某任务执行者:Executor角色,可以有多个
在特殊场景下(local模式),Driver即是管理者又是执行者
下载
下载地址:
http://spark.apache.org/downloads.html
或者
https://archive.apache.org/dist/spark/
选择合适自己的版本下载。
Spark2.X预编译了Scala2.11(Spark2.4.2预编译Scala2.12)
Spark3.0+预编译了Scala2.12
该教程选择Spark3.2.1版本,其中预编译了Hadoop3.2和Scala2.13,对应的包是 spark-3.2.1-bin-hadoop3.2-scala2.13.tgz,但这里的预编译Hadoop不是指不需要再安装Hadoop。
linux 服务器上下载地址
wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2-scala2.13.tgz
安装
Spark的安装部署支持三种模式,
local本地模式(单机):启动一个JVM Process进程,通过其内部的多个线程来模拟整个spark运行时各个角色。一个进程里有多个线程。
Local[N]:可以使用N个线程,一个线程利用一个cpu核,通常cpu有几个核,就指定几个线程,最大化利用计算能力;
Local[*],按照cpu核数设置线程数;
standalone模式(集群):各个角色以独立进程的形式存在,并组成spark集群
spark on YARN模式(集群):各个角色运行在yarn的容器内部,组成集群环境
kubernetes 模式(容器集群):各个角色运行在kubernetes 容器内部,组成集群环境
本文将只介绍 本地Local模式,其它模式将会在后续文章中进行介绍。
该文的安装环境为centos7。
1、将下载的包上传到服务器指定目录,解压
[root@localhost softnew]# tar zxvf spark-3.1.2-bin-hadoop3.2.tgz
# 修改目录
mv spark-3.1.2-bin-hadoop3.2 spark-3.1.2
2、修改配置文件
修改/etc/profile文件,新增spark环境变量:
# Spark Environment Variables
export SPARK_HOME=/home/bigData/softnew/spark
export PATH=$PATH:$SPARK_HOME/bin
修改完成后记得执行 source /etc/profile 使其生效
启动pyspark
pyspark 是spark集成python后,可以使用python 脚本编写spark 数据 批处理计算。pyspark提供了一个shell窗口。
./pyspark
[root@yd-ss bin]# ./pyspark
Python 3.10.10 (main, Dec 26 2024, 22:46:13) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
24/12/27 10:46:44 WARN Utils: Your hostname, yd-ss resolves to a loopback address: 127.0.0.1; using xx.xx.xx.xx instead (on interface bond0)
24/12/27 10:46:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/27 10:46:46 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
24/12/27 10:46:46 WARN HiveConf: HiveConf of name hive.server2.active.passive.ha.enable does not exist
24/12/27 10:46:46 WARN HiveConf: HiveConf of name hive.exec.default.charset does not exist
24/12/27 10:46:46 WARN HiveConf: HiveConf of name hive.exec.default.national.charset does not exist
24/12/27 10:46:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.2.1
/_/
Using Python version 3.10.10 (main, Dec 26 2024 22:46:13)
Spark context Web UI available at http://sc:4040
Spark context available as 'sc' (master = local[*], app id = local-1735267609271).
SparkSession available as 'spark'.
>>>
进入窗口,即可使用python 写RDD编程代码了。
同时,可以通过web ui 在4040端口访问,查看spark 任务执行情况。
执行如下计算任务
sc.parallelize([1,2,3,4,5]).map(lambda x:x*10).collect()
访问localhost:4040
可以看到job清单,这个job,起了24个线程去处理计算。 由于跑任务的服务器是24核的,执行./pyspark 默认以local[*]最大线程去启动。
可以看到任务层面,启动了一个driver,由于是local模式,所以driver即是管理者也是执行者。
可以在pyspark-shell下利用spark做一些简单开发任务;
下面修改启动命令:
# 该local模式启动2个线程
./pyspark --master local[2]
再次执行
sc.parallelize([1,2,3,4,5]).map(lambda x:x*10).collect()
可以看到这个job只用了2个线程来处理计算。
还可以利用该shell处理其他计算任务,也就是说一个shell 启动起来,是可以处理多个任务的,但只要关闭窗口,shell就会关闭。就不能再处理任务了。
通过shell 总是不便,后续将介绍通过pycharm进行RDD计算任务编写。
退出shell脚本
quit()或者ctrl + D
启动spark-sehll
./spark-shell
可以看到如下信息:
[root@yd-ss bin]# ./spark-shell
24/12/27 11:11:50 WARN Utils: Your hostname, yd-ss resolves to a loopback address: 127.0.0.1; using xx.xx.xx.xx instead (on interface bond0)
24/12/27 11:11:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.2.1
/_/
Using Scala version 2.13.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
Type in expressions to have them evaluated.
Type :help for more information.
24/12/27 11:12:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://sc:4040
Spark context available as 'sc' (master = local[*], app id = local-1735269126553).
Spark session available as 'spark'.
scala>
这个是要使用scala语言编写,其他跟pyspark类似。
启动spark-sql
./spark-sql
可以看到如下:
[root@yd-ss bin]# ./spark-sql
24/12/27 11:14:28 WARN Utils: Your hostname, yd-ss resolves to a loopback address: 127.0.0.1; using xx.xx.xx.xx instead (on interface bond0)
24/12/27 11:14:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/27 11:14:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/27 11:14:30 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
24/12/27 11:14:30 WARN HiveConf: HiveConf of name hive.server2.active.passive.ha.enable does not exist
24/12/27 11:14:30 WARN HiveConf: HiveConf of name hive.exec.default.charset does not exist
24/12/27 11:14:30 WARN HiveConf: HiveConf of name hive.exec.default.national.charset does not exist
Spark master: local[*], Application Id: local-1735269273943
spark-sql>
可以看到这个是依赖hive数仓配置的。spark-sql是没有元数据管理的,所以需要跟hive集成,利用其元数据管理功能。后续将详细介绍。
spark-submit
该工具是用来提交写好的计算脚本,到saprk上去执行,执行完成即结束。和前面的shell不一样,shell只要没关闭,就可以一直执行的。
# 执行spark自带的python示例,计算pi的值(8次迭代)
./spark-submit /home/spark/spark-3.2.1/examples/src/main/python/pi.py 8
该脚本,会基于spark启动一个driver,执行pi.py计算,然后打开web ui 4040监控接口,执行完成后输出结果,最后关闭driver,关闭web ui。
是个一次性的任务执行。
经验
1 spark 功能比较强大,使用方式也很丰富,初步学习只需要了解自己使用方式即可;
2 spark local模式使用配置是比较简单的,基本是开箱即用;