Scala的收集星火低效? [英] Scala's collect inefficient in Spark?
问题描述
我现在开始学习使用火花塞使用Scala。我工作的问题,我需要读取文件,拆分对某个角色的每一行,然后过滤其中的一列匹配的predicate最后删除列的行。所以基本的,幼稚的做法是一张地图,然后过滤,然后另一个地图。
这意味着通过收集去3次,这似乎相当不合理给我。所以,我想通过一集(这需要一个部分函数作为参数的collect)替换它们。而令我惊讶的是,这使得它运行慢得多。我定期Scala集合本地审判;如预期的,这样做的,后者的方法是要快得多。
那么,为什么会这样?我的想法是,在地图上与过滤器和地图不顺序地施加,而是混入一个操作;换句话说,当一个动作的力评价列表的每个元素将被检查,并挂起的操作将被执行。是对的吗 ?但即使如此,为什么收集执行如此糟糕?
编辑:一个code为例,说明了我想做的事:
天真的方式:
sc.textFile(...)图(L =方式> {
VAL S = l.split()
(S(0),S(1))
})。过滤器(_._ 2.contains(你好))。图(_._ 1)
的收集方式:
sc.textFile(...)。收集{
如果s如果(s.split()(0)。载(你好))=>秒(0)
}
答案就在于实施收集
:
/ **
*返回通过应用'F`包含所有匹配值的RDD。
* /
高清采集[U:ClassTag](F:PartialFunction [T,U):RDD [U] = {withScope
VAL cleanF = sc.clean(F)
过滤器(cleanF.isDefinedAt).MAP(cleanF)
}
正如你所看到的,它是相同的序列过滤
- > 地图
,但在你的情况下,效率较低
在斯卡拉两个 isDefinedAt
和适用
PartialFunction
评估如果
部分。
所以,在你的收集的例子拆分
将每个输入单元进行两次。
I am currently starting to learn to use spark with Scala. The problem I am working on needs me to read a file, split each line on a certain character, then filtering the lines where one of the columns matches a predicate and finally remove a column. So the basic, naive implementation is a map, then a filter then another map.
This meant going through the collection 3 times and that seemed quite unreasonable to me. So I tried replacing them by one collect (the collect that takes a partial function as an argument). And much to my surprise, this made it run much slower. I tried locally on regular Scala collections; as expected, the latter way of doing is much faster.
So why is that ? My idea is that the map and filter and map are not applied sequentially, but rather mixed into one operation; in other words, when an action forces evaluation every element of the list will be checked and the pending operations will be executed. Is that right ? But even so, why do the collect perform so badly ?
EDIT: a code example to show what I want to do:
The naive way:
sc.textFile(...).map(l => {
val s = l.split(" ")
(s(0), s(1))
}).filter(_._2.contains("hello")).map(_._1)
The collect way:
sc.textFile(...).collect {
case s if(s.split(" ")(0).contains("hello")) => s(0)
}
The answer lies in the implementation of collect
:
/**
* Return an RDD that contains all matching values by applying `f`.
*/
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
filter(cleanF.isDefinedAt).map(cleanF)
}
As you can see, it's the same sequence of filter
->map
, but less efficient in your case.
In scala both isDefinedAt
and apply
methods of PartialFunction
evaluate if
part.
So, in your "collect" example split
will be performed twice for each input element.
这篇关于Scala的收集星火低效?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!