如何在保留最新数据的同时从火花数据框中删除重复项? [英] How to remove duplicates from a spark data frame while retaining the latest?

查看:23
本文介绍了如何在保留最新数据的同时从火花数据框中删除重复项?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 spark 从 Amazon S3 加载 json 文件.我想根据保留最新的数据框的两列删除重复项(我有时间戳列).最好的方法是什么?请注意,重复项可能分布在多个分区中.我可以在不改组的情况下删除保留最后一条记录的重复项吗?我正在处理 1 TB 的数据.

I'm using spark to load json files from Amazon S3. I would like to remove duplicates based on two columns of the data frame retaining the newest(I have timestamp column). What would be the best way to do it? Please note that the duplicates may be spread across partitions. Can I remove duplicates retaining the last record without shuffling? I'm dealing with 1 TB of data.

我正在考虑按这两个列对数据框进行分区,这样所有重复记录都将一致地散列"到同一分区中,因此分区级别排序后删除重复项将消除所有重复项,只保留一个.我不知道是否有可能.任何信息表示赞赏.

I was thinking of partitioning the data frame by those two columns in such way that all duplicate records will be "consistently hashed" into the same partition and thus a partition level sort followed be drop duplicates will eliminate all duplicates keeping just one. I dunno if it's possible. Any information is appreciated.

推荐答案

使用 row_number() 窗口函数可能对你的任务更容易,c1 下面是时间戳列、c2c3 是用于对数据进行分区的列:

Use row_number() Window function is probably easier for your task, below c1 is the timestamp column, c2, c3 are columns used to partition your data:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# set rn with F.row_number() and filter the result by rn == 1
df_new = df.withColumn('rn', F.row_number().over(win)).where('rn = 1').drop('rn')
df_new.show()

如果您只需要重复项并删除唯一行,请添加另一个字段:

If you just need the duplicates and drop unique rows, then add another field:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# window to cover all rows in the same partition
win2 = Window.partitionBy('c2', 'c3') \
             .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# set new columns: rn, cnt and filter the result by rn == 1 and cnt > 1
df_new = df.withColumn('rn', F.row_number().over(win)) \
           .withColumn('cnt', F.count('c1').over(win2)) \
           .where('rn = 1 and cnt > 1') \
           .drop('rn', 'cnt')
df_new.show()

这篇关于如何在保留最新数据的同时从火花数据框中删除重复项?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆