无法将有序数据写入火花中的镶木地板 [英] Can't write ordered data to parquet in spark

查看:62
本文介绍了无法将有序数据写入火花中的镶木地板的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Apache Spark 生成镶木地板文件.我可以毫无问题地按日期对它们进行分区,但在内部我似乎无法按正确的顺序排列数据.

I am working with Apache Spark to generate parquet files. I can partition them by date with no problems, but internally I can not seem to lay out the data in the correct order.

订单似乎在处理过程中丢失了,这意味着镶木地板元数据不正确(特别是我想确保镶木地板行组反映排序顺序,以便特定于我的用例的查询可以通过元数据进行有效过滤).

The order seems to get lost during processing, which means the parquet metadata is not right (specifically I want to ensure that the parquet row groups are reflecting sorted order so that queries specific to my use case can filter efficiently via the metadata).

考虑以下示例:

// note: hbase source is a registered temp table generated from hbase
val transformed = sqlContext.sql(s"SELECT  id, sampleTime, ... , toDate(sampleTime) as date FROM hbaseSource")

// Repartion the input set by the date column (in my source there should be 2 distinct dates)
val sorted = transformed.repartition($"date").sortWithinPartitions("id", "sampleTime")

sorted.coalesce(1).write.partitionBy("date").parquet(s"/outputFiles")

通过这种方法,我确实得到了正确的镶木地板分区结构(按日期).更好的是,对于每个日期分区,我看到一个大的镶木地板文件.

With this approach, I do get the right parquet partition structure ( by date). And even better, for each date partition, I see a single large parquet file.

 /outputFiles/date=2018-01-01/part-00000-4f14286c-6e2c-464a-bd96-612178868263.snappy.parquet

但是,当我查询文件时,我看到内容乱序.具体来说,乱序"是指看起来更像是几个有序的数据帧分区已合并到文件中.

However, when I query the file I see the contents out of order. To be specific, "out of order" seems more like several ordered data-frame partitions have been merged into the file.

parquet 行组元数据显示排序的字段实际上是重叠的(例如,一个特定的 id 可以位于许多行组中):

The parquet row group metadata shows that the sorted fields are actually overlapping ( a specific id, for example, could be located in many row groups ):

id:             :[min: 54, max: 65012, num_nulls: 0]
sampleTime:     :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id:             :[min: 827, max: 65470, num_nulls: 0]
sampleTime:     :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id:             :[min: 1629, max: 61412, num_nulls: 0]

我希望数据在每个文件中正确排序,以便每个行组中的元数据最小值/最大值不重叠.

I want the data to be properly ordered inside each file so the metadata min/max in each row group are non-overlapping.

例如,这是我想看到的模式:

For example, this is the pattern I want to see:

RG 0: id:             :[min: 54, max: 100, num_nulls: 0]
RG 1: id:             :[min: 100, max: 200, num_nulls: 0]

... 其中 RG = 行组".如果我想要 id = 75,查询可以在一行组中找到它.

... where RG = "row group". If I wanted id = 75, the query could find it in one row group.

我尝试了上述代码的许多变体.例如有和没有 coalesce (我知道合并很糟糕,但我的想法是用它来防止改组).我也试过 sort 而不是 sortWithinPartitions (排序应该创建一个完全有序的排序,但会导致很多分区).例如:

I have tried many variations of the above code. For example with and without coalesce (I know coalesce is bad, but my idea was to use it to prevent shuffling). I have also tried sort instead of sortWithinPartitions (sort should create a total ordered sort, but will result in many partitions). For example:

val sorted = transformed.repartition($"date").sort("id", "sampleTime") 
sorted.write.partitionBy("date").parquet(s"/outputFiles")

给了我 200 个文件,太多了,它们仍然没有正确排序.我可以通过调整随机大小来减少文件数,但我希望在写入过程中按顺序处理排序(我的印象是写入不会对输入进行随机排序).我看到的顺序如下(为简洁起见省略了其他字段):

Gives me 200 files, which is too many, and they are still not sorted correctly. I can reduce the file count by adjusting the shuffle size, but I would have expected sort to be processed in order during the write (I was under the impression that writes did not shuffle the input). The order I see is as follows (other fields omitted for brevity):

+----------+----------------+
|id|      sampleTime|
+----------+----------------+
|     56868|1514840220000000|
|     57834|1514785180000000|
|     56868|1514840220000000|
|     57834|1514785180000000|
|     56868|1514840220000000|

看起来像是交错排序的分区.所以我认为 repartition 在这里没有给我带来任何好处,而且 sort 似乎无法在写入步骤中保持顺序.

Which looks like it's interleaved sorted partitions. So I think repartition buys me nothing here, and sort seems to be incapable of preserving order on the write step.

我已经读到我想做的事情应该是可能的.我什至尝试了演示文稿Parquet 性能调整"中概述的方法:失踪的指南"作者:Ryan Blue(不幸的是,它位于 OReily 付费墙后面).这涉及使用insertInto.在这种情况下,spark 似乎使用了旧版本的 parquet-mr,它破坏了元数据,我不知道如何升级它.

I've read that what I want to do should be possible. I've even tried the approach outlined in the presentation "Parquet performance tuning: The missing guide" by Ryan Blue ( unfortunately it is behind the OReily paywall). That involves using insertInto. In that case, spark seemed to use an old version of parquet-mr which corrupted the metadata, and I am not sure how to upgrade it.

我不确定我做错了什么.我的感觉是我误解了 repartition($"date")sort 工作和/或交互的方式.

I am not sure what I am doing wrong. My feeling is that I am misunderstanding the way repartition($"date") and sort work and/or interact.

我会很感激任何想法.为论文道歉.:)

I would appreciate any ideas. Apologies for the essay. :)

另请注意,如果我在 transformed.sort("id", "sampleTime") 上执行 show(n),数据将正确排序.所以问题似乎发生在写入阶段.如上所述,排序的输出似乎在写入过程中被打乱了.

edit: Also note that if I do a show(n) on transformed.sort("id", "sampleTime") the data is sorted correctly. So it seems like the problem occurs during the write stage. As noted above, it does seem like the output of the sort is shuffled during the write.

推荐答案

问题是Spark在保存文件格式时,需要一定的顺序,如果顺序不满足,Spark会在保存过程中按照顺序对数据进行排序的要求,会忘记你的排序.更具体地说,Spark 需要这个顺序(这直接取自 Spark 2.4.4 的 Spark 源代码):

The problem is that while saving file format, Spark is requiring some order and if the order is not satisfied, Spark will sort the data during the saving process according to the requirement and will forget your sort. To be more specific Spark requires this order (and this is taken directly from the Spark source code of Spark 2.4.4):

val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns

其中 partitionColumns 是用于对数据进行分区的列.您没有使用分桶,因此 bucketingIdExpressionsortColumns 在此示例中不相关,requiredOrdering 将只是 partitionColumns>.因此,如果这是您的代码:

where partitionColumns are columns by which you partition the data. You are not using bucketing so bucketingIdExpression and sortColumns are not relevant in this example and the requiredOrdering will be only the partitionColumns. So if this is your code:

val sorted = transformed.repartition($"date").sortWithinPartitions("id", 
"sampleTime")

sorted.write.partitionBy("date").parquet(s"/outputFiles")

Spark 将检查数据是否按 date 排序,而事实并非如此,因此 Spark 会忘记您的排序并按 date 对其进行排序.另一方面,如果你这样做:

Spark will check if the data is sorted by date, which is not, so Spark will forget your sort and will sort it by date. On the other hand if you instead do it like this:

val sorted = transformed.repartition($"date").sortWithinPartitions("date", "id", 
"sampleTime")

sorted.write.partitionBy("date").parquet(s"/outputFiles")

Spark 将再次检查数据是否按 date 排序,这次是(满足要求),因此 Spark 将保留此顺序,并且在保存数据时不会再进行排序.所以我相信它应该以这种方式工作.

Spark will check again if the data is sorted by date and this time it is (the requirement is satisfied) so Spark will preserve this order and will induce no more sorts while saving the data. So i believe this way it should work.

这篇关于无法将有序数据写入火花中的镶木地板的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆