当前位置: 首页 > article >正文

Spark实现PageRank算法

详细步骤:

1、创建Spark sql 环境

2、读取数据

3、数据切分 (分为page列,outLink列)形成表 pageDF

4、新增pr一列  (给定初始值)    形成表 initPrDF

5、新增avgPr一列(根据出链关系,求每个页面所分到的Pr)

6、分组聚合 (将outLink列explode炸开,在按照page分组,然后sum求和,这就是表 newPrDF

7、两表关联(newPrDF.join initPrDF),求pr列的差值平均数,形成列 avgDiffDF

8、if判断,取出avgDiffDF的值,与0.001比较,小于0.001循环结束

9、循环赋值,initPrDF=newPrDF,这样每次循环都能与上一次的进行比较

代码实现:

import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import java.math.BigDecimal
object PageRank {
  def main(args: Array[String]): Unit = {
    /**网页关系表示
     * A,B|D
     * B,C
     * C,A|B
     * D,B|C
     * */
    //网页排名,先了解每个网页的入链和出链,然后用数据表示各个网页之间的关系
    //再循环迭代,编程计算PageRank公式,得到最终的收敛值,就是排名
    // TODO: PR公式:(1-q)/N + q*∑(Pr/L)
    // TODO: q为阻尼系数(q=0.85),N为网页数量, L为出链数量,Pr为初始值(给定为1,后面每次循环都用上一次的Pr为初始值)

    //创建Spark sql 环境
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("pageRank")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    //读取数据
    val pageRankDF: DataFrame = spark.read
      .format("csv")
      .option("sep", ",")
      .schema("page STRING,outLink STRING")
      .load("spark/src/main/data/pageRank")

    import spark.implicits._
    import org.apache.spark.sql.functions._
    //进行数据切分
    val pageDF: DataFrame = pageRankDF
      .select($"page", split($"outLink", "\\|") as "outLink")

    //缓存
    pageDF.cache()

    //给定初始值
    var initPrDF: DataFrame = pageDF
      .withColumn("pr", expr("1.0"))
    var flag=true
    val q=0.85
    //页面数量
    val N: Long = pageDF.count()

  while(flag){
    val newPrDF: DataFrame = initPrDF
      //计算每个页面平均分到的pr值
      .withColumn("avgPr", $"pr" / size($"outLink"))
      //一行变多行,将外链的数据都炸开,保留outLink和avgPr,因为后面要分组聚合
      .select(explode($"outLink") as "page", $"avgPr")
      .groupBy($"page")
      //增加阻尼系数
      .agg(sum($"avgPr")* q + ( 1 - q ) / N as "pr")
      //关联原列表,为了求差值
      .join(pageDF, "page")

    //求该列表与上次一Pr的差值的平均值,最后如果小于0.001就收敛结束
    val avgDiffDF: DataFrame = newPrDF
      .as("a")
      .join(initPrDF.as("b"), "page")
      //abs是求绝对值
      .withColumn("diffPr", abs($"a.pr" - $"b.pr"))
      .agg(avg($"diffPr") as "avgDiff")

    //查看aggDiffDF的表结构,便于从列中取值
    avgDiffDF.printSchema()
    //取出差值平均值
    // TODO: 这个有个类型转换的问题,在没有修正的情况下,也就是不加阻尼系数时,
    //  会出现java.math.BigDecimal not cast to scala.math.BigDecimal,这里导一下java.BigDecimal的包就行
//    val row: Row = avgDiffDF.head()
//    val avgDiff: Double = row.getAs[BigDecimal]("avgDiff").doubleValue()

    // TODO: 加上阻尼系数后,就是正常的Double类型
    val row: Row = avgDiffDF.head()
    val avgDiff: Double = row.getAs[Double]("avgDiff").doubleValue()
    //收敛条件
    if(avgDiff<0.001){
      flag=false
    }
    //每次展示一下新的表
    newPrDF.show()
    //每次得到的新表作为初始表
    initPrDF=newPrDF

  }

  }

}


http://www.kler.cn/news/366081.html

相关文章:

  • MATLAB基础应用精讲-【数模应用】本量利分析(Cost-Volume-Profit Analysis)
  • 使用xml发送国际短信(smspro)【吉尔吉斯斯坦】
  • 机器学习与神经网络的当下与未来
  • HTTP和HTTPS基本概念,主要区别,应用场景
  • 淘宝API的实战应用:数据驱动增长,实时监控商品信息是关键
  • 智能园艺:Spring Boot植物健康系统
  • Java审计对比工具JaVers使用
  • CentOS 7 安装gcc编译环境
  • 解决selenium打开浏览器自动退出
  • k8s 查看 Secrets 的内容和详细信息
  • LCD手机屏幕高精度贴合
  • 密集向量(Dense Vectors):最大化机器学习中数据的潜力
  • 什么是埋点测试,app埋点测试怎么做?
  • 24.10.25 人工智能基础 Pandas库 函数 学习笔记
  • Spring中的循环依赖
  • Android BUG 之 Program type already present: MTT.ThirdAppInfoNew
  • 生信软件39 - GATK最佳实践流程重构,提高17倍分析速度的LUSH流程
  • PyTorch 保存和加载模型状态和优化器状态
  • win10系统家庭版.net framework 3.5sp1启动错误如何解决
  • idea 集成maven
  • Maven(解决思路)
  • TCP标志位在网络故障排查中的作用
  • C语言与C++语言对比:为何C语言不支持函数重载而C++支持?
  • 【图论】Kruskal重构树
  • 《探索 HarmonyOS NEXT(5.0):开启构建模块化项目架构奇幻之旅 —— 模块化基础篇》
  • golang中的函数和结构体