保存到分区实木复合地板文件时实现并发 [英] Achieve concurrency when saving to a partitioned parquet file

查看:68
本文介绍了保存到分区实木复合地板文件时实现并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用partitionBydataframe写入parquet时:

df.write.partitionBy("col1","col2","col3").parquet(path)

我希望每个写入的分区都是由单独的任务独立完成的,并且与分配给当前Spark作业的工人数量的范围平行.

It would be my expectation that each partition being written were done independently by a separate task and in parallel to the extent of the number of workers assigned to the current spark job.

但是,在写入实木复合地板时,实际上一次只能运行一个工作人员/任务.那个工人正在循环遍历每个分区并依次写出.parquet文件.为什么会这样-在这种spark.write.parquet操作中是否有一种方法可以强制并发?

However there is actually only one worker/task running at a time when writing to the parquet. That one worker is cycling through each of the partitions and writing out the .parquet files serially. Why would this be the case - and is there a way to compel concurrency in this spark.write.parquet operation?

以下是不是我想看到的(应该是700%+ ..)

The following is not what I want to see (should be 700%+ ..)

在另一篇文章中,我也尝试在前面添加repartition

From this other post I also tried adding repartition in front

Spark镶木地板分区:大量文件

df.repartition("col1","col2","col3").write.partitionBy("col1","col2","col3").parquet(path)

不幸的是,这没有任何效果:仍然只有一个工人.

This unfortunately had no effect: still one worker only..

注意:我在local模式下使用local[8]运行,并且看到 other spark操作运行多达8个并发工作程序,并占用了750%的CPU.

Note: I am running in local mode with local[8] and have seen other spark operations run with as many as eight concurrent workers and using up to 750% of the cpus.

推荐答案

简而言之,从一个任务编写多个输出文件不会并行执行,但是假设您有多个任务(多个输入拆分),则每个任务都会得到他们自己在工人身上的核心.

In short, writing the multiple output files from a single task is not parallelized, but assuming you have multiple tasks (multiple input splits) each one of those will get their own core on a worker.

写出分区数据的目的不是并行化写操作. Spark已经通过同时写出多个任务来做到这一点.目标只是优化将来的读取操作,在该操作中您只需要一个分区的已保存数据.

The goal of writing out partitioned data isn't to parallelize your writing operation. Spark is already doing that by simultaneously writing out multiple tasks at once. The goal is just to optimize future read operations where you want only one partition of the saved data.

在Spark中写入分区的逻辑旨在将前一阶段的所有记录写到目的地时仅读取一次.我相信设计选择的一部分还在于防止出现以下情况: 分区键有很多值.

The logic to write partitions in Spark is designed to read all of the records from the previous stage only once when writing them out to their destination. I believe part of the design choice is also to protect against the case where a partition key has many many values.

Spark 2.x方法

在Spark 2.x中,它按其分区键对每个任务中的记录进行排序,然后遍历它们,一次写入一个文件句柄.我假设他们这样做是为了确保如果分区键中有很多不同的值,它们永远不会打开大量的文件句柄.

In Spark 2.x, it sorts the records in each task by their partition keys, then it iterates through them writing to one file handle at a time. I assume they are doing this to ensure they never open a huge number of file handles if there are a lot of distinct values in your partition keys.

作为参考,这里是排序:

For reference, here is the sorting:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala #L121

向下滚动一点,您会看到它调用write(iter.next())遍历每一行.

Scroll down a little and you will see it calling write(iter.next()) looping through each row.

这是实际的写入内容(一次一个文件/分区键):

And here is the actual writing (one file/partition key at a time):

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala #L121

您可以看到它一次只打开一个文件句柄.

There you can see it only holds one file handle open at a time.

Spark 1.x方法

spark 1.x对给定任务的作用是遍历所有记录,只要遇到新的输出分区(此任务之前从未见过)就打开一个新的文件句柄.然后,它将立即将记录写入该文件句柄,然后转到下一个.这意味着在任何给定时间,在处理单个任务时,仅针对一个任务(其中N是最大输出分区数),最多可以打开N个文件句柄.为了使它更清楚,这是一些python psuedo代码,以显示一般思想:

What spark 1.x does is for a given task is loop through all the records, opening a new file handle whenever it encounters a new output partition it hasn't seen before for this task. It then immediately writes the record to that file handle and goes onto the next one. This means at any given time while processing a single task it can have up to N file handles open just for that one task where N is the maximum number of output partitions. To make it clearer, here is some python psuedo-code to show the general idea:

# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
    partition_path = determine_output_path(row, partition_keys)
    if partition_path not in handles:
        handles[partition_path] = open(partition_path, 'w')

    handles[partition_path].write(row)

以上记录写策略的警告.在spark 1.x中,设置spark.sql.sources.maxConcurrentWrites对可以按任务打开的掩码文件句柄设置了上限.达到此目标后,Spark会改为按分区键对数据进行排序,因此它可以遍历记录,一次写出一个文件.

There is a caveat to the above strategy for writing out records. In spark 1.x the setting spark.sql.sources.maxConcurrentWrites put an upper limit on the mask file handles that could be open per task. After that was reached, Spark would instead sort the data by the partition key, so it could iterate through the records writing out one file at a time.

这篇关于保存到分区实木复合地板文件时实现并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆