将可为空的列作为参数传递给 Spark SQL UDF [英] Passing nullable columns as parameter to Spark SQL UDF

查看:34
本文介绍了将可为空的列作为参数传递给 Spark SQL UDF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是一个 Spark UDF,我用它来计算使用几列的值.

Here is a Spark UDF that I'm using to compute a value using few columns.

def spark_udf_func(s: String, i:Int): Boolean = { 
    // I'm returning true regardless of the parameters passed to it.
    true
}

val spark_udf = org.apache.spark.sql.functions.udf(spark_udf_func _)

val df = sc.parallelize(Array[(Option[String], Option[Int])](
  (Some("Rafferty"), Some(31)), 
  (null, Some(33)), 
  (Some("Heisenberg"), Some(33)),  
  (Some("Williams"), null)
)).toDF("LastName", "DepartmentID")

df.withColumn("valid", spark_udf(df.col("LastName"), df.col("DepartmentID"))).show()

+----------+------------+-----+
|  LastName|DepartmentID|valid|
+----------+------------+-----+
|  Rafferty|          31| true|
|      null|          33| true|
|Heisenberg|          33| true|
|  Williams|        null| null|
+----------+------------+-----+

谁能解释为什么最后一行的有效列的值为空?

Can anyone explain why the value for column valid is null for the last row?

当我检查 spark 计划时,我能够确定该计划有一个案例条件,它表示如果 column2 (DepartmentID) 为 null,则它必须返回 null.

When I checked the spark plan I was able to figure that the plan has a case condition where it says if column2 (DepartmentID) is null it has to return null.

== Physical Plan ==

*Project [_1#699 AS LastName#702, _2#700 AS DepartmentID#703, if (isnull(_2#700)) null else UDF(_1#699, _2#700) AS valid#717]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), assertnotnull(input[0, scala.Tuple2, true])._1), true) AS _1#699, unwrapoption(IntegerType, assertnotnull(input[0, scala.Tuple2, true])._2) AS _2#700]
   +- Scan ExternalRDDScan[obj#698]

为什么我们在 Spark 中有这样的行为?
为什么只有整数列?
我在这里做错了什么,当 UDF 参数为空时,在 UDF 中处理空值的正确方法是什么?

Why do we have such behaviour in Spark?
Why only Integer columns?
What is it that I'm doing wrong here, what is the proper way to handle null's within UDF when the UDF parameter is null?

推荐答案

问题是 null 不是 scala Int 的有效值(它是支持值),而它是 String 的有效值.Int 等价于 java int 原语并且必须有一个值.这意味着当值为 null 时无法调用 udf,因此 null 仍然存在.

The issue is that null is not a valid value for scala Int (which is the backing value) while it is a valid value for String. Int is equivalent to java int primitive and must have a value. This means the udf can't be called when the value is null and therefore null remains.

有两种方法可以解决这个问题:

There are two ways to solve this:

  1. 更改函数以接受 java.lang.Integer(它是一个对象,可以为 null)
  2. 如果你不能改变函数,你可以使用 when/otherwise 来做一些特殊的事情,以防为空.例如 when(col("int col").isNull, someValue).otherwise(原始调用)

可以在此处找到对此的很好解释

A good explanation of this can be found here

这篇关于将可为空的列作为参数传递给 Spark SQL UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆