PySpark:如何重新采样频率 [英] PySpark: how to resample frequencies

查看:143
本文介绍了PySpark:如何重新采样频率的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

想象一下一个由变量值观察组成的Spark数据框.每个观察都有一个特定的时间戳,并且这些时间戳在不同变量之间是不同的.这是因为时间戳是在变量的值更改并记录时生成的.

Imagine a Spark Dataframe consisting of value observations from variables. Each observation has a specific timestamp and those timestamps are not the same between different variables. This is because the timestamp is generated when the value of a variable changed and is recorded.

#Variable     Time                Value
#852-YF-007   2016-05-10 00:00:00 0
#852-YF-007   2016-05-09 23:59:00 0
#852-YF-007   2016-05-09 23:58:00 0

问题,我想使用前向填充将所有变量设置为相同的频率(例如10分钟).为了实现这一目标,我从"Python for Data Analysis"一书中复制了一页. 问题:如何在Spark Dataframe上以有效的方式做到这一点?

Problem I would like to put all variables into the same frequency (for instance 10min) using forward-fill. To visualize this, I copied a page from the Book "Python for Data Analysis". Question: How to do that on a Spark Dataframe in an efficient way?

推荐答案

问题:如何在Spark Dataframe上高效地做到这一点?

Question: How to do that on a Spark Dataframe in an efficient way?

对于这样的操作,

火花DataFrame根本不是一个好选择.通常,SQL原语不能充分表达,PySpark DataFrame不提供实现它所需的低级访问.

Spark DataFrame is simply not a good choice for an operation like this one. In general SQL primitives won't be expressive enough and PySpark DataFrame doesn't provide low level access required to implement it.

虽然可以使用纪元/时间戳算法轻松地表示重新采样.使用这样的数据:

While re-sampling can be easily represented using epoch / timestamp arithmetics. With data like this:

from pyspark.sql.functions import col, max as max_, min as min_

df = (spark  
    .createDataFrame([
        ("2012-06-13", 0.694), ("2012-06-20", -2.669), ("2012-06-27", 0.245)],   
        ["ts", "val"])        
   .withColumn("ts", col("ts").cast("date").cast("timestamp")))

我们可以对输入进行重新采样:

we can re-sample input:

day = 60 * 60 * 24
epoch = (col("ts").cast("bigint") / day).cast("bigint") * day

with_epoch = df.withColumn("epoch", epoch)

min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()

并加入参考:

# Reference range 
ref = spark.range(
    min_epoch, max_epoch + 1, day
).toDF("epoch")

(ref
    .join(with_epoch, "epoch", "left")
    .orderBy("epoch")
    .withColumn("ts_resampled", col("epoch").cast("timestamp"))
    .show(15, False))

## +----------+---------------------+------+---------------------+   
## |epoch     |ts                   |val   |ts_resampled         |
## +----------+---------------------+------+---------------------+
## |1339459200|2012-06-13 00:00:00.0|0.694 |2012-06-12 02:00:00.0|
## |1339545600|null                 |null  |2012-06-13 02:00:00.0|
## |1339632000|null                 |null  |2012-06-14 02:00:00.0|
## |1339718400|null                 |null  |2012-06-15 02:00:00.0|
## |1339804800|null                 |null  |2012-06-16 02:00:00.0|
## |1339891200|null                 |null  |2012-06-17 02:00:00.0|
## |1339977600|null                 |null  |2012-06-18 02:00:00.0|
## |1340064000|2012-06-20 00:00:00.0|-2.669|2012-06-19 02:00:00.0|
## |1340150400|null                 |null  |2012-06-20 02:00:00.0|
## |1340236800|null                 |null  |2012-06-21 02:00:00.0|
## |1340323200|null                 |null  |2012-06-22 02:00:00.0|
## |1340409600|null                 |null  |2012-06-23 02:00:00.0|
## |1340496000|null                 |null  |2012-06-24 02:00:00.0|
## |1340582400|null                 |null  |2012-06-25 02:00:00.0|
## |1340668800|2012-06-27 00:00:00.0|0.245 |2012-06-26 02:00:00.0|
## +----------+---------------------+------+---------------------+

使用低级API可以填充这样的数据,正如我在 Spark/Scala的答案中所显示的那样:正向填充最后一次观察.使用RDD,我们还可以避免对数据进行两次改组(一次用于连接,一次用于重新排序).

Using low level APIs it is possible to fill data like this as I've shown in my answer to Spark / Scala: forward fill with last observation. Using RDDs we could also avoid shuffling data twice (once for join, once for reordering).

但是这里还有更多重要的问题.当问题可以简化为按元素或按分区的计算时,Spark的性能最佳.尽管可以进行正向填充,但据我所知,通常使用的时间序列模型通常不是这种情况,并且如果某些操作需要顺序访问,那么Spark根本不会提供任何好处.

But there is much more important problem here. Spark performs optimally when problem can be reduced to element wise or partition wise computations. While forward fill is the case when it is possible, as far as I am aware this is typically not the case with commonly used time series models and if some operation requires a sequential access then Spark won't provide any benefits at all.

因此,如果您处理的序列足够大,需要分布式数据结构,则可能需要将其聚合到可以由一台计算机轻松处理的某个对象,然后使用您喜欢的非分布式工具来处理休息.

So if you work with series which are large enough to require distributed data structure you'll probably want to aggregate it to some object that can be easily handled by a single machine and then use your favorite non-distributed tool to handle the rest.

如果您使用多个可以在内存中处理的时间序列,那么当然会有sparkts,但是我知道您已经意识到这一点.

If you work with multiple time series where each can be handled in memory then there is of course sparkts, but I know you're already aware of that.

这篇关于PySpark:如何重新采样频率的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆