Scala Spark DataFrame用udf返回值修改列 [英] scala spark dataframe modify column with udf return value

查看:443
本文介绍了Scala Spark DataFrame用udf返回值修改列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有时间戳字段的spark数据框,我想将其转换为long数据类型。我使用了UDF,但独立代码可以正常工作,但是当我插入需要转换任何时间戳的通用逻辑时,它就无法正常工作。问题是如何将UDF的返回值返回到数据帧列

I have a spark dataframe which has a timestamp field and i want to convert this to long datatype. I used a UDF and the standalone code works fine but when i plug to to a generic logic where any timestamp will need to be converted i m not ble to get it working.Issue is how can i assing the return value from UDF back to the dataframe column

下面是代码段

    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test3").getOrCreate();
      import org.apache.spark.sql.functions._
      val sqlContext  = spark.sqlContext
      val df2 = sqlContext.jsonRDD(spark.sparkContext.parallelize(Array(
        """{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": "","manufacture_ts":"2017-10-16 00:00:00"}""",
        """{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": "","manufacture_ts":"2017-10-16 00:00:00"}""",
      )))

      val convertTimeStamp = udf { (manTs :java.sql.Timestamp) =>
        manTs.getTime
      }

        df2.withColumn("manufacture_ts",getTime(df2("manufacture_ts"))).show

       +-----+----------+-----+--------------+-----+----+
        |     |No Comment|Tesla| 1508126400000|    S|2012|
        |     |   Get one| Ford| 1508126400000| E350|1997|
        |     |          |Chevy| 1508126400000| Volt|2015|
        +-----+----------+-----+--------------+-----+----+ 

    Now i want to invoke this from a dataframe to be clled on all columns which are of type long

    object Test4 extends App{

        val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test").getOrCreate();
        import spark.implicits._

        import scala.collection.JavaConversions._    
        val long : Long  = "1508299200000".toLong    

        val data = Seq(Row("10000020_LUX_OTC",long,"2020-02-14"))

        val schema = List( StructField("rowkey",StringType,true)
                                  ,StructField("order_receipt_dt",LongType,true)
                                  ,StructField("maturity_dt",StringType,true))

        val dataDF =  spark.createDataFrame(spark.sparkContext.parallelize(data),StructType(schema))

        val modifedDf2= schema.foldLeft(dataDF) { case (newDF,StructField(name,dataType,flag,metadata)) =>
          newDF.withColumn(name,DataTypeUtil.transformLong(newDF,name,dataType.typeName))
modifedDf2,show
        }

      }


      val convertTimeStamp = udf { (manTs :java.sql.Timestamp) =>
        manTs.getTime
      }

      def transformLong(dataFrame: DataFrame,name:String, fieldType:String):Column = {
        import org.apache.spark.sql.functions._

        fieldType.toLowerCase match {

          case "timestamp"  => convertTimeStamp(dataFrame(name))
          case _ => dataFrame.col(name)
        }
      }


推荐答案

如果时间戳为null,则您的udf可能崩溃了。

Maybe your udf crashed if the timestamp is nullYou can do :


  • 使用 unix_timestamp 而不是UDF ..或使您的UDF为空安全

  • 仅适用于需要转换的字段。

  • use unix_timestamp instead of UDF.. or make your UDF null-safe
  • only apply on fields which need to be converted.

给出数据:

导入spark.implicits ._

import spark.implicits._

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

val df = Seq(
  (1L,Timestamp.valueOf(LocalDateTime.now()),Timestamp.valueOf(LocalDateTime.now()))
).toDF("id","ts1","ts2")

您可以执行以下操作:

val newDF = df.schema.fields.filter(_.dataType == TimestampType).map(_.name)
  .foldLeft(df)((df,field) => df.withColumn(field,unix_timestamp(col(field))))

newDF.show()

给出:

+---+----------+----------+
| id|       ts1|       ts2|
+---+----------+----------+
|  1|1589109282|1589109282|
+---+----------+----------+

这篇关于Scala Spark DataFrame用udf返回值修改列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆