创建每个执行人阵列中的星火和结合成RDD [英] Creating array per Executor in Spark and combine into RDD

查看:142
本文介绍了创建每个执行人阵列中的星火和结合成RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从基于MPI系统到Apache星火移动。我需要做以下的星火。

I am moving from MPI based systems to Apache Spark. I need to do the following in Spark.

假设,我有 N 顶点。我想从这些 N 顶点创建一个边列表。边缘仅仅是一个两个整数(U,V),没有属性是必需的。元组

Suppose, I have n vertices. I want to create an edge list from these n vertices. An edge is just a tuple of two integers (u,v), no attributes are required.

不过,我想并行每个执行人独立创建。因此,我想单独创建 P 边缘阵列为 P 星火执行人。每个阵列可以具有不同的尺寸,并取决于顶点,因此,我还需要从 0 执行人id来 N-1 。接下来,我想有边的全球RDD阵列。

However, I want to create them in parallel independently in each executor. Therefore, I want to create P edge arrays independently for P Spark Executors. Each array may be of different sizes and depends on the vertices, therefore, I also need the executor id from 0 to n-1. Next, I want to have a global RDD Array of edges.

在MPI,我会创造在使用处理器级别每个处理器阵列。我该怎么做,在星火,尤其是使用 GraphX​​ 库?

In MPI, I would create an array in each processor using the processor rank. How do I do that in Spark, especially using the GraphX library?

所以,我的首要目标是在每一个执行者创造边缘的数组并将它们组合成一个单一的RDD。

Therefore, my primary goal is to create an array of edges in each executor and combine them into one single RDD.

我首先尝试鄂尔多斯的一个修改版本 - 仁义模式。作为一个参数我只有节点n的数目和概率p。

I am first trying one modified version of the Erdos--Renyi model. As a parameter I only have the number of nodes n and a probability p.

假设,遗嘱执行人 I 必须处理从 101 200 。对于任何一个节点表示,节点 101 ,它会创建从 101 102的边缘 - ñ的概率为p。在每次执行程序创建分配的边缘,我会实例化GraphX​​ EdgeRDD VertexRDD 。因此,我的计划是在每个执行人独立创建的边缘名单,并把它们合并成 RDD

Suppose, executor i has to process nodes from 101 to 200. For any node say, node 101, it will create edges from 101 to 102 -- n with probability p. After each executor creates the allocated edges, I would instantiate the GraphX EdgeRDD and VertexRDD. Therefore, my plan is to create the edge lists independently in each executor, and merge them into RDD.

推荐答案

让我们开始与一些进口和变量,这将需要为下游加工:

Lets start with some imports and variables which will be required for downstream processing:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner

val nPartitions: Integer = ???
val n: Long = ??? 
val p: Double = ???

下一步我们需要可以用来生成边缘种子ID的RDD。处理这种幼稚的方法是简单地是这样的:

Next we'll need an RDD of seed IDs which can be used to generate edges. A naive way to handle this would be simply something like this:

sc.parallelize(0L to n)

由于生成的边的数量取决于所述节点的id这种方法会得到一个高度倾斜的负荷。我们可以做的更好一点与重新分配:

Since number of the generated edges depends on the node id this approach would give a highly skewed load. We can do a little bit better with repartitioning:

sc.parallelize(0L to n)
  .map((_, None))
  .partitionBy(new HashPartitioner(nPartitions))
  .keys

但更好的方法是先从空RDD并生成到位的ID。我们需要一个小帮手:

but much better approach is to start with empty RDD and generate ids in place. We'll need a small helper:

def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
  (0L until n).filter(_ % nPartitions == i).toIterator
}

,可使用如下:

val empty = sc.parallelize(Seq.empty[Int], nPartitions)
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))

只是一个快速完整性检查(这是相当昂贵的,所以不要在生产中使用它):

Just a quick sanity check (it is quite expensive so don't use it in production):

require(ids.distinct.count == n) 

和我们可以生成使用其他辅助实际边:

and we can generate actual edges using another helper:

def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
  (i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j, ()))
}

def genEdgesForPartition(iter: Iterator[Long]) = {
  // It could be an overkill but better safe than sorry
  // Depending on your requirement it could worth to
  // consider using commons-math
  // https://commons.apache.org/proper/commons-math/userguide/random.html
  val random = new Random(new java.security.SecureRandom())
  iter.flatMap(genEdgesForId(p, n, random))
}

val edges = ids.mapPartitions(genEdgesForPartition)

最后,我们可以创建一个图:

Finally we can create a graph:

val graph = Graph.fromEdges(edges, ())

这篇关于创建每个执行人阵列中的星火和结合成RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆