如何一次将每个小组发送给火花执行者? [英] How to send each group at a time to the spark executors?
问题描述
我无法一次将每组数据帧发送给执行者.
I'm unable to send each group of dataframe at a time to the executor.
我在company_model_vals_df dataframe
中有如下数据.
----------------------------------------------------------------------------------------
| model_id | fiscal_year | fiscal_quarter | col1 | col2 | col3 | col4 | col5 | col6 |
----------------------------------------------------------------------------------------
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 2 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 2 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
----------------------------------------------------------------------------------------
我想将每个分组的数据发送给执行程序,以一次处理每个数据.
I want to send each grouped data to executor, to process each one at a time.
为此,我正在执行以下操作:
For that I am doing as below:
var dist_company_model_vals_df = company_model_vals_df.select("model_id","fiscal_quarter","fiscal_year").distinct()
// Want to send each group at a time to write by executors.
dist_company_model_vals_df.foreach(rowDf => {
writeAsParquet(rowDf , parquet_file) // this simply writes the data as parquet file
})
错误:
这将引发NullPointerException,因为在执行器端未找到rowDf.使用Scala 2.11在spark-sql中处理此问题的正确方法是什么?
This throws a NullPointerException as rowDf is not found on the Executor side. What is the correct way to handle this in spark-sql using Scala 2.11?
第2部分:问题
当我执行company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year")时,即使我增加了内存,数据也会大量泄漏到磁盘上. IE. company_model_vals_df是巨大的数据框...在进行groupBy时会发生大量溢出.
When i do company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year") the data is spilling a lot on disk even after i increased the memory. I.e. company_model_vals_df is huge dataframe ... lot of spilling happening when doing groupBy.
以下情况也是如此,即partitionBy
Same is the case below i.e. with partitionBy
company_model_vals_df.write.partitionBy("model_id",财政季度",财政年度")
company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year")
PSEDO代码: 所以为了避免是首先我会做元组 val组= company_model_vals_df.groupBy("model_id",财政季度",财政年度").collect
PSEDO CODE : So in order to avoid is first I would do tuples of val groups = company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").collect
groups.forEach{ group ->
// I want to prepare child dataframes for each group from company_model_vals_df
val child_df = company_model_vals_df.where(model_id= group.model_id && fiscal_quarter === group.fiscal_quarter && etc)
this child_df , i want wrote to a file i.e. saveAs(path)
}
反正有这样做吗? 这里有任何对我有用的spark函数或API吗? 请提出解决此问题的方法.
Is there anyway to do it. Any spark functions or API useful for me here? please suggest a way to resolve this.
推荐答案
如果我正确理解了您的问题,则希望分别处理每个"model_id","fiscal_quarter","fiscal_year"
的数据.
If I understand your question correctly, you want to manipulate the data separately for each "model_id","fiscal_quarter","fiscal_year"
.
如果正确,则可以使用groupBy()
进行操作,例如:
If that's correct, you would do it with a groupBy()
, for example:
company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").agg(avg($"col1") as "average")
如果您要查找的是将每个逻辑组写入一个单独的文件夹,则可以通过编写以下内容来做到这一点:
If what you're looking for is to write each logical group into a separate folder, you can do that by writing:
company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year").parquet("path/to/save")
这篇关于如何一次将每个小组发送给火花执行者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!