Apache Spark连接操作的伸缩性较差 [英] Poor weak scaling of Apache Spark join operation

查看:69
本文介绍了Apache Spark连接操作的伸缩性较差的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Apache Spark上运行 join操作,发现没有弱的可伸缩性。

I run the "join" operation on the Apache Spark and see that there is no weak scalability. It will be grateful if anyone can explain this.

我创建两个数据帧( a, b)和( a, c)并加入第一列的数据帧。我为一对一联接生成数据框值。另外,我使用相同的分区程序来避免随机播放。

I create two dataframes ("a", "b") and ("a", "c") and join the dataframes by the first column. I generate dataframe values for "one to one" join. Also, I use the same partitioner to avoid shuffle.

数据帧中的行数-1024 * 1024 * 16 * cores_total(cores_total-哪个程序上的内核总数启动)。
a列由随机的Int值组成, b列的所有值均等于1, c列的所有值均等于2。

Number of rows in the dataframes - 1024 * 1024 * 16 * cores_total (cores_total - total number of cores on which program is launched). Column "a" consist of random Int values, all values of the "b" column equal to 1, all values of the "c" column equal to 2.

理论上,随着数据大小和内核增加64倍,执行时间应保持不变,但执行时间会略有增加。我获得以下执行时间:

Theoretically, with the increase of the data size and cores by 64 times, the execution time should remain the same, but the execution time slightly grows. I obtain the following execution times:

Apache Spark版本-2.1.0。我们使用8个群集节点,配备1 Gbit以太网,每个节点具有2个Intel Xeon E5-2630、64 GB RAM。

Apache Spark version - 2.1.0. We use 8 cluster nodes, equipped with 1 Gbit Ethernet, each node has 2x Intel Xeon E5-2630, 64 GB RAM.

/* join perf */
import scala.io.Source
import scala.math._
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.control.Breaks._
import scala.collection.mutable._
import org.apache.spark.rdd._
import org.apache.spark.sql._
import scala.util.Random
import org.apache.spark.util.SizeEstimator
import org.apache.spark.HashPartitioner

object joinPerf {

    def get_array(n: Int): Array[Int] = {
        var res = Array[Int]()
        for (x <- 1 to n) {
            res :+= Random.nextInt
        }

        return res
    }

    def main(args: Array[String]) {
        val start_time = System.nanoTime
        val conf = new SparkConf().setAppName("joinPerf")
        val sc = new SparkContext(conf)
        val cores_total = sc.getConf.get("spark.cores.max").toInt
        val partitions_total = sc.getConf.get("spark.default.parallelism").toInt
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext._
        import sqlContext.implicits._
        println("start")
        val elems_total = 1024 * 1024 * 16 * cores_total
        val start_cnt = 1024 * 1024
        Random.setSeed(785354)

        var vals = Vector[Int]()
        for (x <- 1 to start_cnt) {
            vals :+= Random.nextInt
        }

        var test_rdd = sc.parallelize(vals)
        println(test_rdd.count)
        test_rdd = test_rdd.flatMap(x => get_array(elems_total / start_cnt)).distinct

        println("test_rdd count = " + test_rdd.count)
        println("partitions count = " + test_rdd.getNumPartitions)

        var test_rdd1 = test_rdd.map(x => (x, 1)).toDF("a", "b").repartition(partitions_total, $"a").cache
        var test_rdd2 = test_rdd.map(x => (x, 2)).toDF("a", "c").repartition(partitions_total, $"a").cache

        println("test_rdd1 count = " + test_rdd1.count)
        println("test_rdd2 count = " + test_rdd2.count)

        var start_test_time = System.nanoTime
        var test_res = test_rdd1.join(test_rdd2, test_rdd1("a") === test_rdd2("a"))
        println(test_res.count)
        print("join time = ")
        println((System.nanoTime - start_test_time) / 1e9d + " sec. ")

        print("all time = ")
        println((System.nanoTime - start_time) / 1e9d + " sec. ")
        sc.stop()
    }
}

config参数:

spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max  1024
spark.kryo.unsafe                true
spark.kryo.referenceTracking     false
spark.driver.memory              22g
spark.executor.memory            22g
spark.driver.maxResultSize       22g
spark.rpc.message.maxSize        2047
spark.memory.fraction            0.8
spark.memory.storageFraction     0.5
spark.executor.extraJavaOptions  "-XX:+UseParallelGC"

每个核心分区-4。

启动程序示例:

./bin/spark-submit --class "joinPerf" --conf spark.executor.cores=8 --conf spark.cores.max=64 --conf spark.default.parallelism=256 ./joinPerf.jar


推荐答案


理论y,随着数据大小和内核增加64倍,执行时间应该保持不变,但执行时间会略有增加

Theoretically, with the increase of the data size and cores by 64 times, the execution time should remain the same, but the execution time slightly grows

不应该。尽管可以假设没有IO瓶颈,但可以期望线性可伸缩性,但是当对均匀分布的数据执行严格的本地操作时,情况就不再如此,当转换需要数据交换时( RDD 随机播放,数据集 Exchange )。在广泛的转换中, join 属于最昂贵的类别(下一个 groupByKey 类似的操作),因为它们具有非归约性质,以及使用大型的本地支持集合。

It shouldn't. While one could expect linear scalability, assuming no IO bottlenecks, when performing strictly local operations on uniformly distributed data, this is not longer the case, when transformations require data exchange (RDD shuffles, Dataset Exchange). Among wide transformations, joins belong to most expensive category (next groupByKey-like operations), due their non-reducing nature, and usage of large, local, supporting collections.

随机播放不仅具有高于线性复杂度的功能(至少 O(N log N ))对于基于排序的方法),还可能导致数据分布不均匀,并且需要大量的磁盘和网络IO。

Shuffles not only have higher than linear complexity (at least O(N log N) for sorting-based methods), but also can induce non-uniform distribution of data, and require significant disk and network IO.

代码,它会随机洗两次数据-一次重新分区 RDD ,一次重新分区 join Datasets RDD HashPartitioner Dataset 分区)。

This is even more severe in case of your code, which shuffles data twice - once to repartition RDDs and once to join Datasets (HashPartitioner for RDDs is not compatible with Dataset partitioning).

最终增加集群大小,会对性能产生影响,与增加的通信和同步开销以及减少的数据局部性有关。

Finally increasing cluster size, has its own performance impact, related to increased communication and synchronization overhead and decreased data locality.

总体上,您几乎看不到真正的线性可伸缩性,即使这样做,y ou可以期望斜率< 1。

Overall you'll rarely see truly linear scalability, and even if you do, you can expect slope to be < 1.

另一方面,我将不依赖缓存- count 习惯用法,使用数据集它可能不可靠

On a side note I wouldn't depend on cache - count idiom when working with Datasets. It is likely to be unreliable.

另请参见火花:在扩展内核数时性能数字不一致

这篇关于Apache Spark连接操作的伸缩性较差的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆