是否有一个RDD变换函数,着眼于相邻的元素呢? [英] Is there an RDD transform function that looks at neighboring elements?
问题描述
有谁知道,如果有一种方法时变换一下在排序RDD相邻的元素呢?我知道我可以收集,然后做这样的操作作为一个在下面的例子,但它那种违背了分布式系统的目的,我想利用这个事实,它的分配。
例如:
(字符串名称,诠释VAL)地图RDD到RDD(字符串名称,诠释VAL,诠释差异)
使得:
名称| VAL成为 - >名称| VAL |差异(电流 - 前)
A | 3 | 3 | 3
C | 6 C | 6 | 3
B | 4℃| 4 | -2
开发| 20 D | 20 | 16
也许最有效的方法简单的方法是一个RDD转换为数据帧和使用滞后:
案例类NameValue(名称:字符串,值:智力)
VAL RDD = sc.parallelize(
NameValue(一,3):: NameValue(B,6)::
NameValue(C,4):: NameValue(D,20)::无)VAL DF = sqlContext.createDataFrame(RDD)
df.registerTempTable(DF)
sqlContext.sql(选择名称,价值,
值 - 滞后(值)OVER(ORDER BY名称,值)滞后
从DF,)。秀
不幸的是在这一刻窗口功能,而 PARTITION BY
子句的所有数据移动到单个分区,所以如果你有大量的数据集是特别有用。
使用低级别的操作,您可以使用后跟 flatMap
和 groupByKey
zipWithIndex
code>:
案例类NameValueWithLag(名称:字符串,值:智力,滞后:智力)
VAL CNT = rdd.count() - 1RDD。
zipWithIndex。
flatMap {情况下(X,I)=> (0到1).MAP(滞后= GT(I - 滞后,(I,X)))}。
groupByKey。
过滤器{情况下(K,V)=> ķ!= CNT}。
值。
地图(丘壑= GT; {
VAL分类= vals.toArray.sortBy(_._ 1).MAP(_._ 2)
如果(sorted.length == 1){
NameValueWithLag(排序(0).name和排序(0).value的,排序(0).value的)
}其他{
NameValueWithLag(
排序(1).name和排序(1).value的,
排序(1)。价值 - 排序(0)。价值
)
}
})
编辑:
如果你不介意使用API的开发人员那里,你可以尝试 RDDFunctions.sliding
,但它需要手动处理
进口org.apache.spark.mllib.rdd.RDDFunctions._VAL第一= rdd.first匹配{
情况NameValue(名称,值)=> NameValueWithLag(名称,价值,价值)
}sc.parallelize(SEQ(第一))。工会(RDD
.sliding(2)
.MAP(一个= GT; NameValueWithLag(一(1).name和一(1)。价值,一(1)。价值 - 一个(0)。价值)))
Does anyone know if there is a way during a transform to look at neighboring elements in a sorted RDD? I know I can collect and then do such an operation as the one in the below example, however it kind of defeats the purpose of a distributed system and I'm trying to leverage the fact that it's distributed.
Example:
RDD of (string name, int val) map to RDD of (string name, int val, int diff)
such that:
name | val becomes -> name | val | diff (current - prior)
a | 3 a | 3 | 3
b | 6 b | 6 | 3
c | 4 c | 4 | -2
d | 20 d | 20 | 16
Probably the most efficient approach simplest approach is to convert a RDD to data frame and use lag:
case class NameValue(name: String, value: Int)
val rdd = sc.parallelize(
NameValue("a", 3) :: NameValue("b", 6) ::
NameValue("c", 4) :: NameValue("d", 20) :: Nil)
val df = sqlContext.createDataFrame(rdd)
df.registerTempTable("df")
sqlContext.sql("""SELECT name, value,
value - lag(value) OVER (ORDER BY name, value) lag
FROM df""").show
Unfortunately at this moment window functions without PARTITION BY
clause move all data to a single partition so it is particularly useful if you have large dataset.
Using low level operations you could use zipWithIndex
followed by flatMap
and groupByKey
:
case class NameValueWithLag(name: String, value: Int, lag: Int)
val cnt = rdd.count() - 1
rdd.
zipWithIndex.
flatMap{case (x, i) => (0 to 1).map(lag => (i - lag, (i, x)))}.
groupByKey.
filter{ case (k, v) => k != cnt}.
values.
map(vals => {
val sorted = vals.toArray.sortBy(_._1).map(_._2)
if (sorted.length == 1) {
NameValueWithLag(sorted(0).name, sorted(0).value, sorted(0).value)
} else {
NameValueWithLag(
sorted(1).name, sorted(1).value,
sorted(1).value - sorted(0).value
)
}
})
Edit:
If you don't mind using developers API there you can try RDDFunctions.sliding
but it requires manual processing
import org.apache.spark.mllib.rdd.RDDFunctions._
val first = rdd.first match {
case NameValue(name, value) => NameValueWithLag(name, value, value)
}
sc.parallelize(Seq(first)).union(rdd
.sliding(2)
.map(a => NameValueWithLag(a(1).name, a(1).value, a(1).value - a(0).value)))
这篇关于是否有一个RDD变换函数,着眼于相邻的元素呢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!