当前位置: 首页 > article >正文

SpringBoot操作spark处理hdfs文件

SpringBoot操作spark处理hdfs文件

  • 在这里插入图片描述

1、导入依赖

  • <!--        spark依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>3.2.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.12</artifactId>
                <version>3.2.2</version>
            </dependency>
    

2、配置spark信息

  • 建立一个配置文件,配置spark信息
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//将文件交于spring管理
@Configuration
public class SparkConfig {

    //使用yml中的配置
    @Value("${spark.master}")
    private String sparkMaster;

    @Value("${spark.appName}")
    private String sparkAppName;

    @Value("${hdfs.user}")
    private String hdfsUser;

    @Value("${hdfs.path}")
    private String hdfsPath;
    @Bean
    public SparkConf sparkConf() {
        SparkConf conf = new SparkConf();
        conf.setMaster(sparkMaster);
        conf.setAppName(sparkAppName);
        // 添加HDFS配置
        conf.set("fs.defaultFS", hdfsPath);
        conf.set("spark.hadoop.hdfs.user",hdfsUser);
        return conf;
    }

    @Bean
    public SparkSession sparkSession() {
        return SparkSession.builder()
                .config(sparkConf())
                .getOrCreate();
    }
}

3、controller和service

  • controller类

    • import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.GetMapping;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      import xyz.zzj.traffic_main_code.service.SparkService;
      
      @RestController
      @RequestMapping("/spark")
      public class SparkController {
      
          @Autowired
          private SparkService sparkService;
      
          @GetMapping("/run")
          public String runSparkJob() {
              //读取Hadoop HDFS文件
              String filePath = "hdfs://192.168.44.128:9000/subwayData.csv";
              sparkService.executeHadoopSparkJob(filePath);
              return "Spark job executed successfully!";
          }
      }
      
  • 处理地铁数据的service

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import xyz.zzj.traffic_main_code.service.SparkReadHdfs;

import java.io.IOException;
import java.net.URI;
import static org.apache.spark.sql.functions.*;

@Service
public class SparkReadHdfsImpl implements SparkReadHdfs {

    private final SparkSession spark;

    @Value("${hdfs.user}")
    private String hdfsUser;

    @Value("${hdfs.path}")
    private String hdfsPath;

    @Autowired
    public SparkReadHdfsImpl(SparkSession spark) {
        this.spark = spark;
    }

    /**
     * 读取HDFS上的CSV文件并上传到HDFS
     * @param filePath
     */
    @Override
    public void sparkSubway(String filePath) {
        try {
            // 设置Hadoop配置
            JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
            Configuration hadoopConf = jsc.hadoopConfiguration();
            hadoopConf.set("fs.defaultFS", hdfsPath);
            hadoopConf.set("hadoop.user.name", hdfsUser);

            // 读取HDFS上的文件
            Dataset<Row> df = spark.read()
                    .option("header", "true") // 指定第一行是列名
                    .option("inferSchema", "true") // 自动推断列的数据类型
                    .csv(filePath);

            // 显示DataFrame的所有数据
//            df.show(Integer.MAX_VALUE, false);

            // 对DataFrame进行清洗和转换操作
            // 检查缺失值
            df.select("number", "people", "dateTime").na().drop().show();
            // 对数据进行类型转换
            Dataset<Row> df2 = df.select(
                    col("number").cast(DataTypes.IntegerType),
                    col("people").cast(DataTypes.IntegerType),
                    to_date(col("dateTime"), "yyyy年MM月dd日").alias("dateTime")
            );
            // 去重
            Dataset<Row> df3 = df2.dropDuplicates();
            // 数据过滤,确保people列没有负数
            Dataset<Row> df4 = df3.filter(col("people").geq(0));
//            df4.show();
            // 数据聚合,按dateTime分组,统计每天的总客流量
            Dataset<Row> df6 = df4.groupBy("dateTime").agg(sum("people").alias("total_people"));
//            df6.show();
            sparkForSubway(df6,"/time_subwayData.csv");
            //数据聚合,获取每天人数最多的地铁number
            Dataset<Row> df7 = df4.groupBy("dateTime").agg(max("people").alias("max_people"));
            sparkForSubway(df7,"/everyday_max_subwayData.csv");
            //数据聚合,计算每天的客流强度:每天总people除以632840
            Dataset<Row> df8 = df4.groupBy("dateTime").agg(sum("people").divide(632.84).alias("strength"));
            sparkForSubway(df8,"/everyday_strength_subwayData.csv");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void sparkForSubway(Dataset<Row> df6, String hdfsPath) throws IOException {
        // 保存处理后的数据到HDFS
        df6.coalesce(1)
                .write().mode("overwrite")
                .option("header", "true")
                .csv("hdfs://192.168.44.128:9000/time_subwayData");
        // 创建Hadoop配置
        Configuration conf = new Configuration();
        // 获取FileSystem实例
        FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.44.128:9000"), conf);
        // 定义临时目录和目标文件路径
        Path tempDir = new Path("/time_subwayData");
        FileStatus[] files = fs.listStatus(tempDir);
        // 检查目标文件是否存在,如果存在则删除
        Path targetFile1 = new Path(hdfsPath);
        if (fs.exists(targetFile1)) {
            fs.delete(targetFile1, true); // true 表示递归删除
        }
        for (FileStatus file : files) {
            if (file.isFile() && file.getPath().getName().startsWith("part-")) {
                Path targetFile = new Path(hdfsPath);
                fs.rename(file.getPath(), targetFile);
            }
        }
        // 删除临时目录
        fs.delete(tempDir, true);
    }

}

4、运行

  • 项目运行完后,打开浏览器
    • spark处理地铁数据
      • http://localhost:8686/spark/dispose
  • 观察spark和hdfs
    • http://192.168.44.128:8099/
    • http://192.168.44.128:9870/explorer.html#/
      • image-20250109095551610

http://www.kler.cn/a/500328.html

相关文章:

  • MATLAB语言的多线程编程
  • Python实现windows自动关机
  • 《自动驾驶与机器人中的SLAM技术》ch1:自动驾驶
  • 相加交互效应函数发布—适用于逻辑回归、cox回归、glmm模型、gee模型
  • 第432场周赛:跳过交替单元格的之字形遍历、机器人可以获得的最大金币数、图的最大边权的最小值、统计 K 次操作以内得到非递减子数组的数目
  • IDEA中创建maven项目
  • Redis之秒杀活动
  • django基于Python的智能停车管理系统
  • 限制图层列表
  • (2025,Cosmos,世界基础模型 (WFM) 平台,物理 AI,数据处理,分词器,世界基础模型预训练/后训练,3D一致性)
  • 【JVM-1】深入解析JVM:Java虚拟机的核心原理与工作机制
  • 【MySQL学习笔记】MySQL视图View
  • 解决nginx多层代理后应用部署后访问发现css、js、图片等样式加载失败
  • CPU缓存架构详解与Disruptor高性能内存队列实战
  • 《零基础Go语言算法实战》【题目 2-5】函数参数的值传递和引用传递
  • 【python A* pygame 格式化 自定义起点、终点、障碍】
  • C++中的unordered_set,unordered_map,哈希表/散列表以及哈希表的模拟实现
  • SqlServer: An expression services limit has been reached异常处理
  • 如何让QPS提升20倍
  • 【学习路线】Python爬虫 详细知识点学习路径(附学习资源)
  • [程序设计]—代理模式
  • 单例模式-如何保证全局唯一性?
  • 【github】向右箭头文件打不开,下载也是空白
  • 【西北工业大学主办 | EI检索稳定 | 高H值专家与会报告】2025年航天航空工程与材料技术国际会议(AEMT 2025)