在Spark中具有嵌套元组或嵌套列并按嵌套列进行过滤或分组的最佳方法 [英] Best way to have nested tuples or nested columns in the Spark and filter by or group by nested column

查看:111
本文介绍了在Spark中具有嵌套元组或嵌套列并按嵌套列进行过滤或分组的最佳方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在按嵌套列分组时遇到问题

I'm having issue for grouping by nested column

我的应用程序scala版本是2.11.7,这是我的sbt依赖项

my application scala version is is 2.11.7 and This is my sbt dependency

libraryDependencies ++= {
  val akkaVersion = "2.4.10"
  val sparkVersion = "2.1.1"

  Seq(
    "com.typesafe.akka" %% "akka-actor"                           % akkaVersion,
    "com.typesafe"      %  "config"                               % "1.3.0" ,
    "org.apache.spark"  %%  "spark-core"                          % sparkVersion,
    "org.apache.spark"  %%  "spark-sql"                           % sparkVersion,
    "com.typesafe.akka" %% "akka-slf4j"                           % akkaVersion,
    "org.apache.spark"  %% "spark-streaming"                      % sparkVersion
  )
}

这是我的示例数据(1行)

This is my sample data (1 row)

124567893|254887452|52448796|2017-02-22 00:00:02|1|4|0014551233548|N|0|0|2||2|44|4||1|1|||2|-1||1|USD|||1457784114521||7|[1~26.927900~0.390200][4~0.000000~0.000000][8~0.000000~0.000000][9~0.000000~0.000000][11~0.000000~0.000000][12~0.000000~0.000000][13~0.000000~0.000000][71~0.000000~0.000000][91~0.000000~0.000000][111~0.000000~0.000000][131~0.000000~0.000000][251~0.000000~0.000000][311~0.000000~0.000000][331~0.000000~0.000000][451~0.000000~0.000000][3~0.000000~0.000000]|[323~4517.702200~0.390200][384~5310.000000~0.000000][443~4296.000000~0.000000][463~0.000000~0.000000][1024~10.535400~0.390200][1343~57.980000~0.000000][783~0.000000~0.000000][303~0.000000~0.000000][403~10.535400~0.390200][523~13790.000000~0.000000][1143~0.000000~0.000000][763~0.000000~0.000000]|

这是我的地图绘制者

case class SampleMap(
                   id: Long, //1
                   a_id_1: Long, //2
                   b_id_2: Long, //3
                   date_time: String, //4
                   subscriber_type: Int, //5
                   x_type: Int, //6
                   sub_id_2: String, //7
                   account_type: Int, //11
                   master_sub_id: String, //12
                   application_id: Int, //13
                   sup_type_id: Int, //14
                   unit_type_id: Int, //15
                   usage_amount: Long, //16
                   type_of_charge: String, //17
                   identity_id: Int, //18
                   group_id: String, //19
                   charge_code: String, //20
                   content_type: Int, //21
                   fund_usage_type: Int, //24
                   msc_id: String, //28
                   circle_id: Int, //29
                   sp_id: Int, //30
                   balance: List[(Int, Double, Double)], //31
                   z_info: List[(Int, Double, Double] //33

                 )

我已经编写了用于过滤和映射的代码

I have written the code to filter and map

 private def mappingSparkLoadedSMSData(sparkRdd:Dataset[String]): Dataset[SMSMap] = {

    import SparkFactory.spark.implicits._
    sparkRdd
      .map(_.split("\\|",-1))
      .filter(_.length==33)       //adding last empty string
      .map(
      data =>
        SMSMap(

          {if(data(0).nonEmpty) data(0).toLong else 0 },
          {if(data(1).nonEmpty) data(1).toLong else 0 },
          {if(data(2).nonEmpty) data(2).toLong else 0 },
          data(3),
          {if(data(4).nonEmpty) data(4).toInt else 0 },
          {if(data(5).nonEmpty) data(5).toInt else 0 },
          data(6),
          {if(data(10).nonEmpty) data(10).toInt else 0 },
          data(11),
          {if(data(12).nonEmpty) data(12).toInt else 0 },
          {if(data(13).nonEmpty) data(13).toInt else 0 },
          {if(data(14).nonEmpty) data(14).toInt else 0 },
          {if(data(15).nonEmpty) data(15).toLong else 0 },
          data(16),
          {if(data(17).nonEmpty) data(17).toInt else 0 },
          data(18),
          data(19),
          {if(data(20).nonEmpty) data(20).toInt else 0 },
          {if(data(23).nonEmpty) data(23).toInt else 0 },
          data(27),
          {if(data(28).nonEmpty) data(28).toInt else 0 },
          {if(data(29).nonEmpty) data(29).toInt else 0 },

          data(30)
            .drop(1)
            .dropRight(1)
            .split("\\]\\[")
            .map(_.split('~'))
            .filter(data =>  data.length > 2 && data(2).nonEmpty &&  data(2).toDouble != 0)
            .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
            .toList,

          data(31)
            .drop(1)
            .dropRight(1)
            .split("\\]\\[")
            .map(_.split('~'))
            .filter(data =>  data.length > 2 && data(2).nonEmpty &&  data(2).toDouble != 0)
            .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
            .toList


        )
    )
  }

然后我创建临时视图并尝试像这样查询

And then I'm creating the temp view and trying to query like this

formattedRDD.createOrReplaceTempView("temp_table")  //formattedRDD is a val that stored after Mapping

spark.sql(
      s" select balance from temp_table group by balance"
    ).collectAsList()

当你看着 y_info:List [(Int,Double,Double)],//31

when you look at the y_info: List[(Int, Double, Double)], //31

第一列为bal_id(Int),第二列为change_balance(Double),第三列为累加(Double),并且具有多个集合

the first column is bal_id (Int) and second one is change_balance (Double) and third one is accumulated (Double) and it has more than one sets

现在,我想按bal_id分组并获取change_balance的总和,但我无法做到这一点(当然不能这样做,因为每个值都是值)

Now I wanted to group by bal_id and get the sum of change_balance but I couldn't do that (of course can not do that because each one is value)

我的想法是将余额(余额:List [(Int,Double,Double)],//31)分离到不同的数据集/表以及映射和分组中,但是要分离,我们需要添加一个auto_increment_id或任何其他对象数据集/表的唯一行标识符(用于映射)(请注意,id可以重复)

I had the idea to separate the balance ( balance: List[(Int, Double, Double)], //31 ) in to different dataset/table and mapping and grouping but to separate we need to add a auto_increment_id or any unique row identifier for both dataset/table for mapping purpose (note that id can be duplicate)

我真的很困惑.任何人请帮助我.预先感谢

I'm really confused with this. Any one please help me. Thanks in advance

推荐答案

如果将余额列分为三个不同的列,则在bal_idsum change_balancegroupBy会很容易.
您可以在初始阶段创建这三个单独的列.
根据我对您的问题的理解,这是解决方案:

您需要在案例类中包括三个列名称:

If you separate the balance column to three different columns, it would be easy for you to groupBy on bal_id and sum change_balance.
you can create these three separate columns in your initial stage.
Here's the solution according to what I understood from your question:

You need to include the three column names in your case class as :

case class SampleMap(
                      id: Long, //1
                      a_id_1: Long, //2
                      b_id_2: Long, //3
                      date_time: String, //4
                      subscriber_type: Int, //5
                      x_type: Int, //6
                      sub_id_2: String, //7
                      account_type: Int, //11
                      master_sub_id: String, //12
                      application_id: Int, //13
                      sup_type_id: Int, //14
                      unit_type_id: Int, //15
                      usage_amount: Long, //16
                      type_of_charge: String, //17
                      identity_id: Int, //18
                      group_id: String, //19
                      charge_code: String, //20
                      content_type: Int, //21
                      fund_usage_type: Int, //24
                      msc_id: String, //28
                      circle_id: Int, //29
                      sp_id: Int, //30
                      balance: List[(Int, Double, Double)], //31
                      bal_id: Int,              //added by Ramesh
                      change_balance: Double,   //added by Ramesh
                      accumulated: Double,      //added by Ramesh
                      z_info: List[(Int, Double, Double)] //33
                    )

在创建数据框/数据集时,必须将这三个值分隔为单独的列.以下是代码的改进版本:

You have to separate those three values to separate columns while creating dataframe/dataset. Following is the improved version of your code :

val formattedRDD = sparkRdd.map(_.split("\\|",-1))
      .filter(_.length==33)       //adding last empty string
      .map( data => {
        val balance = Try(data(30)
          .drop(1)
          .dropRight(1)
          .split("\\]\\[")
          .map(_.split('~'))
          .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
          .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
          .toList) getOrElse List((0, 0.0, 0.0))

        SampleMap(
          Try(data(0).toLong) getOrElse 0,
          Try(data(1).toLong) getOrElse 0,
          Try(data(2).toLong) getOrElse 0,
          Try(data(3).toString) getOrElse "",
          Try(data(4).toInt) getOrElse 0,
          Try(data(5).toInt) getOrElse 0,
          Try(data(6).toString) getOrElse "",
          0,
          Try(data(11).toString) getOrElse "",
          Try(data(12).toInt) getOrElse 0,
          Try(data(13).toInt) getOrElse 0,
          Try(data(14).toInt) getOrElse 0,
          Try(data(15).toLong) getOrElse 0,
          Try(data(16).toString) getOrElse "",
          Try(data(17).toInt) getOrElse 0,
          Try(data(18).toString) getOrElse "",
          Try(data(19).toString) getOrElse "",
          Try(data(20).toInt) getOrElse 0,
          Try(data(23).toInt) getOrElse 0,
          Try(data(27).toString) getOrElse "",
          Try(data(28).toInt) getOrElse 0,
          Try(data(29).toInt) getOrElse 0,
          balance,               //this is the 30th value i.e. balance
          balance(0)._1,         //this is bal_id
          balance(0)._2,         //this is change_balance
          balance(0)._3,         //this is accumulator

          Try(data(31)
            .drop(1)
            .dropRight(1)
            .split("\\]\\[")
            .map(_.split('~'))
            .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
            .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
            .toList) getOrElse List.empty
        )
      }
    )
  .toDS()

现在您需要做的就是调用聚合器

Now all you need to do is call an aggregator

import org.apache.spark.sql.functions.sum
formattedRDD.groupBy("bal_id").agg(sum("change_balance")).show

我希望这是您的必需解决方案

I hope this is your required solution

这篇关于在Spark中具有嵌套元组或嵌套列并按嵌套列进行过滤或分组的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆