如何一次将每个组发送到 Spark 执行器? [英] How to send each group at a time to the spark executors?
问题描述
我无法一次将每组数据帧发送给执行程序.
I'm unable to send each group of dataframe at a time to the executor.
我在 company_model_vals_df dataframe
中有如下数据.
I have a data as below in company_model_vals_df dataframe
.
----------------------------------------------------------------------------------------
| model_id | fiscal_year | fiscal_quarter | col1 | col2 | col3 | col4 | col5 | col6 |
----------------------------------------------------------------------------------------
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 2 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 2 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
----------------------------------------------------------------------------------------
我想把每个分组的数据发送给执行器,一次处理一个.
I want to send each grouped data to executor, to process each one at a time.
为此,我的做法如下:
var dist_company_model_vals_df = company_model_vals_df.select("model_id","fiscal_quarter","fiscal_year").distinct()
// Want to send each group at a time to write by executors.
dist_company_model_vals_df.foreach(rowDf => {
writeAsParquet(rowDf , parquet_file) // this simply writes the data as parquet file
})
错误:
这会引发 NullPointerException,因为在 Executor 端找不到 rowDf.使用 Scala 2.11 在 spark-sql 中处理此问题的正确方法是什么?
This throws a NullPointerException as rowDf is not found on the Executor side. What is the correct way to handle this in spark-sql using Scala 2.11?
第 2 部分:问题
当我执行 company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year") 时,即使我增加了内存,数据也会在磁盘上大量溢出.IE.company_model_vals_df 是巨大的数据框......在执行 groupBy 时发生了很多溢出.
When i do company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year") the data is spilling a lot on disk even after i increased the memory. I.e. company_model_vals_df is huge dataframe ... lot of spilling happening when doing groupBy.
下面的情况也是如此,即使用 partitionBy
Same is the case below i.e. with partitionBy
company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year")
company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year")
PSEDO 代码:所以为了避免首先我会做元组val groups = company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").collect
PSEDO CODE : So in order to avoid is first I would do tuples of val groups = company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").collect
groups.forEach{ group ->
// I want to prepare child dataframes for each group from company_model_vals_df
val child_df = company_model_vals_df.where(model_id= group.model_id && fiscal_quarter === group.fiscal_quarter && etc)
this child_df , i want wrote to a file i.e. saveAs(path)
}
有没有办法做到这一点.任何对我有用的火花函数或 API 在这里?请提出解决此问题的方法.
Is there anyway to do it. Any spark functions or API useful for me here? please suggest a way to resolve this.
推荐答案
如果我正确理解你的问题,你想分别操作每个"model_id","fiscal_quarter","fiscal_year"
.
If I understand your question correctly, you want to manipulate the data separately for each "model_id","fiscal_quarter","fiscal_year"
.
如果这是正确的,您可以使用 groupBy()
,例如:
If that's correct, you would do it with a groupBy()
, for example:
company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").agg(avg($"col1") as "average")
如果您要查找的是将每个逻辑组写入一个单独的文件夹,您可以这样写:
If what you're looking for is to write each logical group into a separate folder, you can do that by writing:
company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year").parquet("path/to/save")
这篇关于如何一次将每个组发送到 Spark 执行器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!