如何使用我的相等比较器对 Spark DataFrame 进行分组? [英] How to GroupBy Spark DataFrame with my Equality Comparators?

查看:22
本文介绍了如何使用我的相等比较器对 Spark DataFrame 进行分组?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在带有我自己的相等比较器的 DataFrame 上使用 GroupBy 运算符.

I would like to use GroupBy operator on a DataFrame with my own equality comparators.

假设我想执行以下操作:

Let's assume that I want to execute something like:

df.groupBy("Year","Month").sum("Counter")

在这个数据帧中:

Year    | Month      | Counter  
---------------------------
2012    | Jan        | 100          
12      | January    | 200       
12      | Janu       | 300       
2012    | Feb        | 400       
13      | Febr       | 500

我必须实现两个比较器:

I have to implement two comparators:

1) 对于列年份:p.e.2012"==12"

1) For column Year: p.e. "2012" == "12"

2) 对于列 Month: p.e.一月"==一月"==一月"

2) For column Month: p.e. "Jan" == "January" == "Janu"

假设我已经实现了这两个比较器.我怎样才能调用它们?正如在这个示例中一样,我已经知道我必须将我的 DataFrame 转换为 RDD 才能使用我的比较器.

Let's assume that I already implemented these two comparators. How can I invoke them? As in this example, I already know that I have to convert my DataFrame into an RDD to make possible to use my comparators.

我考虑过使用 RDD GroupBy.

请注意我确实需要使用比较器来做到这一点.我无法使用 UDF、更改数据或创建新列.未来的想法是拥有密文列,其中我有一些函数可以让我比较两个密文是否相同.我想在我的比较器中使用它们.

Note that I really need to do this using comparators. I can't use UDFs, change the data or create new columns. The future idea is to have ciphertext columns, in which I have functions that allow me to compare if two ciphertexts are the same. I want to use them in my comparators.

此刻,我试图只用一列来做到这一点,例如:

In this moment, I am trying to do this with only one column, like:

df.groupBy("Year").sum("Counter")

我有一个包装类:

class ExampleWrapperYear (val year: Any) extends Serializable {
      // override hashCode and Equals methods
}

然后,我这样做:

val rdd = df.rdd.keyBy(a => new ExampleWrapperYear(a(0))).groupByKey()

我的问题是如何进行求和",以及如何将 keyBy 与多列一起使用以使用 ExampleWrapperYear 和 ExampleWrapperMonth.

My question here is how to do the "sum", and how to use keyBy with multiple columns to use ExampleWrapperYear and ExampleWrapperMonth.

推荐答案

这个解决方案应该有效.以下是实现 hashCode 和 equals 的 case 类(我们可以称之为比较器).

This solution should work.Here are the case classes (we can call these as comparators) which implements hashCode and equals.

您可以根据不同的密文修改/更新hashCode和equals

You can modify/update hashCode and equals based on different ciphertexts

  case class Year(var year:Int){

    override def hashCode(): Int = {
      this.year = this.year match {
        case 2012 => 2012
        case 12 => 2012
        case 13 => 2013
        case _ => this.year
      }
      this.year.hashCode()
    }

    override def equals(that: Any): Boolean ={
      val year1 = 2000 + that.asInstanceOf[Year].year % 100
      val year2 = 2000 + this.year % 100
      if (year1 == year2)
        true
      else
        false
    }
  }

  case class Month(var month:String){

    override def hashCode(): Int = {
      this.month = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      this.month.hashCode
    }

    override def equals(that: Any): Boolean ={
      val month1 = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      val month2 = that.asInstanceOf[Month].month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => that.asInstanceOf[Month].month
      }
      if (month1.equals(month2))
        true
      else
        false
    }
  }

这里是分组键的重要比较器,它简单地使用了单独的 col 比较器

Here is the important comparator for the grouped keys, which simply uses the individual col comparator

  case class Key(var year:Year, var month:Month){

    override def hashCode(): Int ={
      this.year.hashCode() + this.month.hashCode()
    }

    override def equals(that: Any): Boolean ={
      if ( this.year.equals(that.asInstanceOf[Key].year) && this.month.equals(that.asInstanceOf[Key].month))
        true
      else
        false
    }
  }

  case class Record(year:Int,month:String,counter:Int)

  val df = spark.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("data.csv").as[Record]

  df.rdd.groupBy[Key](
      (record:Record)=>Key(Year(record.year), Month(record.month)))
      .map(x=> Record(x._1.year.year, x._1.month.month, x._2.toList.map(_.counter).sum))
      .toDS().show()

给出

+----+-----+-------+
|year|month|counter|
+----+-----+-------+
|2012|  Feb|    800|
|2013|  Feb|    500|
|2012|  Jan|    700|
+----+-----+-------+

for this input in data.csv

Year,Month,Counter
2012,February,400
2012,Jan,100
12,January,200
12,Janu,300
2012,Feb,400
13,Febr,500
2012,Jan,100

请注意,对于案例类 Year 和 Month,也将值更新为标准值(否则无法预测它选择哪个值).

Please note that, for the case classes Year and Month, also updated the value to standard value (otherwise it is unpredictable which value it picks).

这篇关于如何使用我的相等比较器对 Spark DataFrame 进行分组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆