将火花数据帧写入单个镶木地板文件 [英] Write spark dataframe to single parquet file

查看:27
本文介绍了将火花数据帧写入单个镶木地板文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试做一些非常简单的事情,但我遇到了一些非常愚蠢的斗争.我认为这一定与对 Spark 正在做什么的根本误解有关.我将不胜感激任何帮助或解释.

I am trying to do something very simple and I'm having some very stupid struggles. I think it must have to do with a fundamental misunderstanding of what spark is doing. I would greatly appreciate any help or explanation.

我有一个非常大(~3 TB、~300MM 行、25k 分区)的表,在 s3 中保存为镶木地板,我想给某人一个小样本作为单个镶木地板文件.不幸的是,这需要很长时间才能完成,我不明白为什么.我尝试了以下方法:

I have a very large (~3 TB, ~300MM rows, 25k partitions) table, saved as parquet in s3, and I would like to give someone a tiny sample of it as a single parquet file. Unfortunately, this is taking forever to finish and I don't understand why. I have tried the following:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")

然后当那不起作用时我尝试了这个,我认为应该是一样的,但我不确定.(我添加了 print 以进行调试.)

and then when that didn't work I tried this, which I thought should be the same, but I wasn't sure. (I added the print's in an effort to debug.)

tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")

当我观看 Yarn UI 时, 打印语句 write 都使用了 25k 映射器.count 用了 3 分钟,show 用了 25 分钟,write 用了大约 40 分钟,虽然最后做到了em> 编写我正在寻找的单个文件表.

When I watch the Yarn UI, both print statements and the write are using 25k mappers. The count took 3 mins, the show took 25 mins, and the write took ~40 mins, although it finally did write the single file table I was looking for.

在我看来,第一行应该取前 500 行并将它们合并到一个分区,然后其他行应该发生得非常快(在单个映射器/减速器上).谁能看到我在这里做错了什么?有人告诉我也许我应该使用 sample 而不是 limit 但据我所知 limit 应该快得多.是吗?

It seems to me like the first line should take the top 500 rows and coalesce them to a single partition, and then the other lines should happen extremely fast (on a single mapper/reducer). Can anyone see what I'm doing wrong here? I've been told maybe I should use sample instead of limit but as I understand it limit should be much faster. Is that right?

预先感谢您的任何想法!

Thanks in advance for any thoughts!

推荐答案

我将首先处理 print 函数问题,因为它是理解 spark 的基础.然后limit vs sample.然后repartition vs coalesce.

I’ll approach the print functions issue first, as it’s something fundamental to understanding spark. Then limit vs sample. Then repartition vs coalesce.

print 函数以这种方式花费这么长时间的原因是因为 coalesce 是一个惰性转换.spark 中的大多数转换都是惰性的,并且在调用 action 之前不会进行评估.

The reasons the print functions take so long in this manner is because coalesce is a lazy transformation. Most transformations in spark are lazy and do not get evaluated until an action gets called.

动作是做事并且(主要)返回一个新数据帧的事情.像countshow.它们返回一个数字和一些数据,而 coalesce 返回一个具有 1 个分区的数据帧(有点,见下文).

Actions are things that do stuff and (mostly) dont return a new dataframe as a result. Like count, show. They return a number, and some data, whereas coalesce returns a dataframe with 1 partition (sort of, see below).

发生的情况是,每次在 tiny 数据帧上调用操作时,您都在重新运行 sql 查询和 coalesce 调用.这就是他们为每次调用使用 25k 映射器的原因.

What is happening is that you are rerunning the sql query and the coalesce call each time you call an action on the tiny dataframe. That’s why they are using the 25k mappers for each call.

为了节省时间,请将 .cache() 方法添加到第一行(无论如何,对于您的 print 代码).

To save time, add the .cache() method to the first line (for your print code anyway).

然后数据帧转换实际上在你的第一行执行,结果保存在你的火花节点的内存中.

Then the data frame transformations are actually executed on your first line and the result persisted in memory on your spark nodes.

这不会对第一行的初始查询时间产生任何影响,但至少您不会再运行该查询 2 次,因为结果已被缓存,然后操作可以使用该缓存结果.

This won’t have any impact on the initial query time for the first line, but at least you’re not running that query 2 more times because the result has been cached, and the actions can then use that cached result.

要将其从内存中删除,请使用 .unpersist() 方法.

To remove it from memory, use the .unpersist() method.

现在对于您尝试执行的实际查询...

这实际上取决于您的数据是如何分区的.例如,它是否在特定字段等上进行分区...

It really depends on how your data is partitioned. As in, is it partitioned on specific fields etc...

您在问题中提到了它,但 sample 可能是正确的方法.

You mentioned it in your question, but sample might the right way to go.

这是为什么?

limit 必须搜索 first 行中的 500 行.除非您的数据按行号(或某种递增的 id)进行分区,否则前 500 行可以存储在 25k 分区中的任何一个中.

limit has to search for 500 of the first rows. Unless your data is partitioned by row number (or some sort of incrementing id) then the first 500 rows could be stored in any of the the 25k partitions.

因此 spark 必须搜索所有这些值,直到找到所有正确的值.不仅如此,它还必须执行一个额外的步骤来对数据进行排序以获得正确的顺序.

So spark has to go search through all of them until it finds all the correct values. Not only that, it has to perform an additional step of sorting the data to have the correct order.

sample 只抓取 500 个随机值.更容易做,因为没有涉及数据的顺序/排序,也不必搜索特定分区的特定行.

sample just grabs 500 random values. Much easier to do as there’s no order/sorting of the data involved and it doesn’t have to search through specific partitions for specific rows.

虽然 limit 可以更快,但它也有其限制.我通常只将它用于非常小的子集,例如 10/20 行.

While limit can be faster, it also has its, erm, limits. I usually only use it for very small subsets like 10/20 rows.

现在进行分区....

我认为 coalesce 的问题是 实际上 改变了分区.现在我不确定这一点,所以少许盐.

The problem I think with coalesce is it virtually changes the partitioning. Now I’m not sure about this, so pinch of salt.

根据 pyspark 文档:

这个操作会导致一个狭窄的依赖,例如如果从 1000 个分区变为 100 个分区,则不会进行 shuffle,而是 100 个新分区中的每个分区将占用 10 个当前分区.

this operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.

因此,您的 500 行实际上仍将位于 Spark 认为是 1 个虚拟分区的 25k 物理分区中.

So your 500 rows will actually still sit across your 25k physical partitions that are considered by spark to be 1 virtual partition.

使用 .repartition(1).cache() 引起随机播放(通常很糟糕)并在火花内存中持久化可能是一个好主意.因为当您 write 时,不是让 25k 映射器查看物理分区,它应该只导致 1 个映射器查看火花内存中的内容.那么write 就变得容易了.您还在处理一个小子集,因此任何改组都应该(希望)是可管理的.

Causing a shuffle (usually bad) and persisting in spark memory with .repartition(1).cache() is possibly a good idea here. Because instead of having the 25k mappers looking at the physical partitions when you write, it should only result in 1 mapper looking at what is in spark memory. Then write becomes easy. You’re also dealing with a small subset, so any shuffling should (hopefully) be manageable.

显然,这通常是不好的做法,并且不会改变 spark 在执行原始 sql 查询时可能想要运行 25k 映射器的事实.希望 sample 能解决这个问题.

Obviously this is usually bad practice, and doesn’t change the fact spark will probably want to run 25k mappers when it performs the original sql query. Hopefully sample takes care of that.

编辑以澄清改组、repartitioncoalesce

edit to clarify shuffling, repartition and coalesce

您在 4 节点集群的 16 个分区中有 2 个数据集.您想加入它们并在 16 个分区中写入一个新数据集.

You have 2 datasets in 16 partitions on a 4 node cluster. You want to join them and write as a new dataset in 16 partitions.

数据 1 的第 1 行可能在节点 1 上,数据 2 的第 1 行可能在节点 4 上.

Row 1 for data 1 might be on node 1, and row 1 for data 2 on node 4.

为了将这些行连接在一起,spark 必须物理移动其中一个或两个,然后写入新分区.

In order to join these rows together, spark has to physically move one, or both of them, then write to a new partition.

这是一个随机的,围绕集群物理移动数据.

That’s a shuffle, physically moving data around a cluster.

一切都按 16 进行分区并不重要,重要的是数据在集群上的位置.

It doesn’t matter that everything is partitioned by 16, what matters is where the data is sitting on he cluster.

data.repartition(4) 会将数据从每个节点的每 4 组分区物理移动到每个节点的 1 个分区.

data.repartition(4) will physically move data from each 4 sets of partitions per node into 1 partition per node.

Spark 可能会将所有 4 个分区从节点 1 移动到其他 3 个节点,在这些节点上的新单个分区中,反之亦然.

Spark might move all 4 partitions from node 1 over to the 3 other nodes, in a new single partition on those nodes, and vice versa.

我不认为它会这样做,但这是一个证明了这一点的极端案例.

I wouldn’t think it’d do this, but it’s an extreme case that demonstrates the point.

coalesce(4) 调用虽然不移动数据,但更聪明.相反,它识别我每个节点已经有 4 个分区 &总共 4 个节点……我只是将每个节点的所有 4 个分区称为单个分区,然后我将有 4 个分区!"

A coalesce(4) call though, doesn’t move the data, it’s much more clever. Instead, it recognises "I already have 4 partitions per node & 4 nodes in total... I’m just going to call all 4 of those partitions per node a single partition and then I’ll have 4 total partitions!"

所以它不需要移动任何数据,因为它只是将现有的分区组合成一个连接的分区.

So it doesn’t need to move any data because it just combines existing partitions into a joined partition.

这篇关于将火花数据帧写入单个镶木地板文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆