激发多个数据帧保存 [英] Spark multiple dataframe saves

查看:176
本文介绍了激发多个数据帧保存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个创建数据框的Spark工作,我可以将它保存到HDFS中。我想要做的是将该数据框的一个子集保存到另一个地方,但我想要保持这一点。



我唯一的转换是保存自己...火花作业的其他每个代码元素都是一个动作。我不缓存数据帧。我担心在旧数据框架上创建删除动作会再次进行所有原始数据框架转换。



例如,我有这样的内容:

  val df = hiveContext.read.json(hdfs:// HOSTNAME:PORT / user / spark / data / in / * )

val df2 = df.withColumn(new_column,some_udf(old_column))。drop(old_column)



$ b $ val final_df = df10.withColumn(newest_column,another_udf(old_column2))。drop(old_column2)

val subset_df = final_df.drop(this_column )
.drop(that_column)
.drop(another_column)

final_df.write.mode(SaveMode.Overwrite).format(json)。save (hdfs_dir)
subset_df.write.mode(SaveMode.Overwrite).format(json)。save(hdfs_dir2)

但是让我们假设 some_udf 实际上真的是计算密集型的,我不希望它运行两次。 >

如果我在声明 subset_df 之前< final_df.cache()和调用这些保存以确保它不会再执行udf的转换?

类似于:

  val df = hiveContext.read.json(hdfs:// HOSTNAME:PORT / user / spark / data / in / *)

val df2 = df.withColumn(new_column,some_udf(old_column))。dr op(old_column)



$ b $ val final_df = df10.withColumn(newest_column,another_udf(old_column2))。drop(old_column2)

val subset_df = final_df.drop(this_column )
.drop(that_column)
.drop(another_column)

final_df.cache()//这是唯一的新行

final_df.write.mode(SaveMode.Overwrite).format(json)。save(hdfs_dir)
subset_df.write.mode(SaveMode.Overwrite).format(json)。save(hdfs_dir2)


解决方案

您应该缓存:

  val final_df = df10.withColumn(...)
val subset_df = final_df.drop(...)
final_df.cache ()

,否则它会执行两次(如您怀疑的那样)。 b $ b

I have a spark job that creates a data frame that I save down to HDFS. What I would like to do is to save a subset of that data frame to another place, but I want to be performant about this.

The only transformation I have is the save itself...every other element of code for the spark job is an action. I do not cache the data frame. I am concerned that creating drop actions on a new data frame from the old one will undergo all the original data frames transformations again.

For example, I have something like:

val df = hiveContext.read.json("hdfs://HOSTNAME:PORT/user/spark/data/in/*")

val df2 = df.withColumn("new_column", some_udf("old_column")).drop("old_column")
.
.
.

val final_df = df10.withColumn("newest_column", another_udf("old_column2")).drop("old_column2")

val subset_df = final_df.drop("this_column")
                        .drop("that_column")
                        .drop("another_column)

final_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir)
subset_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir2)

But lets assume that some_udf is actually really compute intensive. I don't want it to run twice. Thus my question is:

Should I final_df.cache() before I declare subset_df and calling the saves to make sure that it doesn't perform the transformation of the udf again?

Something like:

val df = hiveContext.read.json("hdfs://HOSTNAME:PORT/user/spark/data/in/*")

val df2 = df.withColumn("new_column", some_udf("old_column")).drop("old_column")
.
.
.

val final_df = df10.withColumn("newest_column", another_udf("old_column2")).drop("old_column2")

val subset_df = final_df.drop("this_column")
                        .drop("that_column")
                        .drop("another_column)

final_df.cache()  // This is the only new line

final_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir)
subset_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir2)

解决方案

You should cache:

val final_df = df10.withColumn(...)
val subset_df = final_df.drop(...)
final_df.cache() 

before first action otherwise it will execute twice (as you suspect).

这篇关于激发多个数据帧保存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆