使用Spark SQL GROUP BY对DataFrame进行高效的PairRDD操作 [英] Efficient PairRDD operations on DataFrame with Spark SQL GROUP BY

查看:328
本文介绍了使用Spark SQL GROUP BY对DataFrame进行高效的PairRDD操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题是关于聚合操作时DataFrameRDD之间的对偶性.在Spark SQL中,可以使用表生成UDF进行自定义聚合,但是创建其中之一通常比使用可用于RDD的聚合函数明显不那么用户友好,尤其是在不需要表输出的情况下.

This question is about the duality between DataFrame and RDD when it comes to aggregation operations. In Spark SQL one can use table generating UDFs for custom aggregations but creating one of those is typically noticeably less user-friendly than using the aggregation functions available for RDDs, especially if table output is not required.

是否有一种有效的方法将成对的RDD操作(例如aggregateByKey)应用到已使用GROUP BY分组或使用ORDERED BY排序的DataFrame?

Is there an efficient way to apply pair RDD operations such as aggregateByKey to a DataFrame which has been grouped using GROUP BY or ordered using ORDERED BY?

通常,需要明确的map步骤来创建键值元组,例如dataFrame.rdd.map(row => (row.getString(row.fieldIndex("category")), row).aggregateByKey(...).可以避免吗?

Normally, one would need an explicit map step to create key-value tuples, e.g., dataFrame.rdd.map(row => (row.getString(row.fieldIndex("category")), row).aggregateByKey(...). Can this be avoided?

推荐答案

不是.虽然DataFrames可以转换为RDDs,反之亦然,但这是相对复杂的操作,并且DataFrame.groupBy之类的方法与RDD上的方法具有不同的语义.

Not really. While DataFrames can be converted to RDDs and vice versa this is relatively complex operation and methods like DataFrame.groupBy don't have the same semantics as their counterparts on RDD.

您可以获得的最接近的东西是 Spark 1.6.0中引入的新的DataSet API .它使用自己的方法集(reducecogroupmapGroups)与DataFramesGroupedDataset类进行更紧密的集成:

The closest thing you can get is a new DataSet API introduced in Spark 1.6.0. It provides a much closer integration with DataFrames and GroupedDataset class with its own set of methods including reduce, cogroup or mapGroups:

case class Record(id: Long, key: String, value: Double)

val df = sc.parallelize(Seq(
    (1L, "foo", 3.0), (2L, "bar", 5.6),
    (3L, "foo", -1.0), (4L, "bar", 10.0)
)).toDF("id", "key", "value")

val ds = df.as[Record]
ds.groupBy($"key").reduce((x, y) => if (x.id < y.id) x else y).show

// +-----+-----------+
// |   _1|         _2|
// +-----+-----------+
// |[bar]|[2,bar,5.6]|
// |[foo]|[1,foo,3.0]|
// +-----+-----------+

在某些特定情况下,可以利用Orderable语义来使用structsarrays对数据进行分组和处理.您可以在 SPARK DataFrame中找到一个示例:选择每个组的第一行

In some specific cases it is possible to leverage Orderable semantics to group and process data using structs or arrays. You'll find an example in SPARK DataFrame: select the first row of each group

这篇关于使用Spark SQL GROUP BY对DataFrame进行高效的PairRDD操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆