处理Pyspark中的丢失数据 [英] Handling missing data in Pyspark
问题描述
使用PySpark 1.6/Python 2.7
Using PySpark 1.6/Python 2.7
我具有以下格式的数据,这些数据是从Hive获取的数据帧:
I have data in the following format, which is obtained from Hive into a dataframe:
date, stock, price
1388534400, GOOG, 50
1388534400, FB, 60
1388534400, MSFT, 55
1388620800, GOOG, 52
1388620800, FB, 61
1388620800, MSFT, 55
我想以以下格式的json结尾:
I would like to end up with a json of the following format:
GOOG.json:
{
'symbol': 'GOOG',
'first_epoch': 1388534400,
'prices': [50, 52, ... ]
}
日期是当天午夜的纪元,我们的数据可以追溯到10年前(8亿多行).
Where date is the epoch for midnight on that day, and we have data going back 10 years or so (800million+ rows).
数据"列表对应于每个时期的库存值,其中每个随后的时期是前一个时期+ 86400(即第二天).
The 'data' list corresponds to the value of the stock for each epoch, where each subsequent epoch is previous epoch + 86400 (i.e. for the next day).
这里的问题是可能可能会丢失数据,因此我需要识别出这些缺失点并替换 None
值.
The problem here is that there could potentially be missing data so I need to identify such missing points and substitute None
values.
在本机Python中,我构建了一个 datetime
对象的生成器,并将此列表用作有效的历元列表,并将其转换为历元.对于每个符号,我遍历有效的历元,并将它们与该符号的可用历元进行比较.当检测到缺少纪元时,将返回 None
值(导致类似'prices'的结果:[50,52,None,49,51,...]
.这种方法效果很好,但是我不确定如何在PySpark中完成此操作.
In native Python, I built a generator of datetime
objects and converted them to epochs, using this list as the valid epochs list. For each symbol, I iterated over valid epochs and compared them against the available epochs for that symbol. When a missing epoch was detected, a None
value was returned (resulting in something like 'prices': [50, 52, None, 49, 51, ... ]
. This approach worked well but I'm not sure how to accomplish this in PySpark.
推荐答案
如果您缺少某些股票的完整纪元,则可以执行以下操作:
If you're missing entire epochs for some stocks then you can do the following:
- 获取不同的时期表和不同的库存表
- 交叉连接那些独特的列表-现在您有了一个包含所有可能值的完整表.将此表称为B
- 与B一起在右侧的原始表上进行左半连接
- 所有缺少的行都将缺少股票价格-使用Spark的NaFunctions估算
Pyspark在 pyspark.sql.DataFrameNaFunctions()
模块中具有整个Nafunctions目录
Pyspark has an entire catalog of Nafunctions in the pyspark.sql.DataFrameNaFunctions()
module
DataFrame.fillna()
或 DataFrame.na.fill()
可以解决此问题.然后,您可以运行 groupBy
,然后按纪元对数据进行分组.
DataFrame.fillna()
or DataFrame.na.fill()
would work for this. You can then run a groupBy
and then group the data by Epoch.
这篇关于处理Pyspark中的丢失数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!