在 Spark 1.6 中加入数据帧时没有发生广播 [英] Broadcast not happening while joining dataframes in Spark 1.6

查看:27
本文介绍了在 Spark 1.6 中加入数据帧时没有发生广播的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是我正在运行的示例代码.当这个 spark 作业运行时,Dataframe 连接是使用 sortmergejoin 而不是 broadcastjoin 进行的.

Below is the sample code that I am running. when this spark job runs, Dataframe joins are happening using sortmergejoin instead of broadcastjoin.

def joinedDf (sqlContext: SQLContext,
              txnTable:   DataFrame,
              countriesDfBroadcast: Broadcast[DataFrame]): 
              DataFrame = {
                    txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
                    $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
              }
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")  

即使我在 join 语句中指定了 broadcast() 提示,也不会发生广播连接.

The broadcastjoin is not happening even when I specify a broadcast() hint in the join statement.

优化器正在对数据帧进行哈希分区,并导致数据倾斜.

The optimizer is hashpartitioning the dataframe and it is causing data skew.

有人见过这种行为吗?

我使用 Spark 1.6 和 HiveContext 作为 SQLContext 在 yarn 上运行它.spark 作业在 200 个执行程序上运行.txnTable的数据大小为240GB,countriesDf的数据大小为5mb.

I am running this on yarn using Spark 1.6 and HiveContext as SQLContext. The spark job runs on 200 executors. and the data size of the txnTable is 240GB and the datasize of countriesDf is 5mb.

推荐答案

广播 DataFrame 的方式和访问它的方式都不正确.

Both the way how you broadcast DataFrame and how you access it are incorrect.

  • 标准广播不能用于处理分布式数据结构.如果你想在 DataFrame 上执行广播连接,你应该使用 broadcast 函数标记给定的 DataFrame 用于广播:

  • Standard broadcasts cannot be used to handle distributed data structures. If you want to perform broadcast join on a DataFrame you should use broadcast functions which marks given DataFrame for broadcasting:

import org.apache.spark.sql.functions.broadcast

val countriesDf: DataFrame = ???
val tmp: DataFrame = broadcast(
  countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries")
) 

txnTable.as("df1").join(
  broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")

在内部它会collect tmp 而不会从内部转换然后广播.

Internally it will collect tmp without converting from internal and broadcast afterwards.

join 参数会被急切地求值.甚至可以使用具有分布式数据结构的 SparkContext.broadcast 广播值在调用 join 之前在本地进行评估.这就是为什么您的函数完全可以工作但不执行广播连接的原因.

join arguments are eagerly evaluated. Even it was possible to use SparkContext.broadcast with distributed data structure broadcast value is evaluated locally before join is called. Thats' why your function work at all but doesn't perform broadcast join.

这篇关于在 Spark 1.6 中加入数据帧时没有发生广播的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆