Scala Spark数据帧爆炸很慢-因此,替代方法-根据列中的数组创建列和行 [英] scala spark dataframe explode is slow - so, alternate method - create columns and rows from arrays in a column

查看:68
本文介绍了Scala Spark数据帧爆炸很慢-因此,替代方法-根据列中的数组创建列和行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Scala 2.11.8,spark 2.0.1

Scala 2.11.8, spark 2.0.1

爆炸功能非常慢-因此,正在寻找其他方法.我认为RDD与flatmap一起使用是可能的-非常感谢您的帮助.

The explode function is very slow - so, looking for an alternate method. I think it is possible with RDD's with flatmap - and, help is greatly appreciated.

我有一个udf,它返回不同长度的List(String,String,String,Int).对于数据框中的每一行,我想创建多行,并创建多列.

I have an udf that returns List(String, String, String, Int) of varying lengths. For each row in the dataframe, I want to create multiple rows, and make multiple columns.

def Udf = udf ( (s: String ) => {
   if (s=="1") Seq(("a", "b", "c", 0), ("a1", "b1", "c1", 1), ("a2", "b2", "c2", 2)).toList   
       else Seq(("a", "b", "c", 0)).toList
})

val df = Seq(("a", "1"), ("b", "2")).toDF("A", "B")
val df1 = df.withColumn("C", Udf($"B"))
val df2 = df1.select($"A", explode($"C"))
val df3 = df2.withColumn("D", $"col._1").withColumn("E", $"col._2").withColumn("F", $"col._3").withColumn("G", $"col._4")

/// dataframe after going through udf
+---+---+--------------------+
|  A|  B|                   C|
+---+---+--------------------+
|  a|  1|[[a,b,c,0], [a1,b...|
|  b|  2|         [[a,b,c,0]]|
+---+---+--------------------+

///Final dataframe
+---+------------+---+---+---+---+
|  A|         col|  D|  E|  F|  G|
+---+------------+---+---+---+---+
|  a|   [a,b,c,0]|  a|  b|  c|  0|
|  a|[a1,b1,c1,1]| a1| b1| c1|  1|
|  a|[a2,b2,c2,2]| a2| b2| c2|  2|
|  b|   [a,b,c,0]|  a|  b|  c|  0|
+---+------------+---+---+---+---+

这在数百万行上非常慢.耗时超过12个小时.

This is very slow on many millions of rows. Takes over 12 hours.

推荐答案

这是另一个简单的示例:

Here is another simple example:

val ds = sc.parallelize(Seq((0, "Lorem ipsum dolor", 1.0, Array("prp1", "prp2", "prp3"))))

使用flatMap展开数组的另一种方法.

Alternative way of exploding arrays using flatMaps.

ds.flatMap { t => 
  t._4.map { prp => 
    (t._1, t._2, t._3, prp) }}.collect.foreach(println) 

结果:

(0,Lorem ipsum dolor,1.0,prp1)
(0,Lorem ipsum dolor,1.0,prp2)
(0,Lorem ipsum dolor,1.0,prp3)

尝试过使用数据集,但不确定是否是最佳的数据处理方式.

Tried with your dataset but not sure if its the optimal way of doing it.

df1.show(false)

+---+---+------------------------------------------------+
|A  |B  |C                                               |
+---+---+------------------------------------------------+
|a  |1  |[[a, b, c, 0], [a1, b1, c1, 1], [a2, b2, c2, 2]]|
|b  |2  |[[a, b, c, 0]]                                  |
+---+---+------------------------------------------------+


df1.rdd.flatMap { t:Row => t.getSeq(2).map { row: Row => (t.getString(0),t.getString(1),row)}}
.map {
    case (col1: String,col2: String, col3: Row) => (col1, col2,col3.getString(0),col3.getString(1),col3.getString(2),col3.getInt(3))
  }.collect.foreach(println)

结果:

(a,1,a,b,c,0)
(a,1,a1,b1,c1,1)
(a,1,a2,b2,c2,2)
(b,2,a,b,c,0)  

希望这会有所帮助!!

Hope this helps!!

这篇关于Scala Spark数据帧爆炸很慢-因此,替代方法-根据列中的数组创建列和行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆