持续通话要比非持续通话慢 [英] Persist slower than non-persist calls

查看:91
本文介绍了持续通话要比非持续通话慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的设置是:具有160 GB,48个vcore的3节点YARN群集上的Spark 2.1.动态分配已打开. spark.executor.memory = 6G spark.executor.cores = 6

My settings are: Spark 2.1 on a 3 node YARN cluster with 160 GB, 48 vcores. Dynamic allocation turned on. spark.executor.memory=6G, spark.executor.cores=6

首先,我正在阅读配置单元表:订单(329MB)和订单项(1.43GB),以及做一个左外连接.接下来,我根据联合应用了7种不同的过滤条件数据集(类似于 var line1 = joindDf.filter("linenumber = 1") var line2 = joindDf.filter("l_linenumber = 2")等).因为我要对联接的数据集进行多次过滤,所以我认为进行持久化( MEMORY_ONLY )会有所帮助,因为联接的数据集将完全适合内存.

First, I am reading hive tables: orders (329MB) and lineitems (1.43GB) and doing a left outer join. Next, I apply 7 different filter conditions based on the joined dataset (something like var line1 = joinedDf.filter("linenumber=1"), var line2 = joinedDf.filter("l_linenumber=2"), etc). Because I'm filtering on the joined dataset multiple times, I thought doing a persist (MEMORY_ONLY) would help here as the joined dataset will fits fully in memory.

  1. 我注意到使用persist时,Spark应用程序的运行时间要比不使用persist时要长(3.5分钟vs 3.3分钟).对于持久性,DAG显示为持久性创建了一个阶段,其他下游作业正在等待持久性完成.这是否意味着持续存在阻塞电话?还是在持久块可用时其他作业的阶段开始处理?

  1. I noticed that with persist, the Spark application takes longer to run than without persist (3.5 mins vs 3.3 mins). With persist, the DAG shows that a single stage was created for persist and other downstream jobs are waiting for the persist to complete. Does that mean persist is a blocking call? Or do stages in other jobs start processing when persisted blocks become available?

在非持久性情况下,不同的作业将创建不同的阶段来读取相同的数据.在不同的阶段多次读取数据,但事实证明,这比持久化情况要快.

In the non-persist case, different jobs are creating different stages to read the same data. Data is read multiple times in different stages, but this is still is turning out to be faster than the persist case.

对于较大的数据集,持久保留实际上会使执行程序用尽内存(Java堆空间).没有持久性,Spark作业就可以顺利完成.我在这里查看了其他一些建议:Spark java.lang.OutOfMemoryError:Java堆空间.我尝试增加/减少执行程序核心,并坚持执行仅使用磁盘,增加分区,修改存储比率,但似乎对执行程序内存问题没有任何帮助.

With larger data sets, persist actually causes executors to run out of memory (Java heap space). Without persist, the Spark jobs complete just fine. I looked at some other suggestions here: Spark java.lang.OutOfMemoryError: Java heap space. I tried increasing/decreasing executor cores, persisting with disk only, increasing partitions, modifying the storage ratio, but nothing seems to help with executor memory issues.

如果有人能提到持久性是如何工作的,在什么情况下它比不持久更快,更重要的是,如何解决内存不足问题,我将不胜感激.

I would appreciate it if someone could mention how persist works, in what cases it is faster than not-persisting and more importantly, how to go about troubleshooting out of memory issues.

推荐答案

我建议阅读

I'd recommend reading up on the difference between transformations and actions in spark. I must admit that I've been bitten by this myself on multiple occasions.

spark中的数据是惰性计算的,这实际上意味着在执行操作"之前不会发生任何事情. .filter()函数是一个转换,因此,当代码到达该点时,实际上没有任何反应,只是在转换管道中添加了一个部分.调用 .persist()的行为相同.

Data in spark is evaluated lazily, which essentially means nothing happens until an "action" is performed. The .filter() function is a transformation, so nothing actually happens when your code reaches that point, except to add a section to the transformation pipeline. A call to .persist() behaves in the same way.

如果您在 .persist()调用下游的代码具有可以同时触发的多个动作,则很可能您实际上是在分别持久化"每个动作的数据,并且吃掉了内存(Spark UI中的存储"选项卡将告诉您数据集的缓存百分比,如果缓存百分比超过100%,您将看到我在此处描述的内容.)更糟糕的是,您可能实际上从未使用过缓存数据

If your code downstream of the .persist() call has multiple actions that can be triggered simultaneously, then it's quite likely that you are actually "persisting" the data for each action separately, and eating up memory (The "Storage' tab in the Spark UI will tell you the % cached of the dataset, if it's more than 100% cached, then you are seeing what I describe here). Worse, you may never actually be using cached data.

通常,如果您在代码中有一点,数据集会分叉到两个单独的转换管道(在示例中为每个或单独的 .filter() s),则为 .persist()是一个好主意,可防止多次读取数据源,和/或保存在派生之前昂贵的转换管道的结果.

Generally, if you have a point in code where the data set forks into two separate transformation pipelines (each or the separate .filter()s in your example), a .persist() is a good idea to prevent multiple readings of your data source, and/or to save the result of an expensive transformation pipeline before the fork.

很多时候,最好在 .persist()调用之后(在数据派生之前)触发一个动作,以确保以后的动作(可以同时运行)从持久存储中读取.缓存,而不是独立评估(并且无用地缓存)数据.

Many times it's a good idea to trigger a single action right after the .persist() call (before the data forks) to ensure that later actions (which may run simultaneously) read from the persisted cache, rather than evaluate (and uselessly cache) the data independently.

TL; DR:

.persist()之后但在 .filter() s之前执行 joinedDF.count().

Do a joinedDF.count() after your .persist(), but before your .filter()s.

这篇关于持续通话要比非持续通话慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆