Spark DataFrame:orderBy之后的groupBy是否保持该顺序? [英] Spark DataFrame: does groupBy after orderBy maintain that order?

查看:782
本文介绍了Spark DataFrame:orderBy之后的groupBy是否保持该顺序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark 2.0数据帧example,具有以下结构:

I have a Spark 2.0 dataframe example with the following structure:

id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.

每个id包含24个条目(一天中的每个小时),并使用orderBy函数按id,小时进行排序.

It contains 24 entries for each id (one for each hour of the day) and is ordered by id, hour using the orderBy function.

我已经创建了一个聚合器groupConcat:

I have created an Aggregator groupConcat:

  def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
    override def zero: String = ""

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)

    override def merge(b1: String, b2: String) = b1 + b2

    override def finish(b: String) = b.substring(1)

    override def bufferEncoder: Encoder[String] = Encoders.STRING

    override def outputEncoder: Encoder[String] = Encoders.STRING
  }.toColumn

它可以帮助我将列连接成字符串,以获得最终的数据帧:

It helps me concatenate columns into strings to obtain this final dataframe:

id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.

我的问题是,如果我执行example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"),是否可以保证按小时数在各自的存储桶中正确排序?

My question is, if I do example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"), does that guarantee that the hourly counts will be ordered correctly in their respective buckets?

我读到RDD并不一定是这种情况(请参阅

I read that this is not necessarily the case for RDDs (see Spark sort by key and then group by to get ordered iterable?), but maybe it's different for DataFrames ?

如果没有,我该如何解决?

If not, how can I work around it ?

推荐答案

groupBy并不保持顺序.您要做的是使用Window函数-按小时数对id和order进行分区.您可以对此进行collect_list,然后使用结果列表的最大(最大)列表,因为它们是累积累积的(即,第一个小时在列表中仅包含自己,第二个小时在列表中具有2个元素,依此类推).

groupBy after orderBy doesn't maintain order, as others have pointed out. What you want to do is use a Window function--partition on id and order by hours. You can collect_list over this and then take the max (largest) of the resulting lists since they go cumulatively (i.e. the first hour will only have itself in the list, the second hour will have 2 elements in the list, and so on).

完整的示例代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val data = Seq(( "id1", 0, 12),
  ("id1", 1, 55),
  ("id1", 23, 44),
  ("id2", 0, 12),
  ("id2", 1, 89),
  ("id2", 23, 34)).toDF("id", "hour", "count")

    val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
    data.withColumn("collected", collect_list($"count")
                                                    .over(Window.partitionBy("id")
                                                                 .orderBy("hour")))
            .groupBy("id")
            .agg(max($"collected").as("collected"))
            .withColumn("hourly_count", mergeList($"collected"))
            .select("id", "hourly_count").show

这使我们进入了DataFrame世界.我还简化了OP使用的UDF代码.

This keeps us within the DataFrame world. I also simplified the UDF code the OP was using.

输出:

+---+------------+
| id|hourly_count|
+---+------------+
|id1|    12:55:44|
|id2|    12:89:34|
+---+------------+

这篇关于Spark DataFrame:orderBy之后的groupBy是否保持该顺序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆