Spark 2.0数据集与DataFrame [英] Spark 2.0 Dataset vs DataFrame

查看:92
本文介绍了Spark 2.0数据集与DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

从spark 2.0.1开始,我遇到了一些问题.我阅读了很多文档,但到目前为止找不到足够的答案:

starting out with spark 2.0.1 I got some questions. I read a lot of documentation but so far could not find sufficient answers:

  • 之间有什么区别
    • df.select("foo")
    • df.select($"foo")
    • What is the difference between
      • df.select("foo")
      • df.select($"foo")
      • myDataSet.map(foo.someVal)是类型安全的,不会转换为RDD,而是保留在DataSet表示中/没有额外的开销(对于2.0.0而言,是明智的选择)
      • myDataSet.map(foo.someVal) is typesafe and will not convert into RDD but stay in DataSet representation / no additional overhead (performance wise for 2.0.0)
      • 为什么我应该使用UDF/UADF代替地图(假设地图留在数据集表示中)?

      推荐答案

      1. df.select("foo")df.select($"foo")之间的差异是签名.前一个取至少一个String,后一个取零或多个Columns.除此之外,没有任何实际差异.
      2. myDataSet.map(foo.someVal)类型检查,但是由于任何Dataset操作都使用对象的RDD,并且与DataFrame操作相比,存在大量开销.让我们看一个简单的例子:

      1. Difference between df.select("foo") and df.select($"foo") is signature. The former one takes at least one String, the later one zero or more Columns. There is no practical difference beyond that.
      2. myDataSet.map(foo.someVal) type checks, but as any Dataset operation uses RDD of objects, and compared to DataFrame operations, there is a significant overhead. Let's take a look at a simple example:

      case class FooBar(foo: Int, bar: String)
      val ds = Seq(FooBar(1, "x")).toDS
      ds.map(_.foo).explain
      

      == Physical Plan ==
      *SerializeFromObject [input[0, int, true] AS value#123]
      +- *MapElements <function1>, obj#122: int
         +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
            +- LocalTableScan [foo#117, bar#118]
      

      如您所见,此执行计划需要访问所有字段,并且必须访问DeserializeToObject.

      As you can see this execution plan requires access to all fields and has to DeserializeToObject.

      否.通常,其他方法不是 syntactic sugar ,它们会产生明显不同的执行计划.例如:

      No. In general other methods are not syntactic sugar and generate a significantly different execution plan. For example:

      ds.select($"foo").explain
      

      == Physical Plan ==
      LocalTableScan [foo#117]
      

      与显示的计划相比,它可以直接访问列.它不是API的限制,而是操作语义上不同的结果.

      Compared to the plan shown before it can access column directly. It is not so much a limitation of the API but a result of a difference in the operational semantics.

      在没有map语句的情况下,如何df.select("foo")类型安全?

      How could I df.select("foo") type-safe without a map statement?

      没有这样的选择.虽然类型化的列允许您将静态Dataset转换为另一个静态类型的Dataset:

      There is no such option. While typed columns allow you to transform statically Dataset into another statically typed Dataset:

      ds.select($"bar".as[Int])
      

      没有类型安全.还有其他尝试包括类型安全的优化操作,例如类型聚合,,但该实验性API.

      there are not type safe. There some other attempts to include type safe optimized operations, like typed aggregations, but this experimental API.

      为什么我应该使用UDF/UADF而不是地图

      why should I use a UDF / UADF instead of a map

      完全取决于您. Spark中的每个分布式数据结构都有其自身的优缺点(例如,参见将ArrayType作为bufferSchema性能问题的Spark UDAF ).

      It is completely up to you. Each distributed data structure in Spark provides its own advantages and disadvantages (see for example Spark UDAF with ArrayType as bufferSchema performance issues).

      我个人认为,静态键入Dataset的用处最少:

      Personally, I find statically typed Dataset to be the least useful:

      • 未提供与Dataset[Row]相同的优化范围(尽管它们共享存储格式和某些执行计划优化,但不能完全受益于代码生成或堆外存储),也无法访问所有DataFrame.

      • Don't provide the same range of optimizations as Dataset[Row] (although they share storage format and some execution plan optimizations it doesn't fully benefit from code generation or off-heap storage) nor access to all the analytical capabilities of the DataFrame.

      类型化转换是黑盒,有效地为优化程序创建了分析障碍.例如,不能将选择(过滤器)推入类型转换:

      Typed transformations are black boxes, and effectively create analysis barrier for the optimizer. For example selections (filters) cannot be be pushed over typed transformation:

      ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
      

      == Physical Plan ==
      *Filter (foo#133 = 1)
      +- *Filter <function1>.apply
         +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
            +- Exchange hashpartitioning(foo#133, 200)
               +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
                  +- LocalTableScan [foo#133, bar#134]
      

      相比:

      ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
      

      == Physical Plan ==
      *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
      +- Exchange hashpartitioning(foo#133, 200)
         +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
            +- *Filter (foo#133 = 1)
               +- LocalTableScan [foo#133, bar#134] 
      

      这会影响谓词下推或投影下推等功能.

      This impacts features like predicate pushdown or projection pushdown.

      不像RDDs那样灵活,仅本地支持一小部分类型.

      There are not as flexible as RDDs with only a small subset of types supported natively.

      相关问题:

      • Perform a typed join in Scala with Spark Datasets
      • Spark 2.0 DataSets groupByKey and divide operation and type safety

      这篇关于Spark 2.0数据集与DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆