当前位置: 首页 > article >正文

大数据技术原理与应用期末复习-代码

RDD:

// 导入SparkConf和SparkContext类,用于配置和创建Spark上下文
import org.apache.spark.{SparkConf, SparkContext}

// 定义一个名为TopN的对象
object TopN {
  def main(args: Array[String]): Unit = {
    // 创建一个新的SparkConf对象,并设置应用程序名称为"TopN",主节点为"local"
    val conf = new SparkConf().setAppName("TopN").setMaster("local")
    val sc = new SparkContext(conf)
    // 设置日志级别为ERROR,以减少输出的信息量
    sc.setLogLevel("ERROR")
    // 从HDFS读取数据,使用2个分区
    val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/mycode/rdd/examples", 2)  
    // 初始化计数器
    var num = 0
    
    // 对读取的数据进行一系列转换操作:
    // 1. 过滤掉空行或长度为0的行
    // 2. 过滤掉不能被逗号分割成4部分的行
    // 3. 将每一行按逗号分割成两部分
    // 4. 将分割后的第二部分转换为整数
    // 5. 按照第二部分(整数)降序排序
    // 6. 取排序后的前5个元素
    // 7. 遍历这5个元素,打印其索引和值
    val result = lines.filter(line => (line.trim.length > 0) && (line.split(",").length == 4))
                      .map(_.split(",")(2))
                      .map(x => (x.toInt,""))
                      .sortByKey(false)
                      .map(x => x._1)
                      .take(5)
                      .foreach(x => {
                        num = num + 1
                        println(num + "\t" + x)
                      })
  }
}


import org.apache.spark.{SparkConf, SparkContext}
object MaxAndMin {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MaxAndMin").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
   
    val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/chapter5", 2)

    // 对读取的数据进行一系列转换操作:
    // 1. 过滤掉空行或长度为0的行
    // 2. 将每一行按逗号分割成两部分:键(key)和值(value)
    // 3. 按键分组
    // 4. 计算每个键对应的最大值和最小值
    // 5. 收集结果并打印
    val result = lines.filter(_.trim.length > 0)
                      .map(line => ("key", line.trim.toInt))
                      .groupByKey()
                      .map(x => {
                        var min = Integer.MAX_VALUE
                        var max = Integer.MIN_VALUE
                        for (num <- x._2) {
                          if (num > max) {
                            max = num
                          }
                          if (num < min) {
                            min = num
                          }
                        }
                        (max, min)
                      })
                      .collect()
                      .foreach(x => {
                        println("max\t" + x._1)
                        println("min\t" + x._2)
                      })
  }
}

案例3:文件排序

任务描述:

有多个输入文件,每个文件中的每一行内容均为一个整数。要求读取所有文件中的整数,进行排序后,输出到一个新的文件中,输出的内容个数为每行两个整数,第一个整数为第二个整数的排序位次,第二个整数为原待排序的整数。

                                    


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.HashPartitioner
object FileSort {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("FileSort")
    val sc = new SparkContext(conf)
    
    // 设置数据文件路径
    val dataFile = "file:///usr/local/spark/mycode/rdd/data"
    
    // 从指定路径读取数据文件,使用3个分区
    val lines = sc.textFile(dataFile, 3)
    
    // 初始化索引变量
    var index = 0
    
    // 对读取的数据进行一系列转换操作:
    // 1. 过滤掉空行或长度为0的行
    // 2. 将每一行按逗号分割成两部分:键(key)和值(value)
    // 3. 使用HashPartitioner进行分区
    // 4. 按键排序
    // 5. 添加索引并重新组合结果
    val result = lines.filter(_.trim.length > 0)
                      .map(n => (n.trim.toInt, ""))
                      .partitionBy(new HashPartitioner(1))
                      .sortByKey()
                      .map(t => {
                        index += 1
                        (index, t._1)
                      })
    
    // 将处理后的结果保存到指定路径
    result.saveAsTextFile("file:///usr/local/spark/mycode/rdd/examples/result")
  }
}

案例4:二次排序

任务要求:

对于一个给定的文件(数据如file1.txt所示),请对数据进行排序,首先根据第1列数据降序排序,如果第1列数据相等,则根据第2列数据降序排序。

// 定义一个SecondarySortKey类,用于实现自定义排序逻辑
package cn.edu.xmu.spark

class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable {
  // 实现compare方法,用于比较两个SecondarySortKey对象
  def compare(other: SecondarySortKey): Int = {
    if (this.first - other.first != 0) {
      this.first - other.first
    } else {
      this.second - other.second
    }
  }
}

// 定义一个SecondarySortApp对象,用于执行主程序
object SecondarySortApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")
    // 创建SparkContext上下文
    val sc = new SparkContext(conf)    
    // 读取文件数据
    val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/examples/file1.txt", 1)  
    // 将每行数据转换为SecondarySortKey对象
    val pairWithSortKey = lines.map(line => new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt))
    // 按照SecondarySortKey进行排序
    val sorted = pairWithSortKey.sortByKey(false)
    // 提取排序后的结果
    val sortedResult = sorted.map(sortedLine => sortedLine._2)
    // 打印排序结果
    sortedResult.collect().foreach(println)
  }
}

案例五:连接操作

任务描述:在推荐领域有一个著名的开放测试集,下载链接是:http://grouplens.org/datasets/movielens/,该测试集包含三个文件,分别是ratings.dat、sers.dat、movies.dat,具体介绍可阅读:README.txt。请编程实现:通过连接ratings.dat和movies.dat两个文件得到平均得分超过4.0的电影列表,采用的数据集是:ml-1m

import org.apache.spark._
import org.apache.spark.SparkContext._

object SparkJoin {
  def main(args: Array[String]): Unit = {

    // 检查命令行参数的数量是否正确,确保提供三个参数:评分文件路径、电影文件路径、输出路径
    if (args.length != 3) {
      println("usage is SparkJoin <rating> <movie> <output>")
      return
    }
    val conf = new SparkConf().setAppName("SparkJoin").setMaster("local")
    // 创建Spark上下文对象
    val sc = new SparkContext(conf)

    try {
      // 从HDFS文件系统读取评分数据
      val textFile = sc.textFile(args(0))

      // 提取(movieId, rating)键值对
      val ratings = textFile.map(line => {
        val fields = line.split("::")
        (fields(1).toInt, fields(2).toDouble)
      })

      // 计算每个电影的平均评分
      val movieScores = ratings
        .groupByKey() // 将相同电影ID的评分组合在一起
        .map(data => { // 对每个电影ID的评分组计算平均值
          val avg = data._2.sum / data._2.size
          (data._1, avg)
        })

      // 从HDFS文件系统读取电影数据
      val movies = sc.textFile(args(1))
      // 提取(MovieID, MovieName)键值对,并基于MovieID创建键值对
      val moviesKey = movies.map(line => {
        val fields = line.split("::")
        (fields(0).toInt, fields(1)) // (MovieID, MovieName)
      }).keyBy(tup => tup._1)

      // 通过join操作合并电影评分和电影信息,过滤出平均评分大于4.0的电影,并格式化输出
      val result = movieScores
        .keyBy(tup => tup._1) // 基于电影ID创建键值对
        .join(moviesKey) // 将评分与电影信息进行连接
        .filter(f => f._2._1._2 > 4.0) // 过滤出平均评分大于4.0的电影
        .map(f => (f._1, f._2._1._2, f._2._2._2)) // 格式化为 (MovieID, AverageRating, MovieName)

      // 将结果保存到指定的输出路径
      result.saveAsTextFile(args(2))

    } finally {
      // 确保在程序结束时停止Spark上下文
      sc.stop()
    }
  }
}

wordcount两道:

MapReduce实现wordcount

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    // 默认构造函数,无参数。
    public WordCount() {
    }

    public static void main(String[] args) throws Exception {
        // 创建一个配置对象,用于读取命令行参数和配置文件。
        Configuration conf = new Configuration();
        
        // 解析命令行参数,并将非Hadoop通用选项的参数分离出来。
        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        
        // 检查输入参数是否正确。至少需要两个参数:一个或多个输入路径和一个输出路径。
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        
        // 初始化一个新的MapReduce作业,并设置其名称为"word count"。
        Job job = Job.getInstance(conf, "word count");
        
        // 设置包含main方法的类作为作业的主类,以便找到相关的Mapper、Reducer和其他资源。
        job.setJarByClass(WordCount.class);
        
        // 设置Mapper类,它负责处理输入数据并生成中间键值对。
        job.setMapperClass(TokenizerMapper.class);
        
        // 设置Combiner类(可选),它在映射阶段后立即对中间结果进行局部聚合,以减少传输的数据量。
        job.setCombinerClass(IntSumReducer.class);
        
        // 设置Reducer类,它负责接收来自Mapper的中间键值对,并执行最终的聚合操作。
        job.setReducerClass(IntSumReducer.class);
        
        // 定义作业的输出格式,指定键和值的类型。
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 添加所有提供的输入路径到作业中。最后一个参数总是作为输出路径。
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        
        // 设置输出目录,该目录必须不存在。
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

        // 提交作业并等待完成,成功返回0,失败返回1。
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    // Reducer类用于汇总每个单词出现的次数。
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            
            // 遍历所有的IntWritable值,累加它们以计算总和。
            for (IntWritable val : values) {
                sum += val.get(); // 将IntWritable转换为原始int类型
            }
            
            // 设置sum到result中,以便可以序列化。
            result.set(sum);
            
            // 输出<key, result>对到context,即单词及其出现的次数。
            context.write(key, result);
        }
    }

    // Mapper类负责将输入文本拆分为单词,并为每个单词生成一个计数为1的键值对。
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1); // 代表每个单词出现一次
        private Text word = new Text(); // 用于存储当前处理的单词

        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            // 使用StringTokenizer来分割文本行中的单词。
            StringTokenizer itr = new StringTokenizer(value.toString());
            
            // 对于每一个单词,创建一个<单词, 1>键值对并写入到context中。
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken()); // 设置当前单词
                context.write(word, one); // 写入键值对到context中
            }
        }
    }
}

Spark SQL实现wordcount

package com.ht.final.wordcount

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkConf

object WordCountSparkSQL {

  def main(args: Array[String]): Unit = {
    // 初始化Spark配置,并创建一个本地模式的SparkSession。
    // local[*]表示使用所有可用的处理器核心来运行任务。
    val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    
    // 设置日志级别为警告,减少控制台输出的日志信息量。
    spark.sparkContext.setLogLevel("WARN")
    
    import spark.implicits._ // 导入隐式转换,用于支持DataFrame/Dataset操作。

    try {
      // 读取文本文件内容到Dataset中,每行作为一个字符串元素。
      val fileDS: Dataset[String] = spark.read.textFile("D:\\Document\\temp\\wordcount\\input.txt")

      // 将每一行按照制表符('\t')分割成多个单词,形成一个新的包含单个单词的Dataset。
      val wordDS: Dataset[String] = fileDS.flatMap(_.split("\t"))

      // 注册临时视图(类似数据库表),以便能够通过SQL查询访问数据。
      wordDS.createOrReplaceTempView("word_count")

      // 定义SQL查询语句,计算每个单词出现的次数,并按出现次数降序排列。
      val sqlQuery = "SELECT value AS word, COUNT(*) AS counts FROM word_count GROUP BY word ORDER BY counts DESC"

      // 执行SQL查询并获取结果作为DataFrame。
      val resultDF: DataFrame = spark.sql(sqlQuery)

      // 展示查询结果的前20行,默认情况下。
      resultDF.show()

    } finally {
      // 确保在程序结束时关闭资源。
      spark.stop()
    }
  }
}


http://www.kler.cn/a/453058.html

相关文章:

  • WebXR
  • JS面试题|[2024-12-26]
  • STM32 高级 谈一下IPV4/默认网关/子网掩码/DNS服务器/MAC
  • 【LLM论文日更】| 训练大型语言模型在连续潜在空间中进行推理
  • information_schema是什么?
  • 全局流量管理:提升用户体验与保障服务稳定性
  • 深度学习camp-第J5周:DenseNet+SE-Net实战
  • 定位方式:css
  • 选择排序 冒泡排序 MySQL 架构
  • [python SQLAlchemy数据库操作入门]-08.ORM删除不再需要的股票记录
  • C项目 天天酷跑(下篇)
  • ZCC5090EA适用于TYPE-C接口,集成30V OVP功能, 最大1.5A充电电流,带NTC及使能功能,双节锂电升压充电芯片替代CS5090EA
  • 开源智能工业软件技术发展分析
  • “黄师日报”平安小程序springboot+论文源码调试讲解
  • Spring的注解@Autowired 是什么意思?
  • 【每日学点鸿蒙知识】长时任务、profiler allocation、事件订阅、getTagInfo、NativeWindow
  • 重温设计模式--状态模式
  • 基于Spring Boot的中国戏曲文化传播系统
  • Android 中的生产者-消费者模式实现
  • kubeadm 安装最新 k8s 集群
  • Ubuntu20.4 VPN+Docker代理配置
  • 正则表达式优化之实际应用场景优化
  • HBU深度学习实验17-优化算法比较和分析
  • 数据结构的基础与应用
  • 【贪吃蛇小游戏 - JavaIDEA】基于Java实现的贪吃蛇小游戏导入IDEA教程
  • HarmonyOS NEXT 实战之元服务:静态案例效果---查看国内航班服务