使用本网站的FIND_PEAKS检测绘制尖峰期间,从电光数据帧读取列表值的问题/错误 [英] Problem/bug with list values reading from Spark dataframe during plotting spikes detection using find_peaks from SciPy

查看:27
本文介绍了使用本网站的FIND_PEAKS检测绘制尖峰期间,从电光数据帧读取列表值的问题/错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有以下 pandas 数据帧随时间包含valuedate

import pandas as pd

pdf = pd.DataFrame(data={'date':['2020-10-16','2020-10-17','2020-10-18','2020-10-19','2020-10-20','2020-10-21','2020-10-22','2020-10-23','2020-10-24','2020-10-25','2020-10-26','2020-10-27','2020-10-28','2020-10-29','2020-10-30','2020-10-31','2020-11-01','2020-11-02','2020-11-03','2020-11-04','2020-11-05','2020-11-06','2020-11-07','2020-11-08','2020-11-09','2020-11-10','2020-11-11','2020-11-12','2020-11-13','2020-11-14','2020-11-15'],
                        'value':[161967, 161270, 148508, 152442, 157504, 157118, 155674, 134522, 213384, 163242, 217415, 221502, 146267, 143621, 145875, 139488, 104466, 94825, 143686, 151952, 161074, 161417, 135042, 148768, 131428, 127816, 151905, 180498, 177899, 193950, 12]})
pdf

或者我有下面的电光数据框有类似的数据:

import pyspark.sql.types
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType

dict  = [ ('2020-10-16', 161967),
          ('2020-10-17', 161270),
          ('2020-10-18', 148508),
          ('2020-10-19', 152442),
          ('2020-10-20', 157504),
          ('2020-10-21', 157118),
          ('2020-10-22', 155674),
          ('2020-10-23', 134522),
          ('2020-10-24', 213384),
          ('2020-10-25', 163242),
          ('2020-10-26', 217415),
          ('2020-10-27', 221502),
          ('2020-10-28', 146267),
          ('2020-10-29', 143621),
          ('2020-10-30', 145875),
          ('2020-10-31', 139488),
          ('2020-11-01', 104466),
          ('2020-11-02', 94825),
          ('2020-11-03', 143686),
          ('2020-11-04', 151952),
          ('2020-11-05', 161074),
          ('2020-11-06', 161417),
          ('2020-11-07', 135042),
          ('2020-11-08', 148768),
          ('2020-11-09', 131428),
          ('2020-11-10', 127816),
          ('2020-11-11', 151905),
          ('2020-11-12', 180498),
          ('2020-11-13', 177899),
          ('2020-11-14', 193950),
          ('2020-11-15', 12),

  ]

schema = StructType([ 
    StructField("date",        StringType(),    True), 
    StructField("value",       IntegerType(),   True), 
  ])
 
#create a Spark dataframe
sc= SparkContext()
sqlContext = SQLContext(sc)
sdf = sqlContext.createDataFrame(data=dict,schema=schema)
sdf.printSchema()
sdf.sort('date').show(truncate = False)

我受此启发answer通过以下代码检测峰谷:

from scipy.signal import find_peaks
import numpy as np
import matplotlib.pyplot as plt

# Input signal from Pandas dataframe
t = pdf.date
x = pdf.value

# Set thresholds
# std calculated on 10-90 percentile data, without outliers is used for threshold
thresh_top    = np.median(x) + 1 * np.std(x)
thresh_bottom = np.median(x) - 1 * np.std(x)


# Find indices of peaks & of valleys (from inverting the signal)
peak_idx, _   = find_peaks(x,  height =  thresh_top)
valley_idx, _ = find_peaks(-x, height = -thresh_bottom)

# Plot signal
plt.figure(figsize=(14,12))
plt.plot(t, x   , color='b', label='data')
plt.scatter(t, x, s=10,c='b',label='value')

# Plot threshold
plt.plot([min(t), max(t)], [thresh_top, thresh_top],       '--',  color='r', label='peaks-threshold')
plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--',  color='g', label='valleys-threshold')

# Plot peaks (red) and valleys (blue)
plt.plot(t[peak_idx],   x[peak_idx],   "x", color='r', label='peaks')
plt.plot(t[valley_idx], x[valley_idx], "x", color='g', label='valleys')


plt.xticks(rotation=45)
plt.ylabel('value')
plt.xlabel('timestamp')
plt.title(f'data over time')
plt.legend( loc='lower left')
plt.gcf().autofmt_xdate()
plt.show()

在创建电光数据帧之前,当我从 pandas 数据帧pdf读取数据时,它工作并绘制成功。一旦我创建了电光数据帧sdf,即使您在读取和输入 pandas 的笔记本上运行相同的单元格pdfpdf也不再起作用,并给出以下错误:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-12-1c79b34272c5> in <module>()
     23 
     24 # Plot threshold
---> 25 plt.plot([min(t), max(t)], [thresh_top, thresh_top],       '--',  color='r', label='peaks-threshold')
     26 plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--',  color='g', label='valleys-threshold')
     27 

2 frames
/content/spark-3.1.2-bin-hadoop2.7/python/pyspark/sql/column.py in _to_java_column(col)
     47             "{0} of type {1}. "
     48             "For column literals, use 'lit', 'array', 'struct' or 'create_map' "
---> 49             "function.".format(col, type(col)))
     50     return jcol
     51 

TypeError: Invalid argument, not a string or column: 0     2020-10-16
1     2020-10-17
2     2020-10-18
3     2020-10-19
4     2020-10-20
5     2020-10-21
6     2020-10-22
7     2020-10-23
8     2020-10-24
9     2020-10-25
10    2020-10-26
11    2020-10-27
12    2020-10-28
13    2020-10-29
14    2020-10-30
15    2020-10-31
16    2020-11-01
17    2020-11-02
18    2020-11-03
19    2020-11-04
20    2020-11-05
21    2020-11-06
22    2020-11-07
23    2020-11-08
24    2020-11-09
25    2020-11-10
26    2020-11-11
27    2020-11-12
28    2020-11-13
29    2020-11-14
30    2020-11-15
Name: date, dtype: object of type <class 'pandas.core.series.Series'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

当我尝试使用以下命令直接从电光数据帧读取数据时:

# Input signal from Spark dataframe
t = [val.date  for val in sdf.select('date').collect()]
x = [val.value for val in sdf.select('value').collect()]

遗憾的是,绘图代码无法工作,并抛出以下错误:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-14-9cfede77d0fb> in <module>()
     31 # Find indices of peaks & of valleys (from inverting the signal)
     32 peak_idx, _   = find_peaks(x,  height =  thresh_top)
---> 33 valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
     34 
     35 

TypeError: bad operand type for unary -: 'list'

我花了很多时间,但我无法修复此错误。如果我能适应代码并将其应用到电光数据帧上,我还可以使用其他非find_peaks()解决方案来绘制尖峰检测,比如这个数字answer。我已经尝试了很多东西,您可以在这个GoogleColab Notebook中查看,您可以随意运行/测试/编辑它以进行快速调试。

推荐答案

问题在于您使用的不是相同的对象。

当您处理 pandas 时,您会得到x = pdf.value,实际上得到的是Series对象。此对象可以将-放在前面,并且它知道必须将其中的值转换为负值。

但是,当您使用PySpark并收集值时,您会得到list对象,如果您将-放在前面,则会收到错误:

TypeError: bad operand type for unary -: 'list'

这告诉您它不知道如何处理它。

  • 所以首先要做的是,而不是:
valley_idx, _ = find_peaks(-x, height=-thresh_bottom)

您必须将值转换为负值,例如:

valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
  • 下一步,find_peaks将返回ndarray,它也不能与中的list一起使用:
plt.plot(t[peak_idx], x[peak_idx], "x", color="r", label="peaks")

所以您必须手动完成,例如:

plt.plot(
        [t[i] for i in peak_idx],
        [x[i] for i in peak_idx],
        "x",
        color="r",
        label="peaks",
    )

我用以下代码(+计算PySpark中的medianstd_dev为例)再现了您的情节:

# data is the same
# ...

# create a Spark dataframe
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(data=data, schema=schema)
std_dev = sdf.select(F.stddev(F.col("value")).alias("std")).collect()[0]["std"]
median = (
    sdf.groupBy("value")
    .agg(F.expr("percentile_approx(value, 0.5)").alias("med"))
    .collect()[0]["med"]
)
thresh_top = median + 1 * std_dev
thresh_bottom = median - 1 * std_dev
t = sdf.select("date").rdd.flatMap(lambda x: x).collect()
x = sdf.select("value").rdd.flatMap(lambda x: x).collect()
peak_idx, _ = find_peaks(x, height=thresh_top)
valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)

plt.figure(figsize=(14, 12))
plt.plot(t, x, color="b", label="data")
plt.scatter(t, x, s=10, c="b", label="value")

# Plot threshold
plt.plot(
    [min(t), max(t)],
    [thresh_top, thresh_top],
    "--",
    color="r",
    label="peaks-threshold",
)
plt.plot(
    [min(t), max(t)],
    [thresh_bottom, thresh_bottom],
    "--",
    color="g",
    label="valleys-threshold",
)

# Plot peaks (red) and valleys (blue)
plt.plot(
    [t[i] for i in peak_idx],
    [x[i] for i in peak_idx],
    "x",
    color="r",
    label="peaks",
)
plt.plot(
    [t[i] for i in valley_idx],
    [x[i] for i in valley_idx],
    "x",
    color="g",
    label="valleys",
)

# ...

这篇关于使用本网站的FIND_PEAKS检测绘制尖峰期间,从电光数据帧读取列表值的问题/错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆