处理 Spark UDF 中的所有列/整行 [英] Process all columns / the entire row in a Spark UDF
问题描述
对于包含字符串和数字数据类型混合的数据框,目标是创建一个新的 features
列,它是所有这些的 minhash
.
虽然这可以通过执行 dataframe.toRDD
来完成,但是当下一步将简单地转换 RDD
back 到数据框.
那么有没有办法按照以下几行执行 udf
:
val wholeRowUdf = udf( (row: Row) => computeHash(row))
Row
当然不是 spark sql
数据类型 - 所以这不会如图所示.
更新/澄清 我意识到创建一个在 withColumn
内运行的全行 UDF 很容易.不太清楚的是可以在 spark sql
语句中使用什么:
val featurizedDf = spark.sql("select WholeRowUdf(这里有什么?)作为特征来自 mytable")
<块引用>Row 当然不是 spark sql 数据类型 - 所以这不会如图所示工作.
我将展示您可以使用 Row 将所有列或选定列传递给使用 struct 内置函数的 udf 函数
首先我定义一个dataframe
val df = Seq(("a", "b", "c"),(a1"、b1"、c1")).toDF("col1", "col2", "col3")//+----+----+----+//|col1|col2|col3|//+----+----+----+//|a |b |c |//|a1 |b1 |c1 |//+----+----+----+
然后我定义一个函数,将一行中的所有元素作为一个由,
分隔的字符串(就像你有computeHash函数一样)
import org.apache.spark.sql.Rowdef concatFunc(row: Row) = row.mkString(", ")
然后我在udf
函数中使用它
import org.apache.spark.sql.functions._def combineUdf = udf((row: Row) => concatFunc(row))
最后,我使用 withColumn
函数和 struct
内置函数 将选定的列组合为一列来调用 udf
函数并传递给 udf
函数
df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)//+----+----+----+------------+//|col1|col2|col3|连接|//+----+----+----+------------+//|a |b |c |a, b, c |//|a1 |b1 |c1 |a1, b1, c1 |//+----+----+----+------------+
所以你可以看到 Row 可以用来作为参数传递整行
您甚至可以一次传递一行中的所有列
val 列 = df.columnsdf.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))
更新
您也可以使用 sql 查询实现相同的功能,您只需要将 udf 函数注册为
df.createOrReplaceTempView("tempview")sqlContext.udf.register("combineUdf", combineUdf)sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) 从 tempview 连接起来")
它会给你和上面一样的结果
现在,如果您不想对列名进行硬编码,那么您可以根据需要选择列名并将其设为字符串
val columns = df.columns.map(x => "`"+x+"`").mkString(",")sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")
希望回答对你有帮助
For a dataframe containing a mix of string and numeric datatypes, the goal is to create a new features
column that is a minhash
of all of them.
While this could be done by performing a dataframe.toRDD
it is expensive to do that when the next step will be to simply convert the RDD
back to a dataframe.
So is there a way to do a udf
along the following lines:
val wholeRowUdf = udf( (row: Row) => computeHash(row))
Row
is not a spark sql
datatype of course - so this would not work as shown.
Update/clarifiction I realize it is easy to create a full-row UDF that runs inside withColumn
. What is not so clear is what can be used inside a spark sql
statement:
val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features
from mytable")
Row is not a spark sql datatype of course - so this would not work as shown.
I am going to show that you can use Row to pass all the columns or selected columns to a udf function using struct inbuilt function
First I define a dataframe
val df = Seq(
("a", "b", "c"),
("a1", "b1", "c1")
).toDF("col1", "col2", "col3")
// +----+----+----+
// |col1|col2|col3|
// +----+----+----+
// |a |b |c |
// |a1 |b1 |c1 |
// +----+----+----+
Then I define a function to make all the elements in a row as one string separated by ,
(as you have computeHash function)
import org.apache.spark.sql.Row
def concatFunc(row: Row) = row.mkString(", ")
Then I use it in udf
function
import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))
Finally I call the udf
function using withColumn
function and struct
inbuilt function combining selected columns as one column and pass to the udf
function
df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)
// +----+----+----+-------------+
// |col1|col2|col3|contcatenated|
// +----+----+----+-------------+
// |a |b |c |a, b, c |
// |a1 |b1 |c1 |a1, b1, c1 |
// +----+----+----+-------------+
So you can see that Row can be used to pass whole row as an argument
You can even pass all columns in a row at once
val columns = df.columns
df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))
Updated
You can achieve the same with sql queries too, you just need to register the udf function as
df.createOrReplaceTempView("tempview")
sqlContext.udf.register("combineUdf", combineUdf)
sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")
It will give you the same result as above
Now if you don't want to hardcode the names of columns then you can select the column names according to your desire and make it a string
val columns = df.columns.map(x => "`"+x+"`").mkString(",")
sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")
I hope the answer is helpful
这篇关于处理 Spark UDF 中的所有列/整行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!