当前位置: 首页 > article >正文

Spark的学习-02

Spark Standalone集群的安装

 
 

架构:普通分布式主从架构 主:Master:管理节点:管理从节点、接客、资源管理和任务 调度,等同于YARN中的ResourceManager 从:Worker:计算节点:负责利用自己节点的资源运行主节点 分配的任务 功能:提供分布式资源管理和任务调度,基本上与YARN是一致的

pi的测试

 
 

/opt/installs/spark/bin/spark-submit --master yarn /opt/installs/spark/examples/src/main/python/pi.py 200

上面我们已经在bigdata01上面安装好了Anaconda ,所以接下来不需要再安装,分发到bigdata02、bidata03即可

#上传,或者同步:
xsync.sh /opt/modules/Anaconda3-2021.05-Linux-x86_64.sh   #(分发给02、03)
# 添加执行权限
chmod u+x Anaconda3-2021.05-Linux-x86_64.sh  #(不添加也行, 直接执行在目录下执行即可)
# 执行
sh ./Anaconda3-2021.05-Linux-x86_64.sh  

# 过程
#第一次:【直接回车,然后按q】
   Please, press ENTER to continue
   >>>
#第二次:【输入yes】
 Do you accept the license terms? [yes|no]
 [no] >>> yes
#第三次:【输入解压路径:/opt/installs/anaconda3】
 [/root/anaconda3] >>> /opt/installs/anaconda3
 #第四次:【输入yes,是否在用户的.bashrc文件中初始化
Anaconda3的相关内容】
 Do you wish the installer to initialize  Anaconda3
   by running conda init? [yes|no]
   [no] >>> yes
   
  ----- 步骤和上面一样
  
  刷新环境变量:
# 刷新环境变量
source /root/.bashrc
# 激活虚拟环境,如果需要关闭就使用:conda deactivate
conda activate
配置环境变量:
# 编辑环境变量
vi /etc/profile
# 添加以下内容
# Anaconda Home
export ANACONDA_HOME=/opt/installs/anaconda3
export PATH=$PATH:$ANACONDA_HOME/bin
制作软链接:
# 刷新环境变量
source /etc/profile

# 创建软连接
ln -s /opt/installs/anaconda3/bin/python3 /usr/bin/python3

重新再解压spark安装包
# 解压安装
cd /opt/modules
tar -zxf spark-3.1.2-bin-hadoop3.2.tgz -C /opt/installs
# 重命名
cd /opt/installs
mv spark-3.1.2-bin-hadoop3.2 spark-standalone

我们上面安装好的 Spark的名字是 Spark-local ,上面也创建了软连接,所以现在需要把上面的软链接删除

# 删除上面创建的软连接
rm -rf spark  
# 重新建立新的
ln -s spark-standalone spark

1、修改 spark-env.sh配置文件:

        先启动hdfs

        接着在hdfs上创建目录

# 创建程序运行日志的存储目录
hdfs dfs -mkdir -p /spark/eventLogs/

cd /opt/installs/spark/conf
mv spark-env.sh.template spark-env.sh
vim spark-env.sh
# 22行:申明JVM环境路径以及Hadoop的配置文件路径
export JAVA_HOME=/opt/installs/jdk
export HADOOP_CONF_DIR=/opt/installs/hadoop/etc/hadoop
# 60行左右
export SPARK_MASTER_HOST=bigdata01 # 主节点所在的地址
export SPARK_MASTER_PORT=7077 #主节点内部通讯端口,用于接收客户端请求
export SPARK_MASTER_WEBUI_PORT=8080 #主节点用于供外部提供浏览器web访问的端口
export SPARK_WORKER_CORES=1     # 指定这个集群总每一个从节点能够使用多少核CPU
export SPARK_WORKER_MEMORY=1g   #指定这个集群总每一个从节点能够使用多少内存
export SPARK_WORKER_PORT=7078
export SPARK_WORKER_WEBUI_PORT=8081
export SPARK_DAEMON_MEMORY=1g  # 进程自己本身使用的内存
export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://bigdata01:9820/spark/eventLogs/ -Dspark.history.fs.cleaner.enabled=true"
# Spark中提供了一个类似于jobHistoryServer的进程,就叫做HistoryServer, 用于查看所有运行过的spark程序

2、spark-defaults.conf:Spark属性配置文件

mv spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf

# 末尾
spark.eventLog.enabled           true
spark.eventLog.dir              hdfs://bigdata01:9820/spark/eventLogs
spark.eventLog.compress              true

3、workers:从节点地址配置文件

mv workers.template workers
vim workers
# 末尾处删掉localhost,添加以下内容
bigdata01
bigdata02
bigdata03

4、log4j.properties:日志配置文件

mv log4j.properties.template log4j.properties
vim log4j.properties
# 19行:修改日志级别为WARN
log4j.rootCategory=WARN, console
#  log4j的5种 级别  debug --> info --> warn --error -->fatal

接着在第二台和第三台上,创建软链接

# 同步新压缩好的spark-standalone
xsync.sh /opt/installs/spark-standalone/
# 同步软连接
xsync.sh /opt/installs/spark

启动:

启动master:
cd /opt/installs/spark
sbin/start-master.sh
启动所有worker:
sbin/start-workers.sh
如果你想启动某一个worker
sbin/start-worker.sh

启动日志服务:
sbin/start-history-server.sh

要想关闭某个服务,将start换为stop

standalone集群启动情况:

master页面:

日志监控页面(18080):

4040和8080端口页面的区别

当有任务进行中的时候,就可以启动4040端口,若此任务并没有执行完毕,集群中又启动了新的端口,就会再启动一个4041端口.....可以一直累加下去

4040端口中任务的执行一结束,就无法再运行此窗口

当启动master等的时候,就会启动8080端口

所以我们在本地模式的时候,是无法启动8080端口的,只能启动4040

此处运行的程序,其实就是4040端口中正在执行的进程,当这个正在running的任务结束后,显示执行完成时,4040端口就打不开了

spark三种模式下4040与8080端口的启动情况:

本地模式:只能启动4040端口 (因为8080端口要启动master才能看到)

standalone(集群)模式: 可以看到4040还可以看到8080端口

Yarn模式:4040、8080都看不到

Spark 4040端口各个模块的作用

Job:

各个界面作用: Job:显示当前这个程序的所有Job,一个程序可以有多个Job Spark中不是所有的代码都会触发Job的产生和运行 所有RDD的转换是不会立即产生job,运行Task任务的,这种模式称为Lazy模式:避免在内存中构建RDD,但是你不用只有遇到了需要使用数据的代码操作才会产生job,触发Task任务的运行 能触发job任务生成的目前有: saveAsTextFile foreach

Stages

 
 

Stages:显示当前这个程序的所有Stage,一个Job可以有多个 Stage

Stage 可以理解为多个算子组成的阶段,到底有多少个Stage,取决于算子是否会触发shuffle过程。假如有两个触发shuffle过程的算子,整个程序可以切为三个阶段。

当一个Job被触发运行的时候,Spark底层会根据回溯算法构建这个job的执行计划图,即DAG图

每个Job都会有1个DAG图,在构建的时候会根据计算过程中是否要产生shuffle来划分Stage 不产生Shuffle的操作就在同一个Stage中执行,产生Shuffle的操作,会传递到另外一个Stage中执行 最终每个Stage中的操作会转换为对应的Task来执行

每个黑点表示一个RDD

每个矩形框中的RDD的转换都是在内存中完成的

曲线代表经过了Shuffle,灰色代表没有执行,因为之前执行的

Executors

 
 

显示当前这个程序的运行进程的信息

每个Spark程序都由两种进程组成:一个Driver和多个Executors

Driver进程:负责解析程序,构建DAG图,构建Stage,构建、调度、监控Task任务的运行 Executor进程:负责运行程序中的所有Task任务

Storage:显示当前这个程序在内存缓存的数据信息 。

Environment:显示当前这个程序所有的配置信息。

Spark-submit提交

# 提交程序的语法
# spark-submit [可选的选项] Python文件 Python文件中用到的参数
spark-submit --master local[2] / spark://bigdata01:7077 / yarn \
……
hdfs://bigdata01:9820/spark/app/pyspark_core_word_args.py /spark/wordcount/input /spark/wordcount/output

spark-submit中各个参数的意义

其实就是在将 提交命令的 [options] 可以写什么。

--master:用于指定程序运行的模式,5种模式,本地模式、
    Standalone、yarn、Mesos、K8s
    本地模式:--master local
    Standalone模式:--master spark://master:7077
    YARN模式:--master yarn
    作用等同于代码中:setMaster
--deploy-mode:用于指定Driver进程运行位置 【重点,后面展开讲】
--name:用于指定程序的名称,作用等同于代码中:setAppName
--jars:用于指定一些额外的jar包,例如读写MySQL时候需要用到MySQL的驱动包
--conf:用于指定当前程序运行的额外的一些配置,作用等同于代码中:set

Driver资源选项

Driver资源选项:主要用于构建一个非RDD的操作
--driver-memory:指定Driver进程能够使用的内存大小,默认是1G
--driver-cores:指定Driver进程能够使用的CPU核数,默认是1Core
--supervise:指定如果Driver故障,就自动重启

executor可以使用的参数:

运行这个程序的一个进程
需要的资源,资源都是来自于从节点
 --executor-cores 4
 --executor-memory 16
参数解释:
--executor-memory:指定每个Executor能够使用多少内存
--executor-cores:指定每个Executor能够使用多少CPU
--total-executor-cores:Standalone集群模式,指定所有Executor总共使用的CPU核数,用于间接指定Executor的个数
--num-executors:YARN集群模式,直接指定Executor的个数
--queue:指定提交程序到哪个队列中运行

加载顺序:优先级:代码中配置【set】 > 参数选项【--conf】 > 配置文件【公共配置:spark-defualt.conf】

实战测试:

将下面代码在pycharm中写好,然后直接拖拽到虚拟机中指定的路径下

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
import os
import sys

"""
-------------------------------------------------
   Description :        TODO:用于实现词频统计
   SourceFile  :        04.pyspark_core_wordcount_hdfs_args
-------------------------------------------------
"""

if __name__ == '__main__':
    # todo:0-设置系统环境变量
    # os.environ['JAVA_HOME'] = 'D:/jdk1.8.0_241'
    # os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.0'
    # os.environ['PYSPARK_PYTHON'] = 'D:/Anaconda/python.exe'
    # os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/Anaconda/python.exe'
    # os.environ['HADOOP_USER_NAME'] = 'root'

    # todo:1-构建SparkContext
    # 甚至 任务的名字都可以不写,让提交任务的时候指定
    conf = SparkConf().setAppName("SparkSubmitApp")
        # .setMaster("local[2]")\

    sc = SparkContext(conf=conf)

    # todo:2-数据处理:读取、转换、保存
    # step1: 读取数据:SparkContext对象负责读取文件,用传递的第二个参数作为程序的输入地址
    input_rdd = sc.textFile(sys.argv[1])
    # 输出第一行
    # print(input_rdd.first())
    # 打印总行数
    # print(input_rdd.count())

    # step2: 处理数据
    rs_rdd = input_rdd\
            .filter(lambda line: len(line.strip()) > 0)\
            .flatMap(lambda line: line.strip().split(" "))\
            .map(lambda word: (word, 1))\
            .reduceByKey(lambda tmp,item: tmp+item)

    # step3: 保存结果
    # 打印结果
    rs_rdd.foreach(lambda x: print(x))
    # 结果保存到文件中:路径不能提前存在,将第二个参数作为输出路径
    rs_rdd.saveAsTextFile(sys.argv[2])

    # todo:3-关闭SparkContext
    sc.stop()

开始编写命令,提交任务

注意:我们前面已经将spark的软连接链接到了 standalone(集群)上,所以需要先把之前的软连接删除掉,重现创建新的,指向本地

# 删除原来的软连接
rm -rf /opt/installs/spark
# 创建新的软连接指向
ln -s /opt/installs/spark-local /opt/installs/spark

本地(local)

spark-submit \
--master local[2] \
/home/_pytSparkDemo04-yuancheng.py \
/home/data.txt \
/home/output02

和上面一样,也需要将软连接再连回来

# 删除原来的软连接
rm -rf /opt/installs/spark
# 创建新的软连接指向
ln -s /opt/installs/spark-standalone /opt/installs/spark

集群(standalone)

spark-submit \
--master spark://bigdata01:7077 \
/home/_pytSparkDemo04-yuancheng.py \
hdfs://bigdata01:9820/spark/wordcount/input \
hdfs://bigdata01:9820/spark/wordcount/jiqun01


http://www.kler.cn/a/387506.html

相关文章:

  • nacos环境搭建以及SpringCloudAlibaba脚手架启动环境映射开发程序
  • ImportError: attempted relative import with no known parent package 报错的解决!
  • vue的KeepAlive应用(针对全部页面及单一页面进行缓存)
  • 回归预测 | MATLAB实RVM-Adaboost相关向量机集成学习多输入单输出回归预测
  • 网络应用技术 实验七:实现无线局域网
  • 一、智能体强化学习——强化学习基础
  • 微积分复习笔记 Calculus Volume 1 - 4.10 Antiderivatives
  • Pr 视频过渡:沉浸式视频 - VR 色度泄漏
  • #渗透测试#SRC漏洞挖掘# 操作系统-Linux系统之病毒防护
  • 《JVM第8课》垃圾回收算法
  • 软考:论DevOps
  • 编码算法笔记(base64,url编码等)
  • iphone怎么删除重复的照片的新策略
  • Elasticsearch里的索引index是什么概念?(ChatGPT回答)
  • 机器学习 - 为 Jupyter Notebook 安装新的 Kernel
  • ArcGIS软件之“计算面积几何”地图制作
  • 人工智能技术的应用前景及未来发展:改变工作与生活的力量
  • opencv 中 threshold 函数作用
  • 万字长文解读深度学习——GPT、BERT、T5
  • Ubuntu 22.04安装ROS 1教程汇总
  • 多线程---线程池
  • # filezilla连接 虚拟机ubuntu系统出错“尝试连接 ECONNREFUSED - 连接被服务器拒绝, 失败,无法连接服务器”解决方案
  • SQL,力扣题目1127, 用户购买平台
  • 本地 Hadoop 开发环境搭建详解
  • MTK6833/MT6833(天玑700)安卓核心板_联发科5G智能通讯模块安卓主板定制
  • 如何在下载我上传的数据时自动设置 Content-Type