SparkX 下载本文

内容发布更新时间 : 2024/5/15 18:47:48星期一 下面是文章的全部内容请认真阅读。

1. 基础类

import org.apache.spark._

import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD

2. 成员和方法

graph.vertices.filter { case (id, (name, pos)) => pos == \graph.edges.filter(e => e.srcId > e.dstId).count

graph.edges.filter{ case Edge(srcId, dstId, s) => srcId > dstId }.count

val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + \of \

facts.collect.foreach(println(_))

2.1. mapVertices,mapEdges,mapTriplets

class Graph[VD, ED] {

def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]

def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] }

以上三个方法修改节点或者边的属性,但是重用原Graph的结构。

2.2. Reverse,subgraph,mask,groupEdges

class Graph[VD, ED] {

// 反转图中所有边的方向 def reverse: Graph[VD, ED]

// 保留满足vpred条件的顶点。保留满足epred条件且满足vpred条件的边。 def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,

vpred: (VertexId, VD) => Boolean): Graph[VD, ED]

// 构建一个子图,子图中的顶点和边同时存在于当前图和输入图中。通常和subgraph结

合使用。

def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] // 合并相同顶点的边,减少图的大小。

def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] }

2.3. joinVertices,outerJoinVertices

class Graph[VD, ED] {

// 返回一个新图,新图的顶点的属性值通过map函数合并当前顶点属性值和输入值获得,输入图中没有顶点和当前图匹配的顶点属性值保持不变。

def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)

: Graph[VD, ED]

// 返回的新图中顶点的属性可以是一种新类型。如果输入RDD中没有匹配的顶点,可以为当前图中的顶点设置一个默认值,例子见下文。

def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)

: Graph[VD2, ED] }

注意:调用joinVertices时,如果输入RDD中某个顶点有多个值,只有一个值会被使用。使用如下方法可以排除输入RDD中的重复顶点,且对输入RDD进行的索引可以加快join。 // 顶点值不唯一的RDD

val nonUniqueCosts: RDD[(VertexId, Double)]

// 将输入RDD转变成只包含唯一顶点值的RDD,且创建了索引

val uniqueCosts: VertexRDD[Double] = graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)

val joinedGraph = graph.joinVertices(uniqueCosts)((id, oldCost, extraCost) => oldCost + extraCost)

例子:

val outDegrees: VertexRDD[Int] = graph.outDegrees

val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) => outDegOpt match {

case Some(outDeg) => outDeg

case None => 0 // No outDegree means zero outDegree } }

2.4. aggregateMessages

class Graph[VD, ED] {

// 对每个edge triplet先调用sendMsg方法发送消息到目的顶点,然后调用mergeMsg方

法合并目的顶点上的所有消息。

def aggregateMessages[Msg: ClassTag](

sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg,

tripletFields: TripletFields = TripletFields.All) : VertexRDD[Msg] }

sendMsg有EdgeContext对象,其中包含了源和目标顶点的属性,还有边的属性,还有sendToSrc和sendToDst方法。

mergeMsg合并sendMsg发送到相同顶点的消息。

返回结果是一个RDD包含顶点和消息对象,没有消息的顶点不包含在结果中。

tripletFields可以取值TripletFields.All、Src、Dst、EdgeOnly、None ,默认是All,这个参数会通知GraphX在EdgeContext中哪些属性可以使用,同时采用更有效的join策略。

2.5. inDegrees,outDegrees,degrees

inDegrees入度是汇入某个顶点的边的数量。

outDegrees出度是以某个顶点为起点的边的数量。 degrees是某个顶点的边的总数量。

// Define a reduce operation to compute the highest degree vertex def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b }

// Compute the max degrees

val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max) val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max) val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)

2.6. collectNeighborIds,collectNeighbors

class GraphOps[VD, ED] {

// 收集某个顶点的所有相邻顶点的Id

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] // 收集某个顶点的所有相邻顶点的Id和属性

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ] }

这两个方法比较耗时,因为需要复制属性值,节点间大量交互。如果可能的话,使用aggregateMessages方法直接计算。