Spark Build自定义列功能,用户定义的功能 [英] Spark Build Custom Column Function, user defined function

查看:102
本文介绍了Spark Build自定义列功能,用户定义的功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Scala,并希望构建自己的DataFrame函数.例如,我想将一列当作array对待,遍历每个元素并进行计算.

首先,我正在尝试实现自己的getMax方法.因此,x列的值为[3,8,2,5,9],该方法的预期输出为9.

这是Scala中的样子

def getMax(inputArray: Array[Int]): Int = {
   var maxValue = inputArray(0)
   for (i <- 1 until inputArray.length if inputArray(i) > maxValue) {
     maxValue = inputArray(i)
   }
   maxValue
}

这是我到目前为止所遇到的,并且得到此错误

"value length is not a member of org.apache.spark.sql.column", 

而且我不知道该如何遍历该列.

def getMax(col: Column): Column = {
var maxValue = col(0)
for (i <- 1 until col.length if col(i) > maxValue){
    maxValue = col(i)
}
maxValue

}

一旦我能够实现自己的方法,我将创建一个列函数

val value_max:org.apache.spark.sql.Column=getMax(df.col(value)).as(value_max)

然后我希望能够在SQL语句中使用它,例如

val sample = sqlContext.sql("SELECT value_max(x) FROM table")

,给定输入列[3,8,2,5,9],预期输出为9

我正在从另一个线程解决方案

在Spark DataFrame中,您无法使用您认为的方法来遍历Column的元素,因为Column不是可迭代的对象.

但是,要处理一列的值,您有一些选择,正确的选择取决于您的任务:

1)使用现有的内置功能

Spark SQL已经具有许多用于处理列的有用功能,包括聚合和转换功能.您可以在functions包(窗口函数.

2)创建UDF

如果无法使用内置函数来完成任务,则可以考虑定义UDF(用户定义函数).当您可以独立处理列的每个项目并且希望产生一个新列且行数与原始列相同(不是聚合列)时,它们很有用.这种方法非常简单:首先,定义一个简单的函数,然后将其注册为UDF,然后使用它.示例:

 def myFunc: (String => String) = { s => s.toLowerCase }

import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)

val newDF = df.withColumn("newCol", myUDF(df("oldCol")))
 

有关更多信息,请此处一篇不错的文章.

3)使用UDAF

如果您的任务是创建聚合数据,则可以定义UDAF(用户定义的聚合功能).我对此没有很多经验,但是我可以为您提供一个不错的教程:

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

4)退回到RDD处理

如果您真的不能使用上述选项,或者如果您处理任务取决于不同的行来处理一个,并且不是聚合,那么我认为您必须选择所需的列并使用相应的列进行处理RDD.示例:

val singleColumnDF = df("column")

val myRDD = singleColumnDF.rdd

// process myRDD


所以,有很多我能想到的选择.希望对您有帮助.

I’m using Scala and want to build my own DataFrame function. For example, I want to treat a column like an array , iterate through each element and make a calculation.

To start off, I’m trying to implement my own getMax method. So column x would have the values [3,8,2,5,9], and the expected output of the method would be 9.

Here is what it looks like in Scala

def getMax(inputArray: Array[Int]): Int = {
   var maxValue = inputArray(0)
   for (i <- 1 until inputArray.length if inputArray(i) > maxValue) {
     maxValue = inputArray(i)
   }
   maxValue
}

This is what I have so far, and get this error

"value length is not a member of org.apache.spark.sql.column", 

and I don't know how else to iterate through the column.

def getMax(col: Column): Column = {
var maxValue = col(0)
for (i <- 1 until col.length if col(i) > maxValue){
    maxValue = col(i)
}
maxValue

}

Once I am able to implement my own method, I will create a column function

val value_max:org.apache.spark.sql.Column=getMax(df.col("value")).as("value_max")

And then I hope to be able to use this in a SQL statement, for example

val sample = sqlContext.sql("SELECT value_max(x) FROM table")

and the expected output would be 9, given input column [3,8,2,5,9]

I am following an answer from another thread Spark Scala - How do I iterate rows in dataframe, and add calculated values as new columns of the data frame where they create a private method for standard deviation. The calculations I will do will be more complex than this, (e.g I will be comparing each element in the column) , am I going in the correct directions or should I be looking more into User Defined Functions?

解决方案

In a Spark DataFrame, you can't iterate through the elements of a Column using the approaches you thought of because a Column is not an iterable object.

However, to process the values of a column, you have some options and the right one depends on your task:

1) Using the existing built-in functions

Spark SQL already has plenty of useful functions for processing columns, including aggregation and transformation functions. Most of them you can find in the functions package (documentation here). Some others (binary functions in general) you can find directly in the Column object (documentation here). So, if you can use them, it's usually the best option. Note: don't forget the Window Functions.

2) Creating an UDF

If you can't complete your task with the built-in functions, you may consider defining an UDF (User Defined Function). They are useful when you can process each item of a column independently and you expect to produce a new column with the same number of rows as the original one (not an aggregated column). This approach is quite simple: first, you define a simple function, then you register it as an UDF, then you use it. Example:

def myFunc: (String => String) = { s => s.toLowerCase }

import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)

val newDF = df.withColumn("newCol", myUDF(df("oldCol")))

For more information, here's a nice article.

3) Using an UDAF

If your task is to create aggregated data, you can define an UDAF (User Defined Aggregation Function). I don't have a lot of experience with this, but I can point you to a nice tutorial:

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

4) Fall back to RDD processing

If you really can't use the options above, or if you processing task depends on different rows for processing one and it's not an aggregation, then I think you would have to select the column you want and process it using the corresponding RDD. Example:

val singleColumnDF = df("column")

val myRDD = singleColumnDF.rdd

// process myRDD


So, there was the options I could think of. I hope it helps.

这篇关于Spark Build自定义列功能,用户定义的功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆