Spark Build Custom Column Function,用户自定义函数 [英] Spark Build Custom Column Function, user defined function

查看:27
本文介绍了Spark Build Custom Column Function,用户自定义函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Scala 并想构建我自己的 DataFrame 函数.例如,我想将一列视为数组,遍历每个元素并进行计算.

I’m using Scala and want to build my own DataFrame function. For example, I want to treat a column like an array , iterate through each element and make a calculation.

首先,我正在尝试实现我自己的 getMax 方法.因此,列 x 的值为 [3,8,2,5,9],该方法的预期输出为 9.

To start off, I’m trying to implement my own getMax method. So column x would have the values [3,8,2,5,9], and the expected output of the method would be 9.

这是在 Scala 中的样子

Here is what it looks like in Scala

def getMax(inputArray: Array[Int]): Int = {
   var maxValue = inputArray(0)
   for (i <- 1 until inputArray.length if inputArray(i) > maxValue) {
     maxValue = inputArray(i)
   }
   maxValue
}

这是我目前所拥有的,并得到这个错误

This is what I have so far, and get this error

"value length is not a member of org.apache.spark.sql.column", 

而且我不知道如何遍历该列.

and I don't know how else to iterate through the column.

def getMax(col: Column): Column = {
var maxValue = col(0)
for (i <- 1 until col.length if col(i) > maxValue){
    maxValue = col(i)
}
maxValue

}

一旦我能够实现我自己的方法,我将创建一个列函数

Once I am able to implement my own method, I will create a column function

val value_max:org.apache.spark.sql.Column=getMax(df.col("value")).as("value_max")

然后我希望能够在SQL语句中使用这个,例如

And then I hope to be able to use this in a SQL statement, for example

val sample = sqlContext.sql("SELECT value_max(x) FROM table")

并且给定输入列 [3,8,2,5,9],预期输出为 9

and the expected output would be 9, given input column [3,8,2,5,9]

我正在关注另一个线程的回答 Spark Scala - 如何迭代数据帧中的行,并将计算值添加为数据帧的新列,并在其中创建标准偏差的私有方法.我将进行的计算将比这更复杂(例如,我将比较列中的每个元素),我是朝着正确的方向前进还是应该更多地研究用户定义的函数?

I am following an answer from another thread Spark Scala - How do I iterate rows in dataframe, and add calculated values as new columns of the data frame where they create a private method for standard deviation. The calculations I will do will be more complex than this, (e.g I will be comparing each element in the column) , am I going in the correct directions or should I be looking more into User Defined Functions?

推荐答案

在 Spark DataFrame 中,您无法使用您想到的方法遍历 Column 的元素,因为 Column 不是可迭代对象.

In a Spark DataFrame, you can't iterate through the elements of a Column using the approaches you thought of because a Column is not an iterable object.

>

但是,要处理一列的值,您有一些选择,正确的选择取决于您的任务:

However, to process the values of a column, you have some options and the right one depends on your task:

1) 使用现有的内置函数

Spark SQL 已经有很多有用的函数来处理列,包括聚合和转换函数.其中大部分都可以在 functions 包中找到 (此处的文档).您可以直接在 Column 对象 (此处的文档).因此,如果您可以使用它们,那通常是最好的选择.注意:不要忘记 窗口函数.

Spark SQL already has plenty of useful functions for processing columns, including aggregation and transformation functions. Most of them you can find in the functions package (documentation here). Some others (binary functions in general) you can find directly in the Column object (documentation here). So, if you can use them, it's usually the best option. Note: don't forget the Window Functions.

2) 创建 UDF

如果你不能用内置函数完成你的任务,你可以考虑定义一个UDF(用户定义函数).当您可以独立处理列的每个项目并且您希望生成与原始列(不是聚合列)行数相同的新列时,它们很有用.这种方法非常简单:首先,定义一个简单的函数,然后将其注册为 UDF,然后使用它.示例:

If you can't complete your task with the built-in functions, you may consider defining an UDF (User Defined Function). They are useful when you can process each item of a column independently and you expect to produce a new column with the same number of rows as the original one (not an aggregated column). This approach is quite simple: first, you define a simple function, then you register it as an UDF, then you use it. Example:

def myFunc: (String => String) = { s => s.toLowerCase }

import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)

val newDF = df.withColumn("newCol", myUDF(df("oldCol")))

欲了解更多信息,这里是 一篇不错的文章.

For more information, here's a nice article.

3) 使用 UDAF

如果您的任务是创建聚合数据,您可以定义一个 UDAF(用户定义聚合函数).我在这方面没有很多经验,但我可以为您提供一个不错的教程:

If your task is to create aggregated data, you can define an UDAF (User Defined Aggregation Function). I don't have a lot of experience with this, but I can point you to a nice tutorial:

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

4) 退回到 RDD 处理

如果你真的不能使用上面的选项,或者如果你处理任务依赖于不同的行来处理一个而不是聚合,那么我认为你必须选择你想要的列并使用相应的处理它RDD.示例:

If you really can't use the options above, or if you processing task depends on different rows for processing one and it's not an aggregation, then I think you would have to select the column you want and process it using the corresponding RDD. Example:

val singleColumnDF = df("column")

val myRDD = singleColumnDF.rdd

// process myRDD

<小时>

所以,有我能想到的选项.我希望它有所帮助.


So, there was the options I could think of. I hope it helps.

这篇关于Spark Build Custom Column Function,用户自定义函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆