当前位置: 首页 > article >正文

5_SparkGraphX讲解

SparkGraphX讲解

1、为何使用SparkGraphiX图处理?

许多大数据以大规模图网络的形式呈现,尤其是许多的非图结构的大数据,常会被转换为图模型进行分析。
图数据结构能够很好地表达数据之间的关联性

2、图——基本术语认知

概念:图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种网状数据结构,可以对事物之间的关系建模。通常表示为二元组:Graph =(V,E)

图中基本知识点

  • 图可以分为有向图无向图有环图无环图等。
  • 度:指与这个顶点相关联的边的数量

    对于有向图而言:

    • 出度:指从当前顶点指向其他顶点的边的数量
      入度:其他顶点指向当前顶点的边的数量
    • 度 = 出度+入度

3、SparkGraphX简介

3.1:基本概念

Spark GraphX 是一个分布式图处理框架,为图计算和图挖掘提供了简洁易用且丰富多彩的接口。

3.2:Graph X 特点

  • 基于内存实现了数据的复用与快速读取

  • Resilient Distributed Property Graph(弹性分布式属性图)

    通过弹性分布式属性图统一了图视图(Graph)与表视图(Table)

    • GraphX的核心抽象是Resilient Distributed Property Graph(弹性分布式属性图),是一种顶点和边都带属性的有向多重图。

    • 其扩展了Spark RDD的抽象,一份物理存储,两种视图(Table和Graph)

    • 两种视图都有自己独有的操作符,从而获得了灵活操作和执行效率。

  • GraphX能与Spark Streaming、Spark sql 和 Spark MLib等无缝衔接

3.3:Graph X中有重要的概念

1Vertices(顶点):对应的RDD名称为VertexRDD【RDD[Vertex]】,包括顶点ID(VertexId)和顶点属性(VD,Vertex Data)

2Edges(边):对应的RDD名称为EdgeRDD【RDD[Edge[(String,Int)]]】,包括源顶点ID(srcId)、目标顶点ID(dstId)和边属性(ED,Edge Data)

3Triplets(三元组):对应的是关系RDD【RDD[EdgeTriplet]】,包括源顶点ID【srcId:VertexId】、源顶点属性【srcAttr:(String,Int,String)】、边属性【attr:(String,Int)】、目标顶点ID【dstId:VertexId】、目标顶点属性【dstAttr:(String,Int,String)】

4度(Degree):包括inDegrees(每个顶点入度)、outDegrees(每个顶点的出度)、degrees(每个顶点的度)

3.4:存储模式——分区切割策略

分区切割策略(PartitionStrategy):
    1、EdgeCut => 保证点在同一分区
        EdgePartition1D => 同点边(出入度)同分区
        EdgePartition2D => 邻接矩阵:与顶点关联的边最多被分配到(2 * sqrt(分区总数))个分区中
    2、VertexCut => 保证边在同一分区(Spark优选切割策略,因为边较为复杂)
        RandomVertexCut => 两同点同方向(同边)同分区
        CanonicalRandomVertexCut => 两同点边同分区

4、常用API

class Graph[VD,ED]{
  // 图信息
  val numEdges: Long					//   边数
  val numVertices: Long					//   点数
  val inDegrees: VertexRDD[Int]			//   入度
  val outDegrees: VertexRDD[Int]		//   出度
  val degrees: VertexRDD[Int]			//   度数

  // 属性算子
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

  // 结构算子
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]

  // 关联算子
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2]
    (table: RDD[(VertexId, U)])
    (map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}


class GraphOps[VD, ED](graph : org.apache.spark.graphx.Graph[VD, ED]){
  // 👉同分区👈内相同边(顶点ID顺序值都相同)合并
  def groupEdges(merge : scala.Function2[ED, ED, ED]) : 
    org.apache.spark.graphx.Graph[VD, ED]
}

5、算法

页面等级(Page Rank)

用于评估网页链接的质量和数量,以确定该网页的重要性和权威性的相对分数,范围为0到10
从本质上讲,Page Rank是找出图中顶点(网页链接)的重要性(值越大越重要)

过程简单讲解:尽可能的在分散的点中选取几个,然后这几个点进行拉取周围的点形成多个圈,在这几个圈内部会去寻找绝对中心点。依次类推,达到tol阈值时会停止迭代。

def pageRank(
tol : scala.Double, 		// 收敛最小误差,越小越好,确定迭代是否结束的参数
   resetProb : scala.Double	// 随机【重复概率】
) : org.apache.spark.graphx.Graph[scala.Double, scala.Double]

案列:顶点质量

graph.pageRank(0.0001,0.2)
  .vertices
  .foreach(println)
-----------------------------
(1,0.6533816355047241)
(4,0.5437786042062481)
(6,0.8615213520717165)
(3,1.2889169066237742)
(2,1.0530712359694594)
(5,1.5993302656240773)
-----------------------------
三角数量

计算经过每个顶点的三角形数量
用于评估:社区、团体、机构、组织内部关系的紧密程度或稳定性,三角形越少越松散

def triangleCount() 
: org.apache.spark.graphx.Graph[scala.Int, ED]

案例:稳定性

graph.triangleCount()
  .vertices
  .foreach(println) // (顶点id,参与的三角形数量)
-------------------------
(3,5)
(1,7)
(5,5)
(6,3)
(2,3)
(4,7)
-------------------------
连通分量

连通分量是一个子图,其中任何两个顶点通过一条边或一系列边相互连接,其顶点是原始图顶点集的子集,其边是原始图边集的子集

用于反应图中顶点间的连通性

def connectedComponents() : org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId, ED]
def stronglyConnectedComponents(numIter : scala.Int) : org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId, ED]

案例

// 强连通图(每两个顶点之间都存在路径)
graph.connectedComponents()
 .vertices
 .foreach(println)
-----------------------------
(5,1) // 顶点 5 属于连通分量 1
(4,1) // 顶点 4 属于连通分量 1
(1,1) // 顶点 1 属于连通分量 1
(2,1) // 顶点 2 属于连通分量 1
(3,1) // 顶点 3 属于连通分量 1
(6,1) // 顶点 6 属于连通分量 1
-----------------------------
PREGEL

是Google提出的用于大规模分布式图计算框架

  • 图遍历(BFS)
  • 单源最短路径(SSSP)
  • Page Rank计算

Pregel的计算由一系列迭代组成,称为 super steps

  • 每个顶点从上一个 super step 接收入站消息
  • 计算顶点新的属性值
  • 在下一个 super step 中向相邻的顶点发送消息
  • 当没有剩余消息时,迭代结束
def pregel[A](
    initialMsg: A, 
    maxIterations: Int, 
    activeDirection: EdgeDirection
)(
    vprog: (VertexID, VD, A) => VD,
    sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
    mergeMsg: (A, A) => A
) : Graph[VD, ED]

案例:求最短路径(srcId为1L,到其他点的最短路径)

// 1、初始化操作
val srcId = 1L
// 初始化(点):对顶点重新定义(VertexId,Distance),保留边
// 除了 srcId本身点是(1,0),其他点为(VertexId,2100000000)
val initialVertices: Graph[Long, Int] = graph.mapVertices({
  // 点重新定义,边依旧是原来的边
  // 偏函数+模式匹配
  case (vertexId, t2) if (vertexId == srcId) => 0L // 若指向自己,则为0
  case _ => Long.MaxValue // 指向其他人距离为最大值
})

// 2、生成结果图
val vertexCount: Int = initialVertices.vertices.count().toInt
val rstGraph: Graph[Long, Int] = initialVertices.pregel[Long](
  Int.MaxValue, // 初始化值
  vertexCount, // 最大迭代次数(按照顶点数量)
  EdgeDirection.Out
)(
  // 初始化:每一轮迭代初始化节点值
  // VD:上一轮计算的值, A:新值
  (VertexId, VD, A) => {
    val min: Long = Math.min(VD, A)
    println(s"$VertexId ($VD,$A) = $min")
    min
  },
  // 发送消息:EdgeTriplet(srcId,srcAttr,attr,dstId,dstAttr)
  e => {
    // (VertexId,距离)
    // srcAttr:前一个顶点的距离 -- attr:边长 --> dstAttr:后一个顶点的距离
    // srcAttr + attr < dstAttr 则发消息进行替换,否则不发送消息
    val distance: Long = e.srcAttr + e.attr
    if (distance < e.dstAttr) {
      // 发送消息进行替换
      println(s"sendMsg from ${e.srcId} to ${e.dstId} for ($distance,${e.dstAttr}) = $distance")
      Iterator((e.dstId, distance))
    } else {
      // 不发消息
      Iterator.empty
    }
  },
  // 合并
  (a, b) => {
    val min: Long = Math.min(a, b)
    println(s"merge($a,$b),最终选择$min")
    min
  }
)
rstGraph.vertices.foreach(println) // (到xx点,距离)
----------------------------------
(2,1)
(4,3)
(5,5)
(6,2)
(1,0)
(3,3)
----------------------------------

6、应用场景

在地图应用中寻找最短路径
社交网络关系
网页间超链接关系

7、小型案例

7.1.准备工作

数据模拟
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Set
import scala.util.Random
import scala.util.control.Breaks.{breakable,break}

val buffer:ArrayBuffer[Edge[Int]] = ArrayBuffer()
val rand = new Random()
val array = Array(1L,2L,3L,4L,5L,6L) // 六个点
val set:Set[(Long,Long)] = Set() // 去重
var cnt = 0

while(cnt<21){
    array.foreach(one=>{ // 点1
        breakable({
            while (true){
                val two = array(rand.nextInt(array.size)) // 点2
                if(one != two){
                    val relation = (one,two) // 构成关系
                    if(!set.contains(relation)){
                        buffer.append(Edge(relation._1,relation._2,rand.nextInt(6)))
                        set.add(relation)
                        cnt += 1
                        break() // 跳出循环
                    }
                }
            }
        })
    })
}
------------- 输出结果形式 ------------
Edge(1,5,0)
Edge(2,1,2)
Edge(3,6,3)
Edge(4,2,1)
Edge(5,2,4)
-------------------------------------
创建图对象
// 边数据:描述点与点间关系【有方向】
val es = Seq(
    Edge(1L,6L,4),
    Edge(1L,6L,4),
    Edge(2L,5L,4),
    Edge(3L,6L,4),
    Edge(4L,5L,3),
    Edge(5L,1L,5),
    Edge(6L,2L,5),
    //Edge(5L,1L,5),
    //Edge(6L,2L,5),
    Edge(1L,5L,1),
    Edge(2L,3L,2),
    Edge(3L,4L,0),
    Edge(4L,2L,4),
    Edge(5L,2L,4),
    //Edge(2L,3L,2),
    Edge(6L,5L,3),
    Edge(1L,2L,1),
    Edge(2L,1L,0),
    Edge(3L,5L,3),
    //Edge(6L,5L,3),
    Edge(4L,6L,0),
    //Edge(2L,1L,0),
    Edge(5L,3L,0),
    Edge(6L,3L,4),
    Edge(1L,3L,5),
    Edge(2L,6L,1),
    Edge(3L,1L,1),
    //Edge(2L,6L,1),
    //Edge(3L,1L,1),
    Edge(4L,1L,5),
    Edge(5L,6L,3),
    //Edge(5L,6L,3),
    Edge(6L,1L,3)
)
// 点数据
val vs = Seq(
    (1L,("jack",22)),
    (2L,("pola",12)),
    (3L,("mike",26)),
    (4L,("wang",17)),
    (5L,("liu",19)),
    (6L,("frimiku",18)),
)

val conf: SparkConf = new SparkConf()
    .setAppName("graphx-01")
    .setMaster("local[4]")
val sc = new SparkContext(conf)

// 点:描述的是人信息(L来表示点)
val vertex: RDD[(Long, (String, Int))] = sc.makeRDD(vs)
// 边
val edge: RDD[Edge[Int]] = sc.makeRDD(es)
// 构建图 (源码 => 基础:Graph,高阶:GraphOps)
val graph: Graph[(String, Int), Int] = Graph(vertex, edge)

7.2.实际演示

遍历
// 1、查看所有顶点
graph.vertices.foreach(println)
-----------------------------
(1,(jack,22))
(2,(pola,12))
(4,(wang,17))
(3,(mike,26))
(5,(liu,19))
(6,(frimiku,18))
-----------------------------

// 2、查看所有的边【简略】
graph.edges.foreach(println)
-----------------------------
Edge(4,6,0)
Edge(4,1,5)
Edge(1,2,1)
Edge(1,3,5)
-----------------------------

// 3、查看所有的边(将点带入)【完整】
graph.triplets.foreach(println)
------------------------------------
((5,(liu,19)),(6,(frimiku,18)),3)
((4,(wang,17)),(5,(liu,19)),3)
((3,(mike,26)),(4,(wang,17)),0)
((4,(wang,17)),(2,(pola,12)),4)
------------------------------------

// 4、入度:指向自己的个数
graph.inDegrees.foreach(println) // (id,入度)
---------------------------
(4,1)
(3,4)
(2,4)
(5,5)
(6,5)
(1,5)
---------------------------

// 5、出度:指向别人的个数
graph.outDegrees.foreach(println) // (id,出度)
---------------------------
(6,4)
(5,4)
(3,4)
(4,4)
(1,4)
(2,4)
---------------------------

// 4,5组合:将入度和出度一起显示
graph
	.inDegrees
	.join(graph.outDegrees)
	.foreach(println) // (id,(入度,出度))
---------------------------
(4,(1,4))
(3,(4,4))
(2,(4,4))
(1,(5,4))
(6,(5,4))
(5,(5,4))
---------------------------

// 6、度:入度+出度
graph.degrees.foreach(println)
---------------------------
(3,8)
(4,5)
(5,9)
(6,9)
(2,8)
(1,9)
---------------------------
案例一:长一岁
// 长一岁:每个顶点中的人物的年龄长一岁
graph.mapVertices((id,t2)=>(id,(t2._1,t2._2+1))) // (id,(名字,年龄))
      .vertices
      .foreach(println)
--------------------------
(5,(5,(liu,20)))
(4,(4,(wang,18)))
(6,(6,(frimiku,19)))
(1,(1,(jack,23)))
(2,(2,(pola,13)))
(3,(3,(mike,27)))
--------------------------
案例二:找好友
val old: RDD[((VertexId, VertexId), Int)] = graph.edges.map(e => ((e.srcId, e.dstId), e.attr))
val cur: RDD[((VertexId, VertexId), Int)] = graph.edges.map(e => ((e.dstId, e.srcId), e.attr))
old.join(cur)
  .filter(t=>{
    val a1: Int = t._2._1
    val a2: Int = t._2._2
    a1>=3 && a2>=3 && Math.abs(a1-a2)<=1
  })
  // ((源顶点(srcId),目标顶点(dstId)),(srcId至dstId的关系,dstId至srcId的关系))
  .foreach(println)
----------------------------
((6,3),(4,4))
((1,6),(4,3))
((6,1),(3,4))
((5,2),(4,4))
----------------------------
案例三:合并重复边(限同分区内)
// 合并重复边:【同分区】内相同边(顶点ID顺序值都相同【如:(1,2),(1,3)】)合并
graph.partitionBy(PartitionStrategy.EdgePartition2D,1).groupEdges((a, b)=>{
  val c: Int = a + b
  println(s"$a + $b = $c")
  c
}).triplets.foreach(println)
---------------------------
原有:
	Edge(1L,6L,4),
    Edge(1L,6L,4),
合并后:
	Edge(1,6,8)
---------------------------

http://www.kler.cn/a/459299.html

相关文章:

  • aws(学习笔记第二十二课) 复杂的lambda应用程序(python zip打包)
  • Redis(二)value 的五种常见数据类型简述
  • 信息科技伦理与道德1:研究方法
  • 【文献精读笔记】Explainability for Large Language Models: A Survey (大语言模型的可解释性综述)(三)
  • 【HAProxy】如何在Ubuntu下配置HAProxy服务器
  • 卸载干净 IDEA(图文讲解)
  • 职场中哪些话中话,弦外之音
  • word中插入zotero引用
  • QT写的动态正弦曲线图显示并打印
  • 多模态机器人
  • 24.小R的随机播放顺序<字节青训营-中等题>
  • 实战指南:Shiro、CAS打造完美单点登录体验
  • 运行python程序报错 undefined symbol: ffi_type_uint32 的参考解决方法
  • 马原复习笔记
  • AWS K8s 部署架构
  • 在云服务器中编译IDF(ESP32库)
  • 2024年个人总结
  • 使用 PyInstaller 和 hdiutil 打包 Tkinter 应用为 macOS 可安装的 DMG 文件
  • 统计颜色Count Color(POJ2777)题解
  • 【UE5 C++课程系列笔记】16——DeveloperSettings(开发者设置)的基本使用——创建配置文件
  • 【linux进程】进程终止进程等待
  • CSS(层叠样式表)基础选择器,文字控制属性
  • SpringBoot发邮件(带附件)
  • 《Vue进阶教程》第二十九课:立即执行的回调
  • OpenTK 光照与材质详解
  • 瓷砖缺陷检测数据集,使用yolo,coco json,pasical voc xml格式标注,可识别边缘崩裂,破洞,裂缝等缺陷,一共7992张原始图