Spark DataFrame:orderBy 之后的 groupBy 是否保持该顺序? [英] Spark DataFrame: does groupBy after orderBy maintain that order?

查看:22
本文介绍了Spark DataFrame:orderBy 之后的 groupBy 是否保持该顺序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有以下结构的 Spark 2.0 数据帧 example:

I have a Spark 2.0 dataframe example with the following structure:

id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.

它包含每个 id 的 24 个条目(一天中的每个小时一个),并使用 orderBy 函数按 id、小时排序.

It contains 24 entries for each id (one for each hour of the day) and is ordered by id, hour using the orderBy function.

我创建了一个聚合器groupConcat:

  def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
    override def zero: String = ""

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)

    override def merge(b1: String, b2: String) = b1 + b2

    override def finish(b: String) = b.substring(1)

    override def bufferEncoder: Encoder[String] = Encoders.STRING

    override def outputEncoder: Encoder[String] = Encoders.STRING
  }.toColumn

它帮助我将列连接成字符串以获得最终的数据帧:

It helps me concatenate columns into strings to obtain this final dataframe:

id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.

我的问题是,如果我做 example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"),这是否保证每小时计数将在各自的存储桶中正确排序?

My question is, if I do example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"), does that guarantee that the hourly counts will be ordered correctly in their respective buckets?

我读到 RDD 不一定是这种情况(请参阅 Spark 按键排序然后分组以获得有序可迭代?),但也许 DataFrames 不同?

I read that this is not necessarily the case for RDDs (see Spark sort by key and then group by to get ordered iterable?), but maybe it's different for DataFrames ?

如果没有,我该如何解决?

If not, how can I work around it ?

推荐答案

groupBy 之后 orderBy 不维护秩序,正如其他人指出的那样.您想要做的是使用 Window 函数 - 对 id 进行分区并按小时排序.您可以在此之上 collect_list ,然后获取结果列表的最大值(最大),因为它们是累积的(即第一个小时将仅在列表中包含它自己,第二个小时将在列表中包含 2 个元素,依此类推).

groupBy after orderBy doesn't maintain order, as others have pointed out. What you want to do is use a Window function--partition on id and order by hours. You can collect_list over this and then take the max (largest) of the resulting lists since they go cumulatively (i.e. the first hour will only have itself in the list, the second hour will have 2 elements in the list, and so on).

完整示例代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val data = Seq(( "id1", 0, 12),
  ("id1", 1, 55),
  ("id1", 23, 44),
  ("id2", 0, 12),
  ("id2", 1, 89),
  ("id2", 23, 34)).toDF("id", "hour", "count")

    val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
    data.withColumn("collected", collect_list($"count")
                                                    .over(Window.partitionBy("id")
                                                                 .orderBy("hour")))
            .groupBy("id")
            .agg(max($"collected").as("collected"))
            .withColumn("hourly_count", mergeList($"collected"))
            .select("id", "hourly_count").show

这让我们保持在 DataFrame 世界中.我还简化了 OP 使用的 UDF 代码.

This keeps us within the DataFrame world. I also simplified the UDF code the OP was using.

输出:

+---+------------+
| id|hourly_count|
+---+------------+
|id1|    12:55:44|
|id2|    12:89:34|
+---+------------+

这篇关于Spark DataFrame:orderBy 之后的 groupBy 是否保持该顺序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆