通过非持久图在Spark和GraphX中获得连接具有不同索引的两个VertexPartition的速度很慢 [英] Get Joining two VertexPartitions with different indexes is slow in Spark and GraphX by unpersist graph

查看:307
本文介绍了通过非持久图在Spark和GraphX中获得连接具有不同索引的两个VertexPartition的速度很慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对不起,标题太长,如果您能理解我的意思,请帮助我进行编辑,谢谢.

Sorry about the inaccurate and long title, if you can understand what I'm saying, please help me edit it, thanks.

代码如下.如果执行它,将会得到

The code is as follows. If you execute it, you will get

14/06/12 14:33:24 WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.

但是,如果您评论graph.unpersistVertices(blocking = false),则不会出现此类警告. 所以我很好奇为什么这会改变Graph对象的索引?

But if you comment graph.unpersistVertices(blocking = false), then there will be no such warning. So I'm curious about why this will change the index of Graph object?

object Test {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Test")
      .setMaster("local[4]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)

    val v: RDD[(VertexId, Int)] = sc.parallelize(Seq((0L,0),(1L,1),(2L,2)))
    val e: RDD[Edge[Int]] = sc.parallelize(Seq(Edge(0, 1, 0), Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0)))


    val g = Graph(v, e)

    def test(graph: Graph[Int, Int]) = {
      graph.cache()
      val ng = graph.outerJoinVertices(graph.outDegrees){
        (vid, vd, out) => (vd, out.getOrElse(vid, 0))
      }

      val f = ng.subgraph(epred = _.srcId != 0, vpred = (vid, vd) => vid != 0L)
      f.cache()
      graph.unpersistVertices(blocking = false)
      f
    }

    val f1 = test(g)

    println(f1.numVertices)

  }
}

据我所知,当您在GraphX的Graph上进行类似mapValue的操作时,将重新使用RDD(VertexRDD)的索引以避免重新计算.当您执行子图之类的操作时,您仍然可以通过在其上应用位掩码来重用那些索引.因为outerJoinVertices仅修改RDD的值,所以会进行某种操作吗?

According to my knowledge, when you do a manipulation, like mapValue, on a GraphX's Graph, the index of RDD(VertexRDD) will be reused to avoid re-computation. When you do something like subgraph, you will still some sort of reuse those indexes by applying bit mask on it. Does outerJoinVertices some kind of manipulation since it only modify the value of a RDD?

而且,我cache()在较旧的图之前的cache()新图,所以我认为unpersist不会影响已缓存的图,因为我们已经对其进行了缓存,但是我错了.

Moreover, I cache() the new graph before unpersist the older one, so I think the unpersist will not affect the cached graph since we already cached it, but I'm wrong.

缓存和非持久化如何工作?为什么由于我实际上没有加入分区,所以它们会影响索引?

How does the cache and unpersist work? Why they will affect the indexes since I'm not actually joining partitions?

更新:我查看了代码,numVertices实际上是一个map and reduce方法 partitionsRDD.map(_.size).reduce(_ + _).因此,连接发生在这一行.

Update: I look into the code and numVertices is actually a map and reduce method partitionsRDD.map(_.size).reduce(_ + _). So the joining happens at this line.

推荐答案

您需要在取消持久化旧图形之前具体化新图形.这是因为RDD transformation 是惰性操作,即,火花剂量只有在看到 action 时才进行实际计算.有关更多信息,请参见spark编程指南中的"RDD操作": https://spark.apache.org/docs/latest/programming-guide.html

You need to materialize the new graph before unpersisting the old graph. This is because RDD transformations are lazy operations, i.e., spark dose not actually compute them until it sees an action. For more information, refer to "RDD Operations" in the spark programming guide: https://spark.apache.org/docs/latest/programming-guide.html

因此,在您的测试函数中,只需在f.cache()解决问题之后添加一行代码即可:
f.vertices.count//此操作将强制spark计算f并将其缓存

So in your test function, simply add one line of code after f.cache() solves the issue:
f.vertices.count //this action will force spark to compute f and cache it

这篇关于通过非持久图在Spark和GraphX中获得连接具有不同索引的两个VertexPartition的速度很慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆