Scala UDF返回“不支持单位类型的模式" [英] Scala UDF returning 'Schema for type Unit is not supported'

查看:85
本文介绍了Scala UDF返回“不支持单位类型的模式"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想对数据框中的列进行更改.该列是一个整数数组.我想替换数组的元素,从另一个数组获取索引,并用第三个数组的元素替换该元素.示例:我有三个列C1,C2,C3,所有三个数组.我要替换C3中的元素,如下所示.

I want to make changes to a column in the dataframe. The column is an Array for Integers. I want to replace an elements of the array, taking index from another array and replacing that element with an element from third array. Example: I have three columns C1, C2, C3 all three arrays. I want to replace elements in C3 as follows.

C3[C2[i]] = C1[i].

我写了以下UDF:

def UpdateHist = udf((CRF_count: Seq[Long], Day: Seq[String], History: Seq[Int])=> for(i <- 0 to Day.length-1){History.updated(Day(i).toInt-1 , CRF_count(i).toInt)})

并执行以下操作:

histdate3.withColumn("History2", UpdateHist2(col("CRF_count"), col("Day"), col("History"))).show()

但是它返回如下错误:

scala> histdate3.withColumn("History2", UpdateHist2(col("CRF_count"), col("Day"), col("History"))).show()

java.lang.UnsupportedOperationException:不支持单元类型的模式 在org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor(ScalaReflection.scala:733) 在org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor(ScalaReflection.scala:671) 在org.apache.spark.sql.functions $ .udf(functions.scala:3100) 在UpdateHist2(:25) ... 48消失

java.lang.UnsupportedOperationException: Schema for type Unit is not supported at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:733) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:671) at org.apache.spark.sql.functions$.udf(functions.scala:3100) at UpdateHist2(:25) ... 48 elided

我认为我要返回一些不同的类型,即不支持的View类型.请帮我解决这个问题.

I think I'm returning some different type, a View type which is not supported. Please help me how I can solve this.

推荐答案

您的for循环返回Unit,因此会出现错误消息.您可以使用for-yield返回值,但是由于Seq应该依次为updated,因此简单的foldLeft会更好地工作:

Your for loop returns a Unit hence the error message. You could use for-yield to return values, but since the Seq should be updated successively, a simple foldLeft would work better:

import org.apache.spark.sql.functions._

val df = Seq(
  (Seq(101L, 102L), Seq("1", "2"), Seq(11, 12)),
  (Seq(201L, 202L, 203L), Seq("2", "3"), Seq(21, 22, 23))
).toDF("C1", "C2", "C3")
// +---------------+------+------------+
// |C1             |C2    |C3          |
// +---------------+------+------------+
// |[101, 102]     |[1, 2]|[11, 12]    |
// |[201, 202, 203]|[2, 3]|[21, 22, 23]|
// +---------------+------+------------+

def updateC3 = udf( (c1: Seq[Long], c2: Seq[String], c3: Seq[Int]) =>
  c2.foldLeft( c3 ){ (acc, i) =>
    val idx = i.toInt - 1
    acc.updated(idx, c1(idx).toInt)
  }
)

df.withColumn("C3", updateC3($"C1", $"C2", $"C3")).show(false)
// +---------------+------+--------------+
// |C1             |C2    |C3            |
// +---------------+------+--------------+
// |[101, 102]     |[1, 2]|[101, 102]    |
// |[201, 202, 203]|[2, 3]|[21, 202, 203]|
// +---------------+------+--------------+

这篇关于Scala UDF返回“不支持单位类型的模式"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆