Spark createDataFrame(df.rdd,df.schema)vs checkPoint打破世袭 [英] Spark createDataFrame(df.rdd, df.schema) vs checkPoint for breaking lineage

查看:152
本文介绍了Spark createDataFrame(df.rdd,df.schema)vs checkPoint打破世袭的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我当前正在使用

val df=longLineageCalculation(....)
val newDf=sparkSession.createDataFrame(df.rdd, df.schema)
newDf.join......

为了节省计算计划时的时间,然而,文档说检查点是削减"计划的建议方法. 但是我不想为将RDD保存到磁盘而付出代价.

In order to save time when calculating plans, however docs say that checkpointing is the suggested way to "cut" lineage. BUT I don't want to pay the price of saving the RDD to disk.

我的过程是一个不那么长的批处理过程,可以无问题地重新启动,因此检查点对我不利(我认为).

My process is a batch process which is not-so-long and can be restarted without issues, so checkpointing is not benefit for me (I think).

使用"my"可能会出现什么问题?方法? (文档建议使用检查点,这种检查会比较昂贵,而不是为了破坏血统而建立的检查点,我想知道原因)

我想我只能猜测的是,如果某个节点在我的血统中断"攻击之后发生故障,那么也许我的过程会失败,而检查点却可以正常工作? (如果DF被缓存而不是被检查点该怎么办?)

Only think I can guess is that if some node fails after my "lineage breaking" maybe my process will fail while the checkpointed one would have worked correctly? (what If the DF is cached instead of checkpointed?)

谢谢!

根据SMaZ的回答,我自己的知识以及他提供的文章.使用createDataframe(这是Dev-API,因此使用时由我自己承担/您自己承担风险)将沿袭保留在内存中(对我来说这不是问题,因为我没有内存问题并且沿袭并不大).

From SMaZ answer, my own knowledge and the article which he provided. Using createDataframe (which is a Dev-API, so use at "my"/your own risk) will keep the lineage in memory (not a problem for me since I don't have memory problems and the lineage is not big).

有了它,看起来(未经100%测试)Spark应该能够重建任何失败的东西.

With this, it looks (not tested 100%) that Spark should be able to rebuild whatever is needed if it fails.

由于我在以下执行中不使用数据,因此我将继续 cache + createDataframe与检查点(如果我没记错的话,是 实际上是cache + saveToHDFS +"createDataFrame").

As I'm not using the data in the following executions, I'll go with cache+createDataframe versus checkpointing (which If i'm not wrong, is actually cache+saveToHDFS+"createDataFrame").

我的过程不是很关键(如果崩溃),因为用户将始终期待结果并手动启动结果,因此,如果出现问题,他们可以重新启动(+ Spark将重新启动)或给我打电话,所以无论如何,我都可以冒险,但是我有99%的把握没有风险:)

My process is not that critical (if it crashes) since an user will be always expecting the result and they launch it manually, so if it gives problems, they can relaunch (+Spark will relaunch it) or call me, so I can take some risk anyways, but I'm 99% sure there's no risk :)

推荐答案

让我开始创建下面一行的数据框:

Let me start with creating dataframe with below line :

val newDf = sparkSession.createDataFrame(df.rdd,df.schema)

val newDf=sparkSession.createDataFrame(df.rdd, df.schema)

如果我们仔细研究

If we take close look into SparkSession class then this method is annotated with @DeveloperApi. To understand what this annotation means please take a look into below lines from DeveloperApi class

面向开发人员的较低级别的不稳定API.

A lower-level, unstable API intended for developers.

在次要版本的Spark中,Developer API可能会更改或删除.

Developer API's might change or be removed in minor versions of Spark.

因此,不建议将此方法用于生产解决方案,在开源世界中,该方法称为 自担风险使用 .

So it is not advised to use this method for production solutions, called as Use at your own risk implementation in open source world.

但是,让我们更深入地了解当我们从RDD调用createDataframe时会发生什么.它正在调用 internalCreateDataFrame 私有方法并创建LogicalRDD.

However, Let's dig deeper what happens when we call createDataframe from RDD. It is calling the internalCreateDataFrame private method and creating LogicalRDD.

在以下情况下创建LogicalRDD:

LogicalRDD is created when:

  • 要求数据集到检查点
  • 要求SparkSession通过内部二进制行的RDD创建一个DataFrame

因此,在没有物理保存数据集的情况下,它与checkpoint操作完全相同.它只是从内部二进制行和架构的RDD创建DataFrame.这可能会截断内存中的血统,而不是在物理级别.

So it is nothing but the same as checkpoint operation without saving the dataset physically. It is just creating DataFrame From RDD Of Internal Binary Rows and Schema. This might truncate the lineage in memory but not at the Physical level.

因此,我认为这只是创建另一个RDD的开销,并且不能用作checkpoint的替代.

So I believe it's just the overhead of creating another RDDs and can not be used as a replacement of checkpoint.

现在,检查点是截断血统图并将其保存到可靠的分布式/本地文件系统的过程.

Now, Checkpoint is the process of truncating lineage graph and saving it to a reliable distributed/local file system.

为什么要设置检查点?

  • 如果计算需要很长时间世系太长取决于太多的RDD

  • If computation takes a long time or lineage is too long or Depends too many RDDs

保持繁重的血统信息会带来内存消耗.

Keeping heavy lineage information comes with the cost of memory.

即使在Spark应用程序终止后,检查点文件也不会自动删除,因此我们可以将其用于其他过程

The checkpoint file will not be deleted automatically even after the Spark application terminated so we can use it for some other process

使用我的"方法会出现什么问题? (文档 建议检查点,这比这更昂贵 打破血统,我想知道原因)

What are the problems which can arise using "my" method? (Docs suggests checkpointing, which is more expensive, instead of this one for breaking lineages and I would like to know the reason)

文章将提供详细信息有关缓存和检查点的信息. IIUC,您的问题更多是在哪里使用检查点.让我们讨论一些检查点很有用的实际情况

This article will give detail information on cache and checkpoint. IIUC, your question is more on where we should use the checkpoint. let's discuss some practical scenarios where checkpointing is helpful

  1. 让我们假设有一个数据集,我们要在该数据集上执行100次迭代操作,并且每个迭代都将最后一个迭代结果作为输入(Spark MLlib用例).现在,在此迭代过程中,沿袭将在此期间增长.这里以规则的间隔(每10次迭代)对数据集进行检查将确保在发生任何故障的情况下,我们可以从上一个故障点开始处理.
  2. 我们来看一些批处理示例.想象一下,我们有一批正在创建一个具有大量谱系或复杂计算的主数据集.现在,在经过一定的定期间隔后,我们得到了一些数据,这些数据应使用较早计算出的主数据集.在这里,如果我们检查主数据集,则可以将其用于来自不同sparkSession的所有后续过程.
  1. Let's take a scenario where we have one dataset on which we want to perform 100 iterative operations and each iteration takes the last iteration result as input(Spark MLlib use cases). Now during this iterative process lineage is going to grow over the period. Here checkpointing dataset at a regular interval(let say every 10 iterations) will assure that in case of any failure we can start the process from last failure point.
  2. Let's take some batch example. Imagine we have a batch which is creating one master dataset with heavy lineage or complex computations. Now after some regular intervals, we are getting some data which should use earlier calculated master dataset. Here if we checkpoint our master dataset then it can be reused for all subsequent processes from different sparkSession.

我的过程是一个批处理过程,时间不那么长,可以 重新启动没有问题,所以检查点对我没有好处(我 想).

My process is a batch process which is not-so-long and can be restarted without issues, so checkpointing is not benefit for me (I think).

是正确的,如果您的过程不是繁重的工作/大笔生意,那么就没有检查点. 经验法则是,如果您的数据集没有被多次使用,并且重建速度快于所花费的时间和用于检查点/缓存的资源,那么我们应该避免使用它.它将为您的过程提供更多资源.

That's correct, If your process is not heavy-computation/Big-lineage then there is no point of checkpointing. Thumb rule is if your dataset is not used multiple time and can be re-build faster than the time is taken and resources used for checkpoint/cache then we should avoid it. It will give more resources to your process.

这篇关于Spark createDataFrame(df.rdd,df.schema)vs checkPoint打破世袭的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆