RDD API与结合了DataFrame API的UDF对性能的影响 [英] Performance impact of RDD API vs UDFs mixed with DataFrame API

查看:101
本文介绍了RDD API与结合了DataFrame API的UDF对性能的影响的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

(有关scala的问题.)

(Scala-specific question.)

虽然Spark文档鼓励在可能的情况下使用DataFrame API,但如果DataFrame API不足,通常是在退回RDD API还是使用UDF之间进行选择.这两种选择之间存在固有的性能差异吗?

While Spark docs encourage the use of DataFrame API where possible, if DataFrame API is insufficient, the choice is usually between falling back to RDD API or using UDFs. Is there inherent performance difference between these two alternatives?

RDD和UDF相似之处在于它们都不可以从Catalyst和Tungsten优化中受益.还有其他开销吗?如果有,这两种方法之间是否有区别?

RDD and UDF are similar in that neither of them can benefit from Catalyst and Tungsten optimizations. Is there any other overhead, and if there is, does it differ between the two approaches?

举一个具体的例子,假设我有一个DataFrame,其中包含一列具有自定义格式(不适合regexp匹配)的文本数据.我需要解析该列,并添加一个新的向量列,其中包含产生的标记.

To give a specific example, let's say I have a DataFrame that contains a column of text data with custom formatting (not amenable to regexp matching). I need to parse that column and add a new vector column that contains the resulting tokens.

推荐答案

他们都不可以从Catalyst和Tungsten优化中受益

neither of them can benefit from Catalyst and Tungsten optimizations

这并非完全正确.尽管UDF不能从钨优化中受益(可以说简单的SQL转换也不能在那里获得巨大的提升),但您仍然可以从Catalyst提供的执行计划优化中受益.让我们用一个简单的示例进行说明(注意:Spark 2.0和Scala.请勿将其推断到早期版本,尤其是在PySpark中):

This is not exactly true. While UDFs don't benefit from Tungsten optimization (arguably simple SQL transformation don't get huge boost there either) you still may benefit from execution plan optimizations provided by Catalyst. Let's illustrate that with a simple example (Note: Spark 2.0 and Scala. Don't extrapolate this to earlier versions, especially with PySpark):

val f = udf((x: String) => x == "a")
val g = udf((x: Int) => x + 1)

val df = Seq(("a", 1), ("b", 2)).toDF

df
  .groupBy($"_1")
  .agg(sum($"_2").as("_2"))
  .where(f($"_1"))
  .withColumn("_2", g($"_2"))
  .select($"_1")
  .explain

// == Physical Plan ==
// *HashAggregate(keys=[_1#2], functions=[])
// +- Exchange hashpartitioning(_1#2, 200)
//    +- *HashAggregate(keys=[_1#2], functions=[])
//       +- *Project [_1#2]
//          +- *Filter UDF(_1#2)
//             +- LocalTableScan [_1#2, _2#3]

执行计划向我们展示了几件事:

Execution plan shows us a couple of things:

  • 选择在汇总之前已被下推.
  • 投影在汇总之前已被下推,并有效删除了第二个UDF调用.
  • Selection has been pushed down before aggregation.
  • Projection has been pushed down before aggregation and effectively removed second UDF call.

根据数据和管道,这几乎可以免费提供实质性的性能提升.

Depending on the data and pipeline this can provide a substantial performance boost almost for free.

话虽这么说,RDD和UDF都需要在安全和不安全之间进行迁移,而后者的灵活性明显不足.但是,如果您唯一需要的是简单的map行为,而无需初始化昂贵的对象(例如数据库连接),那么UDF是解决之道.

That being said both RDDs and UDFs require migrations between safe and unsafe with the latter one being significantly less flexible. Still, if the only thing you need is a simple map-like behavior without initializing expensive objects (like database connections) then UDF is the way to go.

在稍微复杂的场景中,如果您确实需要访问某些低级功能(例如自定义分区),则可以轻松地使用通用Dataset并保留RDDs.

In slightly more complex scenarios you can easily drop down to generic Dataset and reserve RDDs for cases when you really require an access to some low level features like custom partitioning.

这篇关于RDD API与结合了DataFrame API的UDF对性能的影响的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆