spark jdbc df limit...它在做什么? [英] spark jdbc df limit... what is it doing?

查看:37
本文介绍了spark jdbc df limit...它在做什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力学习如何了解 Spark 内部发生的事情,这是我目前的困惑.我正在尝试将 Oracle 表中的前 200 行读入 Spark:

I'm trying to learn how to get a feel of what is going on inside Spark, and here's my current confusion. I'm trying to read first 200 rows from an Oracle table into Spark:

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> "schema.table",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

jdbcDF.limit(200).count()

我希望这会相当快.在具有 500K 行的表上的类似操作在合理的时间内完成.在这种特殊情况下,表要大得多(数亿行),但我认为 limit(200) 会使其变快吗?我该如何弄清楚它把时间花在哪里了?

This, I would expect, to be fairly quick. Similar action on a table with 500K rows completes in a reasonable time. In this particular case, the table is much bigger (hundreds of millions of rows), but limit(200) would, I'd think, make it fast? How do I go about figuring out where it spending its time?

推荐答案

事实上,spark 还不能下推 limit 谓词.

As a matter of fact, spark isn't capable yet of pushing down the limit predicate.

所以实际上在这种情况下发生的事情是它正在拉取所有数据以触发,然后限制和计数.您需要的是在子查询中将其用作表参数.

So actually what's happening in this case scenario is that it's pulling all the data to spark and then limit and count. What you would need is to use it in a subquery as a table argument.

例如:

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> "(select * from schema.table limit 200) as t",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

所以主要花时间的地方是拉取所有数据以产生火花.

So mainly where it is spending time is pull all the data to spark.

您也可以在子查询中动态传递限制:

You can also pass the limit dynamically in the subquery:

val n : Int = ???

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> s"(select * from schema.table limit $n) as t",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

有一个 JIRA 票 (SPARK-10899) 正在解决这个问题,但它已经挂了将近一年.

There is a JIRA ticket (SPARK-10899) in progress to solve this issue but it's been hanging for almost a year.

因为上面 JIRA 中的问题被标记为重复.您可以在此处继续跟踪问题 - SPARK-12126.我希望这能回答您的问题.

As the issue in the JIRA above was flagged as duplicate. You can continue on tracking the issue here - SPARK-12126. I hope that this answers your question.

这篇关于spark jdbc df limit...它在做什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆