GraphX的顶点之间的笛卡尔积 [英] Cartesian product between vertices of a GraphX

查看:152
本文介绍了GraphX的顶点之间的笛卡尔积的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在图的节点之间做一个笛卡尔积.我想建立他们的距离矩阵.也许这不是一个很好的方法,所以欢迎提出任何建议.

I will like to do a cartesian product between the nodes of a Graph. I want to build their distance matrix. Maybe this is not a very good approach, so, any suggestion is welcome.

这是我的代码,它不起作用,我没有任何警告也没有异常,只是不起作用.我想也许是因为我正在尝试使用 same RDD制作笛卡尔产品,但是我不知道如何解决它,如何制作嵌套循环或可以帮助我解决问题的东西.计算这个矩阵.

This is my code, and it's not working, I don't have any warning nor exception, it just does not work. I think maybe is because I'm trying to make a cartesian product with the same RDD, but I don't know how to fix it, how to make a nested loop or something that can help me to compute this matrix.

val indexes1 = graph.vertices.map(_._1)
val indexes2 = graph.vertices.map(_._1)

val cartesian = indexes1.cartesian(indexes2).cache()
cartesian.map(pair => matrix.updated(pair._1, shortPathBetween(pair._1, pair._2)))

def shortPathBetween(v1:VertexId, v2:VertexId) : Int = {
    val path = ShortestPaths.run(graph, Seq(v2))
    val shortestPath = path.vertices.filter({case (vId, _ ) => vId == v1})
        .first()
        ._2
        .get(v2)

    shortestPath.getOrElse(-1)
}

推荐答案

我要使用的方法是使用pregel API.这允许从每个节点并行遍历图形.如果跟踪距离并在使用边权重遍历时更新距离,则最终得到的顶点与每个(可达)其他顶点都有距离.

The way I would approach this, is using the pregel API. This allows for parallel traversing the graph from each node. If you keep track of the distances and update them while traversing with the edge weight you end up with vertices with distances to each (reachable) other vertex.

例如,以这个有向图为例:

If you for example take this directed graph:

您可以像这样在Spark GraphX中初始化它:

You can init this in Spark GraphX like this:

val graphData = List(
    (0, 0, 1, 10.0),
    (1, 0, 2, 5.0),
    (2, 1, 2, 2.0),
    (3, 1, 3, 1.0),
    (4, 2, 1, 3.0),
    (5, 2, 3, 9.0),
    (6, 2, 4, 2.0),
    (7, 3, 4, 4.0),
    (8, 4, 0, 7.0),
    (9, 4, 3, 5.0)
  ).toDF("id", "from", "to", "distance")

  val vertexRDD: RDD[(Long, Int)] = graphData.flatMap(_.getValuesMap[Int](List("to", "from")).values).distinct().map(i => (i.toLong, i)).rdd
  val edgeRDD: RDD[Edge[Double]] = graphData.map(x => Edge(x.getInt(1), x.getInt(2), x.getDouble(3))).rdd
  val graph: Graph[Int, Double] = Graph(vertexRDD, edgeRDD)

pregel调用具有3个功能

The pregel call takes 3 functions

  • vprog 使用消息初始化每个顶点(在这种情况下,为空Map [VertexId,Double]以跟踪距离)
  • sendMsg 应用于每次迭代的更新步骤(在这种情况下,通过增加边缘的权重并返回带有消息的Iterator来发送到下一个迭代来更新距离
  • >
  • mergeMsg 合并两个消息(将2个Map [VertexId,Double] s合并为1,保持最短距离)
  • vprog to initialize each vertex with a message (in this case empty Map[VertexId, Double] to keep track of distances)
  • sendMsg an update step that is applied on each iteration (in this case updating the distances by adding the weight of the edge and returning an Iterator with messages to send out to the next iteration
  • mergeMsg to merge two messages (2 Map[VertexId, Double]s into 1, keeping shortest distance)

在代码中,它可能类似于:

In code this could look like:

def vprog(id: VertexId, orig: Map[VertexId, Double], newly: Map[VertexId, Double]): Map[VertexId, Double] = newly

def mergeMsg(a: Map[VertexId, Double], b: Map[VertexId, Double]): Map[VertexId, Double] = (a.toList ++ b.toList).groupBy(_._1).map{ // mapValues is not serializable :-(
    case (id, v) => id -> v.map(_._2).min // keep shortest distance in case of duplicate
}

def sendMsg(trip: EdgeTriplet[Map[VertexId, Double], Double]): Iterator[(VertexId, Map[VertexId, Double])] = {
    val w = trip.attr // weight of edge from src -> dst
    val distances = trip.dstAttr.mapValues(_ + w) + // update collected distances at dst + edge weight
      (trip.srcId -> 0.0, trip.dstId -> w) // set distance to src to 0  and to dst the edge weight

    // If src contains as much nodes as dst (we traversed all)
    if(trip.srcAttr.keySet.intersect(distances.keySet).size != distances.keySet.size)
      Iterator((trip.srcId, distances))
    else
      Iterator.empty
}

然后运行预凝胶,收集顶点并旋转地图以获取距离矩阵.

Then run the pregel, collect the vertices and pivot the map to get a distance matrix.

val initMap = Map.empty[VertexId, Double]

val result = graph
    .mapVertices((_,_) => initMap)
    .pregel(
      initialMsg = initMap,
      activeDirection = EdgeDirection.Out
    )(vprog, sendMsg, mergeMsg)
    .vertices
    .toDF("id","map")
    .select('id, explode('map))
    .groupBy("id")
    .pivot("key")
    .agg(min("value"))
    .orderBy("id")
    .show(false)

结果看起来像

+---+----+----+----+----+---+
|id |0   |1   |2   |3   |4  |
+---+----+----+----+----+---+
|0  |0.0 |8.0 |5.0 |11.0|7.0|
|1  |11.0|0.0 |2.0 |1.0 |4.0|
|2  |9.0 |3.0 |0.0 |4.0 |2.0|
|3  |11.0|21.0|16.0|0.0 |4.0|
|4  |7.0 |15.0|12.0|5.0 |0.0|
+---+----+----+----+----+---+

也许还有其他/更好的方法,但这似乎比以笛卡尔乘积计算节点之间的最短路径计算强度低;-)

Maybe there are other/better ways, but this seems computationally less intense than calculating shortest path between nodes as a cartesian product ;-)

这篇关于GraphX的顶点之间的笛卡尔积的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆