Spark-从地图调用自定义函数时,出现java.lang.UnsupportedOperationException [英] Spark - I get a java.lang.UnsupportedOperationException when I invoke a custom function from a map
问题描述
我有一个DataFrame,其结构类似于:
I have a DataFrame with a structure similar to:
root
|-- NPAData: struct (nullable = true)
| |-- NPADetails: struct (nullable = true)
| | |-- location: string (nullable = true)
| | |-- manager: string (nullable = true)
| |-- service: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- serviceName: string (nullable = true)
| | | |-- serviceCode: string (nullable = true)
|-- NPAHeader: struct (nullable = true)
| | |-- npaNumber: string (nullable = true)
| | |-- date: string (nullable = true)
我正在尝试的是:
- 将具有相同
npaNumber
的记录分组到列表中 - 在每个列表中,根据元素的
date
对其进行排序
- 将元素分组并排序后,我需要合并 元素应用一些逻辑.为了执行此列表步骤,我决定 使用地图.
- Group the records which has got the same
npaNumber
into a list - Inside each list, order the elements depending on their
date
- Once I have the elements grouped and ordered, I need merge the elements applying some logic. To perform this list step I decided to use a map.
这是我到目前为止尝试过的:
Here is what I tried so far:
val toUpdate = sourceDF.withColumn("count", count($"NPAHeader").over(Window.partitionBy("NPAHeader.npaNumber").orderBy($"NPAHeader.date".desc))).filter($"count" > 1)
val groupedNpa = toUpdate.groupBy($"NPAHeader.npaNumber" ).agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))
//This is a simply version of my logic.
def pickOne(List: Seq[Row]): Row = {
println("First element: "+List.get(0))
List.get(0)
}
val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa"))))
groupBy之后的Row的示例为:
An example of a Row after the groupBy would be:
[1234,WrappedArray([npaNew,npaOlder,... npaOldest])]
[1234,WrappedArray([npaNew,npaOlder,...npaOldest])]
但是当我尝试从地图上调用该函数时,我遇到了异常.
But I am getting an exception when I try to invoke the function from the map.
线程主"中的异常java.lang.UnsupportedOperationException:否 找到org.apache.spark.sql.Row的编码器 -字段(类:"org.apache.spark.sql.Row",名称:"_ 2") -根类:"scala.Tuple2"
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row - field (class: "org.apache.spark.sql.Row", name: "_2") - root class: "scala.Tuple2"
我了解的是我无法从地图调用函数pickOne()
(或者至少不是以我尝试的方式).但是我不知道我在做什么错.
What I understand is I can not invoke the function pickOne()
from the map (Or at least not in the way I am trying it). But I don't know what am I doing wrong.
我为什么要例外?
感谢您的时间!
注意:我知道有更简单的方法可以在不调用自定义功能的情况下从列表中选取一个元素.但是我需要调用yes或yes,因为在下一步中,我需要在其中放置更复杂的逻辑来合并行.
Note: I know there are easier ways to pick up one element from the list without invoking the custom function. But I need to invoke it yes or yes, because in the next step I need to place there a far more complex logic to merge rows.
使用Mahesh Chand Kandpal的建议后:
After using Mahesh Chand Kandpal suggestion:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
grouped.map(row => "emdNumber: "+row.getAs[String]("emdNumber"))
val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa"))(RowEncoder(row.schema))))
我收到以下错误:
类型不匹配;成立 : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder [org.apache.spark.sql.Row] 必填:整数
type mismatch; found : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] required: Int
我应该如何应用编码器?
How should I apply the Encoder instead?
推荐答案
将map与数据框一起使用时,需要提供编码器.
When you use map with dataframe, you need to give encoder.
在spark 2.x中Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
In spark 2.x Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
import org.apache.spark.sql.catalyst.encoders.RowEncoder
implicit val encoder = RowEncoder(schema)
这篇关于Spark-从地图调用自定义函数时,出现java.lang.UnsupportedOperationException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!