从 spark rdd 收集大型数据集的最佳实践是什么? [英] What is the best practice to collect a large data set from spark rdd?

查看:42
本文介绍了从 spark rdd 收集大型数据集的最佳实践是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 pyspark 处理我的数据,最后我需要使用 rdd.collect() 从 rdd 收集数据.但是,由于内存问题,我的 spark 崩溃了.我尝试了很多方法,但没有运气.我现在使用以下代码运行,为每个分区处理一小块数据:

I am using pyspark to process my data and at the very end i need collect data from rdd using rdd.collect(). However, my spark crashes due to the memory problem. I tried a number of ways, but no luck. I am now running with the following code, process a small chunk of data for each partition:

def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter


for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    myCollection = part_rdd.collect()
    for row in myCollection:
          #Do something with each row

我当前使用的新代码不会崩溃,但似乎永远在运行.

The new code I am currently using does not crash, but seems running forever.

有没有更好的方法从大型 rdd 中收集数据?

Is there a better way to collect data from a large rdd?

推荐答案

我不知道这是否是最好的方法,但这是我尝试过的最好的方法.不知道是比你的好还是差.同样的想法,将其拆分为块,但您可以更灵活地设置块大小.

I don't know if this is the best way, but it's the best way I've tried. Not sure if it's better or worse than yours. Same idea, splitting it into chunks, but you can be more flexible with the chunk size.

def rdd_iterate(rdd, chunk_size=1000000):
    indexed_rows = rdd.zipWithIndex().cache()
    count = indexed_rows.count()
    print("Will iterate through RDD of count {}".format(count))
    start = 0
    end = start + chunk_size
    while start < count:
        print("Grabbing new chunk: start = {}, end = {}".format(start, end))
        chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect()
        for row in chunk:
            yield row[0]
        start = end
        end = start + chunk_size

我想将一个巨大的 RDD 附加到磁盘上的 CSV 文件而不用整个 RDD 填充 Python 列表的示例用法:

Example usage where I want to append a huge RDD to a CSV file on disk without ever populating a Python list with the entire RDD:

def rdd_to_csv(fname, rdd):
    import csv
    f = open(fname, "a")
    c = csv.writer(f)
    for row in rdd_iterate(rdd): # with abstraction, iterates through entire RDD
        c.writerows([row])
    f.close()

rdd_to_csv("~/test.csv", my_really_big_rdd)

这篇关于从 spark rdd 收集大型数据集的最佳实践是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆