连接大型的Spark数据框 [英] Joining a large and a ginormous spark dataframe

查看:62
本文介绍了连接大型的Spark数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据帧,df1有600万行,df2有10亿行.

我尝试过标准的df1.join(df2,df1("id")<=>df2("id2")),但是用完了内存.

df1太大,无法放入广播联接.

我什至尝试了布隆过滤器,但是它太大了,无法播放,仍然有用.

我尝试过的唯一不会出错的方法是将df1分成300,000行大块,并在foreach循环中与df2联接.但这花费的时间比可能要长一个数量级(可能是因为它太大而无法作为持久存储,导致它重做拆分到该点).重新组合结果还需要一段时间.

您如何解决此问题?

一些注意事项:

df1是df2的子集. df1=df2.where("fin<1").selectExpr("id as id2").distinct()我对df2中所有ID一次都具有fin <1的行感兴趣,这意味着我不能一步一步做到这一点.

df2中大约有2亿个唯一ID.

以下是一些相关的火花设置:

spark.cores.max=1000
spark.executor.memory=15G
spark.akka.frameSize=1024
spark.shuffle.consolidateFiles=false
spark.task.cpus=1
spark.driver.cores=1
spark.executor.cores=1
spark.memory.fraction=0.5
spark.memory.storageFraction=0.3
spark.sql.shuffle.partitions=10000
spark.default.parallelism=10000

我得到的错误是:

16/03/11 04:36:07 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199, mapr, 46487),3,176,4750,org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365e-2545/executors/16/runs/5a5a01c5-205e-4380-94d3-7fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index (No such file or directory)

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 6.3 failed 4 times, most recent failure: Lost task 465.3 in stage 6.3 (TID 114448, mapr): java.lang.OutOfMemoryError: Direct buffer memory

解决方案

如我所见,您遇到了分区太大的问题(可能是由于更大的数据) 您可以尝试几种方法:

  1. 尝试将spark.sql.shuffle.partitions定义为2048或更高(默认值为200).加入df-s时会出现随机播放.尝试使用此参数,以便更大数据/此参数的总容量约为64Mb-100Mb(取决于文件格式).通常,您应该在Spark UI中看到每个任务(每个分区)处理正常"数量的数据(最大64MB-100MB)

  2. 如果第一个不起作用,我建议在RDD api中进行此连接.将您的df转换为RDD.然后通过HashPartitioner(分区数)对两个RDD进行分区.什么时候应该按照我之前描述的那样计算分区数.

  3. spark开发人员最近添加了一个新选项:您可以将巨大的表存储到N个存储桶中(即,将其存储以备联接使用).目前几乎没有限制,但它可以完全消除混洗巨大数据.只有saveAsTable API支持bucketBy,而不保存其中一个.在对数据进行存储并进行存储之后,在下一次迭代中,您可以在提供存储规范的同时将此数据作为外部表加载(请参见

然后,当您将大型表作为存储分区装入表中时,可以加载大表并将其重新分区到相同数量的存储桶中,并按相同的列进行划分(df.repartition(N,a,b,c))

I have two dataframes, df1 has 6 million rows, df2 has 1 billion.

I have tried the standard df1.join(df2,df1("id")<=>df2("id2")), but run out of memory.

df1 is too large to be put into a broadcast join.

I have even tried a bloom filter, but it was also too large to fit in a broadcast and still be useful.

The only thing I have tried that doesn't error out is to break df1 into 300,000 row chunks and join with df2 in a foreach loop. But this takes an order of magnitude longer than it probably should (likely because it is too large to fit as a persist causing it to redo the split upto that point). Recombining the results also takes awhile.

How have you solved this issue?

A few notes:

df1 is a subset of df2. df1=df2.where("fin<1").selectExpr("id as id2").distinct() I am interested in all rows in df2 that have an id that at one time have a fin<1, which means I can't do it as one step.

there are about 200 million unique ids in df2.

here are some relevant spark settings:

spark.cores.max=1000
spark.executor.memory=15G
spark.akka.frameSize=1024
spark.shuffle.consolidateFiles=false
spark.task.cpus=1
spark.driver.cores=1
spark.executor.cores=1
spark.memory.fraction=0.5
spark.memory.storageFraction=0.3
spark.sql.shuffle.partitions=10000
spark.default.parallelism=10000

The error I get is :

16/03/11 04:36:07 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199, mapr, 46487),3,176,4750,org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365e-2545/executors/16/runs/5a5a01c5-205e-4380-94d3-7fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index (No such file or directory)

and

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 6.3 failed 4 times, most recent failure: Lost task 465.3 in stage 6.3 (TID 114448, mapr): java.lang.OutOfMemoryError: Direct buffer memory

解决方案

As I see it you have a problem of too large partitions(probably due to bigger data) You can try few approaches:

  1. try to define spark.sql.shuffle.partitions to be something 2048 or even more(default is 200). There will be shuffle while joining your df-s. Try to play with this parameter, so that total volume of bigger data / this param will be approx 64Mb-100Mb(depends on file format). In general you should see in spark UI that each task(per partition) process "normal" amount of data(64MB-100MB max)

  2. If first is not working I can suggest to do this join in RDD api. Convert your df into RDD. Then partition both RDDs by HashPartitioner(number of partitions). When number of partitions should be computed as I've described before.

  3. lately new option was added by spark devs: you can bucket ginormous table into N buckets(i.e. store it ready for the join). There are few limitation present, but it can eliminate shuffling ginormous data completely. bucketBy is supported only with saveAsTable api and not save one. After you've bucketBy data and it's bucketed, then on next iteration you can load this data as external table while providing bucketing spec(see https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html)

    CREATE TABLE ginormous --...here you must specify schema USING PARQUET CLUSTERED BY (a,b,c) INTO N buckets LOCATION 'hdfs://your-path'

Then, when you've loaded ginormous table as bucketed one, you can load big table and repartition it to the same number of buckets and by same columns(df.repartition(N, a,b,c))

这篇关于连接大型的Spark数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆