Pyspark - df.cache().count() 需要永远运行 [英] Pyspark - df.cache().count() taking forever to run

查看:43
本文介绍了Pyspark - df.cache().count() 需要永远运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用我在线阅读的计数方法强制对 PySpark 进行急切评估:

I'm trying to force eager evaluation for PySpark, using the count methodology I read online:

spark_df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)

spark_df.cache().count()

但是,当我尝试运行代码时,缓存计数部分需要永远运行.我的数据大小相对较小(2.7GB,1500 万行),但在运行 28 分钟后,我决定终止这项工作.作为对比,当我使用pandas.read_sql()方法读取数据时,只用了6分43秒.

However, when I try running the code, the cache count part is taking forever to run. My data size is relatively small (2.7GB, 15 mil rows), but after 28 min of running, I decided to kill the job. For comparison, when I use pandas.read_sql() method to read the data, it took only 6 min 43 seconds.

我运行代码的机器非常强大(20 vCPU、160 GB RAM、Windows 操作系统).我相信我错过了加快计数语句的步骤.

The machine I'm running the code on is pretty powerful, (20 vCPU, 160 GB RAM, Windows OS). I believe I'm missing a step to speed up the count statement.

感谢任何帮助或建议.

推荐答案

当你使用 Pandas 读取时,它会从机器的可用内存中使用尽可能多的内存(假设你提到的都是 160Gb,这是由远大于数据本身~3Gb).

When you used pandas to read, it will use as much memory as possible from the available memory of the machine (assuming all 160Gb as you mentioned, which is by far larger than the data itself ~3Gb).

然而,这与 Spark 不同.当您开始 Spark 会话时,通常您必须预先提及您想要使用的每个执行程序(以及驱动程序和应用程序管理器,如果适用)的内存量,如果您没有指定它,根据 最新的 Spark 文档.因此,您要做的第一件事就是为执行程序和驱动程序提供更多内存.

However, it's not the same with Spark. When you start your Spark session, typically you would have to mention upfront how much memory per executor (and driver, and application manager if applicable) you'd want to use, and if you don't specify it, it's going to be 1Gb according to the latest Spark documentation. So the first thing you want to do is giving more memory to your executors and driver.

其次,Spark 从 JDBC 读取是很棘手的,因为慢与否取决于执行程序(和任务)的数量,而这些数字取决于您的 RDD(从 JDBC 连接读取)有多少个分区,以及数量分区的数量取决于您的表、您的查询、列、条件等.强制改变行为、拥有更多分区、更多任务、更多执行程序等的一种方法是通过这些配置:numPartitionspartitionColumnlowerBoundupperBound.

Second, reading from JDBC by Spark is tricky, because slowness or not depends on the number of executors (and tasks), and those numbers depend on how many partitions your RDD (that read from JDBC connection) have, and the numbers of partitions depends on your table, your query, columns, conditions, etc. One way to force changing behavior, to have more partitions, more tasks, more executors, ... is via these configurations: numPartitions, partitionColumn, lowerBound, and upperBound.

  • numPartitions 是分区的数量(因此将使用执行程序的数量)
  • partitionColumn 是一个整数类型的列,Spark 将使用它来定位分区
  • lowerBound 是您要读取的 partitionColumn 的最小值
  • upperBound 是你要读取的partitionColumn 的最大值
  • numPartitions is the number of partitions (hence the number of executors will be used)
  • partitionColumn is an integer type column that Spark would use to target partitioning
  • lowerBound is the min value of partitionColumn that you want to read
  • upperBound is the max value of partitionColumn that you want to read

您可以在此处https://stackoverflow.com/a/41085557/3441510阅读更多内容,但基本思想也就是说,您希望使用合理数量的执行器(由 numPartitions 定义),为每个执行器处理均匀分布的数据块(由 partitionColumnlowerBoundupperBound 定义.

You can read more here https://stackoverflow.com/a/41085557/3441510, but the basic idea is, you want to use a reasonable number of executors (defined by numPartitions), to process an equally distributed chunk of data for each executor (defined by partitionColumn, lowerBound and upperBound).

这篇关于Pyspark - df.cache().count() 需要永远运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆