SPARK DataFrame:如何基于相同的列值为每个组有效地拆分数据框 [英] SPARK DataFrame: How to efficiently split dataframe for each group based on same column values
问题描述
我有一个如下生成的DataFrame:
I have a DataFrame generated as follows:
df.groupBy($"Hour", $"Category")
.agg(sum($"value").alias("TotalValue"))
.sort($"Hour".asc,$"TotalValue".desc))
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
我想基于col("Hour")
的每个唯一值创建新的数据帧,即
I would like to make new dataframes based on every unique value of col("Hour")
, i.e.
- 对于小时== 0的组
- 对于小时== 1的组
- 对于小时组== 2 等等...
- for the group of Hour==0
- for the group of Hour==1
- for the group of Hour==2 and so on...
因此所需的输出将是:
df0 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
+----+--------+----------+
df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
+----+--------+----------+
类似地,
df2 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
+----+--------+----------+
我们非常感谢您的帮助.
Any help is highly appreciated.
EDIT 1:
我尝试过的事情:
df.foreach(
row => splitHour(row)
)
def splitHour(row: Row) ={
val Hour=row.getAs[Long]("Hour")
val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))
val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")
val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))
mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
}
此策略存在的问题:
在具有超过一百万行的数据帧df
上运行时花了8个小时,并且在单个节点上为大约10 GB的RAM提供了火花作业.因此,事实证明join
效率很低.
It took 8 hours when it was run on a dataframe df
which had over 1 million rows and spark job was given around 10 GB RAM on single node. So, join
is turning out to be highly in-efficient.
注意事项:我必须将每个数据帧mydf
编写为镶木地板,该镶木具有需要维护(而不是扁平化)的嵌套架构.
Caveat: I have to write each dataframe mydf
as parquet which has nested schema that is required to be maintained (not flattened).
推荐答案
正如我的评论中所述,解决此问题的一种可能的简便方法是使用:
As noted in my comments, one potentially easy approach to this problem would be to use:
df.write.partitionBy("hour").saveAsTable("myparquet")
如前所述,文件夹结构将是myparquet/hour=1
,myparquet/hour=2
,...,myparquet/hour=24
,而不是myparquet/1
,myparquet/2
,...,myparquet/24
.
As noted, the folder structure would be myparquet/hour=1
, myparquet/hour=2
, ..., myparquet/hour=24
as opposed to myparquet/1
, myparquet/2
, ..., myparquet/24
.
要更改文件夹结构,您可以
To change the folder structure, you could
- 可能在显式的HiveContext中使用Hive配置设置
hcat.dynamic.partitioning.custom.pattern
;有关详细信息,请参见 HCatalog DynamicPartitions . - 另一种方法是使用
for f in *; do mv $f ${f/${f:0:5}/} ; done
之类的命令执行df.write.partitionBy.saveAsTable(...)
命令后直接更改文件系统,这将从文件夹名称中删除Hour=
文本.
- Potentially use the Hive configuration setting
hcat.dynamic.partitioning.custom.pattern
within an explicit HiveContext; more information at HCatalog DynamicPartitions. - Another approach would be to change the file system directly after you have executed the
df.write.partitionBy.saveAsTable(...)
command with something likefor f in *; do mv $f ${f/${f:0:5}/} ; done
which would remove theHour=
text from the folder name.
请注意,通过更改文件夹的命名模式,当您在该文件夹中运行spark.read.parquet(...)
时,Spark将不会自动了解动态分区,因为它缺少partitionKey(即Hour
)信息.
It is important to note that by changing the naming pattern for the folders, when you are running spark.read.parquet(...)
in that folder, Spark will not automatically understand the dynamic partitions since its missing the partitionKey (i.e. Hour
) information.
这篇关于SPARK DataFrame:如何基于相同的列值为每个组有效地拆分数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!