如何在保留最新数据的情况下从Spark数据框中删除重复项? [英] How to remove duplicates from a spark data frame while retaining the latest?

查看:174
本文介绍了如何在保留最新数据的情况下从Spark数据框中删除重复项?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用spark从Amazon S3加载json文件.我想基于保留最新的数据帧的两列删除重复项(我有时间戳列).最好的方法是什么?请注意,重复项可能分散在各个分区中.我可以删除保留最后记录的重复项而不进行改组吗?我正在处理1 TB的数据.

I'm using spark to load json files from Amazon S3. I would like to remove duplicates based on two columns of the data frame retaining the newest(I have timestamp column). What would be the best way to do it? Please note that the duplicates may be spread across partitions. Can I remove duplicates retaining the last record without shuffling? I'm dealing with 1 TB of data.

我当时正在考虑将数据帧按这两列进行划分,以使所有重复记录都被一致地散列"到同一分区中,因此在分区级别排序之后,丢弃副本将消除所有重复项,而只保留一个.我不知道是否有可能.任何信息表示赞赏.

I was thinking of partitioning the data frame by those two columns in such way that all duplicate records will be "consistently hashed" into the same partition and thus a partition level sort followed be drop duplicates will eliminate all duplicates keeping just one. I dunno if it's possible. Any information is appreciated.

推荐答案

使用 row_number()窗口功能可能更容易执行任务,在c1下面是时间戳列c2c3是用于分区数据的列:

Use row_number() Window function is probably easier for your task, below c1 is the timestamp column, c2, c3 are columns used to partition your data:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# set rn with F.row_number() and filter the result by rn == 1
df_new = df.withColumn('rn', F.row_number().over(win)).where('rn = 1').drop('rn')
df_new.show()

如果您只需要重复项并放置唯一的行,则添加另一个字段:

If you just need the duplicates and drop unique rows, then add another field:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# window to cover all rows in the same partition
win2 = Window.partitionBy('c2', 'c3') \
             .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# set new columns: rn, cnt and filter the result by rn == 1 and cnt > 1
df_new = df.withColumn('rn', F.row_number().over(win)) \
           .withColumn('cnt', F.count('c1').over(win2)) \
           .where('rn = 1 and cnt > 1') \
           .drop('rn', 'cnt')
df_new.show()

这篇关于如何在保留最新数据的情况下从Spark数据框中删除重复项?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆