处理 Spark UDF 中的所有列/整行 [英] Process all columns / the entire row in a Spark UDF

查看:36
本文介绍了处理 Spark UDF 中的所有列/整行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于包含字符串和数字数据类型混合的数据框,目标是创建一个新的 features 列,它是所有这些的 minhash.

虽然这可以通过执行 dataframe.toRDD 来完成,但是当下一步将简单地转换 RDD back 到数据框.

那么有没有办法按照以下几行执行 udf :

val wholeRowUdf = udf( (row: Row) => computeHash(row))

Row 当然不是 spark sql 数据类型 - 所以这不会如图所示.

更新/澄清 我意识到创建一个在 withColumn 内运行的全行 UDF 很容易.不太清楚的是可以在 spark sql 语句中使用什么:

val featurizedDf = spark.sql("select WholeRowUdf(这里有什么?)作为特征来自 mytable")

解决方案

<块引用>

Row 当然不是 spark sql 数据类型 - 所以这不会如图所示工作.

我将展示您可以使用 Row 将所有列或选定列传递给使用 struct 内置函数的 udf 函数

首先我定义一个dataframe

val df = Seq(("a", "b", "c"),(a1"、b1"、c1")).toDF("col1", "col2", "col3")//+----+----+----+//|col1|col2|col3|//+----+----+----+//|a |b |c |//|a1 |b1 |c1 |//+----+----+----+

然后我定义一个函数,将一行中的所有元素作为一个由,分隔的字符串(就像你有computeHash函数一样)

import org.apache.spark.sql.Rowdef concatFunc(row: Row) = row.mkString(", ")

然后我在udf函数中使用它

import org.apache.spark.sql.functions._def combineUdf = udf((row: Row) => concatFunc(row))

最后,我使用 withColumn 函数和 struct 内置函数 将选定的列组合为一列来调用 udf 函数并传递给 udf 函数

df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)//+----+----+----+------------+//|col1|col2|col3|连接|//+----+----+----+------------+//|a |b |c |a, b, c |//|a1 |b1 |c1 |a1, b1, c1 |//+----+----+----+------------+

所以你可以看到 Row 可以用来作为参数传递整行

您甚至可以一次传递一行中的所有列

val 列 = df.columnsdf.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))

更新

您也可以使用 sql 查询实现相同的功能,您只需要将 udf 函数注册为

df.createOrReplaceTempView("tempview")sqlContext.udf.register("combineUdf", combineUdf)sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) 从 tempview 连接起来")

它会给你和上面一样的结果

现在,如果您不想对列名进行硬编码,那么您可以根据需要选择列名并将其设为字符串

val columns = df.columns.map(x => "`"+x+"`").mkString(",")sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")

希望回答对你有帮助

For a dataframe containing a mix of string and numeric datatypes, the goal is to create a new features column that is a minhash of all of them.

While this could be done by performing a dataframe.toRDD it is expensive to do that when the next step will be to simply convert the RDD back to a dataframe.

So is there a way to do a udf along the following lines:

val wholeRowUdf = udf( (row: Row) =>  computeHash(row))

Row is not a spark sql datatype of course - so this would not work as shown.

Update/clarifiction I realize it is easy to create a full-row UDF that runs inside withColumn. What is not so clear is what can be used inside a spark sql statement:

val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features 
                              from mytable")

解决方案

Row is not a spark sql datatype of course - so this would not work as shown.

I am going to show that you can use Row to pass all the columns or selected columns to a udf function using struct inbuilt function

First I define a dataframe

val df = Seq(
  ("a", "b", "c"),
  ("a1", "b1", "c1")
).toDF("col1", "col2", "col3")
//    +----+----+----+
//    |col1|col2|col3|
//    +----+----+----+
//    |a   |b   |c   |
//    |a1  |b1  |c1  |
//    +----+----+----+

Then I define a function to make all the elements in a row as one string separated by , (as you have computeHash function)

import org.apache.spark.sql.Row
def concatFunc(row: Row) = row.mkString(", ")

Then I use it in udf function

import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))

Finally I call the udf function using withColumn function and struct inbuilt function combining selected columns as one column and pass to the udf function

df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)
//    +----+----+----+-------------+
//    |col1|col2|col3|contcatenated|
//    +----+----+----+-------------+
//    |a   |b   |c   |a, b, c      |
//    |a1  |b1  |c1  |a1, b1, c1   |
//    +----+----+----+-------------+

So you can see that Row can be used to pass whole row as an argument

You can even pass all columns in a row at once

val columns = df.columns
df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))

Updated

You can achieve the same with sql queries too, you just need to register the udf function as

df.createOrReplaceTempView("tempview")
sqlContext.udf.register("combineUdf", combineUdf)
sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")

It will give you the same result as above

Now if you don't want to hardcode the names of columns then you can select the column names according to your desire and make it a string

val columns = df.columns.map(x => "`"+x+"`").mkString(",")
sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")

I hope the answer is helpful

这篇关于处理 Spark UDF 中的所有列/整行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆