PySpark:如何重新采样频率 [英] PySpark: how to resample frequencies

查看:27
本文介绍了PySpark:如何重新采样频率的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

想象一个由变量值观察组成的 Spark 数据框.每个观察都有一个特定的时间戳,并且这些时间戳在不同变量之间是不同的.这是因为时间戳是在变量的值发生变化并被记录时生成的.

#可变时间值#852-YF-007 2016-05-10 00:00:00 0#852-YF-007 2016-05-09 23:59:00 0#852-YF-007 2016-05-09 23:58:00 0

问题 我想使用前向填充将所有变量放入相同的频率(例如 10 分钟).为了可视化这一点,我从Python for Data Analysis"一书中复制了一页.问题:如何以高效的方式在 Spark Dataframe 上做到这一点?

解决方案

问题:如何以有效的方式在 Spark Dataframe 上做到这一点?

Spark DataFrame 对于这样的操作来说根本不是一个好的选择.一般来说,SQL 原语的表达能力不够强,而且 PySpark DataFrame 不提供实现它所需的低级访问.

虽然可以使用纪元/时间戳算法轻松表示重采样.像这样的数据:

from pyspark.sql.functions import col, max as max_, min as min_df =(火花.createDataFrame([(2012-06-13", 0.694), (2012-06-20", -2.669), (2012-06-27", 0.245)],[ts",val"]).withColumn("ts", col("ts").cast("date").cast("timestamp")))

我们可以重新采样输入:

day = 60 * 60 * 24epoch = (col("ts").cast("bigint")/day).cast("bigint") * daywith_epoch = df.withColumn("epoch", epoch)min_epoch, max_epoch = with_epoch.select(min_(epoch"), max_(epoch")).first()

并参考加入:

# 参考范围ref = spark.range(min_epoch, max_epoch + 1, 天).toDF(纪元")(参考.join(with_epoch, "epoch", "left").orderBy(纪元").withColumn("ts_resampled", col("epoch").cast("timestamp")).show(15, 假))## +------------+------------+-------+---------------------+## |epoch |ts |val |ts_resampled |## +------------+------------+-------+---------------------+## |1339459200|2012-06-13 00:00:00.0|0.694 |2012-06-12 02:00:00.0|## |1339545600|null |null |2012-06-13 02:00:00.0|## |1339632000|null |null |2012-06-14 02:00:00.0|## |1339718400|null |null |2012-06-15 02:00:00.0|## |1339804800|null |null |2012-06-16 02:00:00.0|## |1339891200|null |null |2012-06-17 02:00:00.0|## |1339977600|null |null |2012-06-18 02:00:00.0|## |1340064000|2012-06-20 00:00:00.0|-2.669|2012-06-19 02:00:00.0|## |1340150400|null |null |2012-06-20 02:00:00.0|## |1340236800|null |null |2012-06-21 02:00:00.0|## |1340323200|null |null |2012-06-22 02:00:00.0|## |1340409600|null |null |2012-06-23 02:00:00.0|## |1340496000|null |null |2012-06-24 02:00:00.0|## |1340582400|null |null |2012-06-25 02:00:00.0|## |1340668800|2012-06-27 00:00:00.0|0.245 |2012-06-26 02:00:00.0|## +------------+------------+-------+---------------------+

在 Spark >= 3.1 中替换

col("epoch").cast("timestamp")

 from pyspark.sql.functions import timestamp_seconds时间戳_秒(纪元")

使用低级 API 可以像我在对 Spark/Scala: forward fill 的回答中展示的那样填充数据最后一次观察.使用 RDD,我们还可以避免两次混洗数据(一次用于连接,一次用于重新排序).

但是这里还有更重要的问题.当问题可以简化为按元素或按分区进行计算时,Spark 的性能最佳.虽然前向填充是可能的情况,但据我所知,常用的时间序列模型通常不是这种情况,如果某些操作需要顺序访问,那么 Spark 根本不会提供任何好处.

因此,如果您使用大到需要分布式数据结构的系列,您可能希望将其聚合到某个可以由单台机器轻松处理的对象,然后使用您最喜欢的非分布式工具来处理休息.

如果您处理多个时间序列,每个时间序列都可以在内存中处理,那么当然有 sparkts,但我知道您已经意识到这一点.

Imagine a Spark Dataframe consisting of value observations from variables. Each observation has a specific timestamp and those timestamps are not the same between different variables. This is because the timestamp is generated when the value of a variable changed and is recorded.

#Variable     Time                Value
#852-YF-007   2016-05-10 00:00:00 0
#852-YF-007   2016-05-09 23:59:00 0
#852-YF-007   2016-05-09 23:58:00 0

Problem I would like to put all variables into the same frequency (for instance 10min) using forward-fill. To visualize this, I copied a page from the Book "Python for Data Analysis". Question: How to do that on a Spark Dataframe in an efficient way?

解决方案

Question: How to do that on a Spark Dataframe in an efficient way?

Spark DataFrame is simply not a good choice for an operation like this one. In general SQL primitives won't be expressive enough and PySpark DataFrame doesn't provide low level access required to implement it.

While re-sampling can be easily represented using epoch / timestamp arithmetics. With data like this:

from pyspark.sql.functions import col, max as max_, min as min_

df = (spark  
    .createDataFrame([
        ("2012-06-13", 0.694), ("2012-06-20", -2.669), ("2012-06-27", 0.245)],   
        ["ts", "val"])        
   .withColumn("ts", col("ts").cast("date").cast("timestamp")))

we can re-sample input:

day = 60 * 60 * 24
epoch = (col("ts").cast("bigint") / day).cast("bigint") * day

with_epoch = df.withColumn("epoch", epoch)

min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()

and join with reference:

# Reference range 
ref = spark.range(
    min_epoch, max_epoch + 1, day
).toDF("epoch")

(ref
    .join(with_epoch, "epoch", "left")
    .orderBy("epoch")
    .withColumn("ts_resampled", col("epoch").cast("timestamp"))
    .show(15, False))

## +----------+---------------------+------+---------------------+   
## |epoch     |ts                   |val   |ts_resampled         |
## +----------+---------------------+------+---------------------+
## |1339459200|2012-06-13 00:00:00.0|0.694 |2012-06-12 02:00:00.0|
## |1339545600|null                 |null  |2012-06-13 02:00:00.0|
## |1339632000|null                 |null  |2012-06-14 02:00:00.0|
## |1339718400|null                 |null  |2012-06-15 02:00:00.0|
## |1339804800|null                 |null  |2012-06-16 02:00:00.0|
## |1339891200|null                 |null  |2012-06-17 02:00:00.0|
## |1339977600|null                 |null  |2012-06-18 02:00:00.0|
## |1340064000|2012-06-20 00:00:00.0|-2.669|2012-06-19 02:00:00.0|
## |1340150400|null                 |null  |2012-06-20 02:00:00.0|
## |1340236800|null                 |null  |2012-06-21 02:00:00.0|
## |1340323200|null                 |null  |2012-06-22 02:00:00.0|
## |1340409600|null                 |null  |2012-06-23 02:00:00.0|
## |1340496000|null                 |null  |2012-06-24 02:00:00.0|
## |1340582400|null                 |null  |2012-06-25 02:00:00.0|
## |1340668800|2012-06-27 00:00:00.0|0.245 |2012-06-26 02:00:00.0|
## +----------+---------------------+------+---------------------+

In Spark >= 3.1 replace

col("epoch").cast("timestamp")

with

from pyspark.sql.functions import timestamp_seconds

timestamp_seconds("epoch")

Using low level APIs it is possible to fill data like this as I've shown in my answer to Spark / Scala: forward fill with last observation. Using RDDs we could also avoid shuffling data twice (once for join, once for reordering).

But there is much more important problem here. Spark performs optimally when problem can be reduced to element wise or partition wise computations. While forward fill is the case when it is possible, as far as I am aware this is typically not the case with commonly used time series models and if some operation requires a sequential access then Spark won't provide any benefits at all.

So if you work with series which are large enough to require distributed data structure you'll probably want to aggregate it to some object that can be easily handled by a single machine and then use your favorite non-distributed tool to handle the rest.

If you work with multiple time series where each can be handled in memory then there is of course sparkts, but I know you're already aware of that.

这篇关于PySpark:如何重新采样频率的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆