如何使用我的相等比较器对 Spark DataFrame 进行分组? [英] How to GroupBy Spark DataFrame with my Equality Comparators?
问题描述
我想在带有我自己的相等比较器的 DataFrame 上使用 GroupBy 运算符.
I would like to use GroupBy operator on a DataFrame with my own equality comparators.
假设我想执行以下操作:
Let's assume that I want to execute something like:
df.groupBy("Year","Month").sum("Counter")
在这个数据帧中:
Year | Month | Counter
---------------------------
2012 | Jan | 100
12 | January | 200
12 | Janu | 300
2012 | Feb | 400
13 | Febr | 500
我必须实现两个比较器:
I have to implement two comparators:
1) 对于列年份:p.e.2012"==12"
1) For column Year: p.e. "2012" == "12"
2) 对于列 Month: p.e.一月"==一月"==一月"
2) For column Month: p.e. "Jan" == "January" == "Janu"
假设我已经实现了这两个比较器.我怎样才能调用它们?正如在这个示例中一样,我已经知道我必须将我的 DataFrame 转换为 RDD 才能使用我的比较器.
Let's assume that I already implemented these two comparators. How can I invoke them? As in this example, I already know that I have to convert my DataFrame into an RDD to make possible to use my comparators.
我考虑过使用 RDD GroupBy.
请注意我确实需要使用比较器来做到这一点.我无法使用 UDF、更改数据或创建新列.未来的想法是拥有密文列,其中我有一些函数可以让我比较两个密文是否相同.我想在我的比较器中使用它们.
Note that I really need to do this using comparators. I can't use UDFs, change the data or create new columns. The future idea is to have ciphertext columns, in which I have functions that allow me to compare if two ciphertexts are the same. I want to use them in my comparators.
此刻,我试图只用一列来做到这一点,例如:
In this moment, I am trying to do this with only one column, like:
df.groupBy("Year").sum("Counter")
我有一个包装类:
class ExampleWrapperYear (val year: Any) extends Serializable {
// override hashCode and Equals methods
}
然后,我这样做:
val rdd = df.rdd.keyBy(a => new ExampleWrapperYear(a(0))).groupByKey()
我的问题是如何进行求和",以及如何将 keyBy 与多列一起使用以使用 ExampleWrapperYear 和 ExampleWrapperMonth.
My question here is how to do the "sum", and how to use keyBy with multiple columns to use ExampleWrapperYear and ExampleWrapperMonth.
推荐答案
这个解决方案应该有效.以下是实现 hashCode 和 equals 的 case 类(我们可以称之为比较器).
This solution should work.Here are the case classes (we can call these as comparators) which implements hashCode and equals.
您可以根据不同的密文修改/更新hashCode和equals
You can modify/update hashCode and equals based on different ciphertexts
case class Year(var year:Int){
override def hashCode(): Int = {
this.year = this.year match {
case 2012 => 2012
case 12 => 2012
case 13 => 2013
case _ => this.year
}
this.year.hashCode()
}
override def equals(that: Any): Boolean ={
val year1 = 2000 + that.asInstanceOf[Year].year % 100
val year2 = 2000 + this.year % 100
if (year1 == year2)
true
else
false
}
}
case class Month(var month:String){
override def hashCode(): Int = {
this.month = this.month match {
case "January" => "Jan"
case "Janu" => "Jan"
case "February" => "Feb"
case "Febr" => "Feb"
case _ => this.month
}
this.month.hashCode
}
override def equals(that: Any): Boolean ={
val month1 = this.month match {
case "January" => "Jan"
case "Janu" => "Jan"
case "February" => "Feb"
case "Febr" => "Feb"
case _ => this.month
}
val month2 = that.asInstanceOf[Month].month match {
case "January" => "Jan"
case "Janu" => "Jan"
case "February" => "Feb"
case "Febr" => "Feb"
case _ => that.asInstanceOf[Month].month
}
if (month1.equals(month2))
true
else
false
}
}
这里是分组键的重要比较器,它简单地使用了单独的 col 比较器
Here is the important comparator for the grouped keys, which simply uses the individual col comparator
case class Key(var year:Year, var month:Month){
override def hashCode(): Int ={
this.year.hashCode() + this.month.hashCode()
}
override def equals(that: Any): Boolean ={
if ( this.year.equals(that.asInstanceOf[Key].year) && this.month.equals(that.asInstanceOf[Key].month))
true
else
false
}
}
case class Record(year:Int,month:String,counter:Int)
val df = spark.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("data.csv").as[Record]
df.rdd.groupBy[Key](
(record:Record)=>Key(Year(record.year), Month(record.month)))
.map(x=> Record(x._1.year.year, x._1.month.month, x._2.toList.map(_.counter).sum))
.toDS().show()
给出
+----+-----+-------+
|year|month|counter|
+----+-----+-------+
|2012| Feb| 800|
|2013| Feb| 500|
|2012| Jan| 700|
+----+-----+-------+
for this input in data.csv
Year,Month,Counter
2012,February,400
2012,Jan,100
12,January,200
12,Janu,300
2012,Feb,400
13,Febr,500
2012,Jan,100
请注意,对于案例类 Year 和 Month,也将值更新为标准值(否则无法预测它选择哪个值).
Please note that, for the case classes Year and Month, also updated the value to standard value (otherwise it is unpredictable which value it picks).
这篇关于如何使用我的相等比较器对 Spark DataFrame 进行分组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!