请问火花predicate下推工作,JDBC? [英] Does spark predicate pushdown work with JDBC?

查看:290
本文介绍了请问火花predicate下推工作,JDBC?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

据<一个href=\"https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html\">this

催化剂应用于逻辑优化,如predicate下推。该
  优化器可以过滤推predicates分解成数据源,
  使物理执行跳过不相关的数据。

Catalyst applies logical optimizations such as predicate pushdown. The optimizer can push filter predicates down into the data source, enabling the physical execution to skip irrelevant data.

火花支撑推predicates到数据源的向下。
这是功能也可/预期JDBC?

Spark supports push down of predicates to the data source. Is this feature also available / expected for JDBC?

(从检查数据库日志,我可以看到它不是默认的行为,现在 - 完整的查询传递给数据库,即使它后来被火花过滤器限制)

(From inspecting the DB logs I can see it's not the default behavior right now - the full query is passed to the DB, even if it's later limited by spark filters)

详细信息

运行星火1.5在PostgreSQL 9.4

Running Spark 1.5 with PostgreSQL 9.4

code片断:

from pyspark import SQLContext, SparkContext, Row, SparkConf
from data_access.data_access_db import REMOTE_CONNECTION

sc = SparkContext()
sqlContext = SQLContext(sc)

url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION)
sql = "dummy"

df = sqlContext.read.jdbc(url=url, table=sql)
df = df.limit(1)
df.show()

SQL跟踪:

< 2015-09-15 07:11:37.718 EDT >LOG:  execute <unnamed>: SET extra_float_digits = 3                                                                                                                      
< 2015-09-15 07:11:37.771 EDT >LOG:  execute <unnamed>: SELECT * FROM dummy WHERE 1=0                                                                                                                   
< 2015-09-15 07:11:37.830 EDT >LOG:  execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, n.nspname, a.attnotnull OR (t.typtype = 'd' AND t.typnotnull), pg_catalog.pg_get_expr(d.adbin, d.a
drelid) LIKE '%nextval(%' FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (c.oid = a.attrelid) JOIN pg_catalog.pg_type t ON (a.a
tttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = a.attrelid AND d.adnum = a.attnum) JOIN (SELECT 15218474 AS oid , 1 AS attnum UNION ALL SELECT 15218474, 3) vals ON (c.oid = vals.oid
 AND a.attnum = vals.attnum)                                                                                                                                                                            
< 2015-09-15 07:11:40.936 EDT >LOG:  execute <unnamed>: SET extra_float_digits = 3                                                                                                                      
< 2015-09-15 07:11:40.964 EDT >LOG:  execute <unnamed>: SELECT "id","name" FROM dummy                                                                                                                   

我预计最后的选择将包括限制1 子句 - 但事实并非如此。

I would expect that the last select will include a limit 1 clause - but it doesn't

推荐答案

星火DataFrames支持predicate使用JDBC的来源,但长期的 predicate 的下推在严格的SQL含义使用。这意味着它仅覆盖 WHERE 条款。此外,它看起来像它仅限于逻辑连词(没有恐怕)和简单的predicates。

Spark DataFrames support predicate push-down with JDBC sources but term predicate is used in a strict SQL meaning. It means it covers only WHERE clause. Moreover it looks like it is limited to the logical conjunction (no IN and OR I am afraid) and simple predicates.

其他的一切,就像限制,数量,订购,团体和条件在星火端处理。有一点需要注意,已经涵盖在SO,就是 df.count() sqlContext.sql(SELECT COUNT(*)FROM DF)被翻译成 SELECT 1 FROM DF ,并要求使用星火既可观的数据传输和处理。

Everything else, like limits, counts, ordering, groups and conditions is processed on the Spark side. One caveat, already covered on SO, is that df.count() or sqlContext.sql("SELECT COUNT(*) FROM df") is translated to SELECT 1 FROM df and requires both substantial data transfer and processing using Spark.

这是否意味着它是一个失败的事业?不完全是。它可以使用任意的子查询作为参数。它比predicate下推不太方便,但其它作品$ ​​P $ ptty得好:

Does it mean it is a lost cause? Not exactly. It is possible to use an arbitrary subquery as a table argument. It is less convenient than a predicate pushdown but otherwise works pretty well:

n = ... # Number of rows to take
sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n))
df = sqlContext.read.jdbc(url=url, table=sql)

这篇关于请问火花predicate下推工作,JDBC?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆