依靠Spark Dataframe非常慢 [英] Count on Spark Dataframe is extremely slow
问题描述
我正在创建一个新的DataFrame,其中包含来自Join的少量记录.
I'm creating a new DataFrame with a handful of records from a Join.
val joined_df = first_df.join(second_df, first_df.col("key") ===
second_df.col("key") && second_df.col("key").isNull, "left_outer")
joined_df.repartition(1)
joined_df.cache()
joined_df.count()
除计数操作外,其他所有操作都很快(在一秒钟之内). RDD转换开始,实际上需要几个小时才能完成.有什么办法可以加快速度吗?
Everything is fast (under one second) except the count operation. The RDD conversion kicks in and literally takes hours to complete. Is there any way to speed things up?
INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB)
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB)
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes)
INFO Executor: Running task 142.0 in stage 10.0 (TID 545)
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200)
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
推荐答案
除计数操作外,一切都很快(在一秒钟之内).
Everything is fast (under one second) except the count operation.
这是合理的:在count
之前的所有操作都称为转换,而这种类型的spark操作是惰性的,即,在调用操作之前(在您的示例中为count
)它不做任何计算.
This is justified as follow : all operations before the count
are called transformations and this type of spark operations are lazy i.e. it doesn't do any computation before calling an action (count
in your example).
第二个问题出在repartition(1)
:
请记住,您将失去spark提供的所有并行性,并且您的计算将在一个执行程序(如果处于独立模式下则为核心)中运行,因此您必须删除此步骤或更改 1 取决于您的CPU内核数量(独立模式)或执行程序数量(集群模式)的数量.
keep in mind that you'll lose all the parallelism offered by spark and you computation will be run in one executor (core if your are in standalone mode), so you must remove this step or change 1 to a number propositional to the number of your CPU cores (standalone mode) or the number of executors (cluster mode).
RDD转换开始,实际上需要几个小时才能完成.
The RDD conversion kicks in and literally takes hours to complete.
如果我正确理解,您会将DataFrame
转换为RDD
,这在spark中确实是一种不好的做法,您应尽可能避免这种转换.
这是因为DataFrame
和Dataset
中的数据是使用特殊火花编码器(如果我还记得的话称为tungstant)进行编码的,它所占用的内存比JVM序列化编码器少得多,所以这种转换意味着spark会从自己的数据类型中更改数据类型(这会减少更少的内存,并通过仅处理编码后的数据而不是通过spark使优化大量转换)将要使用的数据序列化,然后反序列化为JVM数据类型,这就是为什么DataFrame
和Dataset
比RDD
s
If I understand correctly you would covert the DataFrame
to an RDD
, this is really a bad practice in spark and you should avoid such conversion as possible as you can.
this is because the data in DataFrame
and Dataset
are encoded using special spark encoders (it's called tungstant if I well remembered it) which take much less memory then the JVM serialization encoders, so such conversion mean that spark will change the type of your data from his own one (which take much less memory and let spark optimize a lot of commutations by just work the encoded data and not serialize the data to work with and then deserialize it) to the JVM data type and this why DataFrame
s and Dataset
s are very powerful than RDD
s
希望对您有帮助
这篇关于依靠Spark Dataframe非常慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!