依靠 Spark Dataframe 的速度非常慢 [英] Count on Spark Dataframe is extremely slow

查看:103
本文介绍了依靠 Spark Dataframe 的速度非常慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在创建一个新的 DataFrame,其中包含来自 Join 的少量记录.

I'm creating a new DataFrame with a handful of records from a Join.

val joined_df = first_df.join(second_df, first_df.col("key") ===
second_df.col("key") && second_df.col("key").isNull, "left_outer")
joined_df.repartition(1)
joined_df.cache()
joined_df.count()

除了计数操作外,一切都很快(不到一秒).RDD 转换开始了,实际上需要几个小时才能完成.有什么办法可以加快速度吗?

Everything is fast (under one second) except the count operation. The RDD conversion kicks in and literally takes hours to complete. Is there any way to speed things up?

INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB)
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB)
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes)
INFO Executor: Running task 142.0 in stage 10.0 (TID 545)
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200)
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

推荐答案

除了计数操作外,一切都很快(不到一秒).

Everything is fast (under one second) except the count operation.

这是合理的:count 之前的所有操作都被称为转换,这种类型的火花操作是惰性的,即它在调用动作之前不做任何计算(count 在您的示例中).

This is justified as follow : all operations before the count are called transformations and this type of spark operations are lazy i.e. it doesn't do any computation before calling an action (count in your example).

第二个问题在repartition(1):

请记住,您将失去 spark 提供的所有并行性,并且您的计算将在一个执行程序中运行(如果您处于独立模式,则为核心),因此您必须删除此步骤或更改 1 与 CPU 内核数量(独立模式)或执行程序数量(集群模式)相关的数字.

keep in mind that you'll lose all the parallelism offered by spark and you computation will be run in one executor (core if your are in standalone mode), so you must remove this step or change 1 to a number propositional to the number of your CPU cores (standalone mode) or the number of executors (cluster mode).

RDD 转换开始,实际上需要几个小时才能完成.

The RDD conversion kicks in and literally takes hours to complete.

如果我理解正确,您会将 DataFrame 转换为 RDD,这在 Spark 中确实是一个不好的做法,您应该尽可能避免这种转换.这是因为 DataFrameDataset 中的数据是使用 特殊火花编码器 编码的(如果我记得很清楚,它被称为 tungstant),它占用的时间要少得多内存,然后是 JVM 序列化编码器,因此这种转换意味着 spark 会将您的数据类型从他自己的类型更改(这需要少得多的内存并让 spark 优化很多通过仅处理编码数据而不是序列化要使用的数据然后将其反序列化到 JVM 数据类型来进行换向,这就是为什么 DataFrames 和 Datasets 非常比RDDs

If I understand correctly you would covert the DataFrame to an RDD, this is really a bad practice in spark and you should avoid such conversion as possible as you can. this is because the data in DataFrame and Dataset are encoded using special spark encoders (it's called tungstant if I well remembered it) which take much less memory then the JVM serialization encoders, so such conversion mean that spark will change the type of your data from his own one (which take much less memory and let spark optimize a lot of commutations by just work the encoded data and not serialize the data to work with and then deserialize it) to the JVM data type and this why DataFrames and Datasets are very powerful than RDDs

希望对你有帮助

这篇关于依靠 Spark Dataframe 的速度非常慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆