如何一次将每个小组发送给火花执行者? [英] How to send each group at a time to the spark executors?

查看:64
本文介绍了如何一次将每个小组发送给火花执行者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我无法一次将每组数据帧发送给执行者.

I'm unable to send each group of dataframe at a time to the executor.

我在company_model_vals_df dataframe中有如下数据.

 ----------------------------------------------------------------------------------------
 | model_id  |  fiscal_year  | fiscal_quarter | col1 | col2 | col3 | col4 | col5 | col6 |
 ----------------------------------------------------------------------------------------
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   2             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   2             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 ----------------------------------------------------------------------------------------

我想将每个分组的数据发送给执行程序,以一次处理每个数据.

I want to send each grouped data to executor, to process each one at a time.

为此,我正在执行以下操作:

For that I am doing as below:

var dist_company_model_vals_df =  company_model_vals_df.select("model_id","fiscal_quarter","fiscal_year").distinct()

// Want to send each group at a time to write by executors.

dist_company_model_vals_df.foreach(rowDf => {
  writeAsParquet(rowDf , parquet_file)    // this simply writes the data as parquet file
})

错误:

这将引发NullPointerException,因为在执行器端未找到rowDf.使用Scala 2.11在spark-sql中处理此问题的正确方法是什么?

This throws a NullPointerException as rowDf is not found on the Executor side. What is the correct way to handle this in spark-sql using Scala 2.11?

第2部分:问题

当我执行company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year")时,即使我增加了内存,数据也会大量泄漏到磁盘上. IE. company_model_vals_df是巨大的数据框...在进行groupBy时会发生大量溢出.

When i do company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year") the data is spilling a lot on disk even after i increased the memory. I.e. company_model_vals_df is huge dataframe ... lot of spilling happening when doing groupBy.

以下情况也是如此,即partitionBy

Same is the case below i.e. with partitionBy

company_model_vals_df.write.partitionBy("model_id",财政季度",财政年度")

company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year")

PSEDO代码: 所以为了避免是首先我会做元组 val组= company_model_vals_df.groupBy("model_id",财政季度",财政年度").collect

PSEDO CODE : So in order to avoid is first I would do tuples of val groups = company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").collect

groups.forEach{ group ->
   // I want to prepare child dataframes for each group from    company_model_vals_df

   val child_df = company_model_vals_df.where(model_id= group.model_id && fiscal_quarter === group.fiscal_quarter && etc)

 this child_df , i want wrote to a file i.e. saveAs(path)
}

反正有这样做吗? 这里有任何对我有用的spark函数或API吗? 请提出解决此问题的方法.

Is there anyway to do it. Any spark functions or API useful for me here? please suggest a way to resolve this.

推荐答案

如果我正确理解了您的问题,则希望分别处理每个"model_id","fiscal_quarter","fiscal_year"的数据.

If I understand your question correctly, you want to manipulate the data separately for each "model_id","fiscal_quarter","fiscal_year".

如果正确,则可以使用groupBy()进行操作,例如:

If that's correct, you would do it with a groupBy(), for example:

company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").agg(avg($"col1") as "average")

如果您要查找的是将每个逻辑组写入一个单独的文件夹,则可以通过编写以下内容来做到这一点:

If what you're looking for is to write each logical group into a separate folder, you can do that by writing:

company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year").parquet("path/to/save")

这篇关于如何一次将每个小组发送给火花执行者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆