pyspark,比较数据框中的两行 [英] pyspark, Compare two rows in dataframe

查看:232
本文介绍了pyspark,比较数据框中的两行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将数据帧中的一行与下一行进行比较,以查看时间戳的差异.当前数据如下:

I'm attempting to compare one row in a dataframe with the next to see the difference in timestamp. Currently the data looks like:

 itemid | eventid | timestamp
 ----------------------------
 134    | 30      | 2016-07-02 12:01:40
 134    | 32      | 2016-07-02 12:21:23
 125    | 30      | 2016-07-02 13:22:56
 125    | 32      | 2016-07-02 13:27:07

我尝试将函数映射到数据框以允许进行如下比较:(注意:我试图获取相差大于4小时的行)

I've tried mapping a function onto the dataframe to allow for comparing like this: (note: I'm trying to get rows with a difference greater than 4 hours)

items = df.limit(10)\
          .orderBy('itemid', desc('stamp'))\
          .map(lambda x,y: (x.stamp - y.stamp) > 14400).collect()

但是我遇到了以下错误:

But I'm getting the following error:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe

我认为这是由于我错误地使用了map函数造成的.有关使用地图或其他解决方案的帮助.

Which I believe is due to my using the map function incorrectly. Help with using map, or a different solution would be appreciated.

更新: @ zero323的回答提供了有关我不正确使用映射的信息,但是我正在使用的系统在2.02之前运行的Spark版本,并且我正在使用Cassandra中的数据.

UPDATE: @zero323's answer was informative on my improper use of mapping, however the system I'm using is running a Spark version before 2.02 and I'm working with data in Cassandra.

我设法用mapPartitions解决了这个问题.请在下面查看我的答案.

I managed to solve it with mapPartitions. See my answer below.

更新(2017/03/27): 自从最初在此帖子上标记答案以来,我对Spark的了解已大大提高.我已经更新了下面的答案,以显示当前的解决方案.

UPDATE(2017/03/27): Since originally marking the answer on this post my understanding of Spark has improved significantly. I've updated my answer below to show my current solution.

推荐答案

@ShuaiYuan对原始答案的评论是正确的.在过去的一年中,我对Spark的工作方式有了更好的了解,并实际上重写了我正在撰写此程序的程序.

The comment by @ShuaiYuan on the original answer is correct. Over the last year I've developed a much better understanding of how Spark works and have actually rewritten the program I was working on for this post.

新答案(2017年3月27日)
为了比较数据帧的两行,我最终使用了RDD.我按键(在本例中为项目ID)对数据进行分组,并忽略eventid,因为它在此等式中无关紧要.然后,我将lambda函数映射到行上,返回键的元组以及包含事件间隔的开始和结束的元组列表,这些元组列表是从对链接的值列表(排序的时间戳)进行迭代的"findGaps"函数派生的每个键.完成此操作后,我将筛选出没有时间间隔的键,然后筛选出flatMapValues以将数据返回为类似sql的格式.这是通过以下代码完成的:

NEW ANSWER (2017/03/27)
To accomplish comparing the two rows of the dataframe I ended up using an RDD. I group the data by key (in this case the item id) and ignore eventid as it's irrelevant in this equation. I then map a lambda function onto the rows, returning a tuple of the key and a list of tuples containing the start and end of event gaps, which is derived from "findGaps" function that iterates over the list of values (sorted timestamps) linked to each key. Once this is complete I filter out keys with no time gaps and then flatMapValues to return the data to a more sql like format. This is done with the following code:

# Find time gaps in list of datetimes where firings are longer than given duration.  
def findGaps(dates, duration):
    result = []
    length = len(dates)

    # convert to dates for comparison
    first = toDate(dates[0])
    last = toDate(dates[length - 1])
    for index, item in enumerate(dates):
        if index < length -1 and (dates[index + 1] - item).total_seconds() > duration:
            # build outage tuple and append to list
            # format (start, stop, duration)
            result.append(formatResult(item, dates[index + 1], kind))
    return result

outage_list = outage_join_df.rdd\
                            .groupByKey()\
                            .map(lambda row: (
                                     row[0],
                                     findGaps(
                                         sorted(list(row[1])), 
                                         limit
                                     )
                                  )
                            )\
                            .filter(lambda row: len(row[1]) > 0)\
                            .flatMapValues(lambda row: row)\
                            .map(lambda row: (
                                 row[0]['itemid'],     # itemid
                                 row[1][0].date(),     # date
                                 row[1][0],            # start
                                 row[1][1],            # stop
                                 row[1][2]             # duration
                            ))\
                            .collect()

原始答案(错误)
我设法使用mapPartitions解决了这个问题:

ORIGINAL ANSWER (WRONG)
I managed to solve it using mapPartitions:

def findOutage(items):
    outages = []

    lastStamp = None
    for item in items:
        if lastStamp and (lastStamp - item.stamp).total_seconds() > 14400:
            outages.append({"item": item.itemid, 
                            "start": item.stamp.isoformat(),
                            "stop": lastStamp.isoformat()})
        lastStamp = item.stamp
    return iter(outages)

items = df.limit(10).orderBy('itemid', desc('stamp'))

outages = items.mapPartitions(findOutage).collect()

感谢大家的帮助!

这篇关于pyspark,比较数据框中的两行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆