Apache Spark连接操作的伸缩性较差 [英] Poor weak scaling of Apache Spark join operation
问题描述
我在Apache Spark上运行 join操作,发现没有弱的可伸缩性。
I run the "join" operation on the Apache Spark and see that there is no weak scalability. It will be grateful if anyone can explain this.
我创建两个数据帧( a, b)和( a, c)并加入第一列的数据帧。我为一对一联接生成数据框值。另外,我使用相同的分区程序来避免随机播放。
I create two dataframes ("a", "b") and ("a", "c") and join the dataframes by the first column. I generate dataframe values for "one to one" join. Also, I use the same partitioner to avoid shuffle.
数据帧中的行数-1024 * 1024 * 16 * cores_total(cores_total-哪个程序上的内核总数启动)。
a列由随机的Int值组成, b列的所有值均等于1, c列的所有值均等于2。
Number of rows in the dataframes - 1024 * 1024 * 16 * cores_total (cores_total - total number of cores on which program is launched). Column "a" consist of random Int values, all values of the "b" column equal to 1, all values of the "c" column equal to 2.
理论上,随着数据大小和内核增加64倍,执行时间应保持不变,但执行时间会略有增加。我获得以下执行时间:
Theoretically, with the increase of the data size and cores by 64 times, the execution time should remain the same, but the execution time slightly grows. I obtain the following execution times:
Apache Spark版本-2.1.0。我们使用8个群集节点,配备1 Gbit以太网,每个节点具有2个Intel Xeon E5-2630、64 GB RAM。
Apache Spark version - 2.1.0. We use 8 cluster nodes, equipped with 1 Gbit Ethernet, each node has 2x Intel Xeon E5-2630, 64 GB RAM.
/* join perf */
import scala.io.Source
import scala.math._
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.control.Breaks._
import scala.collection.mutable._
import org.apache.spark.rdd._
import org.apache.spark.sql._
import scala.util.Random
import org.apache.spark.util.SizeEstimator
import org.apache.spark.HashPartitioner
object joinPerf {
def get_array(n: Int): Array[Int] = {
var res = Array[Int]()
for (x <- 1 to n) {
res :+= Random.nextInt
}
return res
}
def main(args: Array[String]) {
val start_time = System.nanoTime
val conf = new SparkConf().setAppName("joinPerf")
val sc = new SparkContext(conf)
val cores_total = sc.getConf.get("spark.cores.max").toInt
val partitions_total = sc.getConf.get("spark.default.parallelism").toInt
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
import sqlContext.implicits._
println("start")
val elems_total = 1024 * 1024 * 16 * cores_total
val start_cnt = 1024 * 1024
Random.setSeed(785354)
var vals = Vector[Int]()
for (x <- 1 to start_cnt) {
vals :+= Random.nextInt
}
var test_rdd = sc.parallelize(vals)
println(test_rdd.count)
test_rdd = test_rdd.flatMap(x => get_array(elems_total / start_cnt)).distinct
println("test_rdd count = " + test_rdd.count)
println("partitions count = " + test_rdd.getNumPartitions)
var test_rdd1 = test_rdd.map(x => (x, 1)).toDF("a", "b").repartition(partitions_total, $"a").cache
var test_rdd2 = test_rdd.map(x => (x, 2)).toDF("a", "c").repartition(partitions_total, $"a").cache
println("test_rdd1 count = " + test_rdd1.count)
println("test_rdd2 count = " + test_rdd2.count)
var start_test_time = System.nanoTime
var test_res = test_rdd1.join(test_rdd2, test_rdd1("a") === test_rdd2("a"))
println(test_res.count)
print("join time = ")
println((System.nanoTime - start_test_time) / 1e9d + " sec. ")
print("all time = ")
println((System.nanoTime - start_time) / 1e9d + " sec. ")
sc.stop()
}
}
config参数:
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 1024
spark.kryo.unsafe true
spark.kryo.referenceTracking false
spark.driver.memory 22g
spark.executor.memory 22g
spark.driver.maxResultSize 22g
spark.rpc.message.maxSize 2047
spark.memory.fraction 0.8
spark.memory.storageFraction 0.5
spark.executor.extraJavaOptions "-XX:+UseParallelGC"
每个核心分区-4。
启动程序示例:
./bin/spark-submit --class "joinPerf" --conf spark.executor.cores=8 --conf spark.cores.max=64 --conf spark.default.parallelism=256 ./joinPerf.jar
推荐答案
理论y,随着数据大小和内核增加64倍,执行时间应该保持不变,但执行时间会略有增加
Theoretically, with the increase of the data size and cores by 64 times, the execution time should remain the same, but the execution time slightly grows
不应该。尽管可以假设没有IO瓶颈,但可以期望线性可伸缩性,但是当对均匀分布的数据执行严格的本地操作时,情况就不再如此,当转换需要数据交换时( RDD
随机播放,数据集
Exchange
)。在广泛的转换中, join
属于最昂贵的类别(下一个 groupByKey
类似的操作),因为它们具有非归约性质,以及使用大型的本地支持集合。
It shouldn't. While one could expect linear scalability, assuming no IO bottlenecks, when performing strictly local operations on uniformly distributed data, this is not longer the case, when transformations require data exchange (RDD
shuffles, Dataset
Exchange
). Among wide transformations, joins
belong to most expensive category (next groupByKey
-like operations), due their non-reducing nature, and usage of large, local, supporting collections.
随机播放不仅具有高于线性复杂度的功能(至少 O(N log N ))对于基于排序的方法),还可能导致数据分布不均匀,并且需要大量的磁盘和网络IO。
Shuffles not only have higher than linear complexity (at least O(N log N) for sorting-based methods), but also can induce non-uniform distribution of data, and require significant disk and network IO.
代码,它会随机洗两次数据-一次重新分区 RDD
,一次重新分区 join
Datasets
( RDD
的 HashPartitioner
与 Dataset $ c不兼容$ c>分区)。
This is even more severe in case of your code, which shuffles data twice - once to repartition RDDs
and once to join
Datasets
(HashPartitioner
for RDDs
is not compatible with Dataset
partitioning).
最终增加集群大小,会对性能产生影响,与增加的通信和同步开销以及减少的数据局部性有关。
Finally increasing cluster size, has its own performance impact, related to increased communication and synchronization overhead and decreased data locality.
总体上,您几乎看不到真正的线性可伸缩性,即使这样做,y ou可以期望斜率< 1。
Overall you'll rarely see truly linear scalability, and even if you do, you can expect slope to be < 1.
另一方面,我将不依赖缓存
- count
习惯用法,使用数据集
。 它可能不可靠。
On a side note I wouldn't depend on cache
- count
idiom when working with Datasets
. It is likely to be unreliable.
这篇关于Apache Spark连接操作的伸缩性较差的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!