什么时候在spark数据帧上使用persist()在性能上不可行? [英] when is it not performance practical to use persist() on a spark dataframe?

查看:395
本文介绍了什么时候在spark数据帧上使用persist()在性能上不可行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在由于许多工作失败(终止)而努力提高代码性能的同时,我考虑在需要在许多其他操作上使用同一数据帧时在Spark Dataframe上使用persist()函数.在执行此操作并遵循Spark应用程序用户界面中的工作阶段时,我觉得这样做并非总是最佳选择,它取决于分区的数量和数据大小.我不确定直到由于持久阶段的失败而中止工作.

While working on improving code performance as I had many jobs fail (aborted), I thought about using persist() function on Spark Dataframe whenever I need to use that same dataframe on many other operations. When doing it and following the jobs, stages in the Spark application UI, I felt like it's not really always optimal to do so, it depends on the number of partitions and the data size. I wasn't sure until I got the job aborted because of a fail in the persist stage.

我在问在数据框上执行许多操作时使用persist()的最佳做法是否始终有效? 如果不是,什么时候无效?如何判断?

I'm questioning if the best practice of using persist() whenever many operations will be performed on the dataframe is always valid? If not, when it's not? how to judge?

更具体地说,我将介绍我的代码和中止的作业的详细信息:

To be more concrete I will present my code and the details of the aborted job:

#create a dataframe from another one df_transf_1 on which I made a lot of transformations but no actions
spark_df = df_transf_1.select('user_id', 'product_id').dropDuplicates()
#persist
spark_df.persist()
products_df = spark_df[['product_id']].distinct()
df_products_indexed = products_df.rdd.map(lambda r: r.product_id).zipWithIndex().toDF(['product_id', 'product_index'])

您可能会问为什么我坚持spark_df? 这是因为我要像products_df一样在joins中多次使用它(例如:spark_df = spark_df.join(df_products_indexed,"product_id")

You may ask why I persisted spark_df? It's because I'm going to use it multiple of times like with products_df and also in joins (e.g: spark_df = spark_df.join(df_products_indexed,"product_id")

第3阶段失败原因的详细信息

Details of fail reason in Stage 3:

作业由于阶段故障而中止:阶段3.0中的任务40458失败了4次,最近一次失败:阶段3.0中的任务40458.3丢失(TID 60778,xx.xx.yyyy.com,执行者91):ExecutorLostFailure(执行者91退出了)由正在运行的任务之一引起)原因:从站丢失 驱动程序堆栈跟踪:

Job aborted due to stage failure: Task 40458 in stage 3.0 failed 4 times, most recent failure: Lost task 40458.3 in stage 3.0 (TID 60778, xx.xx.yyyy.com, executor 91): ExecutorLostFailure (executor 91 exited caused by one of the running tasks) Reason: Slave lost Driver stacktrace:

输入数据的大小( 4 TB )很大,在进行持久化之前是否有办法检查数据的大小?它是选择是否持久化的参数吗?还有persist > 100,000

The size of the input data (4 TB) is huge, before doing persist is there a way to check the size of the data? Is it a parameter in choosing to persist or not? Also the number of partitions (tasks) for persist > 100,000

推荐答案

以下是使用persist()的两种情况:

Here are two cases for using persist():

  • 使用repartition后,为避免数据一次又一次洗牌,因为下一步将使用该数据框.这仅在您对持久化数据帧/RDD调用多个动作的情况下才有用,因为持久化是一种转换,因此懒惰进行了评估.通常,如果您在同一数据框/RDD上有多个操作.

  • After using repartition in order to avoid shuffling your data again and again as the dataframe is being used by the next steps. This will be useful only for the case that you call more than one action for the persisted dataframe/RDD since persist is an transformation and hence lazily evaluated. In general if you have multiple actions on the same dataframe/RDD.

迭代计算,例如,当您要查询for循环内的数据框时.使用persist,Spark将保存中间结果,并忽略在每个操作调用上重新评估相同的操作.另一个示例是在此处.

Iterative computations, for instance when you want to query a dataframe inside a for loop. With persist Spark will save the intermediate results and omit reevaluating the same operations on every action call. Another example would be appending new columns with a join as discussed here.

这篇关于什么时候在spark数据帧上使用persist()在性能上不可行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆