如何根据Spark中另一个RDD的功能过滤RDD? [英] How to filter a RDD according to a function based another RDD in Spark?
问题描述
val组= sc.parallelize(List(a,b,c,d,b,c,e,a,c,d,e,g))
val weights = sc 。并行化((a,3),(b,2),(c,5),(d,1),(e 4),(g,6)))
val wm = weights.toArray.toMap
def isheavy(inp:String):Boolean = {
val allw = inp.split ,)。map(wm(_))。sum
allw> 12
}
val result = groups.filter(isheavy)
当输入数据非常大,例如10GB以上,我总是遇到一个java堆内存不足的错误。我怀疑是否由weights.toArray.toMap引起,因为它将分布式RDD转换为JVM中的Java对象。所以我试图直接用RDD过滤:
val groups = sc.parallelize(List(a,b,c,d (b,c,e,a,c,d,e,g))
val weights = sc.parallelize(Array((a,3) , isheavy(inp:String):Boolean = {
val items = inp.split(,)
val wm = items.map(x => weights.filter(_._ 1 == x ).first._2)
wm.sum> 12
}
val result = groups.filter(isheavy)
当我在将此脚本加载到spark shell后,运行 result.collect
,我收到一个java.lang.NullPointerException错误。有人告诉我RDD在另一个RDD中被操纵时,会有一个空指针异常,并建议我把重量放在Redis中。
那么如何才能得到结果没有将重量转换为地图,还是将其放入雷迪斯?如果没有外部数据存储服务的帮助,是否存在基于另一个类似地图的RDD过滤RDD的解决方案?
谢谢!
java内存不足错误即将到来,因为spark使用它的 spark.default.parallelism
属性,同时确定分割数,默认为可用的核心数。
//从CoarseGrainedSchedulerBackend.scala
覆盖def defaultParallelism():Int = {
conf.getInt(spark.default.parallelism,math.max(totalCoreCount.get ),
}
当输入变大并且内存有限您应该增加分割数。
您可以执行以下操作:
val input = List(a,b,c,d,b,c,e,a,c,d,e,g)
val splitSize = /指定一些适合内存的元素数量。
val numSplits =(input.size / splitSize)+ 1 //必须为> 0.
val groups = sc.parallelize(input,numSplits)//指定分割数。
val weights = Array((a,3),(b,2),(c,5),(d,1),(e (f,4),(g,6))toMap
def isHeavy(inp:String)= inp.split(, _))。> 12
val result = groups.filter(isHeavy)
您还可以考虑增加执行程序内存大小使用 spark.executor.memory
。
I am a beginner of Apache Spark. I want to filter out all groups whose sum of weight is larger than a constant value in a RDD. The "weight" map is also a RDD. Here is a small-size demo, the groups to be filtered is stored in "groups", the constant value is 12:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
val allw = inp.split(",").map(wm(_)).sum
allw > 12
}
val result = groups.filter(isheavy)
When the input data is very large, > 10GB for example, I always encounter a "java heap out of memory" error. I doubted if it's caused by "weights.toArray.toMap", because it convert an distributed RDD to an Java object in JVM. So I tried to filter with RDD directly:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
val items = inp.split(",")
val wm = items.map(x => weights.filter(_._1 == x).first._2)
wm.sum > 12
}
val result = groups.filter(isheavy)
When I ran result.collect
after loading this script into spark shell, I got a "java.lang.NullPointerException" error. Someone told me when a RDD is manipulated in another RDD, there will be a nullpointer exception, and suggest me to put the weight into Redis.
So how can I get the "result" without convert "weight" to Map, or put it into Redis? If there is a solution to filter a RDD based on another map-like RDD without the help of external datastore service? Thanks!
The "java out of memory" error is coming because spark uses its spark.default.parallelism
property while determining number of splits, which by default is number of cores available.
// From CoarseGrainedSchedulerBackend.scala
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
When the input becomes large, and you have limited memory, you should increase number of splits.
You can do something as follows:
val input = List("a,b,c,d", "b,c,e", "a,c,d", "e,g")
val splitSize = 10000 // specify some number of elements that fit in memory.
val numSplits = (input.size / splitSize) + 1 // has to be > 0.
val groups = sc.parallelize(input, numSplits) // specify the # of splits.
val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)).toMap
def isHeavy(inp: String) = inp.split(",").map(weights(_)).sum > 12
val result = groups.filter(isHeavy)
You may also consider increasing executor memory size using spark.executor.memory
.
这篇关于如何根据Spark中另一个RDD的功能过滤RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!