如何从 UDF 创建自定义转换器? [英] How to create a custom Transformer from a UDF?
问题描述
我试图创建并保存一个 管道.我需要使用 UDF
将 column
添加到我的 DataFrame
.因此,我想知道是否可以将 UDF
或类似操作转换为 Transformer
?
I was trying to create and save a Pipeline with custom stages. I need to add a column
to my DataFrame
by using a UDF
. Therefore, I was wondering if it was possible to convert a UDF
or a similar action into a Transformer
?
我的自定义 UDF
看起来像这样,我想学习如何使用 UDF
作为自定义 Transformer
.
My custom UDF
looks like this and I'd like to learn how to do it using an UDF
as a custom Transformer
.
def getFeatures(n: String) = {
val NUMBER_FEATURES = 4
val name = n.split(" +")(0).toLowerCase
((1 to NUMBER_FEATURES)
.filter(size => size <= name.length)
.map(size => name.substring(name.length - size)))
}
val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))
推荐答案
这不是一个功能齐全的解决方案,但您可以从以下内容开始:
It is not a fully featured solution but your can start with something like this:
import org.apache.spark.ml.{UnaryTransformer}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}
class NGramTokenizer(override val uid: String)
extends UnaryTransformer[String, Seq[String], NGramTokenizer] {
def this() = this(Identifiable.randomUID("ngramtokenizer"))
override protected def createTransformFunc: String => Seq[String] = {
getFeatures _
}
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == StringType)
}
override protected def outputDataType: DataType = {
new ArrayType(StringType, true)
}
}
快速检查:
val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v")
val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs")
transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
您甚至可以尝试将其概括为以下内容:
You can even try to generalize it to something like this:
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import scala.reflect.runtime.universe._
class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
override val uid: String,
f: T => U
) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]] {
override protected def createTransformFunc: T => U = f
override protected def validateInputType(inputType: DataType): Unit =
require(inputType == schemaFor[T].dataType)
override protected def outputDataType: DataType = schemaFor[U].dataType
}
val transformer = new UnaryUDFTransformer("featurize", getFeatures)
.setInputCol("v")
.setOutputCol("vs")
如果您想使用 UDF 而不是包装函数,您必须直接扩展 Transformer
并覆盖 transform
方法.不幸的是,大多数有用的类都是私有的,因此可能相当棘手.
If you want to use UDF not the wrapped function you'll have to extend Transformer
directly and override transform
method. Unfortunately majority of the useful classes is private so it can be rather tricky.
或者,您可以注册 UDF:
Alternatively you can register UDF:
spark.udf.register("getFeatures", getFeatures _)
并使用 SQLTransformer
import org.apache.spark.ml.feature.SQLTransformer
val transformer = new SQLTransformer()
.setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__")
transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
这篇关于如何从 UDF 创建自定义转换器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!