在HDFS上进行预分组表并以零改组在Spark中读取 [英] Pre-cogrouping tables on HDFS and reading in Spark with zero shuffling

查看:69
本文介绍了在HDFS上进行预分组表并以零改组在Spark中读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个要加入/共同分组的表,这是我的spark作业的一部分,每次运行作业时都会产生大量洗牌.我想通过一次存储合并数据来分摊所有作业的成本,并在我的常规Spark运行中使用已经合并的数据来避免洗牌.

I have two tables that I am joining/cogrouping as part of my spark jobs, which incurs a large shuffle each time I run a job. I want to amortise the cost across all jobs by storing cogrouped data once, and use the already cogrouped data as part of my regular Spark runs to avoid the shuffle.

要尝试实现这一目标,我以拼花格式存储了一些HDFS数据.我正在使用Parquet重复字段来实现以下架构

To try and achieve this, I have some data in HDFS stored in parquet format. I am using Parquet repeated fields to achieve the following schema

(日期,[aRecords],[bRecords])

(date, [aRecords], [bRecords])

[aRecords]表示aRecord的数组.我还使用通常的write.partitionBy($"date")按日期在HDFS上对数据进行分区.

Where [aRecords] indicates an array of aRecord. I am also partitioning the data by date on HDFS using the usual write.partitionBy($"date").

在这种情况下,aRecords和bRecords似乎有效地按日期进行了分组.我可以执行以下操作:

In this situation, aRecords and bRecords appear to be effectively cogrouped by date. I can perform operations like the following:

case class CogroupedData(date: Date, aRecords: Array[Int], bRecords: Array[Int])

val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]

//Dataset[(Date,Int)] where the Int in the two sides multiplied
val results = cogroupedData
    .flatMap(el => el.aRecords.zip(el.bRecords).map(pair => (el.date, pair._1 * pair._2)))

并获得我通过在两个单独的表上对按日期键入aRecords和bRecords进行等效的groupByKey操作得到的结果.

and get the results that I get from using the equivalent groupByKey operations on two separate tables for aRecords and bRecords keyed by date.

两者之间的区别是,我避免了对已经分组的数据进行混洗,合并的成本通过保留在HDFS上来摊销.

The difference between the two is that I avoid a shuffle with the already cogrouped data, the cogrouped cost is amortised by persisting on HDFS.

现在问这个问题.从联合分组的数据集中,我想导出两个分组的数据集,这样我就可以使用标准的Spark SQL运算符(例如联合分组,联接等)而不会造成洗牌.由于第一个代码示例可行,因此这似乎可行,但是当我加入/groupByKey/cogroup等时,Spark仍然坚持对数据进行哈希/混洗.

Now for the question. From the cogrouped dataset, I would like to derive the two grouped datasets so I can use standard Spark SQL operators (like cogroup, join etc) without incurring a shuffle. This seems possible since the first code example works, but Spark still insists on hashing/shuffling data when I join/groupByKey/cogroup etc.

获取以下代码示例.我希望有一种方法可以执行以下操作,而在执行连接时不会造成混乱.

Take the below code sample. I expect there is a way that we can run the below without incurring a shuffle when the join is performed.

val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]

val aRecords = cogroupedData
    .flatMap(cog => cog.aRecords.map(a => (cog.date,a)))
val bRecords = cogroupedData
    .flatMap(cog => cog.bRecords.map(b => (cog.date,b)))

val joined = aRecords.join(bRecords,Seq("date"))

从文献上看,如果cogroupedData具有已知的分区程序,则随后的操作不应引起混乱,因为它们可以利用RDD已分区并保留该分区程序这一事实.

Looking at the literature, if cogroupedData has a known partitioner, then the operations that follow should not incur a shuffle since they can use the fact that the RDD is already partitioned and preserve the partitioner.

我想我需要实现的是在不引起混乱的情况下,使用已知的分区程序获得一个cogroupedData数据集/rdd.

What I think I need to achieve this is to get a cogroupedData Dataset/rdd with a known partitioner without incurring a shuffle.

我已经尝试过的其他方法:

Other things I have tried already:

  • Hive元数据-适用于简单联接,但仅优化初始联接,而不优化后续转换.蜂巢也根本无法帮助同伴

有人有什么主意吗?

推荐答案

您在这里犯了两个错误.

You've made two mistakes here.

结论:要有机会进行优化,您必须使用Metastore和存储区.

Conclusion: To have any opportunity to optimize you have to use metastore and bucketing.

通常,Spark无法优化对强类型"数据集的操作.有关详细信息,请参见 Spark 2.0数据集与DataFrame 为什么

In general Spark cannot optimize operations on "strongly typed" datasets. For details see Spark 2.0 Dataset vs DataFrame and Why is predicate pushdown not used in typed Dataset API (vs untyped DataFrame API)?

正确的方法是:

  • 使用存储桶.

  • Use bucketing.

val n: Int
someDF.write.bucketBy(n, "date").saveAsTable("df")

  • 将功能性API替换为SQL API:

  • Drop functional API in favor of SQL API:

    import org.apache.spark.sql.functions.explode
    
    val df = spark.table("df")
    
    val adf = df.select($"date", explode($"aRecords").alias("aRecords"))
    val bdf = df.select($"date", explode($"bRecords").alias("bRecords"))
    
    adf.join(bdf, Seq("date"))
    

  • 这篇关于在HDFS上进行预分组表并以零改组在Spark中读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆