在Spark UDF中处理所有列/整个行 [英] Process all columns / the entire row in a Spark UDF

查看:468
本文介绍了在Spark UDF中处理所有列/整个行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于包含字符串和数字数据类型混合的数据框,目标是创建一个新的features列,该列是所有这些值的minhash.

For a dataframe containing a mix of string and numeric datatypes, the goal is to create a new features column that is a minhash of all of them.

虽然可以通过执行dataframe.toRDD来完成此操作,但是下一步只是将RDD back 转换为数据帧时,这样做非常昂贵.

While this could be done by performing a dataframe.toRDD it is expensive to do that when the next step will be to simply convert the RDD back to a dataframe.

那么有一种方法可以按照以下几行做udf:

So is there a way to do a udf along the following lines:

val wholeRowUdf = udf( (row: Row) =>  computeHash(row))

Row当然不是spark sql数据类型-因此,它将无法如图所示工作.

Row is not a spark sql datatype of course - so this would not work as shown.

更新/说明我意识到创建在withColumn内部运行的全行UDF很容易.尚不清楚在spark sql语句中可以使用什么:

Update/clarifiction I realize it is easy to create a full-row UDF that runs inside withColumn. What is not so clear is what can be used inside a spark sql statement:

val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features 
                              from mytable")

推荐答案

行当然不是Spark sql数据类型-因此,它将无法如图所示工作.

Row is not a spark sql datatype of course - so this would not work as shown.

我将展示您可以使用Row使用结构内置函数将所有列或选定的列传递给udf函数

首先,我定义一个dataframe

val df = Seq(
  ("a", "b", "c"),
  ("a1", "b1", "c1")
).toDF("col1", "col2", "col3")
//    +----+----+----+
//    |col1|col2|col3|
//    +----+----+----+
//    |a   |b   |c   |
//    |a1  |b1  |c1  |
//    +----+----+----+

然后我定义一个函数,使一行中的所有元素成为由, 分隔的一个字符串(如您具有computeHash函数)

Then I define a function to make all the elements in a row as one string separated by , (as you have computeHash function)

import org.apache.spark.sql.Row
def concatFunc(row: Row) = row.mkString(", ")

然后在udf函数

import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))

最后,我使用withColumn函数和struct 内置函数调用udf函数,将选定的列合并为一列并传递给udf函数

Finally I call the udf function using withColumn function and struct inbuilt function combining selected columns as one column and pass to the udf function

df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)
//    +----+----+----+-------------+
//    |col1|col2|col3|contcatenated|
//    +----+----+----+-------------+
//    |a   |b   |c   |a, b, c      |
//    |a1  |b1  |c1  |a1, b1, c1   |
//    +----+----+----+-------------+

因此您可以看到 Row可用于将整行作为参数传递

您甚至可以一次连续传递所有列

val columns = df.columns
df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))

已更新

您也可以通过sql查询实现相同的功能,只需将udf函数注册为

You can achieve the same with sql queries too, you just need to register the udf function as

df.createOrReplaceTempView("tempview")
sqlContext.udf.register("combineUdf", combineUdf)
sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")

它将为您提供与上述相同的结果

It will give you the same result as above

现在,如果您不想对列名进行硬编码,则可以根据需要选择列名并将其设置为字符串

Now if you don't want to hardcode the names of columns then you can select the column names according to your desire and make it a string

val columns = df.columns.map(x => "`"+x+"`").mkString(",")
sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")

我希望答案会有所帮助

这篇关于在Spark UDF中处理所有列/整个行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆