如何检查数据帧? [英] How to checkpoint DataFrames?

查看:23
本文介绍了如何检查数据帧?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种检查数据帧的方法.Checkpoint 目前是对 RDD 的操作,但我找不到如何使用 DataFrames 进行操作.persist 和 cache(它们是彼此的同义词)可用于 DataFrame,但它们不会打破血统",因此不适用于可能循环数百(或数千)次迭代的方法.

I'm looking for a way to checkpoint DataFrames. Checkpoint is currently an operation on RDD but I can't find how to do it with DataFrames. persist and cache (which are synonyms for each other) are available for DataFrame but they do not "break the lineage" and are thus unsuitable for methods that could loop for hundreds (or thousands) of iterations.

举个例子,假设我有一个签名为 DataFrame => DataFrame 的函数列表.即使 myfunctions 有数百或数千个条目,我也想有一种方法来计算以下内容:

As an example, suppose that I have a list of functions whose signature is DataFrame => DataFrame. I want to have a way to compute the following even when myfunctions has hundreds or thousands of entries:

def foo(dataset: DataFrame, g: DataFrame => Unit) =
    myfunctions.foldLeft(dataset) {
        case (df, f) =>
            val nextDF = f(df)
            g(nextDF)
            nextDF
   }

推荐答案

TL;DR: 对于高达 1.6 的 Spark 版本,要真正获得检查点 DF",我建议的解决方案基于另一个答案,但多了一行:

TL;DR: For Spark versions up to 1.6, to actually get a "checkpointed DF", my suggested solution is based on another answer, but with one extra line:

df.rdd.checkpoint
df.rdd.count
val df2 = sqlContext.createDataFrame(df.rdd, df.schema)
// df2 is checkpointed

<小时>

说明

在进一步研究后更新.

正如所指出的,虽然有一个 问题 在 Spark 的 Jira 上.

As pointed out, checkpointing a DataFrame directly is not currently (Spark 1.6.1) possible, though there is an issue for it on Spark's Jira.

因此,一个可能的解决方法是在另一个答案中建议的:

So, a possible workaround is the one suggested on another answer:

df.rdd.checkpoint // Assuming the checkpoint dir has already been set
df.count // An action to compute the checkpoint

但是,使用这种方法,只会检查 df.rdd 对象.这可以通过调用 toDebugStringdf.rdd 来验证:

However, with this approach, only the df.rdd object will be checkpointed. This can be verified by calling toDebugString to df.rdd:

 scala> df.rdd.toDebugString
 (32) MapPartitionsRDD[1] at rdd at <console>:38 []
  |   ReliableCheckpointRDD[2] at count at <console>:38 []

然后在快速转换为 df 之后调用 toDebugString(请注意,我从 JDBC 源创建了我的 DataFrame),返回以下内容:

and then calling toDebugString after a quick transformation to df (please note that I created my DataFrame from a JDBC source), returns the following:

scala> df.withColumn("new_column", lit(0)).rdd.toDebugString
res4: String =
(32) MapPartitionsRDD[5] at rdd at <console>:38 []
 |   MapPartitionsRDD[4] at rdd at <console>:38 []
 |   JDBCRDD[3] at rdd at <console>:38 []

df.explain 还显示了一个提示:

scala> df.explain
== Physical Plan ==
Scan JDBCRelation (...)

所以,要真正实现一个检查点"DataFrame,我只能想到从检查点RDD创建一个新的:

So, to actually achieve a "checkpointed" DataFrame, I can only think of creating a new one from the checkpointed RDD:

val newDF = sqlContext.createDataFrame(df.rdd, df.schema)
// or
val newDF = df.rdd.map { 
  case Row(val1: Int, ..., valN: Int) => (val1, ..., valN)
}.toDF("col1", ..., "colN")

然后我们就可以验证新的DataFrame是checkpointed"了:

Then we can verify that the new DataFrame is "checkpointed":

1) newDF.explain:

scala> newDF.explain
== Physical Plan ==
Scan PhysicalRDD[col1#5, col2#6, col3#7]

2) newDF.rdd.toDebugString:

scala> newDF.rdd.toDebugString
res7: String =
(32) MapPartitionsRDD[10] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

3) 有转换:

scala> newDF.withColumn("new_column", lit(0)).rdd.toDebugString
res9: String =
(32) MapPartitionsRDD[12] at rdd at <console>:40 []
 |   MapPartitionsRDD[11] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

此外,我尝试了一些更复杂的转换,并且在实践中我能够检查 newDF 对象是否已被检查点.

Also, I tried some more complex transformations and I was able to check, in practice, that the newDF object was checkpointed.

因此,我发现可靠地检查 DataFrame 的唯一方法是检查其关联的 RDD 并从中创建一个新的 DataFrame 对象.

Therefore, the only way I found to reliably checkpoint a DataFrame was by checkpointing its associated RDD and creating a new DataFrame object from it.

希望能帮到你.干杯.

这篇关于如何检查数据帧?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆