在Spark中具有嵌套元组或嵌套列并按嵌套列进行过滤或分组的最佳方法 [英] Best way to have nested tuples or nested columns in the Spark and filter by or group by nested column
问题描述
在按嵌套列分组时遇到问题
I'm having issue for grouping by nested column
我的应用程序scala版本是2.11.7,这是我的sbt依赖项
my application scala version is is 2.11.7 and This is my sbt dependency
libraryDependencies ++= {
val akkaVersion = "2.4.10"
val sparkVersion = "2.1.1"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe" % "config" % "1.3.0" ,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion
)
}
这是我的示例数据(1行)
This is my sample data (1 row)
124567893|254887452|52448796|2017-02-22 00:00:02|1|4|0014551233548|N|0|0|2||2|44|4||1|1|||2|-1||1|USD|||1457784114521||7|[1~26.927900~0.390200][4~0.000000~0.000000][8~0.000000~0.000000][9~0.000000~0.000000][11~0.000000~0.000000][12~0.000000~0.000000][13~0.000000~0.000000][71~0.000000~0.000000][91~0.000000~0.000000][111~0.000000~0.000000][131~0.000000~0.000000][251~0.000000~0.000000][311~0.000000~0.000000][331~0.000000~0.000000][451~0.000000~0.000000][3~0.000000~0.000000]|[323~4517.702200~0.390200][384~5310.000000~0.000000][443~4296.000000~0.000000][463~0.000000~0.000000][1024~10.535400~0.390200][1343~57.980000~0.000000][783~0.000000~0.000000][303~0.000000~0.000000][403~10.535400~0.390200][523~13790.000000~0.000000][1143~0.000000~0.000000][763~0.000000~0.000000]|
这是我的地图绘制者
case class SampleMap(
id: Long, //1
a_id_1: Long, //2
b_id_2: Long, //3
date_time: String, //4
subscriber_type: Int, //5
x_type: Int, //6
sub_id_2: String, //7
account_type: Int, //11
master_sub_id: String, //12
application_id: Int, //13
sup_type_id: Int, //14
unit_type_id: Int, //15
usage_amount: Long, //16
type_of_charge: String, //17
identity_id: Int, //18
group_id: String, //19
charge_code: String, //20
content_type: Int, //21
fund_usage_type: Int, //24
msc_id: String, //28
circle_id: Int, //29
sp_id: Int, //30
balance: List[(Int, Double, Double)], //31
z_info: List[(Int, Double, Double] //33
)
我已经编写了用于过滤和映射的代码
I have written the code to filter and map
private def mappingSparkLoadedSMSData(sparkRdd:Dataset[String]): Dataset[SMSMap] = {
import SparkFactory.spark.implicits._
sparkRdd
.map(_.split("\\|",-1))
.filter(_.length==33) //adding last empty string
.map(
data =>
SMSMap(
{if(data(0).nonEmpty) data(0).toLong else 0 },
{if(data(1).nonEmpty) data(1).toLong else 0 },
{if(data(2).nonEmpty) data(2).toLong else 0 },
data(3),
{if(data(4).nonEmpty) data(4).toInt else 0 },
{if(data(5).nonEmpty) data(5).toInt else 0 },
data(6),
{if(data(10).nonEmpty) data(10).toInt else 0 },
data(11),
{if(data(12).nonEmpty) data(12).toInt else 0 },
{if(data(13).nonEmpty) data(13).toInt else 0 },
{if(data(14).nonEmpty) data(14).toInt else 0 },
{if(data(15).nonEmpty) data(15).toLong else 0 },
data(16),
{if(data(17).nonEmpty) data(17).toInt else 0 },
data(18),
data(19),
{if(data(20).nonEmpty) data(20).toInt else 0 },
{if(data(23).nonEmpty) data(23).toInt else 0 },
data(27),
{if(data(28).nonEmpty) data(28).toInt else 0 },
{if(data(29).nonEmpty) data(29).toInt else 0 },
data(30)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList,
data(31)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList
)
)
}
然后我创建临时视图并尝试像这样查询
And then I'm creating the temp view and trying to query like this
formattedRDD.createOrReplaceTempView("temp_table") //formattedRDD is a val that stored after Mapping
spark.sql(
s" select balance from temp_table group by balance"
).collectAsList()
当你看着 y_info:List [(Int,Double,Double)],//31
when you look at the y_info: List[(Int, Double, Double)], //31
第一列为bal_id(Int),第二列为change_balance(Double),第三列为累加(Double),并且具有多个集合
the first column is bal_id (Int) and second one is change_balance (Double) and third one is accumulated (Double) and it has more than one sets
现在,我想按bal_id分组并获取change_balance的总和,但我无法做到这一点(当然不能这样做,因为每个值都是值)
Now I wanted to group by bal_id and get the sum of change_balance but I couldn't do that (of course can not do that because each one is value)
我的想法是将余额(余额:List [(Int,Double,Double)],//31)分离到不同的数据集/表以及映射和分组中,但是要分离,我们需要添加一个auto_increment_id或任何其他对象数据集/表的唯一行标识符(用于映射)(请注意,id可以重复)
I had the idea to separate the balance ( balance: List[(Int, Double, Double)], //31 ) in to different dataset/table and mapping and grouping but to separate we need to add a auto_increment_id or any unique row identifier for both dataset/table for mapping purpose (note that id can be duplicate)
我真的很困惑.任何人请帮助我.预先感谢
I'm really confused with this. Any one please help me. Thanks in advance
推荐答案
如果将余额列分为三个不同的列,则在bal_id
和sum
change_balance
上groupBy
会很容易.
您可以在初始阶段创建这三个单独的列.
根据我对您的问题的理解,这是解决方案:
您需要在案例类中包括三个列名称:
If you separate the balance column to three different columns, it would be easy for you to groupBy
on bal_id
and sum
change_balance
.
you can create these three separate columns in your initial stage.
Here's the solution according to what I understood from your question:
You need to include the three column names in your case class as :
case class SampleMap(
id: Long, //1
a_id_1: Long, //2
b_id_2: Long, //3
date_time: String, //4
subscriber_type: Int, //5
x_type: Int, //6
sub_id_2: String, //7
account_type: Int, //11
master_sub_id: String, //12
application_id: Int, //13
sup_type_id: Int, //14
unit_type_id: Int, //15
usage_amount: Long, //16
type_of_charge: String, //17
identity_id: Int, //18
group_id: String, //19
charge_code: String, //20
content_type: Int, //21
fund_usage_type: Int, //24
msc_id: String, //28
circle_id: Int, //29
sp_id: Int, //30
balance: List[(Int, Double, Double)], //31
bal_id: Int, //added by Ramesh
change_balance: Double, //added by Ramesh
accumulated: Double, //added by Ramesh
z_info: List[(Int, Double, Double)] //33
)
在创建数据框/数据集时,必须将这三个值分隔为单独的列.以下是代码的改进版本:
You have to separate those three values to separate columns while creating dataframe/dataset. Following is the improved version of your code :
val formattedRDD = sparkRdd.map(_.split("\\|",-1))
.filter(_.length==33) //adding last empty string
.map( data => {
val balance = Try(data(30)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList) getOrElse List((0, 0.0, 0.0))
SampleMap(
Try(data(0).toLong) getOrElse 0,
Try(data(1).toLong) getOrElse 0,
Try(data(2).toLong) getOrElse 0,
Try(data(3).toString) getOrElse "",
Try(data(4).toInt) getOrElse 0,
Try(data(5).toInt) getOrElse 0,
Try(data(6).toString) getOrElse "",
0,
Try(data(11).toString) getOrElse "",
Try(data(12).toInt) getOrElse 0,
Try(data(13).toInt) getOrElse 0,
Try(data(14).toInt) getOrElse 0,
Try(data(15).toLong) getOrElse 0,
Try(data(16).toString) getOrElse "",
Try(data(17).toInt) getOrElse 0,
Try(data(18).toString) getOrElse "",
Try(data(19).toString) getOrElse "",
Try(data(20).toInt) getOrElse 0,
Try(data(23).toInt) getOrElse 0,
Try(data(27).toString) getOrElse "",
Try(data(28).toInt) getOrElse 0,
Try(data(29).toInt) getOrElse 0,
balance, //this is the 30th value i.e. balance
balance(0)._1, //this is bal_id
balance(0)._2, //this is change_balance
balance(0)._3, //this is accumulator
Try(data(31)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList) getOrElse List.empty
)
}
)
.toDS()
现在您需要做的就是调用聚合器
Now all you need to do is call an aggregator
import org.apache.spark.sql.functions.sum
formattedRDD.groupBy("bal_id").agg(sum("change_balance")).show
我希望这是您的必需解决方案
I hope this is your required solution
这篇关于在Spark中具有嵌套元组或嵌套列并按嵌套列进行过滤或分组的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!