当前位置: 首页 > article >正文

spark笔记(自用)

spark笔记

1. 概述

Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎;Spark提供内存计算,将计算结果直接放在内存中,减少了迭代计算的IO开销,有更高效的运算效率。

  • 1.1 Spark核心模块

    • Spark Core:提供Spark最基础与最核心的功能
    • Spark SQL:是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive 版本的SQL 方言(HQL)来查询数据
    • Spark Streaming:是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API
  • 1.2 基本概念

    • RDD:弹性分布式数据集的简称,分布式内存的一个抽象概念,提供了一种高度受限的共享内存模 (可以看作一个不可变的分布式对象集合)

    • DAG:有向无环图的简称, 反映RDD之间的依赖关系

    • Executor:是运行在工作节点(WorkerNode)的一个进程,负责运行Task

    • 应用(Application):用户编写的Spark应用程序

    • 任务( Task ):运行在Executor上的工作单元

    • 作业( Job ):一个作业包含多个RDD及作用于相应RDD上的各种操作

    • 阶段( Stage ):是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为阶段,或者也被称为任务集合,代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集

2. spark工作架构

在这里插入图片描述

  • Cluster Manager:集群资源管理器
  • Worker Node:运行作业任务的工作节点,运行在集群中的一台服务器上
  • Cluster Manager:每个应用的任务控制节点
  • Driver:每个应用的任务控制节点
  • Executor:每个工作节点上负责具体任务的执行进程
      一个应用由一个Driver和若干个作业构成,一个作业由多个阶段构成,一个阶段由多个没有Shuffle关系的任务组成;
      当执行一个应用时,Driver会向集群管理器申请资源(即由Driver创建一个SparkContext,进行资源的申请、任务的分配和监控) ,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行任务,运行结束后,执行结果会返回给Driver或者写到HDFS或者其他数据库中。

3. RDD

弹性分布式数据集,spark最基本的数据处理模型,代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

  • 3.1 特性
    弹性: 内存与磁盘的自动切换、数据丢失可以自动恢复、计算出错重试机制、可根据需要重新分片
    分布式: 数据存储在大数据集群不同节点上
    数据集: RDD封装计算逻辑,不保存数据
    不可变: RDD封装计算逻辑,是不可以改变的,想要改变只能产生新的RDD,在新的RDD里面封装计算逻辑
    可分区: 并行计算

  • 3.2 RDD 创建

    RDD的创建可以从从文件系统中加载数据创建得到,或者通过并行集合(数组)创建RDD。

    • 从文件系统中加载数据:
    val lines = sc.textFile("C:\Users\26414\Downloads\word.txt")
    
    
    • 从HDFS中加载数据:
    val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
    
    
    • 从已经存在的集合(数组)上创建:
    val list = List(1,2,3,4,5)
    
    val rdd = sc.parallelize(list)
    
    
  • 3.3 RDD 操作

    • 3.3.1 转换算子(产生新的RDD算子,不触发计算,返回新的RDD)
       
      单值类型

      方法描述
      map()一对一的转换操作,对每一条数据计算,默认分区数不变
      mapPartitions()以分区为单位进行计算,一次性获取分区的所有数据,参数为迭代器,返回也是迭代器
      mapPartitionsWithIndex()参数为分区号和迭代器,返回值是迭代器
      flatMap()将map结果扁平化
      glom()将分区内数据转为数组Array
      groupBy(函数)用于执行分组操作,返回值为元组,第一个元素为分组的key,第二个元素为相同key的可迭代集合,将数据进行分组操作,但是分区是不会改变的,也可以传入参数改变分区数(shuffle)
      filter(函数)过滤,参数为返回值类型为Boolean的函数,将数据根据指定的规则进行筛选
      sample()抽取样本,也可以给定种子
      distinct()去重,也可以传入参数num,改变分区数(shuffle)
      coalesce(num)缩减分区,默认shuffle
      repartition(num)扩大分区,底层是coalesce(num, true)
      sortBy()排序之前可以使用函数进行处理,默认升序,可以传入false降序

      map和mapPartitions区别: map每次处理一条数据,mapPartitions每次将一个分区当成一个整体进行处理,如果一个分区没有处理完,那么所有的数据都不会释放,容易出现内存溢出

      双值类型

      方法描述
      union(rdd)并集(分区合并)
      intersection(rdd)交集(保留最大分区,shuffle)
      substring(rdd)差集(分区数为前面的,shuffle)

      key-value类型

      方法描述
      groupByKey()根据key进行分组,将value合并为一个迭代器(列表)
      reduceByKey()根据key进行分组,将value合并为一个迭代器(列表),然后根据传入的自定义函数,对每组中value数据进行聚合处理。
      sortByKey()根据key进行排序操作。默认升序,若想倒序排列,设置参数ascending = False(可直接写False,省略 ascending =)
      countByValue()对value进行count的数量统计
      combineByKey()对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致

      reduceByKey和 groupByKey的区别:
       
      从 shuffle 的角度: reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
       
      从功能的角度: reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey

    • 3.3.2 行动算子(不产生新的RDD算子,触发计算,返回具体值)

      方法描述
      reduce()聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
      collect()以数组 Array 的形式返回数据集的所有元素
      count()返回 RDD 中元素的个数
      take()返回一个由 RDD 的前 n 个元素组成的数组
      aggregate()分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
      fold()折叠操作,aggregate 的简化版操作
      countByKey()统计每种 key 的个数
      save相关算子saveAsTextFile()、saveAsObjectFile()、saveAsSequenceFile()
      foreach()分布式遍历 RDD 中的每一个元素,调用指定函数
  • 3.4 Shuffle与依赖

    Shuffle:数据混洗

    Spark中的两种依赖:

    • 宽依赖:一个父RDD的一个分区对应一个子RDD的多个分区,会引起Shuffle
    • 窄依赖:一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区。
       

    宽窄依赖通俗理解参考:https://www.likecs.com/show-203912211.html

    阶段的划分(作业分成几段,落盘完才能进行下一阶段):取决于Shuffle的个数,阶段的个数 = Shuffle的个数 + 1,Shuffle将阶段一分为二

  • 3.5 RDD 持久化

    RDD是不保存数据的,所以如果多个RDD需要共享其中一个RDD的数据,必须重头执行,效率非常低,所以需要将一些重复性比较高,比较耗时的操作的结果缓存起来,提高效率。

    • cache(),缓存RDD数据,在执行行动算子之后,会在血缘关系中添加缓存相关的依赖,一旦发生错误,可以重新执行
    • persist(),可以设置不同的级别,存储在磁盘内存,cache默认只在内存中缓存
    • checkpoint(),执行前需要设置检查点目录;为了保证数据的准确性,执行时会启动新的job,所以一般与cache结合使用,这样checkpoint的job就可以从cache缓存中读数据;checkpoint()会切断血缘关系,因为它将数据保存在分布式文件系统中,当成了一个数据源,数据相对安全。
       

    血缘关系: A的操作行为依赖于B,B的操作行为依赖于C,然而A的操作行为间接依赖于C,推导于:相邻的两个RDD的关系称之为依赖关系,新的RDD依赖于旧的RDD,多个连续的RDD的依赖关系,称之为血缘关系。每个RDD会保存血缘关系,但每个RDD不会保存数据,如果在reduceByKey过程中出现错误时,由于RDD不会保存数据,但可以根据血缘关系将数据源重新读取进行计算。
    参考:https://www.likecs.com/show-203912211.html

4. Spark 共享变量

  • 4.1 累加器
    使用场景
    主要用于多个节点对一个变量进行共享性的操作,在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

     object Accumulator_scala {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Accumulator_scala").setMaster("local")
        val sc  = new SparkContext(conf)
    
        val sum = sc.accumulator(0);
    
        val numberArray = Array(1,2,3,4,5)
        val numbers = sc.parallelize(numberArray)
    
        numbers.foreach( num => sum += num)
        println(sum)
      }
    }
    

    注: 累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。

  • 4.2 广播变量
    使用场景
    假如在spark程序里需要用到大对象,比如:字典,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗Executor服务器上的资源,如果将这个变量声明为广播变量,那么只是每个Executor拥有一份,这个Executor启动的task会共享这个变量,节省了通信的成本和服务器的资源

    object BroadcastVariable_scala {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("BroadcastVariable_scala").setMaster("local")
    
        val sc = new SparkContext(conf)
        val factor = 3
        val factorBroadcast = sc.broadcast(factor)
    
        val numberArray = Array(1,2,3,4,5)
        val numbers = sc.parallelize(numberArray,1)
    
        val multipleNumber = numbers.map{num => num * factorBroadcast.value}
        multipleNumber.foreach(num => println(num))
      }
    }
    

    注:

    • 因为RDD是不存储数据的。可以将RDD的结果广播出去。
    • 广播变量只能在Driver端定义,不能在Executor端定义。
    • 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
      -> 如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。
    • 如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本

    参考:https://blog.csdn.net/chanyue123/article/details/123175776(累加器介绍)
               https://zhuanlan.zhihu.com/p/158894710(广播变量原理)

    5. Spark SQL

    Spark SQL 是Spark 用于结构化数据(structured data)处理的Spark 模块,兼容Hive,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据,提高开发效率,提供两个编程抽象,DataFrame,DataSet。
    RDD是数据,DF是结构,DS是类型,RDD只包含数据,没有结构,DataFrame添加了结构,DataSet添加了类型;(DataFrame就是在RDD基础上加入了列,处理数据就像处理二维表一样)
               在这里插入图片描述

  • 5.1 DataFrame

    • 5.1.1 DataFrame的组成

      在结构层面:
      StructType对象描述整个DataFrame的表结构
      StructField对象描述一个列的信息
      在数据层面
      Row对象记录一行数据
      Column对象记录一列数据并包含列的信息

    • 5.1.2 DataFrame之DSL

    • agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合
    • alias: 它是Column对象的API, 可以针对一个列 进行改名
    • withColumnRenamed: 它是DataFrame的API, 可以对DF中的列进行改名, 一次改一个列, 改多个列 可以链式调用
    • orderBy: DataFrame的API, 进行排序, 参数1是被排序的列, 参数2是 升序(True) 或 降序 False
    • first: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象.(Row对象 就是一个数组, 你可以通过row[‘列名’] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等))

          常用方法

    • show方法
      功能:展示DataFrame中的数据, 默认展示20条
    • printSchema方法
      功能:打印输出df的schema信息

           DSL语法

      df.select("username").show(),查看某一列数据
      df.select($"username",$"age" + 1).show,列数据进行运算
      df.select('username, 'age + 1).show()
      df.select('username, 'age + 1 as "newage").show(),起别名
      df.filter($"age">30).show(),过滤数据
      df.groupBy("age").count.show(),分组
    
    • 5.1.2 DataFrame之SQL

           SQL语法

      df.createOrReplaceTempView("people"),创建一个临时表
      val sqlDF = spark.sql("SELECT * FROM people"),查询
      sqlDF.show(),显示结果
      df.createGlobalTempView("people"),创建一个全局表
      spark.sql("SELECT * FROM global_temp.people").show(),查询全局表必须加global_temp
      spark.newSession().sql("SELECT * FROM global_temp.people").show(),在新的Session里可以查询全局表
    
    • 5.1.3 DataFrame转换
      • RDD转为DF
        RDD没有结构,指定结构:idRDD.toDF(“id”).show,使用toDF(“列名”)
        通过样例类,将数据转换成样例类的对象,使用idRDD.toDF,使用样例类中的列名,也可以重命名.toDF(“id”,“name”)

      • DF转为RDD
        df.rdd,返回的是Row对象

    注:DataFrame创建参考:https://blog.csdn.net/feizuiku0116/article/details/121523791

  • 5.2 DataSet
    是具有强类型的数据集合,需要提供对应的类型信息,查询之后使用方便,可以看做DataFrame的拓展

    • DataFrame 一行记录中没有指定特定的数据类型
    • Dataset 一行记录中每个对象有明确类型

6. Spark Streaming

SparkStreaming是一套框架。
SparkStreaming是Spark核心API的一个扩展,可以实现高吞吐量的,具备容错机制的实时流数据处理。
实时: 数据处理的时延在毫秒内响应
离线: 数据处理的时延在小时、天
数据处理的方式 :批处理(多条处理),流式(一条一条处理)
微批次,准实时的数据处理引擎

支持多种数据源获取数据:
在这里插入图片描述

Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,进行处理后,处理结构保存在HDFS、DataBase等各种地方。
*使用的最多的是kafka+Spark Streaming

内部工作原理: Spark Streaming从实时数据流接入数据,再将其划分为一个个小批量供后续Spark engine处理,所以实际上,Spark Streaming是按一个个小批量来处理数据流的。
在这里插入图片描述

Spark Streaming为这种持续的数据流提供了的一个高级抽象,即:discretized[dɪˈskriːtaizd] stream(离散数据流)或者叫DStream。DStream既可以从输入数据源创建得来,如:Kafka、Flume或者Kinesis,也可以从其他DStream经一些算子操作得到。其实在内部,一个DStream就是包含了一系列RDDs。

  • 6.1 StreamingContext

    • StreamingContext定义之后做的操作如下
    1. 通过创建输入DStream来创建输入数据源。
    2. 通过对DStream定义transformation和output算子操作,来定义实时计算逻辑。
    3. 调用StreamingContext的start()方法,来开始实时处理数据。
    4. 调用StreamingContext的awaitTermination()方法,来等待应用程序的终止。可以使 用CTRL+C手动停止,或者就是让它持续不断的运行进行计算。
    5. 也可以通过调用StreamingContext的stop()方法,来停止应用程序。
       
    • StreamingContext创建方式

      • SparkConf创建:
      val conf = new SparkConf().setAppName(appName).setMaster(master);
      val ssc = new StreamingContext(conf, Seconds(1));
      
      appName是用来在Spark UI上显示的应用名称。master是Spark、Mesos或Yarn集群的URL,或者是local[*]。batch interval可以根据应用程序的延迟要求以及可用的集群资源情况来设置。
      
      • SparkContext创建:
      val sc = new SparkContext(conf)    
      val ssc = new StreamingContext(sc, Seconds(1))
      
  • 6.2 DStream 简介
    Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream

    假如外部数据不断涌入,按照一分钟切片,每个一分钟内部的数据是连续的(连续数据流),而一分钟与一分钟的切片却是相互独立的(离散流)。
    DStream是Spark Streaming特有的数据类型。

    Dstream可以看做一组RDDs,即RDD的一个序列:
    在这里插入图片描述

    Spark的RDD可以理解为空间维度,Dstream的RDD理解为在空间维度上又加了个时间维度。

    将连续的数据持久化,离散化,然后进行批量处理。
    持久化:接收到的数据暂存(持久化原因:做容错的,当数据流出错了,因为没有得到计算,需要把数据从源头进行回溯,暂存的数据可以进行恢复。)
    离散化:按时间分片,形成处理单元。
    分片处理:分批处理。

  • 6.3 DStream 转换操作

    方法描述Scala示例
    map()对DStream中的每个元素应用指定函数并返回各元素输出元素组成的DStreamds.map(x=>x+1)
    flatMap()对DStream中的每个元素应用指定函数并返回各元素输出迭代器组成的DStreamds.flatMap(x=>x.split(" "))
    filter()筛选过滤ds.filter(x=>x != 1)
    repartition()改变DStream分区数ds.repartition(10)
    reduceByKey()将每个批次中键相同的记录归约ds.reduceByKey((x,y)=>x+y)
    groupByKey()将每个批次中的记录按键分组ds.groupByKey()
  • 6.4 DStream 输出操作

    方法描述
    print()在Driver中打印前10个元素
    saveAsTextFiles(prefix, [suffix])以文本形式保存为文件。其中每次批处理产生文件以prefix-TIME_IN_MS[.suffix]命名
    saveAsObjectFiles(prefix, [suffix])将DStream中内容按对象序列化并以SequenceFile格式保存。其中每次批处理产生文件以prefix-TIME_IN_MS[.suffix]命名
    saveAsHadoopFiles(prefix, [suffix])以文本形式保存为hadoop文件。其中每次批处理产生文件以prefix-TIME_IN_MS[.suffix]命名
    foreachRDD(func)最基本输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统
  • 6.5 Spark Streaming 窗口操作

    • Spark提供了一组窗口操作,通过滑动窗口技术对大规模数据的增量更新进行统计分析

    • Window Operation:定时进行一定时间段内的数据处理
      在这里插入图片描述

    • 任何基于窗口操作需要指定两个参数:

      • 窗口总长度:你想计算多长时间的数据
      • 滑动时间间隔:你每多长时间去更新一次
         
         

    外部参考文档:
    https://www.cnblogs.com/fishperson/p/10447033.html
    https://blog.csdn.net/u012369535/article/details/93042905
    内部参考文档:
    https://wiki.beisen-inc.com/pages/viewpage.action?pageId=231247306

7. Spark 调优

  • 7.1 常规性能调优

    • 常规性能调优
      增加executor个数
      增加每个executor的cpu个数
      增加每个executor的内存,1缓存更多的数据,减少磁盘IO,2为shuffle提供更多内存,减少写入磁盘数据,3内存较小会频繁GC,增加内存可以避免频繁GC,提升整体性能
    • RDD优化
      避免在相同的算子和计算逻辑下对RDD进行重复的计算
      对多次使用的RDD进行持久化(可以序列化的,对于可靠性要求高的数据可以使用副本)
      RDD尽可能早的过滤操作,减少内存占用
    • 并行度调节
      在资源允许的情况下,应尽可能让并行度与资源匹配,官方推荐task数量应该是总cpu核心数的2-3倍(因为有的任务执行的快,执行完毕后可以立马执行下一个task)
    • 广播大的变量
      如果task算子中使用了外部的变量,每个task会有一个变量的副本,这就造成了内存的极大消耗,如果使用广播变量的话每个executor保存一个副本,可以很大程度上减少内存的使用
      序列化:使用Kryo序列化,java的序列化太重(慢,字节多),效率不高
      调整本地化等待时长
  • 7.2 算子调优

    • 使用mapPartitions,map对分区中每一个元素操作,mapPartitions对真个分区操作,如果使用jdbc写入数据,那么每条数据都要建立一个连接,资源消耗大,如果使用mapPartitions,只需要建立一个连接
      缺点:如果数据量很大的话,使用mapPartitions容易OOM,如果资源允许可以考虑使用mapPartitions代替map
    • 使用foreachPartition优化数据库操作,与mapPartitions类似,foreachPartition将RDD的每个分区作为遍历对象
    • filter与coalesce的配合使用,过滤后数据量变小,再用原来的去执行浪费资源,有可能导致数据倾斜,可以进行合并分区或者扩大分区(宽依赖,需要设置为true,即开启shuffle,否则不起作用)
    • 使用repartition提高SparkSQL的并行度,默认是hive表的文件的切片个数,spark SQL的并行度没法手动更改,可以读取到数据后立马进行扩大分区数,提高并行度
    • 使用reduceByKey预聚合,groupByKey 不会进行map 端的聚合,减少磁盘IO,减少对磁盘空间的占用,reduce端的数据也变少
  • 7.3 Shuffle 调优

    • 增加map端缓冲区的大小,避免频繁溢写到磁盘
    • 增加reduce端拉取数据缓冲区大小,减少拉取次数,减少网络传输
    • 增加reduce端拉取数据的重试次数,避免因为JVM的full GC或者网络原因导致数据拉取失败进而导致作业执行失败
    • 增加reduce端拉取数据等待间隔,拉取失败会等一段时间后重试,以增加shuffle - 操作的稳定性
    • 增加SortShuffle 排序操作阈值,
  • 7.4 JVM调优

    • 降低cache 操作的内存占比,1静态内存管理机制,需要调节,2统一内存管理机制,堆内存被划分为了两块,Storage 和Execution
    • 增大Executor 堆外内存,有时去拉取Executor数据时,Executor可能已经由于内存溢出挂掉了
    • 增大连接等待时长,Executor优先从本地关联的BlockManager获取数据,获取不到会去其他节点上获取数据,反复拉取不到会导致DAGScheduler反复提交几次stage,TaskScheduler反复提交几次Task,大大延长作业运行时间

 
 

参考文档:
https://zhuanlan.zhihu.com/p/463537198
https://www.bbsmax.com/A/q4zVE2aWdK/
https://www.cnblogs.com/kdy11/p/10943547.html
《spark官方文档》
《Spark快速大数据分析》
《Apache Spark 2.0.2 中文文档》


http://www.kler.cn/a/4484.html

相关文章:

  • 【DB-GPT】开启数据库交互新篇章的技术探索与实践
  • 灵活妙想学数学
  • Web开发(二)CSS3基础与进阶
  • stack_queue的底层,模拟实现,deque和priority_queue详解
  • 新手如何练习SQL?|掌握
  • 接口测试Day09-数据库工具类封装
  • 进程与子进程
  • java源码阅读 - HashTable
  • 提升代码可读性,减少 if-else 的几个小技巧
  • 合泰HT32单片机使用PDMA和ADC采集多路模拟值并显示在OLED屏上
  • Vue+springboot笔记本电脑商城销售系统
  • 校招失败后,在外包公司熬了 2 年终于进了字节跳动,竭尽全力....
  • python Django运用——(1.创立项目)
  • 进程与线程的关系
  • 【协议】03、深度解剖之HTTP协议
  • leetcode 有序数组的平方(977)
  • (四)Tomcat源码阅读:Service组件分析
  • 垃圾回收机制——把我回收了吧
  • 我的黑苹果安装经验
  • 深入浅出Java线程池Worker类
  • Java Web的三种获取参数的方法
  • Linux部署Docker
  • 分组函数·union·limit·order by排序·group by分组·外键
  • 面试字节跳动软件测试岗,收到offer后我却毫不犹豫拒绝了....
  • day15-面向对象作业2
  • 常用的 IntelliJ IDEA 快捷键