使用Python进行有效的滚动修整 [英] Efficient rolling trimmed mean with Python

查看:65
本文介绍了使用Python进行有效的滚动修整的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

用Python计算滚动(即移动窗口)修剪均值的最有效方法是什么?

What's the most efficient way to calculate a rolling (aka moving window) trimmed mean with Python?

例如,对于一个5万行的数据集和一个窗口大小为50的数据,对于每一行,我需要获取最后的50行,删除顶部和底部的3个值(窗口大小的5%,四舍五入) ,并获取其余44个值的平均值.

For example, for a data set of 50K rows and a window size of 50, for each row I need to take the last 50 rows, remove the top and bottom 3 values (5% of the window size, rounded up), and get the average of the remaining 44 values.

目前,我正在为每一行切片以获取窗口,对窗口进行排序,然后进行切片以对其进行修剪.它的工作速度很慢,但是必须有一种更有效的方法.

Currently for each row I'm slicing to get the window, sorting the window and then slicing to trim it. It works, slowly, but there has to be a more efficient way.

示例

[10,12,8,13,7,18,19,9,15,14] # data used for example, in real its a 50k lines df

窗口大小为5.对于每一行,我们查看最后5个行,对其进行排序,并丢弃前1行和后1行(5的5%= 0.25,向上舍入为1).然后我们平均剩余的中间行.

for a window size of 5. For each row we look at the last 5 rows, sort them and discard 1 top and 1 bottom row (5% of 5 = 0.25, rounded up to 1). Then we average the remaining middle rows.

用于将此示例集生成为DataFrame的代码

pd.DataFrame({
    'value': [10, 12, 8, 13, 7, 18, 19, 9, 15, 14],
    'window_of_last_5_values': [
        np.NaN, np.NaN, np.NaN, np.NaN, '10,12,8,13,7', '12,8,13,7,18',
        '8,13,7,18,19', '13,7,18,19,9', '7,18,19,9,15', '18,19,9,15,14'
    ],
    'values that are counting for average': [
        np.NaN, np.NaN, np.NaN, np.NaN, '10,12,8', '12,8,13', '8,13,18',
        '13,18,9', '18,9,15', '18,15,14'
    ],
    'result': [
        np.NaN, np.NaN, np.NaN, np.NaN, 10.0, 11.0, 13.0, 13.333333333333334,
        14.0, 15.666666666666666
    ]
})

天真的实现的示例代码

window_size = 5
outliers_to_remove = 1

for index in range(window_size - 1, len(df)):
    current_window = df.iloc[index - window_size + 1:index + 1]
    trimmed_mean = current_window.sort_values('value')[
        outliers_to_remove:window_size - outliers_to_remove]['value'].mean()
    # save the result and the window content somewhere

关于DataFrame vs列表vs NumPy数组的说明

仅通过将数据从DataFrame移到列表中,我就可以使用相同的算法将速度提高3.5倍.有趣的是,使用NumPy数组也可以提供几乎相同的速度提升.尽管如此,仍然必须有一种更好的方法来实现这一目标并实现数量级的提升.

Just by moving the data from a DataFrame to a list, I'm getting a 3.5x speed boost with the same algorithm. Interestingly, using a NumPy array also gives almost the same speed boost. Still, there must be a better way to implement this and achieve an orders-of-magnitude boost.

推荐答案

可能会派上用场的一个观察结果是,您无需在每个步骤中对所有值进行排序.相反,如果您确保窗口始终处于排序状态,那么您要做的就是在相关位置插入新值,然后从旧位置删除旧值,这两个操作都可以在O(log_2 (window_size))使用 bisect .实际上,这看起来像是

One observation that could come in handy is that you do not need to sort all the values at each step. Rather, if you ensure that the window is always sorted, all you need to do is insert the new value at the relevant spot, and remove the old one from where it was, both of which are operations that can be done in O(log_2(window_size)) using bisect. In practice, this would look something like

def rolling_mean(data):
    x = sorted(data[:49])
    res = np.repeat(np.nan, len(data))
    for i in range(49, len(data)):
        if i != 49:
            del x[bisect.bisect_left(x, data[i - 50])]
        bisect.insort_right(x, data[i])
        res[i] = np.mean(x[3:47])
    return res

现在,在这种情况下,额外的好处是少于scipy.stats.trim_mean所依赖的矢量化所获得的好处,因此,特别是,它仍然比@ChrisA的解决方案慢,但这是有用的进一步优化性能的起点.

Now, the additional benefit in this case turns out to be less than what is gained by the vectorization that scipy.stats.trim_mean relies on, and so in particular, this will still be slower than @ChrisA's solution, but it is a useful starting point for further performance optimization.

> data = pd.Series(np.random.randint(0, 1000, 50000))
> %timeit data.rolling(50).apply(lambda w: trim_mean(w, 0.06))
727 ms ± 34.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
> %timeit rolling_mean(data.values)
812 ms ± 42.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

值得注意的是,Numba的抖动在这种情况下通常很有用,但也没有任何好处:

Notably, Numba's jitter, which is often useful in situations like these, also provides no benefit:

> from numba import jit
> rolling_mean_jit = jit(rolling_mean)
> %timeit rolling_mean_jit(data.values)
1.05 s ± 183 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

以下看似并非最佳的方法要优于上面考虑的其他两种方法:

The following, seemingly far-from-optimal, approach outperforms both of the other approaches considered above:

def rolling_mean_np(data):
    res = np.repeat(np.nan, len(data))
    for i in range(len(data)-49):
        x = np.sort(data[i:i+50])
        res[i+49] = x[3:47].mean()
    return res

时间:

> %timeit rolling_mean_np(data.values)
564 ms ± 4.44 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

此外,这一次,JIT编译做了 帮助:

What is more, this time around, JIT compilation does help:

> rolling_mean_np_jit = jit(rolling_mean_np)
> %timeit rolling_mean_np_jit(data.values)
94.9 ms ± 605 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

在处理此问题时,让我们快速验证一下它是否确实符合我们的预期:

While we're at it, let's just quickly verify that this actually does what we expect it to:

> np.all(rolling_mean_np_jit(data.values)[49:] == data.rolling(50).apply(lambda w: trim_mean(w, 0.06)).values[49:])
True

实际上,只需一点点帮助分拣器,我们就可以减少2的因素,使总时间减少到57毫秒:

In fact, by helping out the sorter just a little bit, we can squeeze out another factor of 2, taking the total time down to 57 ms:

def rolling_mean_np_manual(data):
    x = np.sort(data[:50])
    res = np.repeat(np.nan, len(data))
    for i in range(50, len(data)+1):
        res[i-1] = x[3:47].mean()
        if i != len(data):
            idx_old = np.searchsorted(x, data[i-50])
            x[idx_old] = data[i]
            x.sort()
    return res

> %timeit rolling_mean_np_manual(data.values)
580 ms ± 23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
> rolling_mean_np_manual_jit = jit(rolling_mean_np_manual)
> %timeit rolling_mean_np_manual_jit(data.values)
57 ms ± 5.89 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
> np.all(rolling_mean_np_manual_jit(data.values)[49:] == data.rolling(50).apply(lambda w: trim_mean(w, 0.06)).values[49:])
True

现在,在此示例中进行的排序"当然可以归结为将新元素放置在正确的位置,同时将所有元素之间移动一个.手动执行此操作将使纯Python代码变慢,但jitted版本的系数提高了2倍,使我们在30毫秒以下:

Now, the "sorting" that is going on in this example of course just boils down to placing the new element in the right place, while shifting everything in between by one. Doing this by hand will make the pure Python code slower, but the jitted version gains another factor of 2, taking us below 30 ms:

def rolling_mean_np_shift(data):
    x = np.sort(data[:50])
    res = np.repeat(np.nan, len(data))
    for i in range(50, len(data)+1):
        res[i-1] = x[3:47].mean()
        if i != len(data):
            idx_old, idx_new = np.searchsorted(x, [data[i-50], data[i]])
            if idx_old < idx_new:
                x[idx_old:idx_new-1] = x[idx_old+1:idx_new]
                x[idx_new-1] = data[i]
            elif idx_new < idx_old:
                x[idx_new+1:idx_old+1] = x[idx_new:idx_old]
                x[idx_new] = data[i]
            else:
                x[idx_new] = data[i]
    return res

> %timeit rolling_mean_np_shift(data.values)
937 ms ± 97.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
> rolling_mean_np_shift_jit = jit(rolling_mean_np_shift)
> %timeit rolling_mean_np_shift_jit(data.values)
26.4 ms ± 693 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
> np.all(rolling_mean_np_shift_jit(data.values)[49:] == data.rolling(50).apply(lambda w: trim_mean(w, 0.06)).values[49:])
True

这时,大多数时间都用在np.searchsorted中,因此让我们使搜索本身对JIT友好.采用 bisect 的源代码,我们让

At this point, most of the time is spent in np.searchsorted, so let us make the search itself JIT-friendly. Adopting the source code for bisect, we let

@jit
def binary_search(a, x):
    lo = 0
    hi = 50
    while lo < hi:
        mid = (lo+hi)//2
        if a[mid] < x: lo = mid+1
        else: hi = mid
    return lo

@jit
def rolling_mean_np_jitted_search(data):
    x = np.sort(data[:50])
    res = np.repeat(np.nan, len(data))
    for i in range(50, len(data)+1):
        res[i-1] = x[3:47].mean()
        if i != len(data):
            idx_old = binary_search(x, data[i-50])
            idx_new = binary_search(x, data[i])
            if idx_old < idx_new:
                x[idx_old:idx_new-1] = x[idx_old+1:idx_new]
                x[idx_new-1] = data[i]
            elif idx_new < idx_old:
                x[idx_new+1:idx_old+1] = x[idx_new:idx_old]
                x[idx_new] = data[i]
            else:
                x[idx_new] = data[i]
    return res

这使我们减少到12毫秒,比原始熊猫+ SciPy方法提高了60倍:

This takes us down to 12 ms, a x60 improvement over the raw pandas+SciPy approach:

> %timeit rolling_mean_np_jitted_search(data.values)
12 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

这篇关于使用Python进行有效的滚动修整的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆