如何根据Spark中另一个RDD的功能过滤RDD? [英] How to filter a RDD according to a function based another RDD in Spark?

查看:1489
本文介绍了如何根据Spark中另一个RDD的功能过滤RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Apache Spark的初学者。我想过滤掉RDD中权重总和大于常数值的所有组。 权重图也是RDD。这是一个小尺寸演示,要过滤的组存储在组中,常量值为12:

  val组= sc.parallelize(List(a,b,c,d,b,c,e,a,c,d,e,g))
val weights = sc 。并行化((a,3),(b,2),(c,5),(d,1),(e 4),(g,6)))
val wm = weights.toArray.toMap
def isheavy(inp:String):Boolean = {
val allw = inp.split ,)。map(wm(_))。sum
allw> 12
}
val result = groups.filter(isheavy)

当输入数据非常大,例如10GB以上,我总是遇到一个java堆内存不足的错误。我怀疑是否由weights.toArray.toMap引起,因为它将分布式RDD转换为JVM中的Java对象。所以我试图直接用RDD过滤:

  val groups = sc.parallelize(List(a,b,c,d (b,c,e,a,c,d,e,g))
val weights = sc.parallelize(Array((a,3) , isheavy(inp:String):Boolean = {
val items = inp.split(,)
val wm = items.map(x => weights.filter(_._ 1 == x ).first._2)
wm.sum> 12
}
val result = groups.filter(isheavy)

当我在将此脚本加载到spark shell后,运行 result.collect ,我收到一个java.lang.NullPointerException错误。有人告诉我RDD在另一个RDD中被操纵时,会有一个空指针异常,并建议我把重量放在Redis中。



那么如何才能得到结果没有将重量转换为地图,还是将其放入雷迪斯?如果没有外部数据存储服务的帮助,是否存在基于另一个类似地图的RDD过滤RDD的解决方案?
谢谢!

解决方案

java内存不足错误即将到来,因为spark使用它的 spark.default.parallelism 属性,同时确定分割数,默认为可用的核心数。

  //从CoarseGrainedSchedulerBackend.scala 

覆盖def defaultParallelism():Int = {
conf.getInt(spark.default.parallelism,math.max(totalCoreCount.get ),
}

当输入变大并且内存有限您应该增加分割数。



您可以执行以下操作:

  val input = List(a,b,c,d,b,c,e,a,c,d,e,g)
val splitSize = /指定一些适合内存的元素数量。

val numSplits =(input.size / splitSize)+ 1 //必须为> 0.
val groups = sc.parallelize(input,numSplits)//指定分割数。

val weights = Array((a,3),(b,2),(c,5),(d,1),(e (f,4),(g,6))toMap

def isHeavy(inp:String)= inp.split(, _))。> 12
val result = groups.filter(isHeavy)

您还可以考虑增加执行程序内存大小使用 spark.executor.memory


I am a beginner of Apache Spark. I want to filter out all groups whose sum of weight is larger than a constant value in a RDD. The "weight" map is also a RDD. Here is a small-size demo, the groups to be filtered is stored in "groups", the constant value is 12:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
  val allw = inp.split(",").map(wm(_)).sum
  allw > 12
}
val result = groups.filter(isheavy)

When the input data is very large, > 10GB for example, I always encounter a "java heap out of memory" error. I doubted if it's caused by "weights.toArray.toMap", because it convert an distributed RDD to an Java object in JVM. So I tried to filter with RDD directly:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
  val items = inp.split(",")
  val wm = items.map(x => weights.filter(_._1 == x).first._2)
  wm.sum > 12
}
val result = groups.filter(isheavy)

When I ran result.collect after loading this script into spark shell, I got a "java.lang.NullPointerException" error. Someone told me when a RDD is manipulated in another RDD, there will be a nullpointer exception, and suggest me to put the weight into Redis.

So how can I get the "result" without convert "weight" to Map, or put it into Redis? If there is a solution to filter a RDD based on another map-like RDD without the help of external datastore service? Thanks!

解决方案

The "java out of memory" error is coming because spark uses its spark.default.parallelism property while determining number of splits, which by default is number of cores available.

// From CoarseGrainedSchedulerBackend.scala

override def defaultParallelism(): Int = {
   conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

When the input becomes large, and you have limited memory, you should increase number of splits.

You can do something as follows:

 val input = List("a,b,c,d", "b,c,e", "a,c,d", "e,g") 
 val splitSize = 10000 // specify some number of elements that fit in memory.

 val numSplits = (input.size / splitSize) + 1 // has to be > 0.
 val groups = sc.parallelize(input, numSplits) // specify the # of splits.

 val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)).toMap

 def isHeavy(inp: String) = inp.split(",").map(weights(_)).sum > 12
 val result = groups.filter(isHeavy)

You may also consider increasing executor memory size using spark.executor.memory.

这篇关于如何根据Spark中另一个RDD的功能过滤RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆