Scala Spark UDF过滤器结构数组 [英] scala spark UDF filter array of struct
问题描述
我有一个带有架构的数据框
I have a dataframe with schema
root
|-- x: Long (nullable = false)
|-- y: Long (nullable = false)
|-- features: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- score: double (nullable = true)
例如,我有数据
+--------------------+--------------------+------------------------------------------+
| x | y | features |
+--------------------+--------------------+------------------------------------------+
|10 | 9 |[["f1", 5.9], ["ft2", 6.0], ["ft3", 10.9]]|
|11 | 0 |[["f4", 0.9], ["ft1", 4.0], ["ft2", 0.9] ]|
|20 | 9 |[["f5", 5.9], ["ft2", 6.4], ["ft3", 1.9] ]|
|18 | 8 |[["f1", 5.9], ["ft4", 8.1], ["ft2", 18.9]]|
+--------------------+--------------------+------------------------------------------+
我想使用特定的前缀(例如"ft")过滤功能,因此最终我需要结果:
I would like to filter the features with a particular prefix, say "ft", so eventually I want the result:
+--------------------+--------------------+-----------------------------+
| x | y | features |
+--------------------+--------------------+-----------------------------+
|10 | 9 |[["ft2", 6.0], ["ft3", 10.9]]|
|11 | 0 |[["ft1", 4.0], ["ft2", 0.9] ]|
|20 | 9 |[["ft2", 6.4], ["ft3", 1.9] ]|
|18 | 8 |[["ft4", 8.1], ["ft2", 18.9]]|
+--------------------+--------------------+-----------------------------+
我未使用Spark 2.4+,因此无法使用此处提供的解决方案:
I'm not using Spark 2.4+ so I cannot use the solution provided here: Spark (Scala) filter array of structs without explode
我尝试使用UDF,但仍然无法正常工作.这是我的尝试.我定义了一个UDF:
I tried to use UDF, but still does not work. Here are my attempts. I define a UDF:
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}
)
但是如果我应用此UDF
But if I apply this UDF
df.withColumn("filtered", filterFeature($"features"))
我收到错误 org.apache.spark.sql.Row类型的架构
.我发现我无法从UDF返回 Row
.然后我尝试了
I get the error Schema for type org.apache.spark.sql.Row is not supported
. I found that I can't return Row
from UDF. Then I tried
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, (StringType, DoubleType)
)
然后我得到一个错误:
error: type mismatch;
found : (org.apache.spark.sql.types.StringType.type, org.apache.spark.sql.types.DoubleType.type)
required: org.apache.spark.sql.types.DataType
}, (StringType, DoubleType)
^
我还尝试了一些答案所建议的案例类:
I also tried a case class as suggested by some answers:
case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, FilteredFeature
)
但是我得到了
error: type mismatch;
found : FilteredFeature.type
required: org.apache.spark.sql.types.DataType
}, FilteredFeature
^
我尝试过:
case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, Seq[FilteredFeature]
)
我得到了:
<console>:192: error: missing argument list for method apply in class GenericCompanion
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `apply _` or `apply(_)` instead of `apply`.
}, Seq[FilteredFeature]
^
我尝试过:
case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, Seq[FilteredFeature](_)
)
我得到了:
<console>:201: error: type mismatch;
found : Seq[FilteredFeature]
required: FilteredFeature
}, Seq[FilteredFeature](_)
^
在这种情况下我该怎么办?
What should I do in this case?
推荐答案
您有两个选择:
a)为UDF提供一个架构,让我们返回 Seq [Row]
a) provide a schema to the UDF, this let's you return Seq[Row]
b)将 Seq [Row]
转换为 Tuple2
或案例类的 Seq
,那么您无需提供模式(但是,如果您使用元组,则结构字段名称会丢失!)
b) convert Seq[Row]
to a Seq
of Tuple2
or a case class, then you don't need to provide a schema (but struct field names are lost if you use Tuples!)
对于您的情况,我希望选择选项a)(对于具有多个字段的结构效果很好):
I would prefer option a) for your case (works well for structs with many fields):
val schema = df.schema("features").dataType
val filterFeature = udf((features:Seq[Row]) => features.filter(_.getAs[String]("name").startsWith("ft")),schema)
这篇关于Scala Spark UDF过滤器结构数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!