如何广播数据帧? [英] How to broadcast a DataFrame?

查看:25
本文介绍了如何广播数据帧?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 spark-sql-2.4.1 版本.创建如下广播变量

I am using spark-sql-2.4.1 version. creating a broadcast variable as below

Broadcast<Map<String,Dataset>> bcVariable = javaSparkContext.broadcast(//read dataset);

我将 bcVariable 传递给函数

Me passing the bcVariable to a function

Service.calculateFunction(sparkSession, bcVariable.getValue());


public   static class Service {
        public static calculateFunction(
          SparkSession sparkSession,
          Map<String, Dataset> dataSet ) {

        System.out.println("---> size : " + dataSet.size());  //printing size 1


        for( Entry<String, Dataset> aEntry : dataSet.entrySet() ) {
           System.out.println( aEntry.getKey());   //  printing key 
            aEntry.getValue().show()   // throw null pointer exception
           }
    }

这里有什么问题?如何在函数中传递数据集/数据框?

What is wrong here ? how to pass a dataset/dataframe in the function?

尝试 2 :

Broadcast<Dataset> bcVariable = javaSparkContext.broadcast(//read dataset);

我将 bcVariable 传递给函数

Me passing the bcVariable to a function

 Service.calculateFunction(sparkSession, bcVariable.getValue());

公共静态类服务{公共静态计算函数(SparkSession sparkSession,数据集数据集){

public static class Service { public static calculateFunction( SparkSession sparkSession, Dataset dataSet ) {

    System.out.println("---> size : " + dataSet.size());  // throwing null pointer exception.



}

这里有什么问题?如何在函数中传递数据集/数据框?

What is wrong here ? how to pass a dataset/dataframe in the function?

尝试 3 :

Dataset metaData = //read dataset from oracle table i.e. meta-data.

我将元数据传递给函数

Service.calculateFunction(sparkSession, metaData);

Service.calculateFunction(sparkSession, metaData );

public   static class Service {
        public static calculateFunction(
          SparkSession sparkSession,
          Dataset metaData ) {

        System.out.println("---> size : " + metaData.size());  // throwing null pointer exception.



    }

这里有什么问题?如何在函数中传递数据集/数据框?

What is wrong here ? how to pass a dataset/dataframe in the function?

推荐答案

要广播的值必须是任何 Scala 对象,但不能是 DataFrame.

The value to be broadcast has to be any Scala object but not a DataFrame.

Service.calculateFunction(sparkSession, metaData) 在 executors 上执行,因此 metaData 是 null(因为它没有序列化并通过线路从驱动程序发送到执行程序).

Service.calculateFunction(sparkSession, metaData) is executed on executors and hence metaData is null (as it was not serialized and sent over the wire from the driver to executors).

广播[T](值:T):广播[T]

向集群广播一个只读变量,返回一个 org.apache.spark.broadcast.Broadcast 对象,用于在分布式函数中读取它.变量只会被发送到每个集群一次.

Broadcast a read-only variable to the cluster, returning a org.apache.spark.broadcast.Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

DataFrame 数据抽象化来表示用类似 SQL 的语言(数据集 API 或 SQL)描述的分布式计算.除了在可以提交计算以供执行的驱动程序(作为执行程序上的任务)之外的任何地方使用它根本没有任何意义.

Think of DataFrame data abstraction to represent a distributed computation that is described in a SQL-like language (Dataset API or SQL). It simply does not make any sense to have it anywhere but on the driver where computations can be submitted for execution (as tasks on executors).

您只需要转换"此计算使用 DataFrame.collect 表示的数据(在 DataFrame 术语中).

You simply have to "convert" the data this computation represents (in DataFrame terms) using DataFrame.collect.

收集数据后,您可以使用.value 方法对其进行广播和引用.

Once you collected the data you can broadcast it and reference using .value method.

代码如下:

val dataset = // reading data
Broadcast<Map<String,Dataset>> bcVariable = 
  javaSparkContext.broadcast(dataset.collect);
Service.calculateFunction(sparkSession, bcVariable.getValue());

与您的代码相比,唯一的变化是 collect.

The only change compared to your code is collect.

这篇关于如何广播数据帧?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆