Spark:在RDD中查找元素的最快方法 [英] Spark: Fastest way to look up an element in an RDD

查看:268
本文介绍了Spark:在RDD中查找元素的最快方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个自定义类E,该类除其他外具有字段 word .我有一个大型的 es:RDD [E] ,其中包含100000个元素,而一个 doc:Seq [String] ,通常具有数百个条目.在 es 中,每个元素的 word 字段值都是唯一的.

I have a custom class E which has, among others, a field word. I have a large es: RDD[E] with several 100000 elements and a doc: Seq[String] with typically a few hundred entries. In es, every element's word field value is unique.

我的任务是在 es 中查找 doc 中每个字符串的元素.但是,不能保证存在这样的元素.因此,我朴素的Scala/Spark实现是:

My task is to look up the element in es for each of the strings in doc. It is however not guaranteed that such an element exists. My naive Scala/Spark implementation is thus:

def word2E(words: Seq[String]): Seq[E] = {
  words.map(lookupWord(_, es))
    .filter(_.isDefined)
    .map(_.get)
}

方法 lookupWord()定义如下:

def lookupWord(w: String, es: RDD[E]): Option[E] = {
  val lookup = es.filter(_.word.equals(w))

  if (lookup.isEmpty) None
  else Some(lookup.first)
}

当我查看Spark阶段概述时, lookupWord()似乎是一个瓶颈.特别是,在某些情况下, lookupWord 中的 isEmpty()调用会花费相对较长的时间(最多2s).

When I look at the Spark stages overview, it seems like lookupWord() is a bottleneck. In particular, the isEmpty() calls in lookupWord take relatively long (up to 2s) in some cases.

我已经保存了 es RDD.是否还有其他方法可以优化此类任务,或者与在此类数据集上进行操作时获得的效果一样好?

I have already persisted the es RDD. Is there any other leverage for optimizing such a task or is this just as good as it gets when operating on such a dataset?

我注意到 PairRDDFunctions 中的 lookup()方法,并考虑构造一个PairRDD,其中 word 字段将用作键.可能有帮助吗?通过实验得出任何结论都非常困难,因为其中涉及的因素太多.

I have noticed the lookup() method in PairRDDFunctions and considered to construct a PairRDD in which the word field would serve as the key. Might that help? Drawing any conclusions experimentally here is quite hard because there are so many factors involved.

推荐答案

实现的问题是,您会触发 words 中的每个单词,从而完全遍历 RDD 然后收集元素.解决问题的一种方法是将单词序列与您的 RDD :

The problem with your implementation is that you trigger for each word in words a complete traversal of your RDD and then collect the elements. One way to solve your problem is to join the sequence of words with your RDD:

case class E(word: String, value: Int)

object App {

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)

    val entries = sc.parallelize(List(E("a", 1), E("b", 2), E("c", 3), E("c", 3)))

    val words = Seq("a", "a", "c")

    val wordsRDD = sc.parallelize(words).map(x => (x, x))

    val matchingEntries = entries
      .map(x => (x.word, x))
      .join(wordsRDD)
      .map{
        case (_, (entry, _)) => entry
      }
      .collect

    println(matchingEntries.mkString("\n"))
  }
}

输出为

E(a,1)
E(a,1)
E(c,3)
E(c,3)

这篇关于Spark:在RDD中查找元素的最快方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆