Pyspark-df.cache().count()永远运行 [英] Pyspark - df.cache().count() taking forever to run

查看:270
本文介绍了Pyspark-df.cache().count()永远运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用在线阅读的计数方法对PySpark进行迫切的评估:

I'm trying to force eager evaluation for PySpark, using the count methodology I read online:

spark_df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)

spark_df.cache().count()

但是,当我尝试运行代码时,缓存计数部分将永远运行.我的数据量相对较小(2.7GB,1500万行),但是运行28分钟后,我决定取消该作业.为了进行比较,当我使用pandas.read_sql()方法读取数据时,只花了6分43秒.

However, when I try running the code, the cache count part is taking forever to run. My data size is relatively small (2.7GB, 15 mil rows), but after 28 min of running, I decided to kill the job. For comparison, when I use pandas.read_sql() method to read the data, it took only 6 min 43 seconds.

我正在运行代码的机器非常强大(20 vCPU,160 GB RAM,Windows OS).我相信我缺少加快计数报告的步骤.

The machine I'm running the code on is pretty powerful, (20 vCPU, 160 GB RAM, Windows OS). I believe I'm missing a step to speed up the count statement.

任何帮助或建议都将受到赞赏.

Any help or suggestions are appreciated.

推荐答案

使用熊猫读取时,它将使用机器可用内存中的尽可能多的内存(假设您提到的所有160Gb内存是通过远比数据本身大3GB).

When you used pandas to read, it will use as much memory as possible from the available memory of the machine (assuming all 160Gb as you mentioned, which is by far larger than the data itself ~3Gb).

但是,Spark并不相同.在启动Spark会话时,通常必须提及 upfront ,您要使用每个执行者(以及驱动程序和应用程序管理器)多少内存,以及是否未指定它,根据最新的Spark文档.因此,您要做的第一件事就是为执行程序和驱动程序提供更多的内存.

However, it's not the same with Spark. When you start your Spark session, typically you would have to mention upfront how much memory per executor (and driver, and application manager if applicable) you'd want to use, and if you don't specify it, it's going to be 1Gb according to the latest Spark documentation. So the first thing you want to do is giving more memory to your executors and driver.

第二,通过Spark读取JDBC是很棘手的,因为速度慢与否取决于执行程序(和任务)的数量,而这些数量取决于RDD(从JDBC连接读取)具有多少个分区以及数量.分区的数量取决于表,查询,列,条件等.强制更改行为,拥有更多分区,更多任务,更多执行程序的一种方法是通过以下配置: numPartitions partitionColumn lowerBound upperBound .

Second, reading from JDBC by Spark is tricky, because slowness or not depends on the number of executors (and tasks), and those numbers depend on how many partitions your RDD (that read from JDBC connection) have, and the numbers of partitions depends on your table, your query, columns, conditions, etc. One way to force changing behavior, to have more partitions, more tasks, more executors, ... is via these configurations: numPartitions, partitionColumn, lowerBound, and upperBound.

  • numPartitions 是分区的数量(因此将使用执行程序的数量)
  • partitionColumn 是Spark用于定位分区的整数类型列
  • lowerBound 是您要读取的 partitionColumn 的最小值
  • upperBound 是您要读取的 partitionColumn 的最大值
  • numPartitions is the number of partitions (hence the number of executors will be used)
  • partitionColumn is an integer type column that Spark would use to target partitioning
  • lowerBound is the min value of partitionColumn that you want to read
  • upperBound is the max value of partitionColumn that you want to read

您可以在此处了解更多信息,但是基本概念就是说,您要使用合理数量的执行者(由 numPartitions 定义),为每个执行者处理等量分布的数据 partitionColumn lowerBound upperBound 定义).

You can read more here https://stackoverflow.com/a/41085557/3441510, but the basic idea is, you want to use a reasonable number of executors (defined by numPartitions), to process an equally distributed chunk of data for each executor (defined by partitionColumn, lowerBound and upperBound).

这篇关于Pyspark-df.cache().count()永远运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆