有没有办法检查点阿帕奇星火DataFrames? [英] Is there a way to checkpoint Apache Spark DataFrames?

查看:188
本文介绍了有没有办法检查点阿帕奇星火DataFrames?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种方法来检查点DataFrames。检查点是目前RDD操作,但我找不到如何与DataFrames做到这一点。坚持和缓存(这是对彼此的同义词)可用于数据帧,但他们不破血统,因而不适合的方法,可以循环迭代为数百(或千计)。

作为一个例子,假设我有他的签名是数据帧=>数据框的函数列表。我想有一个方法来计算,即使以下时myfunctions有条目数以百计或数以千计的:

 高清美孚(数据集:数据帧,G:数据帧=>单位)=
    myfunctions.foldLeft(数据集){
        案例(DF,F)=>
            VAL nextDF = F(DF)
            克(nextDF)
            nextDF
   }


解决方案

TL; DR:火花版本到1.6,实际得到一个检查点DF,我建议的解决方案是基于另一种答案,但有一个额外的行:

  df.rdd.checkpoint
df.rdd.count
VAL DF2 = sqlContext.createDataFrame(df.rdd,df.schema)
// DF2被设置检查点


说明

更新后,进一步的研究。

由于所指出的,直接一个检查点数据框目前不是(星火1.6.1)可能的,虽然有一个发出它在星火的吉拉。

所以,一个可能的解决方法是在另一个回答一个建议:

  df.rdd.checkpoint //假设检查点目录已经设置
df.count //一个动作来计算检查点

然而,使用这种方法,只有df.rdd对象将被检查点。这可以通过调用 toDebugString df.rdd 进行验证:

 斯卡拉> df.rdd.toDebugString
 (32)MapPartitionsRDD [1]在AT&LT RDD;控制台>:38]
  | ReliableCheckpointRDD [2]在AT&LT计数;控制台>:38]

,然后调用 toDebugString 快速转型,东风(请注意,我建立了我的数据帧从一个JDBC后源),返回如下:

 斯卡拉> df.withColumn(NEW_COLUMN,点燃(0))。rdd.toDebugString
RES4:字符串=
(32)MapPartitionsRDD [5]在AT&LT RDD;控制台>:38]
 | MapPartitionsRDD [4]在与下RDD;控制台>:38]
 | JDBCRDD [3]在与下RDD;控制台>:38]

df.explain 也显示一个提示:

 斯卡拉> df.explain
==物理计划==
扫描JDBCRelation(...)

所以,真正实现了检查点数据帧,我只能认为创建一个从检查点RDD一个新的:

  VAL newDF = sqlContext.createDataFrame(df.rdd,df.schema)
// 要么
VAL newDF = {df.rdd.map
  行的情况下(VAL1:智力,...,VALN:强度)=> (VAL1,...,VALN)
} .toDF(COL1,...,科隆)

然后我们就可以验证新数据帧是检查点:

1) newDF.explain

 斯卡拉> newDF.explain
==物理计划==
扫描PhysicalRDD [COL1#5,#COL2 6,COL3#7]

2) newDF.rdd.toDebugString

 斯卡拉> newDF.rdd.toDebugString
res7:字符串=
(32)MapPartitionsRDD [10]在AT&LT RDD;控制台>:40]
 | MapPartitionsRDD [8]在createDataFrame AT<&控制台GT;:37]
 | MapPartitionsRDD [1]在AT&LT RDD;控制台>:38]
 | ReliableCheckpointRDD [2]在AT&LT计数;控制台>:38]

3)随着转换:

 斯卡拉> newDF.withColumn(NEW_COLUMN,点燃(0))。rdd.toDebugString
res9:字符串=
(32)MapPartitionsRDD [12]在AT&LT RDD;控制台>:40]
 | MapPartitionsRDD [11]在AT&LT RDD;控制台>:40]
 | MapPartitionsRDD [8]在createDataFrame AT<&控制台GT;:37]
 | MapPartitionsRDD [1]在AT&LT RDD;控制台>:38]
 | ReliableCheckpointRDD [2]在AT&LT计数;控制台>:38]

另外,我尝试了一些更复杂的变换,我是能够检查,在实践中,在 newDF 对象设置检查点。

Therfore,我发现这是由它的检查点有关的RDD并从它创建一个新的数据框对象来可靠地检查点数据框的唯一途径。

我希望它能帮助。干杯。

I'm looking for a way to checkpoint DataFrames. Checkpoint is currently an operation on RDD but I can't find how to do it with DataFrames. persist and cache (which are synonyms for each other) are available for DataFrame but they do not "break the lineage" and are thus unsuitable for methods that could loop for hundreds (or thousands) of iterations.

As an example, suppose that I have a list of functions whose signature is DataFrame => DataFrame. I want to have a way to compute the following even when myfunctions has hundreds or thousands of entries:

def foo(dataset: DataFrame, g: DataFrame => Unit) =
    myfunctions.foldLeft(dataset) {
        case (df, f) =>
            val nextDF = f(df)
            g(nextDF)
            nextDF
   }

解决方案

TL;DR: For Spark versions up to 1.6, to actually get a "checkpointed DF", my suggested solution is based on another answer, but with one extra line:

df.rdd.checkpoint
df.rdd.count
val df2 = sqlContext.createDataFrame(df.rdd, df.schema)
// df2 is checkpointed


Explanation

Updated after further research.

As pointed out, checkpointing a DataFrame directly is not currently (Spark 1.6.1) possible, though there is an issue for it on Spark's Jira.

So, a possible workaround is the one suggested on another answer:

df.rdd.checkpoint // Assuming the checkpoint dir has already been set
df.count // An action to compute the checkpoint

However, with this approach, only the df.rdd object will be checkpointed. This can be verified by calling toDebugString to df.rdd:

 scala> df.rdd.toDebugString
 (32) MapPartitionsRDD[1] at rdd at <console>:38 []
  |   ReliableCheckpointRDD[2] at count at <console>:38 []

and then calling toDebugString after a quick transformation to df (please note that I created my DataFrame from a JDBC source), returns the following:

scala> df.withColumn("new_column", lit(0)).rdd.toDebugString
res4: String =
(32) MapPartitionsRDD[5] at rdd at <console>:38 []
 |   MapPartitionsRDD[4] at rdd at <console>:38 []
 |   JDBCRDD[3] at rdd at <console>:38 []

df.explain also shows a hint:

scala> df.explain
== Physical Plan ==
Scan JDBCRelation (...)

So, to actually achieve a "checkpointed" DataFrame, I can only think of creating a new one from the checkpointed RDD:

val newDF = sqlContext.createDataFrame(df.rdd, df.schema)
// or
val newDF = df.rdd.map { 
  case Row(val1: Int, ..., valN: Int) => (val1, ..., valN)
}.toDF("col1", ..., "colN")

Then we can verify that the new DataFrame is "checkpointed":

1) newDF.explain:

scala> newDF.explain
== Physical Plan ==
Scan PhysicalRDD[col1#5, col2#6, col3#7]

2) newDF.rdd.toDebugString:

scala> newDF.rdd.toDebugString
res7: String =
(32) MapPartitionsRDD[10] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

3) With transformation:

scala> newDF.withColumn("new_column", lit(0)).rdd.toDebugString
res9: String =
(32) MapPartitionsRDD[12] at rdd at <console>:40 []
 |   MapPartitionsRDD[11] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

Also, I tried some more complex transformations and I was able to check, in practice, that the newDF object was checkpointed.

Therfore, the only way I found to reliably checkpoint a DataFrame was by checkpointing its associated RDD and creating a new DataFrame object from it.

I hope it helps. Cheers.

这篇关于有没有办法检查点阿帕奇星火DataFrames?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆