如何使用我的平等比较器对Spark DataFrame进行分组? [英] How to GroupBy Spark DataFrame with my Equality Comparators?

查看:112
本文介绍了如何使用我的平等比较器对Spark DataFrame进行分组?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在DataFrame上使用GroupBy运算符和我自己的相等比较器.

I would like to use GroupBy operator on a DataFrame with my own equality comparators.

让我们假设我想执行以下操作:

Let's assume that I want to execute something like:

df.groupBy("Year","Month").sum("Counter")

在此DataFrame中:

In this DataFrame:

Year    | Month      | Counter  
---------------------------
2012    | Jan        | 100          
12      | January    | 200       
12      | Janu       | 300       
2012    | Feb        | 400       
13      | Febr       | 500

我必须实现两个比较器:

I have to implement two comparators:

1)对于年份列:p.e. "2012" =="12"

1) For column Year: p.e. "2012" == "12"

2)对于月份列:p.e. "Jan" =="January" =="Janu"

2) For column Month: p.e. "Jan" == "January" == "Janu"

让我们假设我已经实现了这两个比较器.我该如何调用它们?就像在这个示例中一样,我已经知道我必须将我的DataFrame转换为RDD,才能使用我的比较器.

Let's assume that I already implemented these two comparators. How can I invoke them? As in this example, I already know that I have to convert my DataFrame into an RDD to make possible to use my comparators.

我考虑过使用请注意,我确实需要使用比较器来完成此操作.我不能使用UDF,更改数据或创建新列.未来的想法是拥有密文列,其中有两个可以让我比较两个密文是否相同的函数.我想在比较器中使用它们.

Note that I really need to do this using comparators. I can't use UDFs, change the data or create new columns. The future idea is to have ciphertext columns, in which I have functions that allow me to compare if two ciphertexts are the same. I want to use them in my comparators.

此刻,我试图仅用一列来完成此操作,例如:

In this moment, I am trying to do this with only one column, like:

df.groupBy("Year").sum("Counter")

我有一个包装器类:

class ExampleWrapperYear (val year: Any) extends Serializable {
      // override hashCode and Equals methods
}

然后,我正在这样做:

val rdd = df.rdd.keyBy(a => new ExampleWrapperYear(a(0))).groupByKey()

我的问题在这里是如何做和",以及如何在多列中使用keyBy来使用ExampleWrapperYear和ExampleWrapperMonth.

My question here is how to do the "sum", and how to use keyBy with multiple columns to use ExampleWrapperYear and ExampleWrapperMonth.

推荐答案

此解决方案应该有效.以下是实现hashCode和equals的案例类(我们可以将它们称为比较器).

This solution should work.Here are the case classes (we can call these as comparators) which implements hashCode and equals.

您可以基于不同的密文修改/更新hashCode和等值符

You can modify/update hashCode and equals based on different ciphertexts

  case class Year(var year:Int){

    override def hashCode(): Int = {
      this.year = this.year match {
        case 2012 => 2012
        case 12 => 2012
        case 13 => 2013
        case _ => this.year
      }
      this.year.hashCode()
    }

    override def equals(that: Any): Boolean ={
      val year1 = 2000 + that.asInstanceOf[Year].year % 100
      val year2 = 2000 + this.year % 100
      if (year1 == year2)
        true
      else
        false
    }
  }

  case class Month(var month:String){

    override def hashCode(): Int = {
      this.month = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      this.month.hashCode
    }

    override def equals(that: Any): Boolean ={
      val month1 = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      val month2 = that.asInstanceOf[Month].month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => that.asInstanceOf[Month].month
      }
      if (month1.equals(month2))
        true
      else
        false
    }
  }

这是分组键的重要比较器,它仅使用单独的col比较器

Here is the important comparator for the grouped keys, which simply uses the individual col comparator

  case class Key(var year:Year, var month:Month){

    override def hashCode(): Int ={
      this.year.hashCode() + this.month.hashCode()
    }

    override def equals(that: Any): Boolean ={
      if ( this.year.equals(that.asInstanceOf[Key].year) && this.month.equals(that.asInstanceOf[Key].month))
        true
      else
        false
    }
  }

  case class Record(year:Int,month:String,counter:Int)

  val df = spark.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("data.csv").as[Record]

  df.rdd.groupBy[Key](
      (record:Record)=>Key(Year(record.year), Month(record.month)))
      .map(x=> Record(x._1.year.year, x._1.month.month, x._2.toList.map(_.counter).sum))
      .toDS().show()

给出

+----+-----+-------+
|year|month|counter|
+----+-----+-------+
|2012|  Feb|    800|
|2013|  Feb|    500|
|2012|  Jan|    700|
+----+-----+-------+

for this input in data.csv

Year,Month,Counter
2012,February,400
2012,Jan,100
12,January,200
12,Janu,300
2012,Feb,400
13,Febr,500
2012,Jan,100

请注意,对于年"和月"案例类,还将该值更新为标准值(否则将无法预测选择哪个值).

Please note that, for the case classes Year and Month, also updated the value to standard value (otherwise it is unpredictable which value it picks).

这篇关于如何使用我的平等比较器对Spark DataFrame进行分组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆