在Spark UDF中处理所有列/整个行 [英] Process all columns / the entire row in a Spark UDF
问题描述
对于包含字符串和数字数据类型混合的数据框,目标是创建一个新的features
列,该列是所有这些值的minhash
.
For a dataframe containing a mix of string and numeric datatypes, the goal is to create a new features
column that is a minhash
of all of them.
虽然可以通过执行dataframe.toRDD
来完成此操作,但是下一步只是将RDD
back 转换为数据帧时,这样做非常昂贵.
While this could be done by performing a dataframe.toRDD
it is expensive to do that when the next step will be to simply convert the RDD
back to a dataframe.
那么有一种方法可以按照以下几行做udf
:
So is there a way to do a udf
along the following lines:
val wholeRowUdf = udf( (row: Row) => computeHash(row))
Row
当然不是spark sql
数据类型-因此,它将无法如图所示工作.
Row
is not a spark sql
datatype of course - so this would not work as shown.
更新/说明我意识到创建在withColumn
内部运行的全行UDF很容易.尚不清楚在spark sql
语句中可以使用什么:
Update/clarifiction I realize it is easy to create a full-row UDF that runs inside withColumn
. What is not so clear is what can be used inside a spark sql
statement:
val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features
from mytable")
推荐答案
行当然不是Spark sql数据类型-因此,它将无法如图所示工作.
Row is not a spark sql datatype of course - so this would not work as shown.
我将展示您可以使用Row使用结构内置函数将所有列或选定的列传递给udf函数
首先,我定义一个dataframe
val df = Seq(
("a", "b", "c"),
("a1", "b1", "c1")
).toDF("col1", "col2", "col3")
// +----+----+----+
// |col1|col2|col3|
// +----+----+----+
// |a |b |c |
// |a1 |b1 |c1 |
// +----+----+----+
然后我定义一个函数,使一行中的所有元素成为由,
分隔的一个字符串(如您具有computeHash函数)
Then I define a function to make all the elements in a row as one string separated by ,
(as you have computeHash function)
import org.apache.spark.sql.Row
def concatFunc(row: Row) = row.mkString(", ")
然后在udf
函数
import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))
最后,我使用withColumn
函数和struct
内置函数调用udf
函数,将选定的列合并为一列并传递给udf
函数
Finally I call the udf
function using withColumn
function and struct
inbuilt function combining selected columns as one column and pass to the udf
function
df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)
// +----+----+----+-------------+
// |col1|col2|col3|contcatenated|
// +----+----+----+-------------+
// |a |b |c |a, b, c |
// |a1 |b1 |c1 |a1, b1, c1 |
// +----+----+----+-------------+
因此您可以看到 Row可用于将整行作为参数传递
您甚至可以一次连续传递所有列
val columns = df.columns
df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))
已更新
您也可以通过sql查询实现相同的功能,只需将udf函数注册为
You can achieve the same with sql queries too, you just need to register the udf function as
df.createOrReplaceTempView("tempview")
sqlContext.udf.register("combineUdf", combineUdf)
sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")
它将为您提供与上述相同的结果
It will give you the same result as above
现在,如果您不想对列名进行硬编码,则可以根据需要选择列名并将其设置为字符串
Now if you don't want to hardcode the names of columns then you can select the column names according to your desire and make it a string
val columns = df.columns.map(x => "`"+x+"`").mkString(",")
sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")
我希望答案会有所帮助
这篇关于在Spark UDF中处理所有列/整个行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!