持久化比非持久化调用慢 [英] Persist slower than non-persist calls

查看:35
本文介绍了持久化比非持久化调用慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的设置是:Spark 2.1 在 160 GB、48 个 vcore 的 3 节点 YARN 集群上.动态分配开启.spark.executor.memory=6G, spark.executor.cores=6

My settings are: Spark 2.1 on a 3 node YARN cluster with 160 GB, 48 vcores. Dynamic allocation turned on. spark.executor.memory=6G, spark.executor.cores=6

首先,我正在阅读 hive 表:订单 (329MB) 和 lineitems (1.43GB) 和做左外连接.接下来,我根据加入的条件应用 7 种不同的过滤条件数据集(类似于 var line1 = connectedDf.filter("linenumber=1")var line2 = connectedDf.filter("l_linenumber=2") 等).因为我对连接的数据集进行了多次过滤,所以我认为执行持久化 (MEMORY_ONLY) 会有所帮助,因为连接的数据集将完全适合内存.

First, I am reading hive tables: orders (329MB) and lineitems (1.43GB) and doing a left outer join. Next, I apply 7 different filter conditions based on the joined dataset (something like var line1 = joinedDf.filter("linenumber=1"), var line2 = joinedDf.filter("l_linenumber=2"), etc). Because I'm filtering on the joined dataset multiple times, I thought doing a persist (MEMORY_ONLY) would help here as the joined dataset will fits fully in memory.

  1. 我注意到,使用persist 时,Spark 应用程序的运行时间比不使用persist 时要长(3.5 分钟对3.3 分钟).对于persist,DAG 显示为persist 创建了一个阶段,其他下游作业正在等待persist 完成.这是否意味着persist 是一个阻塞调用?或者当持久块可用时,其他作业中的阶段是否开始处理?

  1. I noticed that with persist, the Spark application takes longer to run than without persist (3.5 mins vs 3.3 mins). With persist, the DAG shows that a single stage was created for persist and other downstream jobs are waiting for the persist to complete. Does that mean persist is a blocking call? Or do stages in other jobs start processing when persisted blocks become available?

在非持久化的情况下,不同的作业会创建不同的阶段来读取相同的数据.数据在不同阶段被多次读取,但这仍然比持久情况要快.

In the non-persist case, different jobs are creating different stages to read the same data. Data is read multiple times in different stages, but this is still is turning out to be faster than the persist case.

对于较大的数据集,persist 实际上会导致执行程序耗尽内存(Java 堆空间).如果没有坚持,Spark 作业就可以正常完成.我在这里查看了一些其他建议:Spark java.lang.OutOfMemoryError: Java heap space.我尝试增加/减少执行程序核心,坚持仅使用磁盘、增加分区、修改存储比率,但似乎对执行程序内存问题没有任何帮助.

With larger data sets, persist actually causes executors to run out of memory (Java heap space). Without persist, the Spark jobs complete just fine. I looked at some other suggestions here: Spark java.lang.OutOfMemoryError: Java heap space. I tried increasing/decreasing executor cores, persisting with disk only, increasing partitions, modifying the storage ratio, but nothing seems to help with executor memory issues.

如果有人能提及持久化的工作原理,在什么情况下它比不持久化更快,更重要的是,如何解决内存不足问题,我将不胜感激.

I would appreciate it if someone could mention how persist works, in what cases it is faster than not-persisting and more importantly, how to go about troubleshooting out of memory issues.

推荐答案

我建议阅读 transformations操作.我必须承认,我自己也曾多次被这个问题咬过.

I'd recommend reading up on the difference between transformations and actions in spark. I must admit that I've been bitten by this myself on multiple occasions.

spark 中的数据是惰性求值的,这基本上意味着在执行操作"之前不会发生任何事情..filter() 函数是一个转换,所以当你的代码到达那个点时,除了向转换管道添加一个部分之外,实际上什么也没发生.对 .persist() 的调用具有相同的行为.

Data in spark is evaluated lazily, which essentially means nothing happens until an "action" is performed. The .filter() function is a transformation, so nothing actually happens when your code reaches that point, except to add a section to the transformation pipeline. A call to .persist() behaves in the same way.

如果 .persist() 调用下游的代码有多个可以同时触发的动作,那么很可能你实际上是分别持久化"了每个动作的数据,然后吃增加内存(Spark UI 中的存储"选项卡会告诉您数据集的缓存百分比,如果缓存超过 100%,那么您会看到我在这里描述的内容).更糟糕的是,您可能永远不会真正使用缓存数据.

If your code downstream of the .persist() call has multiple actions that can be triggered simultaneously, then it's quite likely that you are actually "persisting" the data for each action separately, and eating up memory (The "Storage' tab in the Spark UI will tell you the % cached of the dataset, if it's more than 100% cached, then you are seeing what I describe here). Worse, you may never actually be using cached data.

通常,如果您在代码中有一个点,其中数据集分叉到两个单独的转换管道(在您的示例中是每个或单独的 .filter()s),一个 .persist() 是防止多次读取数据源和/或在 fork 之前保存昂贵的转换管道的结果的好主意.

Generally, if you have a point in code where the data set forks into two separate transformation pipelines (each or the separate .filter()s in your example), a .persist() is a good idea to prevent multiple readings of your data source, and/or to save the result of an expensive transformation pipeline before the fork.

很多时候,在 .persist() 调用之后(在数据分叉之前)立即触发单个操作是一个好主意,以确保以后的操作(可能同时运行)从持久化的缓存,而不是独立评估(并无用地缓存)数据.

Many times it's a good idea to trigger a single action right after the .persist() call (before the data forks) to ensure that later actions (which may run simultaneously) read from the persisted cache, rather than evaluate (and uselessly cache) the data independently.

TL;博士:

.persist() 之后但在 .filter() 之前执行 joinedDF.count().

Do a joinedDF.count() after your .persist(), but before your .filter()s.

这篇关于持久化比非持久化调用慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆