如何在不使用合并的情况下在本地系统的单个文件中写入火花数据帧 [英] How to write spark dataframe in a single file in local system without using coalesce

查看:57
本文介绍了如何在不使用合并的情况下在本地系统的单个文件中写入火花数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从 pyspark 数据帧生成一个 avro 文件,目前我正在做 coalesce 如下

I want to generate an avro file from a pyspark dataframe and currently I am doing coalesce as below

df = df.coalesce(1)
df.write.format('avro').save('file:///mypath')

但这会导致内存问题,因为所有数据在写入之前都会被提取到内存中,而且我的数据大小每天都在持续增长.所以我想按每个分区写入数据,以便数据以块的形式写入磁盘并且不会引发 OOM 问题.我发现 toLocalIterator 有助于实现这一点.但我不确定如何使用它.我尝试了以下用法并返回所有行

But this is leading to memory issues now as all the data will be fetched to memory before writing and my data size is growing consistently everyday. So I want to write the data by each partition so that the data would be written to disk in chunks and doesnot raise OOM issues. I found that toLocalIterator helps in achieving this. But I am not sure how to use it. I tried the below usage and it returns all rows

iter = df.toLocalIterator()
for i in iter:
    print('writing some data')
    # write the data into disk/file

迭代器正在迭代每一行而不是每个分区.我该怎么做?

The iter is iterating over each row rather than each partition. How should I do this?

推荐答案

当你做 df = df.coalesce(1)所有数据都收集到其中一个工作节点中.如果该节点由于节点上的资源限制而无法处理如此庞大的任务,则作业将因 OOM 错误而失败.

when you do df = df.coalesce(1) all the data is collected into one of the worker nodes. if that node cannot handle such huge due to resource constraints on the node then the job will fail with OOM error.

根据 spark 文档 toLocalIterator 返回包含当前数据集中所有行的迭代器它可以消耗的最大内存相当于此数据集中的最大分区

As per spark documentation toLocalIterator Returns an iterator that contains all rows in this current Dataset and Max Memory It can consume is equivalent to largest partition in this Dataset

toLocalIterator 如何工作?

第一个分区被发送到驱动程序.如果继续迭代并到达第一个分区的末尾,第二个分区将被发送到驱动程序节点,依此类推,直到最后一个分区..这就是为什么(它可以占用的最大内存=最大分区)确保你的主节点有足够的内存和磁盘.

The first partition is sent to the driver. If you continue to iterate and reach the end of the first partition, the second partition will be sent to the driver node and so on continuous till last partition.. so that is why (max memory it can occupy = largest partition) make sure your master node has sufficient ram and disk.

toLocalIterator.next() 方法确保在前一个分区处理完成后提取下一个分区记录.

what you can do is 

    //batch objects like 1000 per batch
    df.toLocalIterator().foreach(obj => {
      //add object in array
      //if batch size is reached ... 
        //then serialize them and use FileOutputStream and save in local location 
    })

注意:确保缓存您的 parentDF .. 否则在某些情况下每个分区都需要重新计算.

note: make sure to cache your parentDF .. otherwise in some scenarios every partition needs to be recomputed.

这篇关于如何在不使用合并的情况下在本地系统的单个文件中写入火花数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆