RDD API 与 UDF 与 DataFrame API 混合的性能影响 [英] Performance impact of RDD API vs UDFs mixed with DataFrame API

查看:35
本文介绍了RDD API 与 UDF 与 DataFrame API 混合的性能影响的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

(特定于 Scala 的问题.)

(Scala-specific question.)

虽然 Spark 文档鼓励在可能的情况下使用 DataFrame API,但如果 DataFrame API 不足,选择通常是回退到 RDD API 或使用 UDF.这两种替代方案之间是否存在固有的性能差异?

While Spark docs encourage the use of DataFrame API where possible, if DataFrame API is insufficient, the choice is usually between falling back to RDD API or using UDFs. Is there inherent performance difference between these two alternatives?

RDD 和 UDF 的相似之处在于它们都不能从 Catalyst 和 Tungsten 优化中受益.是否有任何其他开销,如果有,两种方法之间是否有区别?

RDD and UDF are similar in that neither of them can benefit from Catalyst and Tungsten optimizations. Is there any other overhead, and if there is, does it differ between the two approaches?

举一个具体的例子,假设我有一个 DataFrame,它包含一列具有自定义格式的文本数据(不适合正则表达式匹配).我需要解析该列并添加一个包含结果标记的新向量列.

To give a specific example, let's say I have a DataFrame that contains a column of text data with custom formatting (not amenable to regexp matching). I need to parse that column and add a new vector column that contains the resulting tokens.

推荐答案

它们都不能从 Catalyst 和 Tungsten 优化中受益

neither of them can benefit from Catalyst and Tungsten optimizations

这并不完全正确.虽然 UDF 不会从 Tungsten 优化中受益(可以说简单的 SQL 转换也不会在那里获得巨大的提升),但您仍然可以从 Catalyst 提供的执行计划优化中受益.让我们用一个简单的例子来说明这一点(注意:Spark 2.0 和 Scala.不要将其推断到早期版本,尤其是 PySpark):

This is not exactly true. While UDFs don't benefit from Tungsten optimization (arguably simple SQL transformation don't get huge boost there either) you still may benefit from execution plan optimizations provided by Catalyst. Let's illustrate that with a simple example (Note: Spark 2.0 and Scala. Don't extrapolate this to earlier versions, especially with PySpark):

val f = udf((x: String) => x == "a")
val g = udf((x: Int) => x + 1)

val df = Seq(("a", 1), ("b", 2)).toDF

df
  .groupBy($"_1")
  .agg(sum($"_2").as("_2"))
  .where(f($"_1"))
  .withColumn("_2", g($"_2"))
  .select($"_1")
  .explain

// == Physical Plan ==
// *HashAggregate(keys=[_1#2], functions=[])
// +- Exchange hashpartitioning(_1#2, 200)
//    +- *HashAggregate(keys=[_1#2], functions=[])
//       +- *Project [_1#2]
//          +- *Filter UDF(_1#2)
//             +- LocalTableScan [_1#2, _2#3]

执行计划向我们展示了几件事:

Execution plan shows us a couple of things:

  • 选择在聚合前已被下推.
  • 投影在聚合前已下推,并有效移除了第二个 UDF 调用.
  • Selection has been pushed down before aggregation.
  • Projection has been pushed down before aggregation and effectively removed second UDF call.

根据数据和管道,这几乎可以免费提供显着的性能提升.

Depending on the data and pipeline this can provide a substantial performance boost almost for free.

话虽如此,RDD 和 UDF 都需要在安全和不安全之间进行迁移,而后者的灵活性要低得多.尽管如此,如果您唯一需要的是一个简单的 map 之类的行为,而无需初始化昂贵的对象(如数据库连接),那么 UDF 就是您要走的路.

That being said both RDDs and UDFs require migrations between safe and unsafe with the latter one being significantly less flexible. Still, if the only thing you need is a simple map-like behavior without initializing expensive objects (like database connections) then UDF is the way to go.

在稍微复杂一点的场景中,您可以轻松下拉到通用的 Dataset 并保留 RDD 以供您真正需要访问一些低级功能(如自定义分区)的情况下使用.

In slightly more complex scenarios you can easily drop down to generic Dataset and reserve RDDs for cases when you really require an access to some low level features like custom partitioning.

这篇关于RDD API 与 UDF 与 DataFrame API 混合的性能影响的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆