依靠Spark Dataframe非常慢 [英] Count on Spark Dataframe is extremely slow

查看:772
本文介绍了依靠Spark Dataframe非常慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在创建一个新的DataFrame,其中包含来自Join的少量记录.

I'm creating a new DataFrame with a handful of records from a Join.

val joined_df = first_df.join(second_df, first_df.col("key") ===
second_df.col("key") && second_df.col("key").isNull, "left_outer")
joined_df.repartition(1)
joined_df.cache()
joined_df.count()

除计数操作外,其他所有操作都很快(在一秒钟之内). RDD转换开始,实际上需要几个小时才能完成.有什么办法可以加快速度吗?

Everything is fast (under one second) except the count operation. The RDD conversion kicks in and literally takes hours to complete. Is there any way to speed things up?

INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB)
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB)
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes)
INFO Executor: Running task 142.0 in stage 10.0 (TID 545)
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200)
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

推荐答案

除计数操作外,一切都很快(在一秒钟之内).

Everything is fast (under one second) except the count operation.

这是合理的:在count之前的所有操作都称为转换,而这种类型的spark操作是惰性的,即,在调用操作之前(在您的示例中为count)它不做任何计算.

This is justified as follow : all operations before the count are called transformations and this type of spark operations are lazy i.e. it doesn't do any computation before calling an action (count in your example).

第二个问题出在repartition(1):

请记住,您将失去spark提供的所有并行性,并且您的计算将在一个执行程序(如果处于独立模式下则为核心)中运行,因此您必须删除此步骤或更改 1 取决于您的CPU内核数量(独立模式)或执行程序数量(集群模式)的数量.

keep in mind that you'll lose all the parallelism offered by spark and you computation will be run in one executor (core if your are in standalone mode), so you must remove this step or change 1 to a number propositional to the number of your CPU cores (standalone mode) or the number of executors (cluster mode).

RDD转换开始,实际上需要几个小时才能完成.

The RDD conversion kicks in and literally takes hours to complete.

如果我正确理解,您会将DataFrame转换为RDD,这在spark中确实是一种不好的做法,您应尽可能避免这种转换. 这是因为DataFrameDataset中的数据是使用特殊火花编码器(如果我还记得的话称为tungstant)进行编码的,它所占用的内存比JVM序列化编码器少得多,所以这种转换意味着spark会从自己的数据类型中更改数据类型(这会减少更少的内存,并通过仅处理编码后的数据而不是通过spark使优化大量转换)将要使用的数据序列化,然后反序列化为JVM数据类型,这就是为什么DataFrameDatasetRDD s

If I understand correctly you would covert the DataFrame to an RDD, this is really a bad practice in spark and you should avoid such conversion as possible as you can. this is because the data in DataFrame and Dataset are encoded using special spark encoders (it's called tungstant if I well remembered it) which take much less memory then the JVM serialization encoders, so such conversion mean that spark will change the type of your data from his own one (which take much less memory and let spark optimize a lot of commutations by just work the encoded data and not serialize the data to work with and then deserialize it) to the JVM data type and this why DataFrames and Datasets are very powerful than RDDs

希望对您有帮助

这篇关于依靠Spark Dataframe非常慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆