如何使用Scala与我的比较器对DataFrame进行排序? [英] How to Sort DataFrame with my Comparator using Scala?

查看:316
本文介绍了如何使用Scala与我的比较器对DataFrame进行排序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想用自己的比较器根据一列对DataFrame进行排序.可以在Spark SQL中执行此操作吗?

I would like to sort a DataFrame based on a column with my own comparator. It is possible to do this in Spark SQL?

例如,假设我有一个注册为"MyTable"的DataFrame,其"Day"列的类型为"string":

For example, let's suppose that I have a DataFrame registred as Table "MyTable" with a column "Day" which its type is "string":

id  | Day  
--------------------
1   | Fri           
2   | Mon           
3   | Sat           
4   | Sun           
5   | Thu           

我想执行此查询:

SELECT * FROM MyTable ORDER BY Day

我想用自己的比较器订购日"列.我考虑过使用UDF,但我不知道是否可能.请注意,我真的想在排序/排序依据"操作中使用我的比较器.我不想将String从Day列转换为Datetime或类似的内容.

I would like to order the column "Day" with my own comparator. I thought about using a UDF but I don't know if it is possible. Note that I really want to use my comparator in Sort/Order By operations. I don't want to convert String from column Day to Datetime or something similar.

推荐答案

在SparkSQL中,您别无选择,需要对一个或多个列使用orderBy.对于RDD,如果您愿意,可以使用类似Java的自定义比较器.实际上,这是RDDsortBy方法的签名(

In SparkSQL, you do not have a choice and need to use orderBy with one or more column(s). With RDDs, you can use a custom java-like comparator if you feel like it. Indeed, here is the signature of the sortBy method of an RDD (cf the scaladoc of Spark 2.4):

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 

这意味着您可以提供一个选择的Ordering,它与Java Comparator完全一样(Ordering实际上是从Comparator继承的).

This means that you can provide an Ordering of your choice, which is exactly like a java Comparator (Ordering actually inherit from Comparator).

为简单起见,假设我要按列"x"的绝对值进行排序(这可以在没有比较器的情况下完成,但是假设我需要使用比较器).我首先在行上定义比较器:

For simplicity, let's say I want to sort by absolute value of a column 'x' (this can be done without a comparator, but let's assume I need to use a comparator). I start by defining my comparator on rows:

class RowOrdering extends Ordering[Row] {
    def compare(x : Row, y : Row): Int = x.getAs[Int]("x").abs - y.getAs[Int]("x").abs
}

现在让我们定义数据并对其进行排序:

Now let's define data and sort it:

val df = Seq( (0, 1),(1, 2),(2, 4),(3, 7),(4, 1),(5, -1),(6, -2),
    (7, 5),(8, 5), (9, 0), (10, -9)).toDF("id", "x")
val rdd = df.rdd.sortBy(identity)(new RowOrdering(), scala.reflect.classTag[Row])
val sorted_df = spark.createDataFrame(rdd, df.schema)
sorted_df.show
+---+---+
| id|  x|
+---+---+
|  9|  0|
|  0|  1|
|  4|  1|
|  5| -1|
|  6| -2|
|  1|  2|
|  2|  4|
|  7|  5|
|  8|  5|
|  3|  7|
| 10| -9|
+---+---+

另一种解决方案是定义隐式排序,这样您在排序时就不需要提供它.

Another solution is to define an implicit ordering so that you don't need to provide it when sorting.

implicit val ord = new RowOrdering()
df.rdd.sortBy(identity)

最后,请注意,df.rdd.sortBy(_.getAs[Int]("x").abs)将获得相同的结果.另外,您可以使用元组排序来执行更复杂的操作,例如按绝对值排序,如果相等,则将正值放在第一位:

Finally, note that df.rdd.sortBy(_.getAs[Int]("x").abs) would achive the same result. Also, you can use tuple ordering to do more complex things such as order by absolute values, and if equal, put the positive values first:

df.rdd.sortBy(x => (x.getAs[Int]("x").abs, - x.getAs[Int]("x"))) //RDD
df.orderBy(abs($"x"), - $"x") //dataframe

这篇关于如何使用Scala与我的比较器对DataFrame进行排序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆