如何在Pyspark滑动窗口在时间序列数据转换数据 [英] How to transform data with sliding window over time series data in Pyspark

查看:1369
本文介绍了如何在Pyspark滑动窗口在时间序列数据转换数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图提取的基础上在时间序列数据滑动窗口的功能。
在Scala中,好像有一个滑动功能基于这个帖子和<一个href=\"http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions\"相对=nofollow>文档

I am trying to extract features based on sliding window over time series data. In Scala, it seems like there is a sliding function based on this post and the documentation

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()

我的问题是有没有PySpark类似的功能?或者,我们如何实现类似的滑动窗口的转换,如果没有这样的功能了吗?

My questions is there similar functions in PySpark? Or how do we achieve similar sliding window transformations if there is no such function yet?

推荐答案

据我可以告诉滑动功能不可用Python和 SlidingRDD 是一个私有类,不能被外界 MLlib 访问。

As far as I can tell sliding function is not available from Python and SlidingRDD is a private class and cannot be accessed outside MLlib.

如果您使用滑动上的现有RDD可以创建穷人的滑动是这样的:

If you to use sliding on an existing RDD you can create poor man's sliding like this:

def sliding(rdd, n):
    assert n > 0
    def gen_window(xi, n):
        i, x = xi
        return [(i - offset, x) for offset in xrange(n)]

    return (
        rdd.
        zipWithIndex(). # Add index
        flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
        groupBy(lambda ix: ix[0]). # Group to create windows
        # Sort values to ensure order inside window and drop indices
        mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
        sortByKey(). # Sort to makes sure we keep original order
        values(). # Get values
        filter(lambda x: len(x) == n)) # Drop beginning and end

另外,您可以尝试这样的事情(使用 toolz

from toolz.itertoolz import sliding_window, concat

def sliding2(rdd, n):
    assert n > 1

    def get_last_el(i, iter):
        """Return last n - 1 elements from the partition"""
        return  [(i, [x for x in iter][(-n + 1):])]

    def slide(i, iter):
        """Prepend previous items and return sliding window"""
        return sliding_window(n, concat([last_items.value[i - 1], iter]))

    def clean_last_items(last_items):
        """Adjust for empty or to small partitions"""
        clean = {-1: [None] * (n - 1)}
        for i in range(rdd.getNumPartitions()):
            clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
        return {k: tuple(v) for k, v in clean.items()}

    last_items = sc.broadcast(clean_last_items(
        rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))

    return rdd.mapPartitionsWithIndex(slide)

这篇关于如何在Pyspark滑动窗口在时间序列数据转换数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆