在DataFrame,RDD和Back之间进行转换会对性能产生什么影响? [英] What's the performance impact of converting between `DataFrame`, `RDD` and back?

查看:106
本文介绍了在DataFrame,RDD和Back之间进行转换会对性能产生什么影响?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

虽然我的第一个直觉是对所有内容使用DataFrames,但这是不可能的-某些操作显然更容易和/或更好地表现为RDD操作,更不用说像GraphX这样的某些API只能在RDDs.

While my first instinct is to use DataFrames for everything, it's just not possible -- some operations are clearly easier and / or better performing as RDD operations, not to mention certain APIs like GraphX only work on RDDs.

这些天我似乎花了很多时间在DataFramesRDDs之间来回转换-那么性能受到什么影响?以RDD.checkpoint为例-没有DataFrame等价物,所以当我这样做时,在幕后会发生什么:

I seem to be spending a lot of time these days converting back and forth between DataFrames and RDDs -- so what's the performance hit? Take RDD.checkpoint -- there's no DataFrame equivalent, so what happens under the hood when I do:

val df = Seq((1,2),(3,4)).toDF("key","value")
val rdd = df.rdd.map(...)
val newDf = rdd.map(r => (r.getInt(0), r.getInt(1))).toDF("key","value")

很明显,这是一个很小的例子,但是很高兴知道转换中幕后发生了什么.

Obviously, this is a trivally small example, but it would be great to know what happens behind the scene in the conversion.

推荐答案

首先让我们看一下df.rdd.定义为:

Let's look at df.rdd first. This is defined as:

lazy val rdd: RDD[Row] = {
  // use a local variable to make sure the map closure doesn't capture the whole DataFrame
  val schema = this.schema
  queryExecution.toRdd.mapPartitions { rows =>
    val converter = CatalystTypeConverters.createToScalaConverter(schema)
    rows.map(converter(_).asInstanceOf[Row])
  }
}

因此,它首先运行queryExecution.toRdd,它基本上根据用于构建DataFrame的运算符来准备执行计划,并计算代表计划结果的RDD[InternalRow].

So firstly, it runs queryExecution.toRdd, which basically prepares the execution plan based on the operators used to build up the DataFrame, and computes an RDD[InternalRow] that represents the outcome of plan.

接下来,该RDD的这些InternalRow(仅供内部使用)将被映射到普通的Row.每行需要执行以下操作:

Next these InternalRows (which are only for internal use) of that RDD will be mapped to normal Rows. This will entail the following for each row:

override def toScala(row: InternalRow): Row = {
  if (row == null) {
    null
  } else {
    val ar = new Array[Any](row.numFields)
    var idx = 0
    while (idx < row.numFields) {
      ar(idx) = converters(idx).toScala(row, idx)
      idx += 1
    }
    new GenericRowWithSchema(ar, structType)
  }
}

因此,它遍历所有元素,将它们覆盖到"scala"空间(来自Catalyst空间),并使用它们创建最后一行. toDf几乎会反过来做这些事情.

So it loops over all elements, coverts them to 'scala' space (from Catalyst space), and creates the final row with them. toDf will pretty much do these things in reverse.

这一切确实会对您的表现产生一些影响.多少取决于将这些操作与您对数据所做的操作相比有多复杂.但是,更大的可能影响将是Spark的Catalyst优化器只能优化到RDD和来自RDD的转换之间的操作,而不能优化整个执行计划.看看您遇到哪些操作会很有趣,我发现大多数事情都可以使用基本表达式或UDF来完成.但是,使用仅适用于RDD的模块是一个非常有效的用例!

This all will indeed have some impact on your performance. How much depends on how complex these operations are compared to the things you do with the data. The bigger possible impact however will be that Spark's Catalyst optimizer can only optimize the operations between the conversions to and from RDDs, rather than optimize the full execution plan in its whole. It would be interesting to see which operations you have trouble with, I find most things can be done using basic expressions or UDFs. Using modules that only work on RDDs is a very valid use case though!

这篇关于在DataFrame,RDD和Back之间进行转换会对性能产生什么影响?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆