【Spark中创建RDD的两种方式】Spark中如何获取sc对象、以及创建RDD的两种方式
文章目录
- 一、Spark如何获取sc对象
- 1、windons 本地模式获取sc对象
- 2、linux 集群模式获取sc对象
- 二、创建RDD的两种方式
- 1、并行化一个已存在的集合
- 2、读取外部共享存储系统
一、Spark如何获取sc对象
不论是本地测试还是集群模式,都需要指定 JAVA_HOME 和 HADOOP_HOME 路径
最好下载Anaconda,使用Anaconda 进行Python 的部署
如果下载了 Anaconda ,需要指定其下面的 python 环境的路径
需要在 Anaconda 中下载 pyspark
1、windons 本地模式获取sc对象
import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = 'D:/devs/javajdk/jdk8'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/learn_tools/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'D:/learn_apps/anaconda/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/learn_apps/anaconda/python.exe'
# 获取 conf 对象
# setMaster 按照什么模式运行,local bigdata01:7077 yarn
# local[2] 使用2核CPU * 你本地资源有多少核就用多少核
# appName 任务的名字
conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
# 假如我想设置压缩
# conf.set("spark.eventLog.compression.codec","snappy")
# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
sc = SparkContext(conf=conf)
# 可以打印sc验证是否获取成功
print(sc)
# 使用完后,记得关闭
sc.stop()
2、linux 集群模式获取sc对象
集群模式需要额外指定 Master 所在位置
import os
import time
from pyspark import SparkContext, SparkConf
import sys
if __name__ == '__main__':
# 配置环境
os.environ['JAVA_HOME'] = '/opt/installs/jdk'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = '/opt/installs/hadoop'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = '/opt/installs/anaconda3/bin/python3' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/installs/anaconda3/bin/python3'
# 获取sc 对象
conf = SparkConf().setMaster("spark://node01:7077").setAppName("wordcount单词统计")
sc = SparkContext(conf=conf)
print(sc)
# 关闭sc
sc.stop()
二、创建RDD的两种方式
1、并行化一个已存在的集合
方法:parallelize (并行的意思)
将一个集合转换为RDD
# 方式一:将一个已存在的集合转换为RDD
# 创建一个列表:会在Driver内存中构建
data = [1,2,3,4,5,6,7,8,9,10]
# 将列表转换为RDD:将在多个Executor内存中实现分布式存储, numSlices用于指定分区数,所谓的分区就是分为几份,每一份放在一台电脑上
list_rdd = sc.parallelize(data,numSlices=2)
# 打印这个RDD的内容
list_rdd.foreach(lambda x: print(x))
2、读取外部共享存储系统
方法:textFile、wholeTextFile、newAPIHadoopRDD等
读取外部存储系统的数据转换为RDD
# 方式二:读取外部系统
# 读取文件的数据变成RDD,minPartitions用于指定最小分区数
file_rdd =sc.textFile("../datas/function_data/filter.txt", minPartitions=2)
# 输出文件的内容
file_rdd.foreach(lambda line: print(line))