当前位置: 首页 > article >正文

使用pyspark完成wordcount案例

本地运行+本地数据

import os
import re

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
"""
数据在本地
代码在本地
使用的是windows的资源

"""

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'  
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'

    conf = SparkConf().setMaster("local[*]").setAppName("第一个spark项目")
    sc = SparkContext(conf=conf)
    fileRdd = sc.textFile("../data/wordcount/input")
    # split 默认是切空格的 假如是多个空格可以识别么
    fileRdd.filter(lambda line: len(line) > 0).flatMap(lambda line: line.strip().split()).map(lambda word: (word, 1)) \
        .reduceByKey(lambda sum, tmp: sum + tmp).saveAsTextFile("../data/wordcount/output3")

    # fileRdd.filter(lambda line: len(line) > 0).flatMap(lambda line: re.split("\s+",line.strip()) ).map(lambda word: (word, 1)) \
    #     .reduceByKey(lambda sum, tmp: sum + tmp).saveAsTextFile("../data/wordcount/output2")
    sc.stop()

本地运行+集群数据

import os
import re

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
"""
数据在hdfs
代码在本地
资源使用的是windows的

"""

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'  
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
    os.environ['HADOOP_USER_NAME'] = 'root'

    conf = SparkConf().setMaster("local[*]").setAppName("第一个spark项目")
    sc = SparkContext(conf=conf)
    fileRdd = sc.textFile("hdfs://bigdata01:9820/spark/wordcount/input")

    fileRdd.filter(lambda line: len(line) > 0).flatMap(lambda line: re.split("\s+",line.strip()) ).map(lambda word: (word, 1)) \
        .reduceByKey(lambda sum, tmp: sum + tmp).saveAsTextFile("hdfs://bigdata01:9820/spark/wordcount/output2")
    sc.stop()

外部传参+服务器模式

import os
import re
import sys

from pyspark.conf import SparkConf
from pyspark.context import SparkContext

"""
数据在hdfs
代码在本地
资源使用的是windows的

"""

if __name__ == '__main__':
    inputPath = sys.argv[1]
    outputPath = sys.argv[2]
    # 配置环境
    os.environ['JAVA_HOME'] = '/opt/installs/jdk'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = '/opt/installs/hadoop'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = '/opt/installs/anaconda3/bin/python3'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/installs/anaconda3/bin/python3'

    conf = SparkConf().setAppName("第一个spark项目")
    sc = SparkContext(conf=conf)
    fileRdd = sc.textFile(inputPath)

    fileRdd \
        .filter(lambda line: len(line) > 0) \
        .flatMap(lambda line: re.split("\s+", line.strip())) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda sum, tmp: sum + tmp) \
        .saveAsTextFile(outputPath)

    sc.stop()


http://www.kler.cn/a/429589.html

相关文章:

  • 【Vim Masterclass 笔记13】第 7 章:Vim 核心操作之——文本对象与宏操作 + S07L28:Vim 文本对象
  • Android SystemUI——使用Dagger2加载组件(四)
  • LSA更新、撤销
  • excel 整理表格,分割一列变成多列数据
  • pytorch张量分块投影示例代码
  • Python编程与在线医疗平台数据挖掘与数据应用交互性研究
  • Flutter 图片编辑板(二) 拖动位置和对齐线应用
  • 封闭式论文写作--全面掌握ChatGPT-4o的写作技能,掌握提示词使用技巧、文献检索与分析方法,帮助您选定研究方向,提炼学术论文题目
  • 软件漏洞印象
  • 网络安全 - Cross-site scripting
  • 刷leetcodehot100-7动态规划
  • 【RBF SBN READ】hadoop社区基于RBF的SBN READ请求流转
  • 产品经理的财会知识课:资产的减值测试
  • X推出新AI图像生成器Aurora:更接近真实的创作效果
  • Facebook与Web3的结合:去中心化社交的可能性
  • 【go】fmt包讲解与案例
  • C语言实例_27之删除字符串中指定字符
  • 出海服务器可以用国内云防护吗
  • React废弃componentWillMount和componentWillReceiveProps这两个生命周期方法
  • 【优选算法篇】:双指针算法--开启高效编码的两把“魔法指针”,练习题演练
  • Java环境变量配置
  • DP协议:缩略词
  • 工具推荐-js爬取工具
  • 【人工智能】GaussDB数据库技术及应用
  • Jmeter Address already in use: connect 解决
  • Debian11(pve) 使用.deb包 安装内核头文件