如何一次将每个组发送到 Spark 执行器? [英] How to send each group at a time to the spark executors?

查看:24
本文介绍了如何一次将每个组发送到 Spark 执行器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我无法一次将每组数据帧发送给执行程序.

I'm unable to send each group of dataframe at a time to the executor.

我在 company_model_vals_df dataframe 中有如下数据.

I have a data as below in company_model_vals_df dataframe.

 ----------------------------------------------------------------------------------------
 | model_id  |  fiscal_year  | fiscal_quarter | col1 | col2 | col3 | col4 | col5 | col6 |
 ----------------------------------------------------------------------------------------
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   2             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   2             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 ----------------------------------------------------------------------------------------

我想把每个分组的数据发送给执行器,一次处理一个.

I want to send each grouped data to executor, to process each one at a time.

为此,我的做法如下:

var dist_company_model_vals_df =  company_model_vals_df.select("model_id","fiscal_quarter","fiscal_year").distinct()

// Want to send each group at a time to write by executors.

dist_company_model_vals_df.foreach(rowDf => {
  writeAsParquet(rowDf , parquet_file)    // this simply writes the data as parquet file
})

错误:

这会引发 NullPointerException,因为在 Executor 端找不到 rowDf.使用 Scala 2.11 在 spark-sql 中处理此问题的正确方法是什么?

This throws a NullPointerException as rowDf is not found on the Executor side. What is the correct way to handle this in spark-sql using Scala 2.11?

第 2 部分:问题

当我执行 company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year") 时,即使我增加了内存,数据也会在磁盘上大量溢出.IE.company_model_vals_df 是巨大的数据框......在执行 groupBy 时发生了很多溢出.

When i do company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year") the data is spilling a lot on disk even after i increased the memory. I.e. company_model_vals_df is huge dataframe ... lot of spilling happening when doing groupBy.

下面的情况也是如此,即使用 partitionBy

Same is the case below i.e. with partitionBy

company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year")

company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year")

PSEDO 代码:所以为了避免首先我会做元组val groups = company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").collect

PSEDO CODE : So in order to avoid is first I would do tuples of val groups = company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").collect

groups.forEach{ group ->
   // I want to prepare child dataframes for each group from    company_model_vals_df

   val child_df = company_model_vals_df.where(model_id= group.model_id && fiscal_quarter === group.fiscal_quarter && etc)

 this child_df , i want wrote to a file i.e. saveAs(path)
}

有没有办法做到这一点.任何对我有用的火花函数或 API 在这里?请提出解决此问题的方法.

Is there anyway to do it. Any spark functions or API useful for me here? please suggest a way to resolve this.

推荐答案

如果我正确理解你的问题,你想分别操作每个"model_id","fiscal_quarter","fiscal_year".

If I understand your question correctly, you want to manipulate the data separately for each "model_id","fiscal_quarter","fiscal_year".

如果这是正确的,您可以使用 groupBy(),例如:

If that's correct, you would do it with a groupBy(), for example:

company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").agg(avg($"col1") as "average")

如果您要查找的是将每个逻辑组写入一个单独的文件夹,您可以这样写:

If what you're looking for is to write each logical group into a separate folder, you can do that by writing:

company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year").parquet("path/to/save")

这篇关于如何一次将每个组发送到 Spark 执行器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆