Scala的收集星火低效? [英] Scala's collect inefficient in Spark?

查看:179
本文介绍了Scala的收集星火低效?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我现在开始学习使用火花塞使用Scala。我工作的问题,我需要读取文件,拆分对某个角色的每一行,然后过滤其中的一列匹配的predicate最后删除列的行。所以基本的,幼稚的做法是一张地图,然后过滤,然后另一个地图。

这意味着通过收集去3次,这似乎相当不合理给我。所以,我想通过一集(这需要一个部分函数作为参数的collect)替换它们。而令我惊讶的是,这使得它运行慢得多。我定期Scala集合本地审判;如预期的,这样做的,后者的方法是要快得多。

那么,为什么会这样?我的想法是,在地图上与过滤器和地图不顺序地施加,而是混入一个操作;换句话说,当一个动作的力评价列表的每个元素将被检查,并挂起的操作将被执行。是对的吗 ?但即使如此,为什么收集执行如此糟糕?

编辑:一个code为例,说明​​了我想做的事:

天真的方式:

  sc.textFile(...)图(L =方式> {
  VAL S = l.split()
  (S(0),S(1))
})。过滤器(_._ 2.contains(你好))。图(_._ 1)

的收集方式:

  sc.textFile(...)。收集{
  如果s如果(s.split()(0)。载(你好))=>秒(0)
}


解决方案

答案就在于实施收集

  / **
 *返回通过应用'F`包含所有匹配值的RDD。
 * /
高清采集[U:ClassTag](F:PartialFunction [T,U):RDD [U] = {withScope
  VAL cleanF = sc.clean(F)
  过滤器(cleanF.isDefinedAt).MAP(cleanF)
}

正如你所看到的,它是相同的序列过滤 - > 地图,但在你的情况下,效率较低

在斯卡拉两个 isDefinedAt 适用 PartialFunction 评估如果部分。

所以,在你的收集的例子拆分将每个输入单元进行两次。

I am currently starting to learn to use spark with Scala. The problem I am working on needs me to read a file, split each line on a certain character, then filtering the lines where one of the columns matches a predicate and finally remove a column. So the basic, naive implementation is a map, then a filter then another map.

This meant going through the collection 3 times and that seemed quite unreasonable to me. So I tried replacing them by one collect (the collect that takes a partial function as an argument). And much to my surprise, this made it run much slower. I tried locally on regular Scala collections; as expected, the latter way of doing is much faster.

So why is that ? My idea is that the map and filter and map are not applied sequentially, but rather mixed into one operation; in other words, when an action forces evaluation every element of the list will be checked and the pending operations will be executed. Is that right ? But even so, why do the collect perform so badly ?

EDIT: a code example to show what I want to do:

The naive way:

sc.textFile(...).map(l => {
  val s = l.split(" ") 
  (s(0), s(1))
}).filter(_._2.contains("hello")).map(_._1)

The collect way:

sc.textFile(...).collect {
  case s if(s.split(" ")(0).contains("hello")) => s(0)
}

解决方案

The answer lies in the implementation of collect:

/**
 * Return an RDD that contains all matching values by applying `f`.
 */
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  filter(cleanF.isDefinedAt).map(cleanF)
}

As you can see, it's the same sequence of filter->map, but less efficient in your case.

In scala both isDefinedAt and apply methods of PartialFunction evaluate if part.

So, in your "collect" example split will be performed twice for each input element.

这篇关于Scala的收集星火低效?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆