1个多小时执行pyspark.sql.DataFrame.take(4) [英] More than one hour to execute pyspark.sql.DataFrame.take(4)

查看:17
本文介绍了1个多小时执行pyspark.sql.DataFrame.take(4)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在 3 个虚拟机上运行 spark 1.6(即 1 个主服务器;2 个从服务器),所有虚拟机都具有 4 个内核和 16GB 内存.

I am running spark 1.6 on 3 VMs (i.e. 1x master; 2x slaves) all with 4 cores and 16GB RAM.

我可以看到在 spark-master webUI 上注册的工人.

I can see the workers registered on spark-master webUI.

我想从我的 Vertica 数据库中检索数据以进行处理.由于我没有设法运行复杂的查询,我尝试使用虚拟查询来理解.我们认为这是一项简单的任务.

I want to retrieve data from my Vertica database to work on it. As I didn't manage to run complex queries I tried dummy queries to understand. We consider here an easy task.

我的代码是:

df = sqlContext.read.format('jdbc').options(url='xxxx', dbtable='xxx', user='xxxx', password='xxxx').load()
four = df.take(4)

输出是(注意:我用@IPSLAVE替换了从VM IP:Port):

And the output is (note: I replace with @IPSLAVE the slave VM IP:Port):

16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1
16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions
16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1)
16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List()
16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List()
16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB)
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1)
16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1)
16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s
16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s

如您所见,这需要很长时间.我的表实际上很大(存储大约 2.2 亿行,每行 11 个字段),但是使用普通"sql(例如 pyodbc)可以立即执行这样的查询.

As you can see it take a reaaaaaally long time. My table is actually quite big (stores around 220 millions lines, 11 fields each) but such a query would be quite instantly executed using "normal" sql (e.g. pyodbc).

我想我误解/误用了 Spark,您有什么想法或建议可以让它更好地工作吗?

I guess I am missunderstanding/missusing Spark, would you have so ideas or advice to make it work better?

推荐答案

虽然 Spark 支持 JDBC 上的有限谓词下推,但所有其他操作,如限制、组、聚合都是在内部执行的.不幸的是,这意味着 take(4) 将首先获取数据,然后应用 limit.换句话说,您的数据库将执行(假设没有投影和过滤器)相当于:

While Spark supports a limited predicate pushdown over JDBC all other operations, like limit, group, aggregations are performed internally. Unfortunately it means that take(4) will fetch data first and then apply the limit. In other words your database will execute (assuming no projections an filters) something equivalent to:

SELECT * FROM table 

其余的将由 Spark 处理.涉及一些优化(特别是 Spark 迭代评估分区以获得LIMIT请求的记录数)) 但与数据库端优化相比,它仍然效率很低.

and the rest will handled by Spark. There are some optimizations involved (in particular Spark evaluates partitions iteratively to obtain number of records requested by LIMIT) but it still quite inefficient process compared to database-side optimizations.

如果你想将 limit 推送到数据库,你必须静态地使用子查询作为 dbtable 参数:

If you want to push limit to the database you'll have to do it statically using subquery as a dbtable parameter:

(sqlContext.read.format('jdbc')
    .options(url='xxxx', dbtable='(SELECT * FROM xxx LIMIT 4) tmp', ....))

sqlContext.read.format("jdbc").options(Map(
  "url"     -> "xxxx",
  "dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp",
))

请注意,子查询中的别名是必需的.

Please note that an alias in subquery is mandatory.

注意:

一旦 Data Source API v2 准备就绪,此行为将来可能会得到改进:

This behavior may be improved in the future, once Data Source API v2 is ready:

这篇关于1个多小时执行pyspark.sql.DataFrame.take(4)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆