Spark 2.0数据集与DataFrame [英] Spark 2.0 Dataset vs DataFrame
问题描述
从spark 2.0.1开始,我遇到了一些问题.我阅读了很多文档,但到目前为止找不到足够的答案:
starting out with spark 2.0.1 I got some questions. I read a lot of documentation but so far could not find sufficient answers:
- 之间有什么区别
-
df.select("foo")
-
df.select($"foo")
- What is the difference between
df.select("foo")
df.select($"foo")
-
myDataSet.map(foo.someVal)
是类型安全的,不会转换为RDD
,而是保留在DataSet表示中/没有额外的开销(对于2.0.0而言,是明智的选择)
myDataSet.map(foo.someVal)
is typesafe and will not convert intoRDD
but stay in DataSet representation / no additional overhead (performance wise for 2.0.0)
- 为什么我应该使用UDF/UADF代替地图(假设地图留在数据集表示中)?
推荐答案
-
df.select("foo")
和df.select($"foo")
之间的差异是签名.前一个取至少一个String
,后一个取零或多个Columns
.除此之外,没有任何实际差异. -
myDataSet.map(foo.someVal)
类型检查,但是由于任何Dataset
操作都使用对象的RDD
,并且与DataFrame
操作相比,存在大量开销.让我们看一个简单的例子:
- Difference between
df.select("foo")
anddf.select($"foo")
is signature. The former one takes at least oneString
, the later one zero or moreColumns
. There is no practical difference beyond that. myDataSet.map(foo.someVal)
type checks, but as anyDataset
operation usesRDD
of objects, and compared toDataFrame
operations, there is a significant overhead. Let's take a look at a simple example:
case class FooBar(foo: Int, bar: String) val ds = Seq(FooBar(1, "x")).toDS ds.map(_.foo).explain
== Physical Plan == *SerializeFromObject [input[0, int, true] AS value#123] +- *MapElements <function1>, obj#122: int +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar +- LocalTableScan [foo#117, bar#118]
如您所见,此执行计划需要访问所有字段,并且必须访问
DeserializeToObject
.As you can see this execution plan requires access to all fields and has to
DeserializeToObject
.否.通常,其他方法不是 syntactic sugar ,它们会产生明显不同的执行计划.例如:
No. In general other methods are not syntactic sugar and generate a significantly different execution plan. For example:
ds.select($"foo").explain
== Physical Plan == LocalTableScan [foo#117]
与显示的计划相比,它可以直接访问列.它不是API的限制,而是操作语义上不同的结果.
Compared to the plan shown before it can access column directly. It is not so much a limitation of the API but a result of a difference in the operational semantics.
在没有map语句的情况下,如何df.select("foo")类型安全?
How could I df.select("foo") type-safe without a map statement?
没有这样的选择.虽然类型化的列允许您将静态
Dataset
转换为另一个静态类型的Dataset
:There is no such option. While typed columns allow you to transform statically
Dataset
into another statically typedDataset
:ds.select($"bar".as[Int])
没有类型安全.还有其他尝试包括类型安全的优化操作,例如类型聚合,,但该实验性API.
there are not type safe. There some other attempts to include type safe optimized operations, like typed aggregations, but this experimental API.
为什么我应该使用UDF/UADF而不是地图
why should I use a UDF / UADF instead of a map
完全取决于您. Spark中的每个分布式数据结构都有其自身的优缺点(例如,参见将ArrayType作为bufferSchema性能问题的Spark UDAF ).
It is completely up to you. Each distributed data structure in Spark provides its own advantages and disadvantages (see for example Spark UDAF with ArrayType as bufferSchema performance issues).
我个人认为,静态键入
Dataset
的用处最少:Personally, I find statically typed
Dataset
to be the least useful:-
未提供与
Dataset[Row]
相同的优化范围(尽管它们共享存储格式和某些执行计划优化,但不能完全受益于代码生成或堆外存储),也无法访问所有DataFrame
.
Don't provide the same range of optimizations as
Dataset[Row]
(although they share storage format and some execution plan optimizations it doesn't fully benefit from code generation or off-heap storage) nor access to all the analytical capabilities of theDataFrame
.
类型化转换是黑盒,有效地为优化程序创建了分析障碍.例如,不能将选择(过滤器)推入类型转换:
Typed transformations are black boxes, and effectively create analysis barrier for the optimizer. For example selections (filters) cannot be be pushed over typed transformation:
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
== Physical Plan == *Filter (foo#133 = 1) +- *Filter <function1>.apply +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- LocalTableScan [foo#133, bar#134]
相比:
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
== Physical Plan == *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- *Filter (foo#133 = 1) +- LocalTableScan [foo#133, bar#134]
这会影响谓词下推或投影下推等功能.
This impacts features like predicate pushdown or projection pushdown.
不像
RDDs
那样灵活,仅本地支持一小部分类型.There are not as flexible as
RDDs
with only a small subset of types supported natively.相关问题:
- Perform a typed join in Scala with Spark Datasets
- Spark 2.0 DataSets groupByKey and divide operation and type safety
这篇关于Spark 2.0数据集与DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
-