处理Pyspark中的丢失数据 [英] Handling missing data in Pyspark

查看:157
本文介绍了处理Pyspark中的丢失数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用PySpark 1.6/Python 2.7

Using PySpark 1.6/Python 2.7

我具有以下格式的数据,这些数据是从Hive获取的数据帧:

I have data in the following format, which is obtained from Hive into a dataframe:

date, stock, price
1388534400, GOOG, 50
1388534400, FB, 60
1388534400, MSFT, 55
1388620800, GOOG, 52
1388620800, FB, 61
1388620800, MSFT, 55

我想以以下格式的json结尾:

I would like to end up with a json of the following format:

GOOG.json:
{
'symbol': 'GOOG',
 'first_epoch': 1388534400,
 'prices': [50, 52, ... ]
}

日期是当天午夜的纪元,我们的数据可以追溯到10年前(8亿多行).

Where date is the epoch for midnight on that day, and we have data going back 10 years or so (800million+ rows).

数据"列表对应于每个时期的库存值,其中每个随后的时期是前一个时期+ 86400(即第二天).

The 'data' list corresponds to the value of the stock for each epoch, where each subsequent epoch is previous epoch + 86400 (i.e. for the next day).

这里的问题是可能可能会丢失数据,因此我需要识别出这些缺失点并替换 None 值.

The problem here is that there could potentially be missing data so I need to identify such missing points and substitute None values.

在本机Python中,我构建了一个 datetime 对象的生成器,并将此列表用作有效的历元列表,并将其转换为历元.对于每个符号,我遍历有效的历元,并将它们与该符号的可用历元进行比较.当检测到缺少纪元时,将返回 None 值(导致类似'prices'的结果:[50,52,None,49,51,...] .这种方法效果很好,但是我不确定如何在PySpark中完成此操作.

In native Python, I built a generator of datetime objects and converted them to epochs, using this list as the valid epochs list. For each symbol, I iterated over valid epochs and compared them against the available epochs for that symbol. When a missing epoch was detected, a None value was returned (resulting in something like 'prices': [50, 52, None, 49, 51, ... ]. This approach worked well but I'm not sure how to accomplish this in PySpark.

推荐答案

如果您缺少某些股票的完整纪元,则可以执行以下操作:

If you're missing entire epochs for some stocks then you can do the following:

  • 获取不同的时期表和不同的库存表
  • 交叉连接那些独特的列表-现在您有了一个包含所有可能值的完整表.将此表称为B
  • 与B一起在右侧的原始表上进行左半连接
  • 所有缺少的行都将缺少股票价格-使用Spark的NaFunctions估算

Pyspark在 pyspark.sql.DataFrameNaFunctions()模块中具有整个Nafunctions目录

Pyspark has an entire catalog of Nafunctions in the pyspark.sql.DataFrameNaFunctions() module

DataFrame.fillna() DataFrame.na.fill()可以解决此问题.然后,您可以运行 groupBy ,然后按纪元对数据进行分组.

DataFrame.fillna() or DataFrame.na.fill() would work for this. You can then run a groupBy and then group the data by Epoch.

这篇关于处理Pyspark中的丢失数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆