加入一个巨大的火花数据框 [英] Joining a large and a ginormous spark dataframe

查看:29
本文介绍了加入一个巨大的火花数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据帧,df1 有 600 万行,df2 有 10 亿行.

我尝试了标准的 df1.join(df2,df1("id")<=>df2("id2")),但内存不足.

df1 太大,无法放入广播连接中.

我什至尝试过布隆过滤器,但它也太大了,无法放入广播中,但仍然有用.

我尝试过的唯一没有出错的方法是将 df1 分成 300,000 个行块并在 foreach 循环中与 df2 连接.但这比它可能需要的时间长一个数量级(可能是因为它太大而无法作为持久化,导致它重做分割到那个点).重新组合结果也需要一段时间.

你是如何解决这个问题的?

一些注意事项:

df1 是 df2 的子集.df1=df2.where("fin<1").selectExpr("id as id2").distinct() 我对 df2 中具有 id 的所有行感兴趣fin<1,这意味着我不能一步完成.

df2 中大约有 2 亿个唯一 ID.

以下是一些相关的火花设置:

spark.cores.max=1000spark.executor.memory=15Gspark.akka.frameSize=1024spark.shuffle.consolidateFiles=falsespark.task.cpus=1spark.driver.cores=1spark.executor.cores=1spark.memory.fraction=0.5spark.memory.storageFraction=0.3spark.sql.shuffle.partitions=10000spark.default.parallelism=10000

我得到的错误是:

16/03/11 04:36:07 错误 LiveListenerBus:SparkListenerBus 已经停止!丢弃事件 SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199,mapr, 46487),3,176,4750,org.apache.sparkio.shuffle.shuffle):/tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365/exe-cute-250304/exe-cute-250365/exe-cute-250365/exe-250a-c50a-c386-S1997fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index(没有那个文件或目录)

Caused by: org.apache.spark.SparkException:作业因阶段失败而中止:阶段 6.3 中的任务 465 失败 4 次,最近失败:阶段 6.3 中丢失任务 465.3(TID 114448,mapr):java.lang.OutOfMemoryError:直接缓冲内存

解决方案

据我所知,您遇到了分区过大的问题(可能是由于数据较大)您可以尝试几种方法:

  1. 尝试将 spark.sql.shuffle.partitions 定义为 2048 甚至更多(默认为 200).加入你的 df-s 时会有 shuffle.尝试使用此参数,以便更大数据的总量/此参数约为 64Mb-100Mb(取决于文件格式).一般来说,你应该在 spark UI 中看到每个任务(每个分区)处理正常"的数据量(最大 64MB-100MB)

  2. 如果第一个不起作用,我可以建议在 RDD api 中加入这个.将您的 df 转换为 RDD.然后通过 HashPartitioner(分区数)对两个 RDD 进行分区.什么时候应该像我之前描述的那样计算分区数.

  3. spark 开发人员最近添加了新选项:您可以将巨大的表存储到 N 个存储桶中(即存储它以备加入).存在的限制很少,但它可以完全消除混洗的大量数据.bucketBy 仅支持 saveAsTable api 而不是 save 一个.在您使用 bucketBy 数据并将其分桶后,然后在下一次迭代中,您可以将此数据作为外部表加载,同时提供分桶规范(请参阅 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html)

    创建巨大的表--...在这里你必须指定架构使用镶木地板CLUSTERED BY (a,b,c) INTO N 个桶位置 'hdfs://your-path'

然后,当您将巨大的表加载为分桶表时,您可以加载大表并将其重新分区为相同数量的桶和相同的列(df.repartition(N, a,b,c))

I have two dataframes, df1 has 6 million rows, df2 has 1 billion.

I have tried the standard df1.join(df2,df1("id")<=>df2("id2")), but run out of memory.

df1 is too large to be put into a broadcast join.

I have even tried a bloom filter, but it was also too large to fit in a broadcast and still be useful.

The only thing I have tried that doesn't error out is to break df1 into 300,000 row chunks and join with df2 in a foreach loop. But this takes an order of magnitude longer than it probably should (likely because it is too large to fit as a persist causing it to redo the split upto that point). Recombining the results also takes awhile.

How have you solved this issue?

A few notes:

df1 is a subset of df2. df1=df2.where("fin<1").selectExpr("id as id2").distinct() I am interested in all rows in df2 that have an id that at one time have a fin<1, which means I can't do it as one step.

there are about 200 million unique ids in df2.

here are some relevant spark settings:

spark.cores.max=1000
spark.executor.memory=15G
spark.akka.frameSize=1024
spark.shuffle.consolidateFiles=false
spark.task.cpus=1
spark.driver.cores=1
spark.executor.cores=1
spark.memory.fraction=0.5
spark.memory.storageFraction=0.3
spark.sql.shuffle.partitions=10000
spark.default.parallelism=10000

The error I get is :

16/03/11 04:36:07 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199, mapr, 46487),3,176,4750,org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365e-2545/executors/16/runs/5a5a01c5-205e-4380-94d3-7fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index (No such file or directory)

and

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 6.3 failed 4 times, most recent failure: Lost task 465.3 in stage 6.3 (TID 114448, mapr): java.lang.OutOfMemoryError: Direct buffer memory

解决方案

As I see it you have a problem of too large partitions(probably due to bigger data) You can try few approaches:

  1. try to define spark.sql.shuffle.partitions to be something 2048 or even more(default is 200). There will be shuffle while joining your df-s. Try to play with this parameter, so that total volume of bigger data / this param will be approx 64Mb-100Mb(depends on file format). In general you should see in spark UI that each task(per partition) process "normal" amount of data(64MB-100MB max)

  2. If first is not working I can suggest to do this join in RDD api. Convert your df into RDD. Then partition both RDDs by HashPartitioner(number of partitions). When number of partitions should be computed as I've described before.

  3. lately new option was added by spark devs: you can bucket ginormous table into N buckets(i.e. store it ready for the join). There are few limitation present, but it can eliminate shuffling ginormous data completely. bucketBy is supported only with saveAsTable api and not save one. After you've bucketBy data and it's bucketed, then on next iteration you can load this data as external table while providing bucketing spec(see https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html)

    CREATE TABLE ginormous --...here you must specify schema USING PARQUET CLUSTERED BY (a,b,c) INTO N buckets LOCATION 'hdfs://your-path'

Then, when you've loaded ginormous table as bucketed one, you can load big table and repartition it to the same number of buckets and by same columns(df.repartition(N, a,b,c))

这篇关于加入一个巨大的火花数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆