Spark:在RDD中查找元素的最快方法 [英] Spark: Fastest way to look up an element in an RDD
问题描述
我有一个自定义类E,该类除其他外具有字段 word
.我有一个大型的 es:RDD [E]
,其中包含100000个元素,而一个 doc:Seq [String]
,通常具有数百个条目.在 es
中,每个元素的 word
字段值都是唯一的.
I have a custom class E which has, among others, a field word
. I have a large es: RDD[E]
with several 100000 elements and a doc: Seq[String]
with typically a few hundred entries. In es
, every element's word
field value is unique.
我的任务是在 es
中查找 doc
中每个字符串的元素.但是,不能保证存在这样的元素.因此,我朴素的Scala/Spark实现是:
My task is to look up the element in es
for each of the strings in doc
. It is however not guaranteed that such an element exists. My naive Scala/Spark implementation is thus:
def word2E(words: Seq[String]): Seq[E] = {
words.map(lookupWord(_, es))
.filter(_.isDefined)
.map(_.get)
}
方法 lookupWord()
定义如下:
def lookupWord(w: String, es: RDD[E]): Option[E] = {
val lookup = es.filter(_.word.equals(w))
if (lookup.isEmpty) None
else Some(lookup.first)
}
当我查看Spark阶段概述时, lookupWord()
似乎是一个瓶颈.特别是,在某些情况下, lookupWord
中的 isEmpty()
调用会花费相对较长的时间(最多2s).
When I look at the Spark stages overview, it seems like lookupWord()
is a bottleneck. In particular, the isEmpty()
calls in lookupWord
take relatively long (up to 2s) in some cases.
我已经保存了 es
RDD.是否还有其他方法可以优化此类任务,或者与在此类数据集上进行操作时获得的效果一样好?
I have already persisted the es
RDD. Is there any other leverage for optimizing such a task or is this just as good as it gets when operating on such a dataset?
我注意到 PairRDDFunctions
中的 lookup()
方法,并考虑构造一个PairRDD,其中 word
字段将用作键.可能有帮助吗?通过实验得出任何结论都非常困难,因为其中涉及的因素太多.
I have noticed the lookup()
method in PairRDDFunctions
and considered to construct a PairRDD in which the word
field would serve as the key. Might that help?
Drawing any conclusions experimentally here is quite hard because there are so many factors involved.
推荐答案
实现的问题是,您会触发 words
中的每个单词,从而完全遍历 RDD
然后收集元素.解决问题的一种方法是将单词序列与您的 RDD
:
The problem with your implementation is that you trigger for each word in words
a complete traversal of your RDD
and then collect the elements. One way to solve your problem is to join the sequence of words with your RDD
:
case class E(word: String, value: Int)
object App {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val entries = sc.parallelize(List(E("a", 1), E("b", 2), E("c", 3), E("c", 3)))
val words = Seq("a", "a", "c")
val wordsRDD = sc.parallelize(words).map(x => (x, x))
val matchingEntries = entries
.map(x => (x.word, x))
.join(wordsRDD)
.map{
case (_, (entry, _)) => entry
}
.collect
println(matchingEntries.mkString("\n"))
}
}
输出为
E(a,1)
E(a,1)
E(c,3)
E(c,3)
这篇关于Spark:在RDD中查找元素的最快方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!