在Spark中联接多个表的有效方法-设备上没有剩余空间 [英] Efficient way of joining multiple tables in Spark - No space left on device

查看:191
本文介绍了在Spark中联接多个表的有效方法-设备上没有剩余空间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有人在此处提出了类似的问题,但无法正确解决我的问题.我有近100个DataFrame,每个都有至少200,000行,我需要通过基于列ID进行full联接来连接它们,从而创建具有列-ID, Col1, Col2,Col3,Col4, Col5..., Col102的DataFrame.

A similar question has been asked here, but it does not address my question properly. I am having nearly 100 DataFrames, with each having atleast 200,000 rows and I need to join them, by doing a full join based on the column ID, thereby creating a DataFrame with columns - ID, Col1, Col2,Col3,Col4, Col5..., Col102.

仅供说明,我的DataFrames的结构-

Just for illustration, the structure of my DataFrames -

df1 =                          df2 =            df3 =          .....  df100 = 
+----+------+------+------+    +----+------+    +----+------+         +----+------+ 
|  ID|  Col1|  Col2|  Col3|    |  ID|  Col4|    |  ID|  Col5|         |  ID|Col102|
+----+------+-------------+    +----+------+    +----+------+         +----+------+
| 501|  25.1|  34.9| 436.9|    | 501| 22.33|    | 503| 22.33|         | 501|  78,1|
| 502|  12.2|3225.9|  46.2|    | 502| 645.1|    | 505| 645.1|         | 502|  54.9|
| 504| 754.5| 131.0| 667.3|    | 504| 547.2|    | 504| 547.2|         | 507|     0|
| 505|324.12| 48.93|  -1.3|    | 506|     2|    | 506|     2|         | 509| 71.57|
| 506| 27.51| 88.99|  67.7|    | 507| 463.7|    | 507| 463.7|         | 510|  82.1|
.
.
+----+------+------|------|    |----|------|    |----|------|         |----|------|

我通过依次对所有DataFrame进行full连接来开始连接这些DataFrame.自然,这是一个计算密集型过程,因此必须努力减少不同工作节点之间的shuffles数量.因此,我首先使用

I starting joining these DataFrames by doing a full join sequentially on all of them. Naturally, this is computationally intensive procedure and one must strive to reduce the number of shuffles across different worker nodes. Therefore, I started by partitioning the DataFrame df1 based on ID using repartition(), which hash-partitions the DataFrame based on ID into 30 partitions -

df1 = df1.repartition(30,'ID')

现在,我在df1df2之间进行了full联接.

Now, I do a full join between df1 and df2.

df = df1.join(df2,['ID'],how='full')
df.persist()

由于df1已经是hash-partitioned,所以我曾期望上面的join会跳过改组并保持df1partitioner,但是我注意到确实发生了shuffle并它将df上的分区数量增加到200.现在,如果继续通过如下所示的函数调用后续的DataFrame来加入它们,则会出现错误java.io.IOException: No space left on device-

Since df1 was already hash-partitioned, so I had expected that this join above would skip shuffles and would maintain the partitioner of df1, but I notice that a shuffle did take place and it increased the number of partitions on df to 200. Now, if I keep on joining the subsequent DataFrames by calling them via a function like shown below, I get the error java.io.IOException: No space left on device -

def rev(df,num):
     df_temp = spark.read.load(filename+str(num)+'.csv')
     df_temp.persist()
     df = df.join(df_temp,['ID'],how='full')
     df_temp.unpersist()
     return df

df = rev(df,3)
df = rev(df,4)
.
.
df = rev(df,100)
# I get the ERROR here below, when I call the first action count() - 
print("Total number of rows: "+str(df.count()))
df.unpersist()  # Never reached this stage.

更新:错误消息-

Py4JJavaError: An error occurred while calling o3487.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 255.0 failed 1 times, most recent failure: Lost task 42.0 in stage 255.0 (TID 8755, localhost, executor driver): java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)

问题: 1.为什么我们在做第一个join时没有维护df1的分区程序?

Questions: 1. Why was the partitioner of df1 not maintained when we did the first join?

2.如何才能有效地连接这些多个表并避免出现此No space left on device问题? @silvio用户此处建议使用.bucketBy(),但是他还暗示了将维护分区程序的事实,但这种情况并未发生.因此,我不确定加入这些多个DataFrame的有效方法是什么.

2.How can I join these multiple tables efficiently and also avoid this No space left on device issue? User @silvio here suggests to use .bucketBy(), but he also alluded to the fact the partitioner will be maintained, which did not happen. So, I am not sure as to what would be an efficient way to join these multiple DataFrames.

任何建议/提示将不胜感激.

Any suggestion/hints will be very appreciated.

推荐答案

第一次尝试通过for循环(您可能已经拥有)每N次迭代来保存大df

1st try to persist your big df every N iterations with a for loop (that you probably have already)

第二次尝试通过设置sqlContext.sql("set spark.sql.shuffle.partitions=100")而不是默认值200来控制默认分区号.

2nd try to control the default partition number by setting sqlContext.sql("set spark.sql.shuffle.partitions=100") instead of 200 that is the default.

您的代码应如下所示:

num_partitions = 10
big_df = spark.createDataFrame(...) #empty df
for i in range(num_partitions):
   big_df = big_df.join(df, ....)

   if i % num_partitions == 0:
     big_df = big_df.persist()

在这里,我将持久性称为每10次迭代,您当然可以根据您的工作行为来调整该数字.

Here I call persist every 10 iterations you can of course adjust that number according to the behavior of your job.

在您的情况下,您将本地df_temp保留在rev函数中,但不保留包含所有先前联接(在您的情况下为df)的整个数据框.因为这是本地持久性,所以这对最终执行计划没有影响.关于我的建议,我们假设您总共需要100个连接,然后使用上面的代码,您应该循环遍历循环[1..100],并每10次迭代保留累积的结果.保留大数据帧后,DAG将包含较少的内存计算,因为将存储中间步骤,并且Spark知道如何从存储中还原它们,而不是从头开始重新计算所有内容.

In your case you are persisting the local df_temp inside the rev function but not the whole dataframe which contains all the previous joins (df in your case). This will have no effect in the final execution plan since it is a local persist. As for my suggestion let's assume that you need in total 100 joins then with the code above you should iterate through a loop [1..100] and persist the accumulated results every 10 iterations. After persisting the big dataframe the DAG will contain less in-memory calculations since the intermediate steps will be stored and Spark knows how to restore them from the storage instead of recalculating everything from scratch.

这篇关于在Spark中联接多个表的有效方法-设备上没有剩余空间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆