spark jdbc df限制...正在做什么? [英] spark jdbc df limit... what is it doing?

查看:158
本文介绍了spark jdbc df限制...正在做什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试学习如何了解Spark内部的情况,这是我当前的困惑.我正在尝试将Oracle表中的前200行读入Spark:

I'm trying to learn how to get a feel of what is going on inside Spark, and here's my current confusion. I'm trying to read first 200 rows from an Oracle table into Spark:

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> "schema.table",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

jdbcDF.limit(200).count()

我希望这会很快.对具有500K行的表执行类似的操作将在合理的时间内完成.在这种情况下,表要大得多(几亿行),但是我想limit(200)可以使它变快吗?我该如何弄清楚它在哪里花费时间?

This, I would expect, to be fairly quick. Similar action on a table with 500K rows completes in a reasonable time. In this particular case, the table is much bigger (hundreds of millions of rows), but limit(200) would, I'd think, make it fast? How do I go about figuring out where it spending its time?

推荐答案

事实上,spark尚无法降低limit谓词.

As a matter of fact, spark isn't capable yet of pushing down the limit predicate.

因此,在这种情况下,实际上发生的是,它拉动所有数据以激发,然后进行限制和计数.您需要在子查询中将其用作表参数.

So actually what's happening in this case scenario is that it's pulling all the data to spark and then limit and count. What you would need is to use it in a subquery as a table argument.

例如:

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> "(select * from schema.table limit 200) as t",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

所以主要是在花时间的地方提取所有数据.

So mainly where it is spending time is pull all the data to spark.

您还可以在子查询中动态传递限制:

You can also pass the limit dynamically in the subquery:

val n : Int = ???

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> s"(select * from schema.table limit $n) as t",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

有一张 JIRA票证(SPARK-10899) 进行中来解决此问题,但已经暂停了将近一年.

There is a JIRA ticket (SPARK-10899) in progress to solve this issue but it's been hanging for almost a year.

编辑:由于上述JIRA中的问题被标记为重复.您可以在此处-SPARK-12126 继续跟踪问题. 我希望这能回答您的问题.

As the issue in the JIRA above was flagged as duplicate. You can continue on tracking the issue here - SPARK-12126. I hope that this answers your question.

这篇关于spark jdbc df限制...正在做什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆