Apache Spark:如何构造Spark应用程序的代码(尤其是在使用广播时) [英] Apache Spark: How to structure code of a Spark Application (especially when using Broadcasts)

查看:79
本文介绍了Apache Spark:如何构造Spark应用程序的代码(尤其是在使用广播时)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个关于Java Spark应用程序中代码结构的通用问题.我想将实现Spark转换的代码与调用RDD分开,这样即使使用很多包含很多代码行的转换,应用程序的源代码也保持清晰.

I have a generic question concerning the structuring of code in Java Spark applications. I want to separate the code for the implementation of Spark transformations from the calling on RDDs so the source code of the application stays clear even when using lots of transformations containing lots of lines of code.

我先给你一个简短的例子.在这种情况下,flatMap转换的实现作为匿名内部类提供.这是一个简单的应用程序,它读取整数的RDD,然后将每个元素乘以一个整数数组,该数组在以下情况之前已广播到所有工作程序节点:

I'll give you a short example first. In this scenario the implementation of a flatMap transformation is provided as an anonymous inner class. This is a simple application that reads an RDD of integers and then multiplies each element to an integer array which was broadcasted to all worker nodes before:

public static void main(String[] args) {

    SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9));

    final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 });

    result = result.flatMap(new FlatMapFunction<Integer, Integer>() {
        public Iterable<Integer> call(Integer t) throws Exception {
            int[] values = factors.value();
            LinkedList<Integer> result = new LinkedList<Integer>();
            for (int value : values) result.add(t * value);
            return result;
        }
    });

    System.out.println(result.collect());   // [5, 10, 15, 8, 16, 24, 9, 18, 27]

    sc.close();
}

为了构造代码,我将Spark函数的实现提取到了另一个类.类SparkFunctions提供了flatMap转换的实现,并具有setter方法来获取对广播变量的引用(...在我的实际场景中,该类中将有很多操作都可以访问广播数据)

In order to structure code I have extracted the implementation of the Spark functions to a different class. The class SparkFunctions provides the implementation for the flatMap transformation and has a setter method to get a reference to the broadcast variable (...in my real-world scenario there would be many operations in this class which all access the broadcasted data).

我已经体验到表示Spark转换的方法可以是静态的,只要它不访问Broadcast变量或Accumulator变量即可.为什么?静态方法只能访问静态属性.对Broadcast变量的静态引用始终为null(可能是因为Spark将类SparkFunctions发送到工作节点时未序列化).

I have experienced that a method representing a Spark transformation can be static as long as it is not accessing a Broadcast variable or an Accumulator variable. Why? Static methods can only access static attributes. A static reference to a Broadcast variable is always null (probably as it is not serialized when Spark sends the class SparkFunctions to the worker nodes).

@SuppressWarnings("serial")
public class SparkFunctions implements Serializable {

    private Broadcast<int[]> factors;

    public SparkFunctions() {
    }

    public void setFactors(Broadcast<int[]> factors) {
        this.factors = factors;
    }

    public final FlatMapFunction<Integer, Integer> myFunction = new FlatMapFunction<Integer, Integer>() {
        public Iterable<Integer> call(Integer t) throws Exception {
            int[] values = factors.value();
            LinkedList<Integer> result = new LinkedList<Integer>();
            for (int value : values) result.add(t * value);
            return result;
        }
    };

}

这是使用类SparkFunctions的应用程序的第二个版本:

This is the second version of the application using the class SparkFunctions:

public static void main(String[] args) {

    SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9));

    final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 });

    // 1) Initializing
    SparkFunctions functions = new SparkFunctions();

    // 2) Pass reference of broadcast variable
    functions.setFactors(factors);

    // 3) Implementation is now in the class SparkFunctions
    result = result.flatMap(functions.myFunction);

    System.out.println(result.collect());   // [5, 10, 15, 8, 16, 24, 9, 18, 27]

    sc.close();
}

应用程序的两个版本都可以正常工作(在本地和群集设置中),但是我问它们是否同样有效.

Both versions of the application are working (locally and in a cluster setup) but I am asking if they are equally efficient.

问题1 :在我看来,Spark会序列化包含广播变量的类SparkFunctions并将其发送到辅助节点,以便节点可以在其任务中使用该功能.数据是否先发送两次两次到工作节点,然后使用SparkContext进行广播,然后再次使用类SparkFunctions进行序列化?还是每个元素发送一次(广播加1)?

Question 1: In my opinion, Spark serializes the class SparkFunctions including the Broadcast variable and sends it to the worker nodes so that the nodes can use the function in their tasks. Is the data sent twice to the worker nodes, first on the broadcast using SparkContext, and then another time on the serialization of the class SparkFunctions? Or is it even sent once per element (plus 1 for the broadcast)?

问题2 :您能否向我提供有关如何以其他方式构造源代码的建议?

Question 2: Can you provide me with suggestions on how the source code might be structured otherwise?

请不要提供如何防止广播的解决方案.我有一个实际的应用程序,它要复杂得多.

Please don't provide solutions how to prevent a Broadcast. I have a real-world application which is much more complex.

我发现的类似问题并没有真正帮助:

Similar questions that I have found which were not really helpful:

  • Spark Java Code Structure
  • BroadCast Variables In Spark
  • Spark: passing broadcast variable to executors

提前感谢您的帮助!

推荐答案

这与Question1有关

提交Spark作业后,这些作业被分为阶段->任务.这些任务实际上是在工作程序节点上执行转换和操作的执行.驱动程序的sumbitTask()会将有关广播变量的功能和元数据序列化到所有节点.

When a spark job is submitted, the jobs are divided into stages-> tasks. The tasks actually carries out the execution of the transformations and actions on worker nodes. The drivers's sumbitTask() will serialize the functions and metadata about the broadcast variable to all nodes.

广播工作原理的剖析.

驱动程序创建一个本地目录来存储要广播的数据,并启动具有对该目录访问权限的HttpServer.调用广播时,数据实际上已写入目录(val bdata = sc.broadcast(data)).同时,该数据还将通过StorageLevel内存+磁盘写入驱动程序的blockManger中.块管理器为数据分配一个blockId(类型BroadcastBlockId).

The Driver creates a local directory to store the data to be broadcasted and launches a HttpServer with access to the directory. The data is actually written into the directory when the broadcast is called (val bdata = sc.broadcast(data)). At the same time, the data is also written into driver's blockManger with a StorageLevel memory + disk. Block manager allocates a blockId (of type BroadcastBlockId) for the data.

仅当执行程序反序列化接收到的任务时,才广播实际数据,并且还以广播对象的形式获取广播变量的元数据.然后,它调用元数据对象(bdata变量)的readObject()方法.此方法将首先检查本地块管理器,以查看是否已存在本地副本.否则,将从驱动程序中获取数据.提取数据后,会将其存储在本地块管理器中以备后用.

The real data is broadcasted only when an executor deserializes the task it has received, it also gets the broadcast variable's metadata, in the form of a Broadcast object. It then calls the readObject() method of the metadata object (bdata variable). This method will first check the local block manager to see if there's already a local copy. If not, the data will be fetched from the driver. Once the data is fetched, it's stored in the local block manager for subsequent uses.

这篇关于Apache Spark:如何构造Spark应用程序的代码(尤其是在使用广播时)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆