为什么在类型化数据集 API(与非类型化数据帧 API)中不使用谓词下推? [英] Why is predicate pushdown not used in typed Dataset API (vs untyped DataFrame API)?

查看:16
本文介绍了为什么在类型化数据集 API(与非类型化数据帧 API)中不使用谓词下推?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直认为数据集/数据帧 API 是相同的..唯一的区别是数据集 API 将为您提供编译时安全性.对吗?

所以..我有一个非常简单的案例:

 case class Player (playerID: String,birthYear: Int)val playerDs: Dataset[Player] = session.read.option("header", "true").option("分隔符", ",").option("inferSchema", "true").csv(PeopleCsv).as[玩家]//让我们试着找出 1999 年出生的球员.//这会起作用,你有编译时安全...但它不会使用谓词下推!!!playerDs.filter(_.birthYear == 1999).explain()//这将按预期工作并使用谓词下推!!!//但是你不能用这个来保证编译时安全:(playerDs.filter('birthYear === 1999).explain()

从第一个例子中解释将表明它没有做谓词下推(注意空的 PushedFilters):

== 物理计划 ==*(1) 过滤器<function1>.apply+- *(1) FileScan csv [...] 批处理:false,格式:CSV,位置:InMemoryFileIndex[file:People.csv],PartitionFilters:[],PushedFilters:[],ReadSchema:struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

虽然第二个示例可以正确执行(注意 PushedFilters):

== 物理计划 ==*(1) 项目 [.....]+- *(1) 过滤器 (isnotnull(birthYear#11) && (birthYear#11 = 1999))+- *(1) FileScan csv [...] 批处理:false,格式:CSV,位置:InMemoryFileIndex[file:People.csv],PartitionFilters:[],PushedFilters:[IsNotNull(birthYear),EqualTo(birthYear,1999))], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

所以问题是.. 我怎样才能使用 DS Api,并确保编译时安全..,并且谓词下推按预期工作 ????

有可能吗?如果不是..这是否意味着DS api为您提供编译时安全..但以性能为代价!???(在这种情况下,DF 会快得多……尤其是在处理大型镶木地板文件时)

解决方案

那是你物理计划中的那一行,你应该记住要知道 Dataset[T]DataFrame<之间的真正区别/code>(即 Dataset[Row]).

过滤器 .apply

我一直说人们应该远离类型化数据集 API 并继续使用非类型化数据帧 API,因为 Scala 代码在太多地方成为优化器的黑匣子.您刚刚遇到了其中之一,并考虑了 Spark SQL 远离 JVM 以避免 GC 的所有对象的反序列化.每次接触对象时,您实际上都会要求 Spark SQL 反序列化对象并将它们加载到 JVM 上,这会给 GC 带来很大压力(与非类型化 DataFrame API 相比,类型化 Dataset API 会更频繁地触发).

参见 UDF 是黑盒 — 不要使用它们除非你别无选择.

<小时>

引用 Reynold Xin 在我在 dev@spark.ao 邮件列表上提出了同样的问题:

<块引用>

UDF 是一个黑匣子,因此 Spark 无法知道它在处理什么.那里是一些简单的情况,我们可以在其中分析 UDF 字节码并推断出什么正在做,但一般来说很难做到.

这种情况有一个JIRA票 SPARK-14083 分析JVM字节码并将闭包转化为催化剂表达,但正如有人说的(我认为是 Adam B. 在 twitter 上),期待它很快就会成为一种笑话.

<块引用>

Dataset API 的一大优势是类型安全,但由于严重依赖用户定义的闭包/lambdas,以牺牲性能为代价.这些闭包通常比表达式慢,因为我们可以更灵活地优化表达式(已知数据类型,没有虚函数调用等).在许多情况下,查看这些闭包的字节码并弄清楚它们要做什么实际上并不难.如果我们能够理解它们,那么我们就可以将它们直接转化为 Catalyst 表达式,以实现更优化的执行.

<小时>

//让我们试着找出 1999 年出生的球员.//这会起作用,你有编译时安全...但它不会使用谓词下推!!!playerDs.filter(_.birthYear == 1999).explain()

上面的代码等价于如下:

val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) =>p.birthYear == 1999playerDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()

someCodeSparkSQLCannotDoMuchOutOfIt 正是您将优化放在一边并让 Spark 优化器跳过它的地方.

I always thought that dataset/dataframe API's are the same.. and the only difference is that dataset API will give you compile time safety. Right ?

So.. I have very simple case:

 case class Player (playerID: String, birthYear: Int)

 val playersDs: Dataset[Player] = session.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .csv(PeopleCsv)
  .as[Player]

 // Let's try to find players born in 1999. 
 // This will work, you have compile time safety... but it will not use predicate pushdown!!!
 playersDs.filter(_.birthYear == 1999).explain()

 // This will work as expected and use predicate pushdown!!!
 // But you can't have compile time safety with this :(
 playersDs.filter('birthYear === 1999).explain()

Explain from first example will show that it's NOT doing predicate pushdown (Notice empty PushedFilters):

== Physical Plan ==
*(1) Filter <function1>.apply
+- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

While the second sample will do it correctly (Notice PushedFilters):

== Physical Plan ==
*(1) Project [.....]
+- *(1) Filter (isnotnull(birthYear#11) && (birthYear#11 = 1999))
   +- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [IsNotNull(birthYear), EqualTo(birthYear,1999)], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

So the question is.. how can I use DS Api, and have compile time safety.., and predicate pushdown working as expected ????

Is it possible ? If not.. does this mean that DS api gives you compile time safety.. but at the cost of performance!! ??? (DF will be much faster in this case.. especially when processing large parquet files)

解决方案

That's the line in your Physical Plan you should remember to know the real difference between Dataset[T] and DataFrame (which is Dataset[Row]).

Filter <function1>.apply

I keep saying that people should stay away from the typed Dataset API and keep using the untyped DataFrame API as the Scala code becomes a black box to the optimizer in too many places. You've just hit one of these and think also about the deserialization of all the objects that Spark SQL keeps away from JVM to avoid GCs. Every time you touch the objects you literally ask Spark SQL to deserialize objects and load them onto JVM that puts a lot of pressure on GC (which will get triggered more often with the typed Dataset API as compared to the untyped DataFrame API).

See UDFs are Blackbox — Don’t Use Them Unless You’ve Got No Choice.


Quoting Reynold Xin after I asked the very same question on dev@spark.a.o mailing list:

The UDF is a black box so Spark can't know what it is dealing with. There are simple cases in which we can analyze the UDFs byte code and infer what it is doing, but it is pretty difficult to do in general.

There is a JIRA ticket for such cases SPARK-14083 Analyze JVM bytecode and turn closures into Catalyst expressions, but as someone said (I think it was Adam B. on twitter) it'd be a kind of joke to expect it any time soon.

One big advantage of the Dataset API is the type safety, at the cost of performance due to heavy reliance on user-defined closures/lambdas. These closures are typically slower than expressions because we have more flexibility to optimize expressions (known data types, no virtual function calls, etc). In many cases, it's actually not going to be very difficult to look into the byte code of these closures and figure out what they are trying to do. If we can understand them, then we can turn them directly into Catalyst expressions for more optimized executions.


// Let's try to find players born in 1999. 
// This will work, you have compile time safety... but it will not use predicate pushdown!!!
playersDs.filter(_.birthYear == 1999).explain()

The above code is equivalent to the following:

val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) => p.birthYear == 1999
playersDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()

someCodeSparkSQLCannotDoMuchOutOfIt is exactly where you put optimizations aside and let Spark Optimizer skip it.

这篇关于为什么在类型化数据集 API(与非类型化数据帧 API)中不使用谓词下推?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆