火花(斯卡拉)数据帧过滤(FIR) [英] spark (Scala) dataframe filtering (FIR)

查看:152
本文介绍了火花(斯卡拉)数据帧过滤(FIR)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

让说我有一个包含从CSV的数据的数据框(存储在斯卡拉VAL为东风

Let say I have a dataframe ( stored in scala val as df) which contains the data from a csv:

time,temperature
0,65
1,67
2,62
3,59

我没有问题,从文件作为Scala语言的火花数据帧阅读本。

which I have no problem reading this from file as a spark dataframe in scala language.

我想加一个过滤柱(由过滤器我的意思是信号处理的移动平均滤波),(说我想要做的(T [N] + T [N-1])/ 2.0

I would like to add a filtered column (by filter I meant signal processing moving average filtering), (say I want to do (T[n]+T[n-1])/2.0):

time,temperature,temperatureAvg
0,65,(65+0)/2.0
1,67,(67+65)/2.0
2,62,(62+67)/2.0
3,59,(59+62)/2.0

(其实,说的第一排,我想 32.5 而不是(65 + 0)/2.0 我写的澄清预计2时步过滤操作输出)

(Actually, say for the first row, I want 32.5 instead of (65+0)/2.0. I wrote it to clarify the expected 2-time-step filtering operation output)

那么,如何实现这一目标?我不熟悉相结合行火花数据帧反复操作沿列...

So how to achieve this? I am not familiar with spark dataframe operation which combine rows iteratively along column...

推荐答案

如果有分割你的数据,你可以使用窗口函数如下一种自然的方式:

If there is a natural way to partition your data you can use window functions as follows:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.mean

val w = Window.partitionBy($"id").orderBy($"time").rowsBetween(-1, 0)

val df = sc.parallelize(Seq(
    (1L, 0, 65), (1L, 1, 67), (1L, 2, 62), (1L, 3, 59)
)).toDF("id", "time", "temperature")

df.select($"*", mean($"temperature").over(w).alias("temperatureAvg")).show

// +---+----+-----------+--------------+                             
// | id|time|temperature|temperatureAvg|
// +---+----+-----------+--------------+
// |  1|   0|         65|          65.0|
// |  1|   1|         67|          66.0|
// |  1|   2|         62|          64.5|
// |  1|   3|         59|          60.5|
// +---+----+-----------+--------------+

您可以创建使用铅任意重量 / 滞后功能窗口:

You can create windows with arbitrary weights using lead / lag functions:

 lit(0.6) * $"temperature" + 
 lit(0.3) * lag($"temperature", 1) +
 lit(0.2) * lag($"temperature", 2)

这仍然是可能的,而不 partitionBy 条款,但将是非常低效的。如果是这种情况下,您将无法使用 DataFrames 。相反,可以使用滑动在RDD(例如,见在RDD操作上相邻元素使用Scala 火花)。还有火花时间序列的包可能对你有用。

It is still possible without partitionBy clause but will be extremely inefficient. If this is the case you won't be able to use DataFrames. Instead you can use sliding over RDD (see for example Operate neighbor elements in RDD on Spark with Scala). There is also spark-timeseries package you may find useful.

这篇关于火花(斯卡拉)数据帧过滤(FIR)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆