如何广播一个DataFrame? [英] How to broadcast a DataFrame?

查看:78
本文介绍了如何广播一个DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用spark-sql-2.4.1版本.创建如下的广播变量

I am using spark-sql-2.4.1 version. creating a broadcast variable as below

Broadcast<Map<String,Dataset>> bcVariable = javaSparkContext.broadcast(//read dataset);

我将bcVariable传递给函数

Me passing the bcVariable to a function

Service.calculateFunction(sparkSession, bcVariable.getValue());


public   static class Service {
        public static calculateFunction(
          SparkSession sparkSession,
          Map<String, Dataset> dataSet ) {

        System.out.println("---> size : " + dataSet.size());  //printing size 1


        for( Entry<String, Dataset> aEntry : dataSet.entrySet() ) {
           System.out.println( aEntry.getKey());   //  printing key 
            aEntry.getValue().show()   // throw null pointer exception
           }
    }

这是怎么了?如何在函数中传递数据集/数据框?

What is wrong here ? how to pass a dataset/dataframe in the function?

尝试2:

Broadcast<Dataset> bcVariable = javaSparkContext.broadcast(//read dataset);

我将bcVariable传递给函数

Me passing the bcVariable to a function

 Service.calculateFunction(sparkSession, bcVariable.getValue());

公共静态类服务{公共静态calculateFunction(SparkSession sparkSession,数据集dataSet){

public static class Service { public static calculateFunction( SparkSession sparkSession, Dataset dataSet ) {

    System.out.println("---> size : " + dataSet.size());  // throwing null pointer exception.



}

这是怎么了?如何在函数中传递数据集/数据框?

What is wrong here ? how to pass a dataset/dataframe in the function?

尝试3:

Dataset metaData = //read dataset from oracle table i.e. meta-data.

我将metaData传递给函数

Me passing the metaData to a function

Service.calculateFunction(sparkSession,metaData);

Service.calculateFunction(sparkSession, metaData );

public   static class Service {
        public static calculateFunction(
          SparkSession sparkSession,
          Dataset metaData ) {

        System.out.println("---> size : " + metaData.size());  // throwing null pointer exception.



    }

这是怎么了?如何在函数中传递数据集/数据框?

What is wrong here ? how to pass a dataset/dataframe in the function?

推荐答案

要广播的值必须是任何Scala对象,而不是 DataFrame .

The value to be broadcast has to be any Scala object but not a DataFrame.

Service.calculateFunction(sparkSession,metaData)在执行程序上执行,因此metaData为 null (因为它没有被序列化并通过电线从驱动程序发送给执行程序)).

Service.calculateFunction(sparkSession, metaData) is executed on executors and hence metaData is null (as it was not serialized and sent over the wire from the driver to executors).

广播[T](值:T):广播[T]

向集群广播一个只读变量,返回一个 org.apache.spark.broadcast.Broadcast 对象,用于在分布式函数中读取该对象.该变量将仅发送到每个集群一次.

Broadcast a read-only variable to the cluster, returning a org.apache.spark.broadcast.Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

想想 DataFrame 数据抽象来表示一种分布式计算,该计算以类似SQL的语言(Dataset API或SQL)进行描述.根本没有任何意义,而是将其放在可以提交计算以供执行的驱动程序上(作为执行程序上的任务).

Think of DataFrame data abstraction to represent a distributed computation that is described in a SQL-like language (Dataset API or SQL). It simply does not make any sense to have it anywhere but on the driver where computations can be submitted for execution (as tasks on executors).

您只需要转换"此计算使用 DataFrame.collect 表示(以 DataFrame 术语表示)的数据.

You simply have to "convert" the data this computation represents (in DataFrame terms) using DataFrame.collect.

一旦您收集了数据,就可以使用 .value 方法对其进行广播和引用.

Once you collected the data you can broadcast it and reference using .value method.

代码如下:

val dataset = // reading data
Broadcast<Map<String,Dataset>> bcVariable = 
  javaSparkContext.broadcast(dataset.collect);
Service.calculateFunction(sparkSession, bcVariable.getValue());

与您的代码相比,唯一的变化是收集.

The only change compared to your code is collect.

这篇关于如何广播一个DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆