如何在Pyspark中的withColumn函数中使用数据框中的函数? [英] How can I use a function in dataframe withColumn function in Pyspark?

查看:1509
本文介绍了如何在Pyspark中的withColumn函数中使用数据框中的函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些字典和一个定义的函数:

I have the some dictionaries and a function defined:

dict_TEMPERATURE = {(0, 70): 'Low', (70.01, 73.99): 'Normal-Low',(74, 76): 'Normal', (76.01, 80): 'Normal-High', (80.01, 300): 'High'}
...
hierarchy_dict = {'TEMP': dict_TEMPERATURE, 'PRESS': dict_PRESSURE, 'SH_SP': dict_SHAFT_SPEED, 'POI': dict_POI, 'TRIG': dict_TRIGGER}



def function_definition(valor, atributo):

    dict_atributo = hierarchy_dict[atributo]
    valor_generalizado = None

    if isinstance(valor, (int, long, float, complex)):

        for key, value in dict_atributo.items():

            if(isinstance(key, tuple)):
                lista = list(key)

                if (valor > key[0] and valor < key[1]):
                    valor_generalizado = value

    else: # if it is not numeric
        valor_generalizado = dict_atributo.get(valor)


    return valor_generalizado

此函数的基本作用是:检查作为参数传递给"function_definition"函数的值,并根据其字典引用替换其值.

What this function basically do is: check the value which is passed as an argument to the "function_definition" function, and replace its value according to its dictionary's references.

因此,如果我调用"function_definition(60,'TEMP')",它将返回"LOW".

So, if I call "function_definition(60, 'TEMP')" it will return 'LOW'.

另一方面,我有一个具有下一个结构的数据框(这是一个示例):

On the other hand, I have a dataframe with the next structure (this is an example):

+----+-----+-----+---+----+
|TEMP|SH_SP|PRESS|POI|TRIG|
+----+-----+-----+---+----+
|   0|    1|    2|  0|   0|
|   0|    2|    3|  1|   1|
|   0|    3|    4|  2|   1|
|   0|    4|    5|  3|   1|
|   0|    5|    6|  4|   1|
|   0|    1|    2|  5|   1|
+----+-----+-----+---+----+

我想要做的是根据上面定义的函数替换数据框的一列的值,所以我有了下一个代码行:

What I want to do is to replace the values of one column of the dataframe based on the function defined above, so I have the next code-line:

dataframe_new = dataframe.withColumn(atribute_name, function_definition(dataframe[atribute_name], atribute_name))

但是执行它时,我会收到下一条错误消息:

But I get the next error message when executing it:

AssertionError: col should be Column

我的代码有什么问题?该怎么办?

What is wrong in my code? How could do that?

推荐答案

您的 function_definition(valor,atributo)为单个 valor_generalizado ) >勇气.

Your function_definition(valor,atributo) returns a single String (valor_generalizado) for a single valor.

AssertionError:col应该为列表示您正在将参数传递给

AssertionError: col should be Column means that you are passing an argument to WithColumn(colName,col) that is not a Column. So you have to transform your data, in order to have Column, for example as you can see below.

例如数据框(与您的结构相同):

Dataframe for example (same structure as yours):

a = [(10.0,1.2),(73.0,4.0)] # like your dataframe, this is only an example

dataframe = spark.createDataFrame(a,["tp", "S"]) # tp and S are random names for these columns

dataframe.show()
+----+---+
|  tp|  S|
+----+---+
|10.0|1.2|
|73.0|4.0|
+----+---+

您会看到这里

udf 创建代表用户定义函数(UDF)的列表达式.

udf Creates a Column expression representing a user defined function (UDF).

解决方案:

from pyspark.sql.functions import udf

attr = 'TEMP'
udf_func = udf(lambda x: function_definition(x,attr),returnType=StringType())

dataframe_new = dataframe.withColumn("newCol",udf_func(dataframe.tp))
dataframe_new.show()

+----+---+----------+
|  tp|  S|    newCol|
+----+---+----------+
|10.0|1.2|       Low|
|73.0|4.0|Normal-Low|
+----+---+----------+

这篇关于如何在Pyspark中的withColumn函数中使用数据框中的函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆