Spark在RDD的每次迭代中找到先前的值 [英] Spark find previous value on each iteration of RDD

查看:71
本文介绍了Spark在RDD的每次迭代中找到先前的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遵循以下代码:-

val rdd = sc.cassandraTable("db", "table").select("id", "date", "gpsdt").where("id=? and date=? and gpsdt>? and gpsdt<?", entry(0), entry(1), entry(2) , entry(3))
val rddcopy = rdd.sortBy(row => row.get[String]("gpsdt"), false).zipWithIndex()
rddcopy.foreach { records =>
  {
    val previousRow = (records - 1)th row
    val currentRow = records
// Some calculation based on both rows 
    }
}

因此,Idea是在RDD的每次迭代中仅获得上一个\下一行.我想根据上一行的当前值来计算当前行的某些字段.谢谢,

So, Idea is to get just previous \ next row on each iteration of RDD. I want to calculate some field on current row based on the value present on previous row. Thanks,

推荐答案

编辑II:以下被误解的问题是如何获取滚动窗口语义,但需要滑动窗口.考虑到这是一个排序的RDD

EDIT II: Misunderstood question below is how to get tumbling window semantics but sliding window is needed. considering this is a sorted RDD

import org.apache.spark.mllib.rdd.RDDFunctions._
sortedRDD.sliding(2)

应该可以解决问题.但是请注意,这使用的是DeveloperAPI.

should do the trick. Note however that this is using a DeveloperAPI.

或者您可以

val l = sortedRdd.zipWithIndex.map(kv => (kv._2, kv._1))
val r = sortedRdd.zipWithIndex.map(kv => (kv._2-1, kv._1))
val sliding = l.join(r)

rdd联接应该是内部联接(IIRC),因此删除了元组将部分为空的边缘情况

rdd joins should be inner joins (IIRC) thus dropping the edge cases where the tuples would be partially null

旧东西:

您如何确定上一行?RDD本身没有任何稳定的排序.如果您有递增的密集键,则可以添加一个新列,该列的计算方式如下: if(k%2 == 0)k/2 else(k-1)/2 对于两个连续键具有相同值的键.然后,您可以分组.

how do you do identify the previous row? RDDs do not have any sort of stable ordering by themselves. if you have an incrementing dense key you could add a new column that get's calculated the following way if (k % 2 == 0) k / 2 else (k-1)/2 this should give you a key that has the same value for two successive keys. Then you could just group by.

但是要重申的是,在大多数情况下,对于RDD(取决于分区,数据源等),并没有真正意义上的 previous

But to reiterate there is no really sensible notion of previous in most cases for RDDs (depending on partitioning, datasource etc.)

因此,现在您有了一个zipWithIndex并在您的集合中进行了排序,您可以执行我上面提到的操作.因此,现在您有了 RDD [(Int,YourData)] 并可以完成

so now that you have a zipWithIndex and an ordering in your set you can do what I mentioned above. So now you have an RDD[(Int, YourData)] and can do

rdd.map( kv => if (kv._1 % 2 == 0) (kv._1 / 2, kv._2) else ( (kv._1 -1) /2, kv._2 ) ).groupByKey.foreach (/* your stuff here /*)

如果您在任何时候减少,请考虑使用 reduceByKey 而不是 groupByKey().reduce

if you reduce at any point consider using reduceByKey rather than groupByKey().reduce

这篇关于Spark在RDD的每次迭代中找到先前的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆