当前位置: 首页 > article >正文

spark——RDD算子集合

目录

    • 算子
      • 转换算子示例
        • map
        • flatMap
        • ReduceByKey
        • filter
        • distinct
        • glom
        • groupBy
        • groupByKey
        • SortBy
        • sortByKey
        • union
        • 交集intersection和差集subtract
        • join
        • partitionBy
        • mapPartition
        • sample
      • 行动算子示例
        • ForeachPartition
        • Foreach
        • SaveAsTextFile
        • CountByKey
        • Reduce
        • fold
        • first、take、count
        • top、takeOrderd
        • takeSample

算子

描述
分布式集合RDD对象的方法被称为算子

类别定义特点作用
转换算子返回值仍然是一个 RDD 的算子懒加载,遇到 Action 算子前不执行构建执行计划
(例如:map, filter, flatMap, reduceByKey)- 不会立即触发计算
- 形成 RDD 的 lineage
- 描述数据如何从一个 RDD 转换为另一个 RDD
- 延迟计算
行动算子返回值不是 RDD 的算子立即执行让由转换算子构建好的执行计划开启工作
(例如:collect, count, reduce, take)- 触发计算
- 执行并返回结果到驱动程序或外部
- 执行计算并返回结果
- 触发整个 RDD lineage 的计算

在这里插入图片描述
转换算子是对RDD进行一些格式等方面的操作,使得RDD满足行动算子的要求,行动算子是对RDD的计算操作

转换算子示例

map

将RDD中的数据一条一条处理,处理逻辑基于map算子接收到的函数,返回值为处理后的RDD

package com.wunaiieq.RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object TransformationMap {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("map")
    val sparkContext = new SparkContext(conf)
    //1.创建rdd对象
    val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4))
    //2.定义函数,作为map算子的传入函数
    def funcA(x:Int):Int={
      if((x%2!=0)){
        return x*2
      }else{
        return x
      }
    }
    //3.map中使用funcA
    val rdd1: RDD[Int] = rdd.map(funcA)
    println(rdd1.collect().mkString(","))
    //4.匿名函数的用法
    def funcB(x:Int): Int ={
      return x*2
    }
    val rdd2: RDD[Int] = rdd.map(funcB)
    println(rdd2.collect().mkString(","))
    //关闭sc
    sparkContext.stop()
  }
}
flatMap

将集合中的每一个元素按照空格进行拆分,拆分后的内容逐一作为一个元素组成一个新的RDD
在这里插入图片描述

package com.wunaiieq.RDD

//1.导入spark下的SparkConf, SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object TransformationFlatMap {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序的名称
    val conf = new SparkConf().setMaster("local[*]").setAppName("flagMap")
    //3.通过SparkConf对象构建SparkContext对象
    val sparkContext = new SparkContext(conf)
    //4.创建一个RDD对象
    val rdd1: RDD[String] = sparkContext.parallelize(List("a b c", "a a d", "b c d"))
    //5.将集合中的每一个元素按照空格进行拆分,拆分后的内容逐一作为一个元素组成一个新的RDD
    val rdd2: RDD[String] = rdd1.flatMap(line=>line.split(" "))
    print(rdd2.collect().mkString(","))
    sparkContext.stop()
  }
}
ReduceByKey

仅适用于Key-Value类型的RDD,自动按照key进行分组,然后根据提供的函数func的逻辑进行聚合操作,完成组内所有value的聚合操作。

package com.wunaiieq.RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddReduceByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("reduceByKey")
    val sparkContext = new SparkContext(conf)
    //1.创建一个Key-Value类型的RDD对象
    val rdd: RDD[(String, Int)] = sparkContext.parallelize(List(("a", 1), ("a", 2), ("b", 1), ("a", 3), ("b", 1), ("a", 4)))
    //2.将相同的key的value做聚合加操作
    val rdd1: RDD[(String, Int)] = rdd.reduceByKey((x, y) => x + y)
    println(rdd1.collect().mkString(","))
    sparkContext.stop()
  }
}
filter

设置过滤函数,将RDD种符合条件的元素抽取并组成新的RDD(1234,抽取偶数,结果为24)

package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddFilter {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("filter")
    //3.使用conf对象构建SparkContext对象
    val sparkContext = new SparkContext(conf)
    //5.创建Rdd
    val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 8))
    //6.调用fiter算子,过滤掉奇数(x%2==0 false)
    //只保留偶数(x%2==0 true)
    val resultRdd: RDD[Int] = rdd.filter(x => x % 2 == 0)
    //7.输出
    println(resultRdd.collect().mkString(","))
    //4.关闭sparkContext对象
    sparkContext.stop()
  }
}
distinct

过滤重复值,重复值保留一个

package com.wunaiieq.RDD
//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddDistinct {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("filter")
    //3.使用conf对象构建SparkContet对象
    val sparkContext = new SparkContext(conf)
    //5.创建Rdd
    val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 2, 5, 2))
    //6.去重后输出 : 1,2,3,5
    println(rdd.distinct().collect().mkString(","))
    //7.key-value型的rdd,如何认定为重复元素?只有key和value的值都相同时认定为重复元素
    //结果(y,1),(x,1),(x,2)
    val rdd1: RDD[(Char, Int)] = sparkContext.parallelize(List(('x', 1), ('x', 2), ('x', 1), ('y', 1)))
    println(rdd1.distinct().collect().mkString(","))
    //4.关闭sc对象
    sparkContext.stop()
  }
}

在这里插入图片描述

glom

给RDD增加一个关于分区的嵌套,显示分区效果,对于结果没有太多影响

package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddGlom {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("glom")
    //3.使用conf对象构建SparkContet对象
    val sparkContext = new SparkContext(conf)
    //5.创建Rdd
    val rdd1: RDD[Int] = sparkContext.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
    //6.rdd1.glom()将rdd的数据加上一个嵌套,这个嵌套是按照分区进行的
    // rdd1: RDD[Int] 和 rdd2: RDD[Array[Int]] 类型不同
    val rdd2: RDD[Array[Int]] = rdd1.glom()
    //7.rddc1: Array[Int]和rddc2: Array[Array[Int]]类型也不一样
    val rddc1: Array[Int] = rdd1.collect()
    val rddc2: Array[Array[Int]] = rdd2.collect()
    println(rddc1.mkString(","))
    //println(rddc2.mkString(","))
    rddc2.foreach(arr=>println(arr.mkString(",")))
    //4.关闭sparkContext对象
    sparkContext.stop()
  }
}

在这里插入图片描述

groupBy

将输入的RDD进行分组,传入函数为分组要求,比如根据列表种第一个元素,使得所有相同的元素为一组

package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddGroupBy {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("groupBy")
    //3.使用conf对象构建SparkContet对象
    val sparkContext = new SparkContext(conf)
    //5.创建Rdd
    val rdd: RDD[(Char, Int)] = sparkContext.parallelize(Array(('a', 1), ('c', 1), ('a', 2), ('b', 1), ('b', 2), ('a', 3), ('a', 4)))
    //6.通过groupBy算子对rdd对象中的数据进行分组
    //groupBy插入的函数的用意是指定按照谁进行分组
    val gbRdd: RDD[(Char, Iterable[(Char, Int)])] = rdd.groupBy(tupEle => tupEle._1)
    //收集到Driver端
    val result1: Array[(Char, Iterable[(Char, Int)])] = gbRdd.collect()
    //(a,CompactBuffer((a,1), (a,2), (a,3), (a,4))),(b,CompactBuffer((b,1), (b,2)))
    println(result1.mkString(","))
    //7.使用map转换算子
    //(a,List((a,1), (a,2), (a,3), (a,4))),(b,List((b,1), (b,2)))
    val result2: Array[(Char, List[(Char, Int)])] = gbRdd.map(tup => (tup._1, tup._2.toList)).collect()
    println(result2.mkString(","))
    //4.关闭sparkContext对象
    sparkContext.stop()
  }
}

在这里插入图片描述

groupByKey

针对kv型的RDD进行分组

package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddGroupByKey {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("groupByKey")
    //3.使用conf对象构建SparkContet对象
    val sparkContext = new SparkContext(conf)
    //5.创建Key-Value型Rdd
    val rdd: RDD[(Char, Int)] = sparkContext.parallelize(Array(('a', 1), ('c', 1),('c', 3),('a', 2), ('b', 1), ('b', 2), ('a', 3), ('a', 4)))
    //6.按照key进行分组,分组后的结果是有二元组组成的RDD
    val gbkRdd: RDD[(Char, Iterable[Int])] = rdd.groupByKey()
    val result1: Array[(Char, Iterable[Int])] = gbkRdd.collect()
    //(a,CompactBuffer(1, 2, 3, 4)),(b,CompactBuffer(1, 2))
    println(result1.mkString(","))
    //7.使用map转换算子,对value数据转为List
    val result2: Array[(Char, List[Int])] = gbkRdd.map(tup => (tup._1, tup._2.toList)).collect()
    //(a,List(1, 2, 3, 4)),(b,List(1, 2))
    println(result2.mkString(","))
    //4.关闭sparkContext对象
    sparkContext.stop()
  }
}

在这里插入图片描述

SortBy

排序算子,对rdd按照元祖的第n个值进行排序

package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddSortBy {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("groupByKey")
    //3.使用conf对象构建SparkContet对象
    val sparkContext = new SparkContext(conf)
    //5.创建Key-Value型Rdd
    val rdd: RDD[(Char, Int)] = sparkContext.parallelize(List(('w', 2), ('h', 5), ('k', 9), ('m', 3),('a', 7),
      ('p', 4), ('q', 1), ('n', 8), ('y', 6)))
    //6.使用sortBy对rdd按照元祖的第二个值进行排序
    /*f: (T) => K,指定按照第几个元素记进行排序
     ascending: Boolean = true,true表示升序,false表示降序,默认就是true升序
     numPartitions: Int = this.partitions.length排序的分区数,默认为rdd的分区数*/
    val result1: Array[Array[(Char, Int)]] = rdd.sortBy(tup => tup._2,
      ascending = true, numPartitions = 3).glom().collect()
    result1.foreach(arr=>println(arr.mkString(",")))
    //7.全局有序,排序后的分区数设置为1
    val result2: Array[(Char, Int)] = rdd.sortBy(tup => tup._2,
      ascending = true, numPartitions = 1).collect()
    println(result2.mkString(","))
    //8按照元祖的第一个元素进行降序排序
    val result3: Array[(Char, Int)] = rdd.sortBy(tup => tup._1,
      ascending = false, numPartitions = 1).collect()
    println(result3.mkString(","))
    //4.关闭sparkContext对象
    sparkContext.stop()
  }
}

在这里插入图片描述

sortByKey

默认按照key的升序进行排序,生成的RDD对象分区数和原RDD相同

package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddSortByKey {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("sortByKey")
    //3.使用conf对象构建SparkContet对象
    val sparkContext = new SparkContext(conf)
    //5.创建Key-Value型Rdd
    val rdd: RDD[(Char, Int)] = sparkContext.parallelize(List(('w', 2), ('h', 5), ('k', 9), ('m', 3),('a', 7),
      ('p', 4), ('q', 1), ('n', 8), ('y', 6)))
    println("rdd partition num:"+rdd.getNumPartitions)
    //6.默认按照key的升序排列,生成的RDD对象分区数与原RDD相同。
    val rdd1: RDD[(Char, Int)] = rdd.sortByKey()
    println("rdd1 partition num:"+rdd1.getNumPartitions)
    val result1: Array[(Char, Int)] = rdd1.collect()
    println(result1.mkString(","))
    //7.按照key的降序排列,默认生成的RDD对象分区数与原RDD相同。
    val rdd2: RDD[(Char, Int)] = rdd.sortByKey(ascending = false)
    println("rdd2 partition num:"+rdd2.getNumPartitions)
    val result2: Array[(Char, Int)] = rdd2.collect()
    println(result2.mkString(","))
    //8.按照key的降序排列,设定生成的RDD对象分区数为4。
    val rdd3: RDD[(Char, Int)] = rdd.sortByKey(ascending = false, numPartitions = 4)
    println("rdd3 partition num:"+rdd3.getNumPartitions)
    val result3: Array[Array[(Char, Int)]] = rdd3.glom().collect()
    result3.foreach(arr=>println(arr.mkString(",")))
    //9.按照key的升序排序,生成的RDD的分区数设置为1,就实现了全局有序
    val result4: Array[(Char, Int)] = rdd.sortByKey(ascending = true, numPartitions = 1).collect()
    println(result4.mkString(","))
    //4.关闭sc对象
    sparkContext.stop()
  }
}

在这里插入图片描述

union

将两个同类型RDD进行合并,合并后的顺序为调用union算子的rdd在前,作为参数的rdd在后,rdd内部顺序不变

package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddUnion {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("union")
    //3.使用conf对象构建SparkContet对象
    val sparkContext = new SparkContext(conf)
    //5.创建3个RDD对象
    val rdd1: RDD[Int] = sparkContext.parallelize(Array(1, 2, 3, 4))
    val rdd2: RDD[Int] = sparkContext.parallelize(Array(3, 4, 5, 6))
    val rdd3: RDD[String] = sparkContext.parallelize(Array("x", "y"))
    //6.union算子将两个rdd对象元素做并集计算,并生成一个新的rdd对象
    val result21: RDD[Int] = rdd2.union(rdd1)
    val result12: RDD[Int] = rdd1.union(rdd2)
    //7.union算子不会去重,无方向性
    println(result21.collect().mkString(","))
    println(result12.collect().mkString(","))
    //8.使用union做并集计算的两个rdd的数据类型要一致,否则出错
    //rdd1.union(rdd3)
    //4.关闭sc对象
    sparkContext.stop()
  }
}

在这里插入图片描述

交集intersection和差集subtract

交集:将两个RDD相同元素获取后组成新的RDD
差集:一个RDD去除和另一个RDD相同元素的剩余元素,组成的新的RDD

package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddIntersectionSubtract {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("交集和差集")
    //3.使用conf对象构建SparkContet对象
    val sparkContext = new SparkContext(conf)
    //5.创建5个RDD对象
    val rdd1: RDD[Int] = sparkContext.parallelize(Array(1, 2, 3, 4))
    val rdd2: RDD[Int] = sparkContext.parallelize(Array(3, 4, 5, 6))


    val rdd3: RDD[(Char, Int)] = sparkContext.parallelize(List(('x', 6), ('y', 1)))
    val rdd4: RDD[(Char, Int)] = sparkContext.parallelize(List(('x', 6), ('z', 3)))
    val rdd5: RDD[(Char, Int)] = sparkContext.parallelize(List(('x', 5), ('z', 3)))


    //6.交集运算,将两个rdd中的相同元素获取后组成一个新的rdd,无方向性
    println(rdd1.intersection(rdd2).collect().mkString(","))
    println(rdd2.intersection(rdd1).collect().mkString(","))


    val rdd34: RDD[(Char, Int)] = rdd3.intersection(rdd4)
    val rdd43: RDD[(Char, Int)] = rdd4.intersection(rdd3)
    println(rdd34.collect().mkString(","))
    println(rdd43.collect().mkString(","))
    //如果是二元组,key和value都相同,才认定为同一个元素
    println(rdd4.intersection(rdd5).collect().mkString(","))


    //7.差集,有方向性的
    println(rdd1.subtract(rdd2).collect().mkString(","))
    println(rdd2.subtract(rdd1).collect().mkString(","))
    println(rdd3.subtract(rdd4).collect().mkString(","))
    println(rdd4.subtract(rdd3).collect().mkString(","))
    //扩展:只比较key
    println("-------------------")
    println(rdd4.subtract(rdd5).collect().mkString(","))
    //由于只比较key,所以rdd4和rdd5的key均为x,z因此差集为空
    println("=="+rdd4.subtractByKey(rdd5).collect().mkString(","))
    //4.关闭sc对象
    sparkContext.stop()
  }
}

在这里插入图片描述

join

只能使用kv类型的RDD,对相同的key进行匹配连接

package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddJoin {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("关联查询")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建两个rdd
    val rdd1: RDD[(Int, String)] = sc.makeRDD(List((101, "A"), (102, "B"), (103, "C")))
    val rdd2: RDD[(Int, String)] = sc.makeRDD(List((101, "a"), (102, "b"), (104, "d"),(101, "aa")))
    //6.通过join关联算子 rdd1.join(rdd2) 关联条件是key,只保留关联上的数据
    val rdd1j2: RDD[(Int, (String, String))] = rdd1.join(rdd2)
    //结果:(101,(A,a)),(101,(A,aa)),(102,(B,b))
    println(rdd1j2.collect().mkString(","))
    //7.左外关联(101,(A,Some(a))),(101,(A,Some(aa))),(102,(B,Some(b))),(103,(C,None))
    val rdd1loj2: RDD[(Int, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
    println(rdd1loj2.collect().mkString(","))
    //8.右外关联(101,(Some(A),a)),(101,(Some(A),aa)),(104,(None,d)),(102,(Some(B),b))
    val rdd1roj2: RDD[(Int, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
    println(rdd1roj2.collect().mkString(","))
    //(101,(a,Some(A))),(101,(aa,Some(A))),(104,(d,None)),(102,(b,Some(B)))
    println(rdd2.leftOuterJoin(rdd1).collect().mkString(","))
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述

partitionBy

对RDD进行重新分区

package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}


object RddPartitionBy {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("partitionBy")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建1个RDD对象
    val rdd: RDD[(String, Int)] = sc.parallelize(List(("andy", 1), ("jack", 1),
      ("hello", 1), ("lucy", 1), ("tom", 1), ("su", 1)))
    println("重新自定义分区前:"+rdd.getNumPartitions)
    //6.调用partitionBy()对rdd对象进行重新分区
    val rdd1: RDD[(String, Int)] = rdd.partitionBy(new Partitioner {
      override def numPartitions = 3
      override def getPartition(key: Any): Int = {
        //获取key的首字母
        val firstChar: Char = key.toString.charAt(0)
        //['a','i] => 0
        if (firstChar >= 'a' && firstChar <= 'i') {
          return 0
        } else if (firstChar >= 'j' && firstChar <= 'q') {
          return 1
        } else {
          return 2
        }
      }
    })
    val result: Array[Array[(String, Int)]] = rdd1.glom().collect()
    //输出
    result.foreach(arr=>println(arr.mkString(",")))
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述

mapPartition

类似于map,作用是对每个分区进行操作

算子数据处理方式性能特性适用场景
map对RDD中的每一个元素进行操作每个元素独立处理,通常性能稳定需要对RDD中的每个元素执行简单转换或计算的场景
接受一个函数,应用于RDD的每个元素内存使用较为安全,不易导致内存溢出问题不依赖分区元数据或复杂计算的场景
mapPartition对RDD中的每一个分区的迭代器进行操作可能具有更好的性能,因为减少了函数调用的开销需要对RDD中的每个分区执行复杂计算或转换的场景
接受一个函数,应用于RDD的每个分区内存使用需谨慎,可能因处理大量数据而导致内存溢出依赖分区元数据或需要减少资源创建次数的场景
package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


import scala.collection.mutable.ListBuffer


object RddMapPartition {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("mapPartition")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建RDD对象
    val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "d", "e", "f"), 3)
    //6.自定义一个处理函数
    def process(datas: Iterator[String]): Iterator[String] = {
      println("创建数据库连接...")
      val result = ListBuffer[String]()
      //遍历datas
      for(ele<-datas){
        result.append(ele)
      }
      println("批量插入数据:"+result)
      println("关闭数据库连接...")
      //返回
      result.iterator
    }
    //7.调用mapPartitions操作
    val resultRdd: RDD[String] = rdd.mapPartitions(process)
    resultRdd.count()
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述

sample

根据指定规则从RDD中抽取数据

参数名称类型描述
withReplacementBoolean是否允许放回抽样。true表示允许元素被重复抽取,false表示不允许。
fractionDouble抽取比例,表示希望从原始RDD中抽取的样本比例(0到1之间)。若withReplacementtrue,则此值可理解为每个元素被期望抽取的次数。
seedLong随机种子。若提供,则每次运行抽样操作会得到相同的结果;若不提供(或设置为None),则每次结果可能不同。
package com.wunaiieq.RDD

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object RddSample {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("sample")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建RDD对象
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 9, 0), 1)

    val rdd2: RDD[Int] = rdd.sample(true, 2)
    println("2"+rdd2.collect().mkString(","))
    //当种子相同(前两个参数的值也相同)时,多次抽取的结果相同
    val rdd3: RDD[Int] = rdd.sample(false, 0.3, 6)
    val rdd4: RDD[Int] = rdd.sample(false, 0.3, 6)
    println(rdd3.collect().mkString(","))
    println(rdd4.collect().mkString(","))


    val rdd5: RDD[Int] = rdd.sample(true, 2, 3)
    val rdd6: RDD[Int] = rdd.sample(true, 2, 3)
    println(rdd5.collect().mkString(","))
    println(rdd6.collect().mkString(","))
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述

行动算子示例

ForeachPartition

在进行每个分区操作前后增加一些额外的操作

package com.wunaiieq.RDD.action

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object RddForeachPartition {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("ForeachPartition")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建RDD对象
    val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "e", "f", "d"), 3)
    //6.自定义处理函数
    def process(datas: Iterator[String]): Unit = {
      println("操作一-开始一个分区...")
      val result = ListBuffer[String]()
      for(data<-datas){
        result.append(data)
      }
      println("当前分区的数据:"+result)
      println("操作二-结束一个分区...")
    }
    //7.调用foreachPartition算子,参数为自定义函数
    rdd.foreachPartition(process)
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述

Foreach

对每个元素提供操作逻辑,无返回值

package com.wunaiieq.RDD.action

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


import scala.collection.mutable.ListBuffer
object RddForeach {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("foreach")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建RDD对象
    val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "e", "f", "d"), 3)
    //6.自定义函数
    def process(ele:String): Unit ={
      println("开启process..")
      println("当前数据"+ele)
      println("结束process..")
    }
    //7.调用foreach算子,参数为自定义函数 f: T => Unit
    rdd.foreach(process)
    println("------------------")
    //8.可以为foreach算子指定print\println函数
    //rdd.foreach(print)
    rdd.foreach(println)
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述
在这里插入图片描述

SaveAsTextFile

将RDD数据写入到txt文本文件中,支持本地文件、hdfs等,RDD的每个分区均会产生一个结果文件

package com.wunaiieq.RDD.action

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddSaveAsTextFile {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("SaveAsTextFile")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建RDD对象
    val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "e", "f", "d"), 3)
    //6.将rdd中的数据保持到本地磁盘
    rdd.saveAsTextFile("data/output/file")
//    7.将数据写入到HDFS文件系统中 node1上namenode需要是active
//    rdd.saveAsTextFile("hdfs://node1:9820/spark/output/file")
//    8.为了避免active namenode放生变化,所以最好使用mycluster
//    UnknownHostException: mycluster 需要将虚拟机中的core-site.xml和hdfs-site.xml文件下载到本项目的resources目录下
//    rdd.saveAsTextFile("hdfs://mycluster/spark/output/file")
    //4.关闭sc对象
    sc.stop()
  }
}

CountByKey

统计kv型RDD中,key出现的次数
转换算子reduceByKey是对RDD做聚合操作,针对相同key的value还会进行相加

package com.wunaiieq.RDD.action

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddCountByKey {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("foreach")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建KeyValue型RDD对象
    val rdd: RDD[(String, Int)] = sc.parallelize(List(("a", 1), ("b", 1), ("a", 2), ("b", 2), ("a", 3), ("a", 4)))
    //6.复习转换算子reduceByKey:将相同key的value值做聚合操作
    val rdd2: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
    println(rdd2.collect().mkString(","))
    //7.行动算子countByKey 表示统计key出现次数
    val result: collection.Map[String, Long] = rdd.countByKey()
    result.foreach(println)
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述

Reduce

对传入的RDD做聚合操作(一般为相加)

package com.wunaiieq.RDD.action

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddReduce {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("reduce")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建RDD对象
    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4))
    //6.将rdd中所有元素做聚合加操作
    val sum: Int = rdd.reduce(_ + _)
    println(sum)
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述

fold

在这里插入图片描述
此方法也是聚合操作,不过会携带一个初始值,在每个分区进行聚合操作时,会带上初始值一起进行
分区1:10(初始值)+1+2+3(123为分区内数据)=16
分区2:10+4+5+6=25
分区3:10+7+8+9=34
分区聚合:10(初始值)+16(分区1)+25(分区2)+34(分区3)=85

package com.wunaiieq.RDD.action

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddFold {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("fold")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建RDD对象
    //val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 1)
    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
    //6.执行带有初始值的fold聚合操作
    //10表示初始值,_+_表示做聚合加操作
    //RDD对象中所有元素的和+(partitionNum+1)*10
    val result: Int = rdd.fold(10)(_ + _)
    //7.注意:初始值的类型要和RDD中数据的类型一致
    //rdd.fold(10.5)(_+_)//错误的
    println(result)
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述

first、take、count

first:取出RDD第一个元素
take:取出RDD的前n个元素
count:统计RDD的元素数量
RDD的分区是只存储位置的不同,因此对于结果的顺序是没影响的,但是不同分区的元素数量会影响take的性能
比如第一个分区上1个元素,第二个2个,第三个3个,现在执行take(5)它需要先从第一个分区上读取,然后第二个,第三个,影响效率

package com.wunaiieq.RDD.action

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddFirstTakeCount {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("FirstTakeCount")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建RDD对象,这里是否分区没区别
    val rdd: RDD[Int] = sc.parallelize(List(5, 2, 1, 4, 3, 8, 6, 1))
    //6.获取第一个元素
    val first: Int = rdd.first()
    println("first:"+first)
    //7.获取rdd前三个元素
    val threeEle: Array[Int] = rdd.take(3)
    println(threeEle.mkString(","))
    //8.获取rdd中元素的总数
    val eleNum: Long = rdd.count()
    println(eleNum)
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述

top、takeOrderd

top算子:
top(n)取出RDD降序排序后的topN
top(n)(Ordering.Int.reverse)取出RDD升序后的topN

takeOrdered算子:
takeOrdered(n)取出RDD升序排序后的topN
takeOrdered(n)(Ordering.Int.reverse)取出RDD降序排序后的topN

简单来讲就是top和takeOrdered用于获取排序的最开始n位,默认排序位降序、升序,同时可以增加其他自定义的函数

package com.wunaiieq.RDD.action

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddTopTakeOrdered {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("TakeOrdered")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建RDD对象
    val rdd: RDD[Int] = sc.parallelize(List(5, 2, 1, 4, 3, 8, 6, 1))
    //6.获取默认降序的top3
    val top3: Array[Int] = rdd.top(3)
    println("top3:"+top3.mkString(","))
    //7.获取默认升序的top3
    val takeOr3: Array[Int] = rdd.takeOrdered(3)
    println("takeOr3:"+takeOr3.mkString(","))
    //8.获取升序的top3
    val top3r: Array[Int] = rdd.top(3)(Ordering.Int.reverse)
    println("top3r:"+top3r.mkString(","))
    //9.获取降序的top3
    val takeOr3r: Array[Int] = rdd.takeOrdered(3)(Ordering.Int.reverse)
    println("takeOr3r:"+takeOr3r.mkString(","))
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述

takeSample
//withReplacement:true可以重复,false:不能重复。数据位置是否重复不是值
//num:抽样的元素个数
//seed:随机种子,数值型的;RDD元素相同的情况下,相同的种子抽样出的元素也相同
def takeSample(
   withReplacement: Boolean,
   num: Int,
   seed: Long = Utils.random.nextLong): Array[T] 
)
方法使用场景返回类型采样方式参数设置
takeSample 行动算子从 RDD 中随机抽取指定数量的样本数组按设定的采样个数withReplacement(是否允许重复采样,布尔值)
num(要抽取的样本数量,整数)
seed(随机算法的种子值,可选)
sample 转换算子从 RDD 中随机抽取按比例分布的样本新的 RDD按指定的比例withReplacement(是否允许重复采样,布尔值)
fraction(要抽取的样本比例,0 到 1 之间的浮点数)
seed(随机算法的种子值,可选)
package com.wunaiieq.RDD.action

//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object RddTakeSample {
  def main(args: Array[String]): Unit = {
    //2.构建SparkConf对象,并设置本地运行和程序名称
    val conf: SparkConf = new SparkConf().setMaster("local[1]").
      setAppName("TakeSample")
    //3.使用conf对象构建SparkContet对象
    val sc = new SparkContext(conf)
    //5.创建RDD对象
    val rdd: RDD[Int] = sc.parallelize(List(5, 2, 1, 4, 3, 8, 6, 1))
    //6.第一个参数为true表示被抽出的元素返回,所以抽样的元素个数可以超过rdd.count() 元素个数
    val result: Array[Int] = rdd.takeSample(true, 10, 2)
    println(result.mkString(","))
    //7.第一个参数为false表示被抽取出的数据不放回,如果数量>rdd.count()
    println(rdd.takeSample(false,10,2).mkString(","))
    //8.三个参数的值都一样时,多次抽取结果
    println(rdd.takeSample(false,5,3).mkString(","))
    println(rdd.takeSample(false,5,3).mkString(","))
    //4.关闭sc对象
    sc.stop()
  }
}

在这里插入图片描述


http://www.kler.cn/a/473132.html

相关文章:

  • 2025新春烟花代码(二)HTML5实现孔明灯和烟花效果
  • Springboot SAP Docker 镜像打包问题
  • Linux 正则表达式 ⑪
  • 51单片机——中断(重点)
  • 【机器学习:四、多输入变量的回归问题】
  • doris:远程存储
  • 标题统计C++
  • Spring Security使用指南
  • 代码随想录 链表 test 6
  • 东京大学联合Adobe提出基于指令的图像编辑模型InstructMove,可通过观察视频中的动作来实现基于指令的图像编辑。
  • Selenium 进行网页自动化操作的一个示例,绕过一些网站的自动化检测。python编程
  • 『 Linux 』高级IO (三) - Epoll模型的封装与EpollEchoServer服务器
  • C#里对已经存在的文件进行压缩生成ZIP文件
  • FPGA车牌识别
  • 基于需求文档、设计文档、测试用例的测试答疑助手
  • 用Portainer实现对Docker容器的管理(四)
  • 深度学习与计算机视觉 (博士)
  • 【JAVA基础】Collections方法的具体使用方法
  • 计算机网络 笔记 物理层
  • 【递归与分治】Leetcode23:合并K个升序链表
  • Redis--20--大Key问题解析
  • Mono里运行C#脚本26—CEE_ADD/MONO_CEE_ADD/OP_IADD/X86_ADD的转换过程
  • java项目学科竞赛管理系统的设计与实现源码(springboot+mysql+vue)
  • 预训练语言模型——BERT
  • 【免费】2000-2019年各省技术市场成交额数据
  • 字玩FontPlayer开发笔记9 Tauri2打包应用