当前位置: 首页 > article >正文

大数据学习16之Spark-Core

1. 概述

1.1.简介

        Apache Spark 是专门为大规模数据处理而设计的快速通用的计算引擎。

        一种类似 Hadoop MapReduce 的通用并行计算框架,它拥有MapReduce的优点,不同于MR的是Job中间结果可以缓存在内存中,从而不需要读取HDFS,减少磁盘交互。

        基于内存的分布式计算框架。

        2009年作者开始编写源码,2012年2月发布0.6.0版本,2014年5月点发布1.0.0版本

1.2.Spark vs MapReduce

        MR只能做离线计算,复杂逻辑计算需要多个Task穿行处理,结果存入HDFS,磁盘开销大,效率较低。

        Spark即可离线,也可实时计算,提供高度封装API数据集RDD,算子丰富,采用有向无环图思想,执行计划自动优化,数据在内存中可以复用。

        Spark为什么比MapReduce快?

        1.Spark基于内存,磁盘IO开销较小

        2.Spark采用粗粒度资源申请,MR是细粒度资源申请,因而Spark不需要第二次申请资源

        3.Spark采用DAG有向无环图,优化了执行计划

1.3.特点

        快        :基于内存,DAG有向无环图,优化执行计划

        易用     :支持多种语言,用户可快速构建不同应用

        通用     :提供统一解决方案,批处理(Spark-Core)、交互式查询(Spark-SQL)、实时流处理(Spark-Stream)、机器学习(Spark MLlib)和图计算(GraphX)。这些操作都可以在同一个应用中使用,减少开发成本。

        兼容性  :可与其他大数据组件进行整合,如Hadoop,Hive等

1.4模块

        Spark-Core 核心组件---离线批处理

        Spark-SQL 交互式查询----可取代Hive引擎,但不能取代元数据服务器

        Spark-Stream 准实时流处理---通过基于减小任务规模达到一个伪实时的效果

        Spark-MLlib 机器学习---使计算机对某些数据敏感

        Spark-GraphX 图计算

        Mesos 集群资源管理器

        Yarn  集群资源管理器---主流

        Kubernetes 集群资源管理器

1.5.运行模式

        --Master Local :本地模式,学习使用

        --Master Standalone:独立模式,使用Spark自带资源调度框架,对集群配置要求较高

        --Master Yarn :主流,推荐使用

        --Master Mesos:类似Yarn,国内使用较少

        --Master Kubernetes:k8s容器

1.6.总结

        

2.快速入门

        1.创建Maven项目

        2.添加依赖

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.2</version>
</dependency>

        3.数据准备

        项目根目录下创建data文件夹

        data文件夹下面创建wd1.txt,wd2.txt

wd1.txt

Hello Hadoop
Hello ZooKeeper
Hello Hadoop Hive

wd2.txt

Hello Hadoop HBase
Hive Scala Spark

        4.创建Scala执行类WordCount:

object WordCount01Demo {
def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
// 初始化配置对象
val conf = new SparkConf()
// 设置运行模式与 AppName
conf.setMaster("local").setAppName("WordCount")
// 根据配置对象初始化上下文对象
val sc = new SparkContext(conf)
// ==================== 业务处理 ====================
// 读取文件,按行读取
val lines: RDD[String] = sc.textFile("data/wordcount")
// 按空格拆分每一行数据,拆分为一个一个的单词
val words: RDD[String] = lines.flatMap(w => w.split("\\s+"))
// 将数据根据单词进行分组,便于统计
val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(w => w)
// 对分组后的数据进行统计
val wordCount: RDD[(String, Int)] = wordGroup.map(kv => (kv._1, kv._2.size))
// 从结果集中获取指定条数的数据
val wordCountTopN: Array[(String, Int)] = wordCount.take(10)
// 将结果打印在控制台
wordCountTopN.foreach(println)
// 简写方式
lines.flatMap(_.split("\\s+"))
.groupBy(w => w)
.map(kv => (kv._1, kv._2.size))
.take(10)
.foreach(println)
// ==================== 关闭连接 ====================
if (!sc.isStopped) sc.stop()
}
}

        5.执行

        6.日志配置

                自定义的 log4j.properties 文件添加至项目的resources 文件夹即可。详细内容过多,另查。

        

3.运行架构

3.1.概述

        Spark 有多种运行模式   ,local、standalone、yarn、mesos、k8s

        资源组的Master和Work   :

                Cluster Master 表示 Master,负责管理与分配整个集群中的资源(CPU Core 和 Memory);

                Cluster Worker 表示 Worker,负责接收资源并执行作业中的任务。

        作业组的Master 和 Worker:

                Driver 表示 Master,负责管理整个集群中的作业任务调度;

                Executor 表示 Worker,负责执行具体的任务。

        无论什么运行模式都会存在这些角色,只是在不同的运行模式下,这些角色的分布会有所不同。

        --master [] 决定运行模式

        --deploy-mode决定Driver的运行方式,推荐Cluster,Driver与ApplicationMaster在同一进程,减少消息连接。

3.2.通用运行流程

        集群启动后,Worker节点会向Master节点发送心跳信息,汇报资源情况,包括内存和CPU;

        Client提交Application,根据不同的运行模式在不同地方创建Driver进程;

        SparkContext连接到Master,向Master注册并一次性申请所有需要的资源(粗粒度资源申请);

        Worker节点创建Executor进程,Executor向Driver反向注册;

        资源满足后(Executor注册完毕),SparkContext解析代码,创建RDD,构建DAG,提交给DAGScheduler分解成Stage(碰到运行算子时创建一个Job,每个Job中包含多个Stage阶段),然后将Stage(TaskSet)提交给TaskScheduler,TaskScheduler负责将Task分配到相应Worker,最后提交给Executor执行(发送到 Executor 的线程池中);

        每个Executor拥有一个线程池,通过启动多个线程(Task)来对RDD的partition进行计算,并向SparkContext报告,直至Task完成;

        所有 Task 完成后,SparkContext 向 Master 注销,释放资源。

        任务失败情况:

        Task失败,TaskScheduler重试3次,都失败则Stage失败

        Stage失败,DAGScheduler重新发送Stage给TaskScheduler重试4次,四次失败则Job失败

        Job失败则Application失败;

        同时有推测执行。

3.3.资源申请粒度

 3.3.1.粗粒度资源申请(Spark)

        Spark 会在 Application 执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的Task 执行完成后,才会释放这部分资源。

        优点:不需要重复申请资源,节省时间,加快计算速度

        缺点:资源释放较晚,集群资源无法得到充分利用

3.3.2.细粒度资源申请

        MapReduce在 Application 执行之前不需要去提前申请资源,而是直接执行,让 Job 中的每一个 Task 在执行前自己去申请资源, Task 执行完成就立刻释放资源。

        优点:集群资源可以充分利用

        缺点:Task自己去申请资源,Task启动变慢,提交的应用运行也就慢了

3.4.本地模式

        过

3.5.Standalone 独立模式

        过

3.6.YARN模式

3.6.1.yarn-client

        Driver进程创建在任务提交主机上,不推荐,过

3.6.2.yarn-cluster

        Driver和ApplicationMaster在同一进程,即同一主机;

        

        看图总结流程

        该模式下只能通过 YARN 查看日志。

3.6.3.Mesos

        国内用的少,过

3.6.4.Kubernetes

        不是重点,过

4.环境搭建

Spark环境搭建_搭建spark开发环境-CSDN博客

        

5.核心编程*

5.1.RDD

        RDD 是 Resilient Distributed Dataset 的缩写,意思为弹性分布式数据集(一种数据结构),是一个读取分区记录的集合,是 Spark 对需要处理的数据的基本抽象。

        Spark计算过程中可以简单的抽象为对RDD的创建、转换和返回操作结果的过程:

创建: 通过加载外部物理存储(如 HDFS)中的数据集,或 Application 中定义的对象集合(如 List)来创建。RDD 在创建后不可被改变,只可以对其执行下面两种操作。

转换(Transformation):对已有的 RDD 中的数据执行计算并进行转换,从而产生新的 RDD,在这个过程中有时会产生中间 RDD。Spark 对于 Transformation 采用惰性计算机制,遇到 Transformation 时并不会立即计算结果,而是要等遇到 Action 时才会一起执行。

行动(Action):对已有的 RDD 中的数据执行计算后产生结果,将结果返回 Driver 程序或写入到外部物理存储。在Action 过程中同样有可能产生中间 RDD。

具体通过案例理解。

五大属性:

        

创建RDD:

        通过集合创建:val rdd: RDD[Int] = sc.makeRDD(list)

        通过文件创建:val rdd01: RDD[String] = sc.textFile("data/test.txt")

5.2.Partition

5.2.1.集合的分区处理

        如果数据不是很多,但是分区数却很多的时候 Spark 会如何处理呢?

比如我们的数据是 List(1, 2, 3, 4, 5) ,但是分区数却有 12 个,看看 Spark 是如何工作的。

        建议读源码,过

5.2.2.文件的分区处理

        读取文件数据时,数据会按照 Hadoop 文件读取的规则进行分区,文件的分区规则和内存的分区规则有些差异。

        建议读源码,过

集合分区处理源码:

SparkContext.scala

def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
// ==================== 从这里继续深入 ====================
parallelize(seq, numSlices)
}
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
// ==================== 从这里继续深入 ====================
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
ParallelCollectionRDD.scala

private[spark] class ParallelCollectionRDD[T: ClassTag](
sc: SparkContext,
@transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
// cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
// instead.
// UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal.
override def getPartitions: Array[Partition] = {
// ==================== 从这里继续深入 ====================
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
override def compute(s: Partition, context: TaskContext): Iterator[T] = {
new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
}
override def getPreferredLocations(s: Partition): Seq[String] = {
locationPrefs.getOrElse(s.index, Nil)
}
}
ParallelCollectionRDD.scala

private object ParallelCollectionRDD {
/**
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
* it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
* is an inclusive Range, we use inclusive range for the last slice.
*/
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
// ==================== 从这里继续深入 ====================
positions(array.length, numSlices).map { case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
}
ParallelCollectionRDD.scala

// 计算分区所对应的集合索引区间
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
// 遍历 0 到 分区数的左闭右开区间(左边包含右边不包含)
// 刚才的案例中分区数为 12,则 0 until 12
(0 until numSlices).iterator.map { i =>
// start = 分区的索引从 0 开始 * 集合长度 / 分区数
// start 第 1 次 = 0 * 5 / 12 = 0
// start 第 2 次 = 1 * 5 / 12 = 0
// start 第 3 次 = 2 * 5 / 12 = 0
// start 第 4 次 = 3 * 5 / 12 = 1
// start 第 5 次 = 4 * 5 / 12 = 1
// start 第 6 次 = 5 * 5 / 12 = 2
// start 第 7 次 = 6 * 5 / 12 = 2
// start 第 8 次 = 7 * 5 / 12 = 2
// start 第 9 次 = 8 * 5 / 12 = 3
// start 第 10 次 = 9 * 5 / 12 = 3
// start 第 11 次 = 10 * 5 / 12 = 4
// start 第 12 次 = 11 * 5 / 12 = 4
val start = ((i * length) / numSlices).toInt
// end = (分区的索引从 0 开始 + 1) * 集合长度 / 分区数
// end 第 1 次 = (0 + 1) * 5 / 12 = 0
// end 第 2 次 = (1 + 1) * 5 / 12 = 0
// end 第 3 次 = (2 + 1) * 5 / 12 = 1
// end 第 4 次 = (3 + 1) * 5 / 12 = 1
// end 第 5 次 = (4 + 1) * 5 / 12 = 2
// end 第 6 次 = (5 + 1) * 5 / 12 = 2
// end 第 7 次 = (6 + 1) * 5 / 12 = 2
// end 第 8 次 = (7 + 1) * 5 / 12 = 3
// end 第 9 次 = (8 + 1) * 5 / 12 = 3
// end 第 10 次 = (9 + 1) * 5 / 12 = 4
// end 第 11 次 = (10 + 1) * 5 / 12 = 4
// end 第 12 次 = (11 + 1) * 5 / 12 = 5
val end = (((i + 1) * length) / numSlices).toInt
/*
第 1 个分区 part-00000 - (0, 0)
第 2 个分区 part-00001 - (0, 0)
第 3 个分区 part-00002 - (0, 1) => 1
第 4 个分区 part-00003 - (1, 1)
第 5 个分区 part-00004 - (1, 2) => 2
第 6 个分区 part-00005 - (2, 2)
第 7 个分区 part-00006 - (2, 2)
第 8 个分区 part-00007 - (2, 3) => 3
第 9 个分区 part-00008 - (3, 3)
第 10 个分区 part-00009 - (3, 4) => 4
第 11 个分区 part-00010 - (4, 4)
第 12 个分区 part-00011 - (4, 5) => 5
*/
(start, end)
}
}

文件的分区处理源码:

SparkContext.scala

def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
// LongWritable 和 TextInputFormat 熟悉不熟悉,没错就是每行的偏移量和数据
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
// 类似秒表,开始计时
Stopwatch sw = new Stopwatch().start();
// 获取要分析的文件列表
FileStatus[] files = listStatus(job);
// Save the number of input files for metrics/loadgen
job.setLong(NUM_INPUT_FILES, files.length);
// 计算文件总字节长度
long totalSize = 0; // compute total size
// 开始遍历要分析文件的路径
for (FileStatus file: files) { // check we have valid files
// 如果是文件夹则抛出异常
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
// 累加每个文件的字节长度
totalSize += file.getLen();
}
// 每个分区的字节大小 = 文件总字节长度 / (分区数 == 0 ? 1 : 分区数)
// 本文案例 goalSize = 81 / 2 = 40
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
// 默认 minSize = Math.max(1, 1)
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// generate splits
// 开始生成 splits,创建一个 List 存放切片
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
// 开始遍历要分析文件的路径
for (FileStatus file: files) {
// 获取文件路径
Path path = file.getPath();
// 获取文件的长度,文件的字节数
long length = file.getLen();
// 如果文件长度不为 0
if (length != 0) {
// 获取文件系统对象
FileSystem fs = path.getFileSystem(job);
// 获取文件对应的 Blocks 信息
BlockLocation[] blkLocations;
// 判断文件是否是 LocatedFileStatus 对象,如果是则文件使用了 ErasureCoded 需要获取文件块与 EC
校验块
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
// 如果不是则直接获取数据块
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判断文件是否可以进行切片,如果不可切,则整个文件作为一个切片处理
if (isSplitable(fs, path)) {
// 获取 Block 的大小(默认为 128M)
long blockSize = file.getBlockSize();
// 获取 Split 的大小,切片的默认大小为 Math.min(goalSize, blockSize)
// return Math.max(minSize, Math.min(goalSize, blockSize));
// 如果要调大切片则修改 FileInputFormat.SPLIT_MINSIZE 即可
// 本文案例 splitSize = Math.max(1, Math.min(40, 128)) = 40
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
// 声明一个变量存放文件剩余的字节
long bytesRemaining = length;
// 查看剩余的容量是否能达到阈值 SPLIT_SLOP:private static final double SPLIT_SLOP = 1.1
// 当剩余的字节个数 / 切片大小 大于 1.1,继续切;如果不大于,判断剩下的字节数是不是 0,如果不
是 0,生成最后一个切片
// SPLIT_SLOP = 1.1 的好处是可以减少分片的数量,从而减少计算的次数来提高性能
// 本文案例 wd1.txt 46 字节,46 / 40 = 1.15 大于 1.1 结果为 true 生成第一个切片
// 本文案例 wd2.txt 35 字节,35 / 40 = 0.875 不大于 1.1 结果为 false
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining,
splitSize, clusterMap);
// makeSplit 创建切片,切片生成后添加到 List
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
// 每次创建切片后,将已创建的部分删除
// 46 - 40 = 6,6 / 40 = 0.15 不大于 1.1,下次结果将为 false
bytesRemaining -= splitSize;
}
// 判断剩下的字节数是不是 0,如果不是 0,生成最后一个切片
// wd1.txt 剩余 6 != 0,生成第二个切片
// wd2.txt 35 != 0,生成第三个切片
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining,
bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
// 如果发现文件不能切片,将整个文件作为一个切片
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
// 如果文件为空,则整个文件作为一个切片处理
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// 类似秒表,停止计时
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
// 返回 Splits Array
return splits.toArray(new FileSplit[splits.size()]);
}

接下篇


http://www.kler.cn/a/400554.html

相关文章:

  • Vscode/Code-server无网环境安装通义灵码
  • 解决Ubuntu18.04及以上版本高分辨率下导致字体过小问题
  • Ubuntu22.04基于ROS2-Humble安装moveit2教程(亲测)
  • 【jvm】HotSpot中方法区的演进
  • 【MySQL】MySQL的笛卡尔积现象是什么?简单说说
  • 走进嵌入式开发世界
  • 商业物联网详细指南:优势与挑战
  • emerge 命令学习笔记
  • Flume1.9.0自定义拦截器
  • 跨平台WPF框架Avalonia教程 一
  • 【论文阅读】WaDec: Decompiling WebAssembly Using Large Language Model
  • 使用 .NET 创建新的 WPF 应用
  • web——upload-labs——第十关——.空格.绕过
  • HTTP 缓存策略
  • 网络卡绑定详解:提升网络性能与冗余的最佳实践
  • 【Zookeeper】一、Zookeeper的使命
  • 激光雷达不够用,怎么办?Ubuntu如何用一个激光雷达实现两个激光雷达的扫描点云效果?点云配准ICP,点云拼接、话题转换、ROS重录制bag包。
  • 互联网演进跨越半世纪,智能化时代呼唤Net5.5G网络新代际
  • React 教程第一节 简介概述 以及 特点
  • 新版华为认证全套资料(题库试题、知识点速记、考试大纲、思维导图、面试宝典)
  • WebSocket实战,后台修改订单状态,前台实现数据变更,提供前端和后端多种语言
  • 智能停车解决方案之停车场室内导航系统(二):核心技术与系统架构构建
  • 如何利用CSS制作导航菜单
  • 网约车治理:构建安全、高效、规范的出行新生态
  • i18n的原理是什么,spring整合i18n
  • nodejs+mysql+vue3 应用实例剖析