Spark:并行创建多个DataFrame [英] Spark: Parallelizing creation of multiple DataFrames

查看:221
本文介绍了Spark:并行创建多个DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在基于ID列表生成DataFrames-每个基于一个ID的查询都会返回非常大型PostgreSQL表的可管理子集.然后,我根据需要写出的文件结构对该输出进行分区.问题是我达到了速度极限,并且主要是未充分利用我的执行器资源.

I'm currently generating DataFrames based on a list of IDs - each query based on one ID gives back a manageable subset of a very large PostgreSQL table. I then partition that output based on the file structure I need to write out. The problem is that I'm hitting a speed limit and majorly under-utilizing my executor resources.

我不确定这是重新考虑我的体系结构还是有一些简单的方法来解决这个问题,但是基本上我想让任务更多地并行化,但是我无法让我的所有16个执行程序都保持忙碌状态尝试尽快完成此ETL工作.

I’m not sure if this is a matter of rethinking my architecture or if there is some simple way to get around this, but basically I want to get more parallelization of tasks but am failing to keep all of my 16 executors busy while trying to do this ETL job as quickly as possible.

所以...我想我可以做些什么来加快速度:

So...here’s what I thought I could do to speed this up:

  1. 使列表平行化.
  2. 然后,执行程序上的列表中的每个元素都会通过jdbc选择一个(相对较小的)DataFrame.
  3. 然后foreachPartition(其中必须很少),我需要执行一些操作(其中包括每个分区的原子数据写入),并且这些分区操作也可以分支到工作节点/执行者.

当前代码看起来像这样,但是当然会抛出"py4j.Py4JException:方法 getnewargs ([])不存在",因为无法将spark会话上下文传递到foreach闭包中这将使它留在执行程序上:

Current code looks something like this, but of course throws "py4j.Py4JException: Method getnewargs([]) does not exist" because the spark session context can’t be passed into the foreach closure that would allow this to stay on the executors:

spark = SparkSession \
    .builder \
    .appName
    ... etc

# the list, distributed to workers
idsAndRegionsToProcess = sc.parallelize(idList)

# the final thing that needs to be done with the data
# (each partition written to a file and sent somewhere)
def transformAndLoad(iterator, someField, someOtherField):
    for row in iterator:
        ...do stuff
    ...write a file to S3

# !! The issue is here (well, at least with my current approach)!!
# In theory these are the first operations that really need to be
# running on various nodes.
def dataMove(idAndRegion, spark):
        # now pull dataFrames from Postgres foreach id
        postgresDF = spark.read \
            .format("jdbc") \
            .option("url" …
        .option("dbtable", "(select id, someField, someOtherField from table_region_" + idAndRegion[1] + " where id = ‘" + idAndRegion[0] + "') as \history") \
        … more setup        

    postgresDF.repartition('someOtherField')
    postgresDF.persist(StorageLevel.MEMORY_AND_DISK)
    postgresDF.foreachPartition(lambda iterator: transformAndLoad(iterator, someField, someOtherField))

# invoking the problematic code on the parallelized list
idsAndRegionsToProcess.foreach(lambda idAndRegion: dataMove(idAndRegion, spark))

我知道用这种方法不可能相当,但是也许我错过了使之成为可能的细微之处?这似乎比选择1TB数据然后处理它要有效得多,但是也许有些底层分页我不知道.

I get that this isn’t quite possible this way, but maybe I’m missing a subtlety that would make this possible? This seems a lot more efficient than selecting 1TB of data and then processing that, but maybe there is some underlying pagination that I don't know about.

我有非常相似的工作代码,但有一个常规循环在收集的列表上运行,否则会使用几乎完全相同的代码,但这非常缓慢,并且无法接近使用执行程序.

I have very similar working code with a regular loop operating on a collected list using almost this exact code otherwise, but this was painfully slow and isn’t coming close to utilizing the executors.

对于其他情况,我使用的是EMR和YARN,并且我的spark-submit(来自主节点)如下所示: spark-submit --packages org.postgresql:postgresql:9.4.1207.jre7 --deploy-mode集群--num-executors 16 --executor-memory 3g --master yarn DataMove.py

For some extra context I’m on EMR and YARN and my spark-submit (from the master node) looks like this: spark-submit --packages org.postgresql:postgresql:9.4.1207.jre7 --deploy-mode cluster --num-executors 16 --executor-memory 3g --master yarn DataMove.py

此外,选择这些DataFrame也不成问题,因为结果是一小部分数据,并且数据库已正确索引,但是选择每个表似乎是绝对不可能的,因为可能会有多达TB的数据在其中一些.同样,重新分区根据需要写入到s3的每个文件(单独的和特定命名的)中的内容进行划分.

Also, selecting these DataFrames is not problematic as the result is a small subset of the data and the database is indexed correctly, but selecting each entire table seems like it would be absolutely impossible as there could be up to a TB of data in some of them. Also, the repartition divides it out by what needs to be written into each (individual and specifically-named) file going to s3.

我愿意接受任何建议,即使这只是意味着使用我的工作代码并以某种方式使它启动尽可能多的工作,而其他事情仍在从最后开始运行.但是首先,我的方法可以奏效吗?

I would be open to any suggestions, even if it just means using my working code and somehow getting it to kick off as many jobs as it can while other things are still running from the last. But first and foremost, can my approach here work?

推荐答案

您可以按照以下说明研究如何在Spark集群上将数据工作负载作为单独的作业/应用程序运行:

You could look into running your data workload as separate jobs / applications on your Spark cluster as described here:

https://spark.apache.org/docs/latest/submitting -applications.html

但是您关于将数据存储在多个分区中的评论也应该大大有助于减少处理数据所需的内存.您也许可以避免以这种方式将其拆分为多个单独的作业.

But your comment about storing the data in multiple partitions should also greatly help to reduce the memory needed to process it. You may be able to avoid splitting it up into separate jobs that way.

Spark UI,位于:

The Spark UI at:

http://localhost:4040

是您的朋友,以弄清您的工作在内部Spark中创建了哪些步骤以及消耗了哪些资源.根据这些见解,您可以优化它并减少所需的内存量或提高处理速度.

is your friend in figuring out what steps your job is creating in Spark internally and what resources it consumes. Based on those insights you can optimize it and reduce the amount of memory needed or improve the processing speed.

这篇关于Spark:并行创建多个DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆