Spark sql如何在不丢失空值的情况下爆炸 [英] Spark sql how to explode without losing null values

查看:41
本文介绍了Spark sql如何在不丢失空值的情况下爆炸的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个要扁平化的数据框.作为过程的一部分,我想分解它,所以如果我有一列数组,数组的每个值都将用于创建一个单独的行.例如,

I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. For instance,

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]

应该变成

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

这是我的代码

private DataFrame explodeDataFrame(DataFrame df) {
    DataFrame resultDf = df;
    for (StructField field : df.schema().fields()) {
        if (field.dataType() instanceof ArrayType) {
            resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
            resultDf.show();
        }
    }
    return resultDf;
}

问题是在我的数据中,一些数组列有空值.在这种情况下,整行都将被删除.所以这个数据框:

The problem is that in my data, some of the array columns have nulls. In that case, the entire row is deleted. So this dataframe:

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
2  | Lucy | null

变成

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

代替

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
2  | Lucy | null

如何分解我的数组,以免丢失空行?

How can I explode my arrays so that I don't lose the null rows?

我使用的是 Spark 1.5.2 和 Java 8

I am using Spark 1.5.2 and Java 8

推荐答案

Spark 2.2+

你可以使用explode_outer函数:

import org.apache.spark.sql.functions.explode_outer

df.withColumn("likes", explode_outer($"likes")).show

// +---+----+--------+
// | id|name|   likes|
// +---+----+--------+
// |  1|Luke|baseball|
// |  1|Luke|  soccer|
// |  2|Lucy|    null|
// +---+----+--------+

Spark <= 2.1

在 Scala 中,但 Java 中的等效项应该几乎相同(要导入单个函数,请使用 import static).

In Scala but Java equivalent should be almost identical (to import individual functions use import static).

import org.apache.spark.sql.functions.{array, col, explode, lit, when}

val df = Seq(
  (1, "Luke", Some(Array("baseball", "soccer"))),
  (2, "Lucy", None)
).toDF("id", "name", "likes")

df.withColumn("likes", explode(
  when(col("likes").isNotNull, col("likes"))
    // If null explode an array<string> with a single null
    .otherwise(array(lit(null).cast("string")))))

这里的想法基本上是将 NULL 替换为所需类型的 array(NULL).对于复杂类型(又名 structs),您必须提供完整的架构:

The idea here is basically to replace NULL with an array(NULL) of a desired type. For complex type (a.k.a structs) you have to provide full schema:

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")

val st =  StructType(Seq(
  StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast(st)))))

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))

注意:

如果创建数组 ColumncontainsNull 设置为 false,您应该先更改它(使用 Spark 2.1 测试):

If array Column has been created with containsNull set to false you should change this first (tested with Spark 2.1):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))

这篇关于Spark sql如何在不丢失空值的情况下爆炸的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆