排序后未对 Spark 数据框进行排序 [英] Spark dataframe is not ordered after sort

查看:18
本文介绍了排序后未对 Spark 数据框进行排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理一个 JSON 文件以使用 Spark(版本 1.6.1)生成两个 JSON 文件.输入文件的大小约为 30~40G(100M 记录).生成的文件,大的大约10G~15G(30M记录),小的大约500M~750M(1.5M记录).两个结果文件都面临以下问题:

我为数据框调用了排序"方法,然后执行重新分区"以将结果合并到一个文件中.然后我检查了生成的文件,发现在一个时间间隔内记录被排序,但整个文件没有全局排序.例如文件中最后一条记录(第 1.9M 行)的键(由 3 列构成)是(ou7QDj48c, 014, 075)",但文件中一条中间记录(第 375K 行)的键是(pzwzh5vm8, 003, 023)"

pzwzh5vm8 003 023...ou7QDj48c 014 075

当我使用相对较小的输入源(输入文件 40 万行)在本地测试代码时,这种情况根本不会发生.

我的具体代码如下所示:

big_json = big_json.sort($"col1", $"col2", $"col3", $"col4")big_json.repartition(1).write.mode("覆盖").json("文件路径")

谁能给点建议?谢谢.

(我也注意到这个线程讨论了一个类似的问题,但目前还没有很好的解决方案.如果这种现象真的是重新分区操作造成的,谁能帮我有效地将dataframe转换为单个JSON文件而不将其转换为RDD,同时保持排序?谢谢)

解决方案:

非常感谢@manos、@eliasah 和@pkrishna 的帮助.阅读您的评论后,我曾考虑使用合并,但在调查了其性能后,我放弃了这个想法.

最终的解决方案是:对数据帧进行排序并写入JSON,无需任何重新分区或合并.全部工作完成后,调用下面的HDFS命令

hdfs dfs -getmerge/hdfs/file/path/part* ./local.json

这个命令比我想象的要好得多.它既不会占用太多时间也不会占用太多空间,并且给了我一个很好的单个文件.我只是在巨大的结果文件上使用了 headtail,它似乎完全有序.

解决方案

发生的事情是您正在重新分区 之后您的排序操作.

repartition 随机重组 RDD 中的数据以创建更多或更少的分区并在它们之间进行平衡.这总是在网络上打乱所有数据.

在幕后,它使用 coalesceshuffle 重新分配数据.这就是您的数据不再排序的原因.

您可以查看参考.

I'm processing a JSON file to generate two JSON files using Spark (version 1.6.1). The size of input file is about 30~40G (100M records). For the generated files, the bigger one is about 10G ~ 15G (30M records), the smaller one is about 500M ~ 750M (1.5M records). both result files are facing the below problems:

I invoked the "sort" method for the dataframe, after that performed "repartition" to merge the results into a single file. Then I checked the generated file, found in an interval the records are ordered, but the whole file is not ordered globally. e.g. the key (constructed from 3 columns) of the last record (line no 1.9M) in the file is "(ou7QDj48c, 014, 075)", but the key of a middle record in the file (line no 375K) is "(pzwzh5vm8, 003, 023)"

pzwzh5vm8 003 023
...
ou7QDj48c 014 075

When I tested code locally using a relatively small input source (input file 400K lines), such case doesn't happen at all.

My concrete code is shown below:

big_json = big_json.sort($"col1", $"col2", $"col3", $"col4")
big_json.repartition(1).write.mode("overwrite").json("filepath")

Could anyone give an advice? Thank you.

(I've also noticed that this thread discussed a similar problem, but there is not a good solution till now. If this phenomenon is really resulted from repartition operation, could anyone help me to effectively transform dataframe to a single JSON file without transform it into RDD, while keep the sorted order? Thanks)

Solution:

Really appreciate for the help from @manos @eliasah and @pkrishna. I had thought about using coalesce after read your comments but after having investigated its performance I gave up the idea.

The final solution is: sort the dataframe and write into JSON, without any repartition or coalesce. After the whole work is done, call the HDFS command below

hdfs dfs -getmerge /hdfs/file/path/part* ./local.json

This command is far better than my imagine. It neither takes too much time nor too much space, and gives me a good single file. I just used head and tail on the huge result file and it seems totally ordered.

解决方案

What's happening is that you are repartitioning after your sort action.

repartition reshuffles the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

Under the hood, it uses coalesce and shuffle to redistribute data. This is why your data isn't sorted anymore.

You can check the code for reference.

这篇关于排序后未对 Spark 数据框进行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆