当前位置: 首页 > article >正文

SparkSQL和Spark常用语句

Spark SQL 常用语句
读取数据
scala
val df = spark.read.option(“header”, “true”).csv(“path/to/csvfile.csv”)
val df = spark.read.parquet(“path/to/parquetfile.parquet”)
val df = spark.read.json(“path/to/jsonfile.json”)

展示数据
scala
df.show()
df.show(5) // 展示前5条记录

打印模式(Schema)
scala
df.printSchema()

选择列
scala
df.select(“column1”, “column2”).show()
df.select($“column1”, $“column2” + 1).show()

过滤数据
scala
df.filter($“column1” > 100).show()
df.filter(“column1 > 100”).show()

分组和聚合
scala
df.groupBy(“column1”).avg(“column2”).show()
df.groupBy(“column1”).agg(Max(“column2”), Sum(“column3”)).show()

排序
scala
df.orderBy($“column1”.desc).show()
df.orderBy(“column1”, “column2”.asc).show()

重命名列
scala
df.withColumnRenamed(“oldName”, “newName”).show()

添加新列
scala
df.withColumn(“newColumn”, $“column1” + $“column2”).show()

删除列
scala
df.drop(“column1”).show()

SQL 查询
scala
df.createOrReplaceTempView(“table_name”)
spark.sql(“SELECT * FROM table_name WHERE column1 > 100”).show()

连接(Join)
scala
val df1 = …
val df2 = …
df1.join(df2, df1(“id”) === df2(“id”)).show()
df1.join(df2, Seq(“id”)).show() // 适用于相同列名的简单连接

写入数据
scala
df.write.option(“header”, “true”).csv(“path/to/output.csv”)
df.write.parquet(“path/to/output.parquet”)

Spark 常用语句(Spark Core)
初始化 SparkConf 和 SparkContext
scala
val conf = new SparkConf().setAppName(“AppName”).setMaster(“local[*]”)
val sc = new SparkContext(conf)

读取数据
scala
val data = sc.textFile(“path/to/textfile.txt”)

展示数据
scala
data.take(10).foreach(println)

映射(Map)
scala
val mappedData = data.map(line => line.split(",")(0))

过滤(Filter)
scala
val filteredData = data.filter(line => line.contains(“keyword”))

flatMap
scala
val flatMappedData = data.flatMap(line => line.split(" "))

行动操作(例如:collect, count, reduce)
scala
data.collect()
data.count()
val reducedData = data.reduce(_ + _)

键值对操作
scala
val pairs = data.map(line => (line.split(",")(0), line.split(",")(1)))
val groupedByKey = pairs.groupByKey()
val reducedByKey = pairs.reduceByKey(_ + _)

排序
scala
val sortedData = data.sortBy(line => line.length)

联合(Union)
scala
val rdd1 = …
val rdd2 = …
val unionedRDD = rdd1.union(rdd2)

缓存(Cache)
scala
data.cache()

保存数据
scala
data.saveAsTextFile(“path/to/output”)


http://www.kler.cn/a/315608.html

相关文章:

  • 《Python网络安全项目实战》项目5 编写网站扫描程序
  • Python高级编程模式和设计模式
  • Linux——GPIO输入输出裸机实验
  • 2024 年 Apifox 和 Postman 对比介绍详细版
  • win11 新建一个批处理,双击查看本机的IP地址
  • Qt 和 WPF(Windows Presentation Foundation)
  • Go语言并发编程:从理论到实践
  • QT widgets 窗口缩放,自适应窗口大小进行布局
  • 【鸿蒙OH-v5.0源码分析之 Linux Kernel 部分】003 - vmlinux.lds 链接脚本文件源码分析
  • 第k个排列 - 华为OD统一考试(E卷)
  • 跟着问题学12——GRU详解
  • Lucene详解介绍以及底层原理说明
  • 如何在Linux Centos7系统中挂载群晖共享文件夹
  • 心理辅导平台的构建:Spring Boot技术解析
  • 深度学习-从零基础快速入门到项目实践,这本书上市了!!!
  • 828华为云征文|部署知识库问答系统 MaxKB
  • 【文献阅读】基于原型的自适应方法增强未见到的构音障碍者的语音识别
  • 分布式消息中间件kafka
  • Google深度学习的图像生成大模型Imagen
  • Java接口和抽象类的区别
  • calibre-web报错:File type isn‘t allowed to be uploaded to this server
  • Ubuntu20.04配置NVIDIA+CUDA12.2+CUDNN【附所有下载资源】【亲测有效】【非常详细】
  • 设计模式-依赖注入
  • Mac剪贴板历史全记录!
  • 单片机的信号线都需要差分布放吗?
  • turtle实现贪吃蛇小游戏