排序后未排序Spark数据帧 [英] Spark dataframe is not ordered after sort

查看:119
本文介绍了排序后未排序Spark数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理一个JSON文件,以使用Spark(版本1.6.1)生成两个JSON文件.输入文件的大小约为30〜40G(100M个记录).对于生成的文件,较大的一个大约为10G〜15G(3000万个记录),较小的大约为500M〜750M(150万个记录).两个结果文件都面临以下问题:

我对数据框调用了"sort"方法,然后执行重新分区"以将结果合并到单个文件中.然后,我检查了生成的文件,发现记录的间隔时间是有序的,但是整个文件并不是全局排序的.例如文件中最后一条记录(行号1.9M)的键(由3列构成)是(ou7QDj48c,014,075)",而文件中中间记录(行号375K)的键是( pzwzh5vm8,003,023)"

pzwzh5vm8 003 023
...
ou7QDj48c 014 075

当我使用相对较小的输入源(输入文件400K行)在本地测试代码时,根本不会发生这种情况.

我的具体代码如下所示:

 big_json = big_json.sort($"col1", $"col2", $"col3", $"col4")
big_json.repartition(1).write.mode("overwrite").json("filepath")
 

有人可以提供建议吗?谢谢.

(我还注意到此线程讨论了类似的问题,但到目前为止还没有一个好的解决方案.如果这种现象确实是由重新分区操作引起的,谁能帮助我有效地将数据帧转换为单个JSON文件,而无需将其转换为RDD,同时又保持排序顺序呢?谢谢)

解决方案:

非常感谢@manos @eliasah和@pkrishna的帮助.阅读您的评论后,我曾考虑过使用合并,但是在研究了其性能后,我放弃了这个主意.

最终的解决方案是:对数据帧进行排序并写入JSON,而无需任何重新分区或合并.整个工作完成后,请调用下面的HDFS命令

 hdfs dfs -getmerge /hdfs/file/path/part* ./local.json
 

此命令比我想象的要好得多.它既不花费太多时间也不占用太多空间,并且给了我一个不错的文件.我只是在巨大的结果文件中使用了headtail,似乎完全有序.

解决方案

正在发生的事情是您在 sort 操作之后重新分区 .

repartition随机重新随机排列RDD中的数据,以创建更多或更少的分区,并在整个分区之间保持平衡.这始终会拖曳网络上的所有数据.

在后台,它使用coalesceshuffle重新分发数据. 这就是为什么您的数据不再排序的原因.

您可以检查in an interval the records are ordered, but the whole file is not ordered globally. e.g. the key (constructed from 3 columns) of the last record (line no 1.9M) in the file is "(ou7QDj48c, 014, 075)", but the key of a middle record in the file (line no 375K) is "(pzwzh5vm8, 003, 023)"

pzwzh5vm8 003 023
...
ou7QDj48c 014 075

When I tested code locally using a relatively small input source (input file 400K lines), such case doesn't happen at all.

My concrete code is shown below:

big_json = big_json.sort($"col1", $"col2", $"col3", $"col4")
big_json.repartition(1).write.mode("overwrite").json("filepath")

Could anyone give an advice? Thank you.

(I've also noticed that this thread discussed a similar problem, but there is not a good solution till now. If this phenomenon is really resulted from repartition operation, could anyone help me to effectively transform dataframe to a single JSON file without transform it into RDD, while keep the sorted order? Thanks)

Solution:

Really appreciate for the help from @manos @eliasah and @pkrishna. I had thought about using coalesce after read your comments but after having investigated its performance I gave up the idea.

The final solution is: sort the dataframe and write into JSON, without any repartition or coalesce. After the whole work is done, call the HDFS command below

hdfs dfs -getmerge /hdfs/file/path/part* ./local.json

This command is far better than my imagine. It neither takes too much time nor too much space, and gives me a good single file. I just used head and tail on the huge result file and it seems totally ordered.

解决方案

What's happening is that you are repartitioning after your sort action.

repartition reshuffles the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

Under the hood, it uses coalesce and shuffle to redistribute data. This is why your data isn't sorted anymore.

You can check the code for reference.

这篇关于排序后未排序Spark数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆