Spark:如何在不占用内存的情况下收集大量数据 [英] Spark: How collect large amount of data without out of memory

查看:255
本文介绍了Spark:如何在不占用内存的情况下收集大量数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下问题:

我对HDFS上的一组镶木地板文件执行sql查询,然后进行收集以获取结果.

I do a sql query over a set of parquet files on HDFS and then I do a collect in order to get the result.

问题是,当有很多行时,出现内存不足错误.

The problem is that when there are many rows I get an out of memory error.

此查询需要改组,因此我无法对每个文件进行查询.

This query requires shuffling so I can not do the query on each file.

一种解决方案可能是遍历列的值并将结果保存在磁盘上:

One solution could be to iterate over the values of a column and save the result on disk:

df = sql('original query goes here')
// data = collect(df) <- out of memory
createOrReplaceTempView(df, 't')
for each c in cities
    x = collect(sql("select * from t where city = c")
    append x to file

据我所知,这将导致该程序花费太多时间,因为将针对每个城市执行查询.

As far as I know it will result in the program taking too much time because the query will be executed for each city.

做到这一点的最佳方法是什么?

What is the best way of doing this?

推荐答案

正如@ cricket_007所说,我不会 collect()您的Spark数据附加到R中的文件中.此外,遍历 SparkR :: distinct()城市列表,然后从这些表中选择所有内容只是将它们附加到某些输出数据集,这是没有意义的.唯一需要这样做的是,如果您尝试基于某种条件逻辑在每个组中执行另一项操作,或者使用SparkR中不提供的功能将操作应用于每个组.

As @cricket_007 said, I would not collect() your data from Spark to append it to a file in R. Additionally, it doesn't make sense to iterate over a list of SparkR::distinct() cities and then select everything from those tables just to append them to some output dataset. The only time you would want to do that is if you are trying to do another operation within each group based upon some sort of conditional logic or apply an operation to each group using a function that is NOT available in SparkR.

我认为您正在尝试获取数据框(Spark或R),并将观察结果按某种方式分组,以便在查看它们时,一切都很漂亮.为此,在第一个SQL查询中添加 GROUP BY city 子句.从那里,只需将数据写回到HDFS或其他输出目录即可.据我对您的问题的了解,也许做这样的事情会有所帮助:

I think you are trying to get a data frame (either Spark or R) with observations grouped in a way so that when you look at them, everything is pretty. To do that, add a GROUP BY city clause to your first SQL query. From there, just write the data back out to HDFS or some other output directory. From what I understand about your question, maybe doing something like this will help:

sdf <- SparkR::sql('SELECT SOME GREAT QUERY FROM TABLE GROUP BY city')

SparkR::write.parquet(sdf, path="path/to/desired/output/location", mode="append")

这会将所有数据集中在一个文件中,并且应按 city 分组,这是我认为您正在尝试对问题中的第二个查询进行分组的内容.

This will give you all your data in one file, and it should be grouped by city, which is what I think you are trying to get with your second query in your question.

您可以通过以下方式确认输出是您想要的:

You can confirm the output is what you want via:

newsdf<- SparkR::read.parquet(x="path/to/first/output/location/")
View(head(sdf, num=200))

祝您好运,希望对您有所帮助.

Good luck, hopefully this helps.

这篇关于Spark:如何在不占用内存的情况下收集大量数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆