在生成的图形上运行Spark GraphX算法时出现问题 [英] Problems running Spark GraphX algorithms on generated graphs

查看:79
本文介绍了在生成的图形上运行Spark GraphX算法时出现问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已使用以下代码在Spark GraphX中创建了一个图形.(请参见我的问题和解决方法)

I have created a graph in Spark GraphX using the following codes. (See my question and solution)

import scala.math.random
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner

object SparkER {

  val nPartitions: Integer = 4
  val n: Long = 100
  val p: Double = 0.1

  def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
    (0L until n).filter(_ % nPartitions == i).toIterator
  }

  def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
    (i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j, ()))
  }

  def genEdgesForPartition(iter: Iterator[Long]) = {
    val random = new Random(new java.security.SecureRandom())
    iter.flatMap(genEdgesForId(p, n, random))
  }

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark ER").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val empty = sc.parallelize(Seq.empty[Int], nPartitions)
    val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))

    val edges = ids.mapPartitions(genEdgesForPartition)
    val vertices: VertexRDD[Unit] = VertexRDD(ids.map((_, ())))

    val graph = Graph(vertices, edges)

    val cc = graph.connectedComponents().vertices //Throwing Exceptions

    println("Stopping Spark Context")
    sc.stop()
  }
}

现在,我可以访问该图并查看节点的度数.但是,当我尝试采取某些措施(例如,连接组件)时,遇到了以下例外情况.

Now, I can access the graph and see the degrees of the nodes. But when I try to get some measures, such as Connected components, I am getting the following exceptions.

15/12/22 12:12:57 ERROR Executor: Exception in task 3.0 in stage 6.0 (TID 19)
java.lang.ArrayIndexOutOfBoundsException: -1
    at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
    at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
15/12/22 12:12:57 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 17)
java.lang.ArrayIndexOutOfBoundsException: -1
    at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
    at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
    at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

为什么我不能使用GraphX对生成的图执行这些操作?

Why am I nable to perform these operations on the generated graph using GraphX?

推荐答案

我发现,如果执行以下操作,则不会发生异常.

I found that, if I do the following the exception does not occur.

val graph = Graph(vertices, edges).partitionBy(PartitionStrategy.RandomVertexCut)

显然,某些GraphX算法需要重新分区.但是目的对我来说并不完全清楚.

Apparently, some GraphX algorithms require the repartitioning. But the purpose is not entirely clear to me.

这篇关于在生成的图形上运行Spark GraphX算法时出现问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆