Spark GraphX聚合汇总 [英] Spark GraphX Aggregation Summation

查看:93
本文介绍了Spark GraphX聚合汇总的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试计算spark graphx图中的节点值之和.简而言之,该图是一棵树,并且顶层节点(根)应将所有子项及其子项相加.我的图实际上是一棵看起来像这样的树, 期望的总值应为1850 :

I'm trying to compute the sum of node values in a spark graphx graph. In short the graph is a tree and the top node (root) should sum all children and their children. My graph is actually a tree that looks like this and the expected summed value should be 1850:

                                     +----+
                     +--------------->    |  VertexID 14
                     |               |    |  Value: 1000
                 +---+--+            +----+
    +------------>      | VertexId 11
    |            |      | Value:     +----+
    |            +------+ Sum of 14 & 24  |  VertexId 24
+---++                +-------------->    |  Value: 550
|    | VertexId 20                   +----+
|    | Value:
+----++Sum of 11 & 911
      |
      |           +-----+
      +----------->     | VertexId 911
                  |     | Value: 300
                  +-----+

第一个刺是这样的:

val vertices: RDD[(VertexId, Int)] =
      sc.parallelize(Array((20L, 0)
        , (11L, 0)
        , (14L, 1000)
        , (24L, 550)
        , (911L, 300)
      ))

  //note that the last value in the edge is for factor (positive or negative)
    val edges: RDD[Edge[Int]] =
      sc.parallelize(Array(
        Edge(14L, 11L, 1),
        Edge(24L, 11L, 1),
        Edge(11L, 20L, 1),
        Edge(911L, 20L, 1)
      ))

    val dataItemGraph = Graph(vertices, edges)


    val sum: VertexRDD[(Int, BigDecimal, Int)] = dataItemGraph.aggregateMessages[(Int, BigDecimal, Int)](
      sendMsg = { triplet => triplet.sendToDst(1, triplet.srcAttr, 1) },
      mergeMsg = { (a, b) => (a._1, a._2 * a._3 + b._2 * b._3, 1) }
    )

    sum.collect.foreach(println)

这将返回以下内容:

(20,(1,300,1))
(11,(1,1550,1))

它正在计算顶点11的总和,但没有累积到根节点(顶点20).我缺少什么或有更好的方法来做到这一点?当然,树可以具有任意大小,并且每个顶点可以具有任意数量的子边缘.

It's doing the sum for vertex 11 but it's not rolling up to the root node (vertex 20). What am I missing or is there a better way of doing this? Of course the tree can be of arbitrary size and each vertex can have an arbitrary number of children edges.

推荐答案

鉴于图形是有向的(如您的示例所示),应该可以编写一个Pregel程序来满足您的要求:

Given the graph is directed (as in you example it seems to be) it should be possible to write a Pregel program that does what you're asking for:

val result = 
 dataItemGraph.pregel(0, activeDirection = EdgeDirection.Out)(
  (_, vd, msg) => msg + vd, 
  t => Iterator((t.dstId, t.srcAttr)), 
  (x, y) => x + y
 )

 result.vertices.collect().foreach(println)

// Output is:
// (24,550)
// (20,1850)
// (14,1000)
// (11,1550)
// (911,300)

我正在使用 EdgeDirection.Out ,以便消息仅从下往上发送(否则我们将陷入无休止的循环).

I'm using EdgeDirection.Out so that the messages are being send only from bottom to up (otherwise we would get into an endless loop).

这篇关于Spark GraphX聚合汇总的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆