为什么在类型化数据集API中不使用谓词下推(与未类型化DataFrame API相比)? [英] Why is predicate pushdown not used in typed Dataset API (vs untyped DataFrame API)?

查看:55
本文介绍了为什么在类型化数据集API中不使用谓词下推(与未类型化DataFrame API相比)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直认为数据集/数据框架API相同.唯一的区别是,数据集API将为您提供编译时安全性.是吗?

所以..我有一个非常简单的案例:

 case class Player (playerID: String, birthYear: Int)

 val playersDs: Dataset[Player] = session.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .csv(PeopleCsv)
  .as[Player]

 // Let's try to find players born in 1999. 
 // This will work, you have compile time safety... but it will not use predicate pushdown!!!
 playersDs.filter(_.birthYear == 1999).explain()

 // This will work as expected and use predicate pushdown!!!
 // But you can't have compile time safety with this :(
 playersDs.filter('birthYear === 1999).explain()

第一个示例的解释将显示它没有在做谓词下推(注意为空的PushedFilters):

== Physical Plan ==
*(1) Filter <function1>.apply
+- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

第二个示例将正确执行此操作(注意PushedFilters):

== Physical Plan ==
*(1) Project [.....]
+- *(1) Filter (isnotnull(birthYear#11) && (birthYear#11 = 1999))
   +- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [IsNotNull(birthYear), EqualTo(birthYear,1999)], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

所以问题是..我如何使用DS Api,并具有编译时间安全性,以及谓词下推是否按预期工作????

有可能吗?如果不是,这是否意味着DS api可以为您提供编译时的安全性,但是会降低性能! ??? (在这种情况下,DF会更快..尤其是在处理大型实木复合地板文件时)

解决方案

这是您的物理计划中的那行,您应该记住了解Dataset[T]DataFrame(即Dataset[Row])之间的真实差异.

Filter <function1>.apply

我一直说人们应该远离类型化的Dataset API,并继续使用非类型化的DataFrame API,因为Scala代码在很多地方成为优化器的黑匣子.您只是碰到了其中之一,还考虑了Spark SQL远离JVM以避免GC的所有对象的反序列化.每次触摸对象时,您实际上都会要求Spark SQL对对象进行反序列化并将其加载到JVM上,这给GC带来了很大的压力(与无类型的DataFrame API相比,有类型的Dataset API会更频繁地触发该对象).

请参见 UDF是黑匣子-不要使用它们除非您别无选择.


引用 SPARK-14083分析JVM字节码并将闭包转换为催化剂的表达方式,但是正如有人说的那样(我认为是Twitter上的Adam B.),这是一个笑话,很快就会期待它.

Dataset API的一大优势是类型安全,因为严重依赖用户定义的闭包/lambda,因此以性能为代价.这些闭包通常比表达式慢,因为我们有更大的灵活性来优化表达式(已知数据类型,无虚函数调用等).在很多情况下,查看这些闭包的字节码并弄清楚它们试图做什么实际上并不困难.如果我们能够理解它们,则可以将它们直接转换为Catalyst表达式,以实现更优化的执行.


// Let's try to find players born in 1999. 
// This will work, you have compile time safety... but it will not use predicate pushdown!!!
playersDs.filter(_.birthYear == 1999).explain()

上面的代码等同于以下代码:

val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) => p.birthYear == 1999
playersDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()

someCodeSparkSQLCannotDoMuchOutOfIt正是您将优化放在一边,让Spark Optimizer跳过它的地方.

I always thought that dataset/dataframe API's are the same.. and the only difference is that dataset API will give you compile time safety. Right ?

So.. I have very simple case:

 case class Player (playerID: String, birthYear: Int)

 val playersDs: Dataset[Player] = session.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .csv(PeopleCsv)
  .as[Player]

 // Let's try to find players born in 1999. 
 // This will work, you have compile time safety... but it will not use predicate pushdown!!!
 playersDs.filter(_.birthYear == 1999).explain()

 // This will work as expected and use predicate pushdown!!!
 // But you can't have compile time safety with this :(
 playersDs.filter('birthYear === 1999).explain()

Explain from first example will show that it's NOT doing predicate pushdown (Notice empty PushedFilters):

== Physical Plan ==
*(1) Filter <function1>.apply
+- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

While the second sample will do it correctly (Notice PushedFilters):

== Physical Plan ==
*(1) Project [.....]
+- *(1) Filter (isnotnull(birthYear#11) && (birthYear#11 = 1999))
   +- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [IsNotNull(birthYear), EqualTo(birthYear,1999)], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

So the question is.. how can I use DS Api, and have compile time safety.., and predicate pushdown working as expected ????

Is it possible ? If not.. does this mean that DS api gives you compile time safety.. but at the cost of performance!! ??? (DF will be much faster in this case.. especially when processing large parquet files)

解决方案

That's the line in your Physical Plan you should remember to know the real difference between Dataset[T] and DataFrame (which is Dataset[Row]).

Filter <function1>.apply

I keep saying that people should stay away from the typed Dataset API and keep using the untyped DataFrame API as the Scala code becomes a black box to the optimizer in too many places. You've just hit one of these and think also about the deserialization of all the objects that Spark SQL keeps away from JVM to avoid GCs. Every time you touch the objects you literally ask Spark SQL to deserialize objects and load them onto JVM that puts a lot of pressure on GC (which will get triggered more often with the typed Dataset API as compared to the untyped DataFrame API).

See UDFs are Blackbox — Don’t Use Them Unless You’ve Got No Choice.


Quoting Reynold Xin after I asked the very same question on dev@spark.a.o mailing list:

The UDF is a black box so Spark can't know what it is dealing with. There are simple cases in which we can analyze the UDFs byte code and infer what it is doing, but it is pretty difficult to do in general.

There is a JIRA ticket for such cases SPARK-14083 Analyze JVM bytecode and turn closures into Catalyst expressions, but as someone said (I think it was Adam B. on twitter) it'd be a kind of joke to expect it any time soon.

One big advantage of the Dataset API is the type safety, at the cost of performance due to heavy reliance on user-defined closures/lambdas. These closures are typically slower than expressions because we have more flexibility to optimize expressions (known data types, no virtual function calls, etc). In many cases, it's actually not going to be very difficult to look into the byte code of these closures and figure out what they are trying to do. If we can understand them, then we can turn them directly into Catalyst expressions for more optimized executions.


// Let's try to find players born in 1999. 
// This will work, you have compile time safety... but it will not use predicate pushdown!!!
playersDs.filter(_.birthYear == 1999).explain()

The above code is equivalent to the following:

val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) => p.birthYear == 1999
playersDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()

someCodeSparkSQLCannotDoMuchOutOfIt is exactly where you put optimizations aside and let Spark Optimizer skip it.

这篇关于为什么在类型化数据集API中不使用谓词下推(与未类型化DataFrame API相比)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆