Pyspark简单的重新分区和toPandas()仅在600,000多行上无法完成 [英] Pyspark simple re-partition and toPandas() fails to finish on just 600,000+ rows

查看:197
本文介绍了Pyspark简单的重新分区和toPandas()仅在600,000多行上无法完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有JSON数据,我正在将其读取到具有多个字段的数据框中,然后基于两列对其进行重新分区,然后转换为Pandas.

这项工作使仅60万行数据的EMR一直失败,并带有一些模糊的错误.我还增加了spark驱动程序的内存设置,但仍然看不到任何分辨率.

这是我的pyspark代码:

  enhDataDf =(sqlContext.read.json(sys.argv [1]))enhDataDf =(enhDataDf.repartition('column1','column2').toPandas())enhDataDf = sqlContext.createDataFrame(enhDataDf)enhDataDf =(enhDataDf.toJSON().saveAsTextFile(sys.argv [2])) 

我的火花设置如下:

  conf = SparkConf().setAppName('myapp1')conf.set('spark.yarn.executor.memoryOverhead',8192)conf.set('spark.executor.memory',8192)conf.set('spark.driver.memory',8192)sc = SparkContext(conf =配置文件) 

我得到的错误是:

  16/10/01 19:57:56错误executor.CoarseGrainedExecutor后端:驱动程序172.31.58.76:37973已取消关联!闭嘴16/10/01 19:57:11错误executor.CoarseGrainedExecutor后端:驱动程序172.31.58.76:42167取消关联!闭嘴16/10/01 19:57:56错误executor.CoarseGrainedExecutor后端:驱动程序172.31.58.76:37973取消关联!闭嘴log4j:ERROR无法从URL [文件:/etc/spark/conf/log4j.properties]中读取配置文件.log4j:错误忽略配置文件[文件:/etc/spark/conf/log4j.properties].01/10/16 19:57:11错误ApplicationMaster:收到的信号15:信号16/10/01 19:57:11错误ApplicationMaster:用户应用程序退出,状态为143log4j:ERROR无法从URL [文件:/etc/spark/conf/log4j.properties]中读取配置文件.log4j:错误忽略配置文件[文件:/etc/spark/conf/log4j.properties].01/10/16 19:57:56错误ApplicationMaster:收到的信号15:信号16/10/01 19:57:56错误ApplicationMaster:用户应用程序退出,状态为14316/10/01 19:57:11错误executor.CoarseGrainedExecutor后端:驱动程序172.31.58.76:42167取消关联!闭嘴16/10/01 19:57:56错误executor.CoarseGrainedExecutor后端:驱动程序172.31.58.76:37973取消关联!闭嘴 

即使有大量可用内存,代码也可以在多达600,000 JSON行上正常工作.然后,它不断失败.

关于发生的事情以及如何调试/修复此问题的任何想法?

解决方案

我认为问题出在代码的以下部分:

  enhDataDf =(enhDataDf.repartition('column1','column2').toPandas()) 

.toPandas()会收集数据,因此当记录数量增加时,将导致驱动程序失败.

根据您的评论,这是您使用的确切管道.这意味着整个阶段不仅过时,而且是不正确的.当收集数据并进一步并行化时,可以确保由

创建的分区

  .repartition('column1','column2') 

当您重新创建Spark DataFrame

时,

将被保留:

  sqlContext.createDataFrame(enhDataDf) 

如果您想按列写入数据,则可以直接进行:

 (sqlContext.read.json(sys.argv [1]).repartition('column1','column2').写.json(sys.argv [2])) 

跳过中间 toPandas 并转换为RDD.

您的评论如下:

如果 toPandas 达到了目的,那么它将始终是管道中的限制因素,唯一直接的解决方案是扩大驱动程序节点.根据您对收集到的数据应用的确切算法,您可以考虑其他选择:

  • 您尚未在Spark上使用重新实现算法.
  • 考虑具有更好的SciPy堆栈互操作性的替代框架(例如 Dask ).

I have JSON data that I am reading into a data frame with several fields, repartitioning it based on two columns, and converting to Pandas.

This job keeps failing on EMR on just 600,000 rows of data with some obscure errors. I have also increased memory settings of the spark driver, and still don't see any resolution.

Here is my pyspark code:

enhDataDf = (
    sqlContext
    .read.json(sys.argv[1])
    )

enhDataDf = (
    enhDataDf
    .repartition('column1', 'column2')
    .toPandas()
    )
enhDataDf = sqlContext.createDataFrame(enhDataDf)
enhDataDf = (
    enhDataDf
    .toJSON()
    .saveAsTextFile(sys.argv[2])
    )

My spark settings are as follows:

conf = SparkConf().setAppName('myapp1')
conf.set('spark.yarn.executor.memoryOverhead', 8192)
conf.set('spark.executor.memory', 8192)
conf.set('spark.driver.memory', 8192)
sc = SparkContext(conf=conf)

The errors I get are:

16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:11 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:11 ERROR ApplicationMaster: User application exited with status 143
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:56 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:56 ERROR ApplicationMaster: User application exited with status 143
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.

The code works fine on up to about 600,000 JSON lines - even though there is ton of memory available. Then, it keeps failing.

Any thoughts on what is going on and how to debug / fix this problem?

解决方案

I believe that the problem comes from the following part of your code:

enhDataDf = (
    enhDataDf
    .repartition('column1', 'column2')
    .toPandas()
)

.toPandas() collects data, so when number of records grows, it will result in the driver failure.

According to your comment this the exact pipeline you use. It means that a whole stage is not only obsolete but also incorrect. When data is collected and further parallelized there is guarantee that partitioning created by

.repartition('column1', 'column2')

will be preserved when you re-create Spark DataFrame:

sqlContext.createDataFrame(enhDataDf)

If you want to write data by column you can do it directly:

(sqlContext
    .read.json(sys.argv[1])
    .repartition('column1', 'column2')
    .write
    .json(sys.argv[2]))

skipping intermediate toPandas and conversion to RDD.

Following your comments:

If toPandas serves a purpose then it will always stay a limiting factor in the pipeline and the only direct solution is to scale-up the driver node. Depending on the exact algorithms you apply on the collected data you may consider alternative options:

  • Reimplementing algorithms you use on top of Spark it there are not available already.
  • Consider alternative framework with a better SciPy stack interoperability (like Dask).

这篇关于Pyspark简单的重新分区和toPandas()仅在600,000多行上无法完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆