Pyspark:依靠 pyspark.sql.dataframe.DataFrame 需要很长时间 [英] Pyspark: count on pyspark.sql.dataframe.DataFrame takes long time

查看:71
本文介绍了Pyspark:依靠 pyspark.sql.dataframe.DataFrame 需要很长时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 pyspark.sql.dataframe.DataFrame,如下所示

df.show()
+--------------------+----+----+---------+----------+---------+----------+---------+
|                  ID|Code|bool|      lat|       lon|       v1|        v2|       v3|
+--------------------+----+----+---------+----------+---------+----------+---------+
|5ac52674ffff34c98...|IDFA|   1|42.377167| -71.06994|17.422535|1525319638|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37747|-71.069824|17.683573|1525319639|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37757| -71.06942|22.287935|1525319640|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37761| -71.06943|19.110023|1525319641|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.377243| -71.06952|18.904774|1525319642|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378254| -71.06948|20.772903|1525319643|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37801| -71.06983|18.084948|1525319644|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378693| -71.07033| 15.64326|1525319645|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378723|-71.070335|21.093477|1525319646|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37868| -71.07034|21.851894|1525319647|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378716| -71.07029|20.583202|1525319648|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37872| -71.07067|19.738768|1525319649|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.379112| -71.07097|20.480911|1525319650|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37952|  -71.0708|20.526752|1525319651| 44.93808|
|5ac52674ffff34c98...|IDFA|   1| 42.37902| -71.07056|20.534052|1525319652| 44.93808|
|5ac52674ffff34c98...|IDFA|   1|42.380203|  -71.0709|19.921381|1525319653| 44.93808|
|5ac52674ffff34c98...|IDFA|   1| 42.37968|-71.071144| 20.12599|1525319654| 44.93808|
|5ac52674ffff34c98...|IDFA|   1|42.379696| -71.07114|18.760069|1525319655| 36.77853|
|5ac52674ffff34c98...|IDFA|   1| 42.38011| -71.07123|19.155525|1525319656| 36.77853|
|5ac52674ffff34c98...|IDFA|   1| 42.38022|  -71.0712|16.978994|1525319657| 36.77853|
+--------------------+----+----+---------+----------+---------+----------+---------+
only showing top 20 rows

如果尝试count

%%time
df.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 28.1 s

30241272

现在如果我取df的一个子集,计算的时间会更长.

now if I take a subset of df the time to count is even longer.

id0 = df.first().ID  ## First ID
tmp = df.filter( (df['ID'] == id0) )

%%time
tmp.count()

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 1min 33s
Out[6]:
3299

推荐答案

你的问题很复杂也很棘手..

Your question is very ingesting and tricky..

为了重现您的行为,我使用大型数据集进行了测试.

I tested with a large dataset in order to reproduce your behavior.

我在一个大型数据集中测试了以下两种情况:

I tested the following two cases in a large dataset:

# Case 1
df.count() # Execution time: 37secs

# Case 2
df.filter((df['ID'] == id0)).count() #Execution time: 1.39 min

说明

让我们看看只有 .count() 的物理计划:

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#38L])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#41L])
      +- *(1) FileScan csv [] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

让我们用 .filter().count() 查看物理计划:

Lets see the physical plan with .filter() and then .count():

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#61L])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#64L])
      +- *(1) Project
         +- *(1) Filter (isnotnull(ID#11) && (ID#11 = Muhammed MacIntyre))
            +- *(1) FileScan csv [ID#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [IsNotNull(ID), EqualTo(ID,Muhammed MacIntyre)], ReadSchema: struct<_c1:string>

通常,Spark 在计算行数时映射 count=1 的行,并减少所有映射器以创建最终的行数.

Generally, Spark when counts the number of rows maps the rows with count=1 and the reduce all the mappers to create the final number of rows.

案例 2 中,Spark 必须首先过滤,然后为每个分区创建部分计数,然后在另一个阶段将它们汇总在一起.因此,对于相同的行,在第二种情况下,Spark 也会进行过滤,这会影响大型数据集中的计算时间.Spark 是一个分布式处理框架,没有像 Pandas 那样的索引,它可以在不传递所有行的情况下非常快速地进行过滤.

In the Case 2 Spark has first to filter and then create the partial counts for every partition and then having another stage to sum those up together. So, for the same rows, in the second case the Spark doing also the filtering, something that affects the computation time in large datasets. Spark is a framework for distributed processing and doesn't have indexes like Pandas, which could do the filtering extremely fast without passing all the rows.

在这种简单的情况下,您无法做很多事情来改善执行时间.您可以尝试使用不同的配置设置(例如 # spark.sql.shuffle.partitions、# spark.default.parallelism# of executors# executor内存等)

In that simple case you can't do a lot of things to improve the execution time. You can try your application with different configuration settings (e.g # spark.sql.shuffle.partitions, # spark.default.parallelism, # of executors, # executor memory etc)

这篇关于Pyspark:依靠 pyspark.sql.dataframe.DataFrame 需要很长时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆