是否有一个RDD变换函数,着眼于相邻的元素呢? [英] Is there an RDD transform function that looks at neighboring elements?

查看:131
本文介绍了是否有一个RDD变换函数,着眼于相邻的元素呢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有谁知道,如果有一种方法时变换一下在排序RDD相邻的元素呢?我知道我可以收集,然后做这样的操作作为一个在下面的例子,但它那种违背了分布式系统的目的,我想利用这个事实,它的分配。

例如:

(字符串名称,诠释VAL)地图RDD到RDD(字符串名称,诠释VAL,诠释差异)

使得:

 名称| VAL成为 - >名称| VAL |差异(电流 - 前)
A | 3 | 3 | 3
C | 6 C | 6 | 3
B | 4℃| 4 | -2
开发| 20 D | 20 | 16


解决方案

也许最有效的方法简单的方法是一个RDD转换为数据帧和使用滞后:

 案例类NameValue(名称:字符串,值:智力)
VAL RDD = sc.parallelize(
    NameValue(一,3):: NameValue(B,6)::
    NameValue(C,4):: NameValue(D,20)::无)VAL DF = sqlContext.createDataFrame(RDD)
df.registerTempTable(DF)
sqlContext.sql(选择名称,价值,
                  值 - 滞后(值)OVER(ORDER BY名称,值)滞后
                  从DF,)。秀

不幸的是在这一刻窗口功能,而 PARTITION BY 子句的所有数据移动到单个分区,所以如果你有大量的数据集是特别有用。

使用低级别的操作,您可以使用后跟 flatMap groupByKey zipWithIndex code>:

 案例类NameValueWithLag(名称:字符串,值:智力,滞后:智力)
VAL CNT = rdd.count() - 1RDD。
    zipWithIndex。
    flatMap {情况下(X,I)=> (0到1).MAP(滞后= GT(I - 滞后,(I,X)))}。
    groupByKey。
    过滤器{情况下(K,V)=> ķ!= CNT}。
    值。
    地图(丘壑= GT; {
        VAL分类= vals.toArray.sortBy(_._ 1).MAP(_._ 2)
        如果(sorted.length == 1){
            NameValueWithLag(排序(0).name和排序(0).value的,排序(0).value的)
        }其他{
            NameValueWithLag(
               排序(1).name和排序(1).value的,
               排序(1)。价值 - 排序(0)。价值
            )
        }
    })

编辑:

如果你不介意使用API​​的开发人员那里,你可以尝试 RDDFunctions.sliding ,但它需要手动处理

 进口org.apache.spark.mllib.rdd.RDDFunctions._VAL第一= rdd.first匹配{
  情况NameValue(名称,值)=> NameValueWithLag(名称,价值,价值)
}sc.parallelize(SEQ(第一))。工会(RDD
  .sliding(2)
  .MAP(一个= GT; NameValueWithLag(一(1).name和一(1)。价值,一(1)。价值 - 一个(0)。价值)))

Does anyone know if there is a way during a transform to look at neighboring elements in a sorted RDD? I know I can collect and then do such an operation as the one in the below example, however it kind of defeats the purpose of a distributed system and I'm trying to leverage the fact that it's distributed.

Example:

RDD of (string name, int val) map to RDD of (string name, int val, int diff)

such that:

name | val     becomes ->      name | val | diff (current - prior)
a    | 3                       a    | 3   | 3
b    | 6                       b    | 6   | 3
c    | 4                       c    | 4   | -2
d    | 20                      d    | 20  | 16

解决方案

Probably the most efficient approach simplest approach is to convert a RDD to data frame and use lag:

case class NameValue(name: String, value: Int)
val rdd = sc.parallelize(
    NameValue("a", 3) ::  NameValue("b", 6) :: 
    NameValue("c", 4) ::  NameValue("d", 20) :: Nil)

val df = sqlContext.createDataFrame(rdd)
df.registerTempTable("df")
sqlContext.sql("""SELECT name, value,
                  value - lag(value) OVER (ORDER BY name, value) lag
                  FROM df""").show

Unfortunately at this moment window functions without PARTITION BY clause move all data to a single partition so it is particularly useful if you have large dataset.

Using low level operations you could use zipWithIndex followed by flatMap and groupByKey:

case class NameValueWithLag(name: String, value: Int, lag: Int)
val cnt = rdd.count() - 1

rdd.
    zipWithIndex.
    flatMap{case (x, i) => (0 to 1).map(lag => (i - lag, (i, x)))}.
    groupByKey.
    filter{ case (k, v) => k != cnt}.
    values.
    map(vals => {
        val sorted = vals.toArray.sortBy(_._1).map(_._2)
        if (sorted.length == 1) {
            NameValueWithLag(sorted(0).name, sorted(0).value, sorted(0).value)
        } else {
            NameValueWithLag(
               sorted(1).name, sorted(1).value,
               sorted(1).value - sorted(0).value
            )
        }
    })

Edit:

If you don't mind using developers API there you can try RDDFunctions.sliding but it requires manual processing

import org.apache.spark.mllib.rdd.RDDFunctions._

val first = rdd.first match {
  case NameValue(name, value) => NameValueWithLag(name, value, value)
}

sc.parallelize(Seq(first)).union(rdd
  .sliding(2)
  .map(a => NameValueWithLag(a(1).name, a(1).value, a(1).value - a(0).value)))

这篇关于是否有一个RDD变换函数,着眼于相邻的元素呢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆