当前位置: 首页 > article >正文

Spark的原理以及使用

一、spark集群的常见操作

启动spark集群

# 需要在各节点上首先启动zookeeper

zkServer.sh start

1、在主节点node1上spark目录下的sbin目录

2、执行./start-all.sh

3、使用jps和8080端口可以检查集群是否启动成功 http://node1:8080/

node1是Master,node1/node2/node3启动Worker进程

4、进入spark-shell查看是否正常

二、在Spark集群上提交应用

1、本地执行

val conf = new SparkConf().setAppName("TopN").setMaster("local");

val sc = new SparkContext(conf);

2、Idea提交集群执行

val conf = new SparkConf().setAppName("WordCount")

          .setMaster("spark://node1:7077");

val sc = new SparkContext(conf);

sc.addJar("c:\\spark-wordcount-scala.jar"); // spark-submmit 集群上提交时,需要注释该行

val linesRdd = sc.textFile("hdfs://node1:9000/test/README.txt");

3、集群上执行

(1)在spark上执行

打包jar,只包含spark程序类,不要包含所有依赖类

#spark自己管理资源 Master:8080

#集群的各个节点都需要能访问到jar

# 各个节点都存在 /usr/local/spark-wordcount-scala.jar

/usr/local/spark/bin/spark-submit \

--class com.aaa.spark.WordCountSpark \

--master spark://node10:7077 \

--driver-memory 5a00m \

--executor-memory 500m \

--executor-cores 1 \

/usr/local/spark-wordcount-scala.jar

(2)、在yarn上执行

#yarn调度资源 RM:8088

/usr/local/spark/bin/spark-submit \

--class com.aaa.spark.WordCount \

--master yarn-cluster/yarn-client \

--num-executors 3 \

--driver-memory 500m \

--executor-memory 500m \

--executor-cores 3 \

/usr/local/spark/spark.jar

三、Spark集群的原理

1、spark的基本组件

          Driver

         Master

        worker

        Executor

       Task

spark的每个CPU可创建2到4个分区

2、Spark的四种RDD操作

(1)transformation:转换,根据已有的RDD创建一个新的RDD

map

filter

flatMap

spark:groupByKey

spark:reduceByKey

sortByKey

join

cogroup

(2)action:行动,对RDD进行最后的操作

reduce

collect

count:元素的总个数

take(n)

top

saveAsTextFile

countByKey:各个Key的value的次数,Map[Key,次数]

countByValue:各个元素分别出现的次数,Map[元素,次数]

foreach 存储RDD到文件或数据库中,将操作结果转换为集合

action执行会自动执行之前的所有transformation操作

(3)集合类操作

creation:创建,两种方式创建RDD,一是集合,二是外部文件

(4)控制类

control:控制,RDD的持久化,放入缓存或磁盘

3、DAGScheduler划分stage算法

执行Action操作时,对该RDD创建一个stage

往前推,遇到宽依赖,再创建一个stage


http://www.kler.cn/a/472864.html

相关文章:

  • 基于springboot的网上商城购物系统
  • Python创建GitHub标签的Django管理命令
  • 晨辉面试抽签和评分管理系统之一:考生信息管理和编排
  • Redis 数据库源码分析
  • 【题库】人工智能训练师练习题
  • 如何用代码提交spark任务并且获取任务权柄
  • PyMysql 02|(包含项目实战)数据库工具类封装
  • 结构型模式4.装饰器模式
  • 枫清科技高雪峰: Data-Centric新范式开启,知识引擎+大模型双轮驱动企业智能化
  • Python里JSON orjson ujson在json.loads有什么区别?
  • 性能测试03|JMeter:断言、关联、web脚本录制
  • c++ 17 constexpr
  • conda+jupyter+pycharm:如何在Windows conda环境下运行jupyter并使用浏览器或者pycharm运行.ipynb
  • 开源一款简单易用的键盘音效工具
  • el-table 多级表头
  • 域名反查IP多种方式
  • 【Linux基础指令】第一期
  • 高频 SQL 50 题(基础版)_197. 上升的温度
  • cursor试用出现:Too many free trial accounts used on this machine 的解决方法
  • html 前端进行浮动布局设置
  • Go跨平台UI开发之wails的使用(1)
  • Lua语言的软件工程
  • 实现串口控制
  • 计算机网络 (31)运输层协议概念
  • JVM实战—11.OOM的原因和模拟以及案例
  • python代码实现了一个金融数据处理和分析的功能,主要围绕国债期货及相关指数数据展开