spark 谓词下推是否适用于 JDBC? [英] Does spark predicate pushdown work with JDBC?

查看:31
本文介绍了spark 谓词下推是否适用于 JDBC?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据这个

Catalyst 应用逻辑优化,例如谓词下推.这优化器可以将过滤器谓词下推到数据源中,使物理执行能够跳过不相关的数据.

Catalyst applies logical optimizations such as predicate pushdown. The optimizer can push filter predicates down into the data source, enabling the physical execution to skip irrelevant data.

Spark 支持将谓词下推到数据源.JDBC 是否也提供/预期此功能?

Spark supports push down of predicates to the data source. Is this feature also available / expected for JDBC?

(通过检查数据库日志,我可以看到它现在不是默认行为 - 完整查询被传递给数据库,即使它后来受到火花过滤器的限制)

(From inspecting the DB logs I can see it's not the default behavior right now - the full query is passed to the DB, even if it's later limited by spark filters)

更多详情

使用 PostgreSQL 9.4 运行 Spark 1.5

Running Spark 1.5 with PostgreSQL 9.4

代码片段:

from pyspark import SQLContext, SparkContext, Row, SparkConf
from data_access.data_access_db import REMOTE_CONNECTION

sc = SparkContext()
sqlContext = SQLContext(sc)

url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION)
sql = "dummy"

df = sqlContext.read.jdbc(url=url, table=sql)
df = df.limit(1)
df.show()

SQL 跟踪:

< 2015-09-15 07:11:37.718 EDT >LOG:  execute <unnamed>: SET extra_float_digits = 3                                                                                                                      
< 2015-09-15 07:11:37.771 EDT >LOG:  execute <unnamed>: SELECT * FROM dummy WHERE 1=0                                                                                                                   
< 2015-09-15 07:11:37.830 EDT >LOG:  execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, n.nspname, a.attnotnull OR (t.typtype = 'd' AND t.typnotnull), pg_catalog.pg_get_expr(d.adbin, d.a
drelid) LIKE '%nextval(%' FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (c.oid = a.attrelid) JOIN pg_catalog.pg_type t ON (a.a
tttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = a.attrelid AND d.adnum = a.attnum) JOIN (SELECT 15218474 AS oid , 1 AS attnum UNION ALL SELECT 15218474, 3) vals ON (c.oid = vals.oid
 AND a.attnum = vals.attnum)                                                                                                                                                                            
< 2015-09-15 07:11:40.936 EDT >LOG:  execute <unnamed>: SET extra_float_digits = 3                                                                                                                      
< 2015-09-15 07:11:40.964 EDT >LOG:  execute <unnamed>: SELECT "id","name" FROM dummy                                                                                                                   

我希望最后一个选择将包含一个 limit 1 子句 - 但它没有

I would expect that the last select will include a limit 1 clause - but it doesn't

推荐答案

Spark DataFrames 支持 JDBC 源的谓词下推,但术语 predicate 用于严格的 SQL 含义.这意味着它只涵盖 WHERE 子句.此外,它看起来仅限于逻辑连词(恐怕没有 INOR)和简单的谓词.

Spark DataFrames support predicate push-down with JDBC sources but term predicate is used in a strict SQL meaning. It means it covers only WHERE clause. Moreover it looks like it is limited to the logical conjunction (no IN and OR I am afraid) and simple predicates.

其他所有内容,例如限制、计数、排序、组和条件,都在 Spark 端处理.一个警告,已经涵盖在 SO,是 df.count()sqlContext.sql("SELECT COUNT(*) FROM df") 被转换为 SELECT 1 FROM df 并且需要使用 Spark 进行大量数据传输和处理.

Everything else, like limits, counts, ordering, groups and conditions is processed on the Spark side. One caveat, already covered on SO, is that df.count() or sqlContext.sql("SELECT COUNT(*) FROM df") is translated to SELECT 1 FROM df and requires both substantial data transfer and processing using Spark.

这是否意味着它是一个失败的原因?不完全是.可以使用任意子查询作为 table 参数.它不如谓词下推方便,但其他方面效果很好:

Does it mean it is a lost cause? Not exactly. It is possible to use an arbitrary subquery as a table argument. It is less convenient than a predicate pushdown but otherwise works pretty well:

n = ... # Number of rows to take
sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n))
df = sqlContext.read.jdbc(url=url, table=sql)

注意:

一旦数据源 API v2 准备就绪,此行为将来可能会得到改进:

This behavior may be improved in the future, once Data Source API v2 is ready:

这篇关于spark 谓词下推是否适用于 JDBC?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆