通过在此数据框中的另一列上应用 udf,在 pyspark 数据框中创建一个新列 [英] Create a new column in pyspark dataframe by applying a udf on another column from this dataframe

查看:86
本文介绍了通过在此数据框中的另一列上应用 udf,在 pyspark 数据框中创建一个新列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的数据是数据集菱形:

My data is dataset diamond:

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|

我创建了一个函数,它读取列克拉并返回每个值的间隔.我需要用这个间隔形成一个新列.

I have created a function which reads columns carat and returns interval for every value. I need to form a new column with this intervals.

结果应该是:

carat carat_bin
0.23    (0.1)
1.5      (1,2)

到目前为止我的代码是:

My code so far is:

def carat_bin(size) :
  if ((df['size'] >0) & (df['size'] <= 1)):
    return '[0,1)'
  if ((df['size'] >1) & (df['size'] <= 2)):
    return '[1,2)'
  if ((df['size'] >2) & (df['size'] <= 3)):
    return '[2,3)'
  if ((df['size'] >3) & (df['size'] <= 4)):
    return '[3,4)'
  if ((df['size'] >4) & (df['size'] <= 5)):
    return '[4,5)'
  elif df['size'] :
    return '[5, 6)'
  spark.udf.register('carat_bin', carat_bin)
  tst = diamonds.withColumn("carat_bin", carat_bin(diamonds['carat']))

但我得到的是:

Cannot resolve column name "size" among (carat, cut, color, clarity, depth, table, price, x, y, z);

我在这里缺少什么?

推荐答案

修改您的解决方案

您的问题是您的 udf 明确地寻找全局定义的 df 并且没有以任何方式使用它的 size 参数.

Modifying your solution

Your problem is that your udf is explicitly looking for a the globally defined df and is not using it's size parameter in any way.

试试这个:

from pyspark.sql import functions as F
from pyspark.sql.types import StringType

@F.udf(StringType())
def bin_carat(s):
    if 0 < s <= 1:
        return '[0,1)'
    if 1 < s <= 2:
        return '[1,2)'
    if 2 < s <= 3:
        return '[2,3)'
    if 3 < s <= 4:
        return '[3,4)'
    if 4 < s <= 5:
        return '[4,5)'
    elif s:
        return '[5, 6)'

diamonds.withColumn("carat_bin", bin_carat(diamonds['carat'])).show()

这导致(我稍微修改了您的输入,以便您可以看到不同的情况):

This results in (I modified your inputs slightly so that one can see the different cases):

+-----+-------+-----+-------+-----+-----+-----+----+----+----+---------+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|carat_bin|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+---------+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|    [0,1)|
| 1.34|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|    [1,2)|
| 2.45|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|    [2,3)|
| 3.12|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|    [3,4)|
|  5.6|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|   [5, 6)|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+---------+

对于您的数据框,正如预期的那样.使用 spark.udf.register('carat_bin', carat_bin) 时似乎存在根本差异,这总是导致错误.

For your dataframe, just as expected. There seems to be a fundamental difference when using spark.udf.register('carat_bin', carat_bin) which always led to an error.

如果您使用 pyspark 2.3 及更高版本,则使用 Pandas udfs 有一种更简单的方法来实现这一点.只需看看以下内容:

If you use pyspark 2.3 and above, there is an even simpler way to achieve this using pandas udfs. Just have a look at the following:

from pyspark.sql.functions import PandasUDFType
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf(StringType(), PandasUDFType.SCALAR)
def cut_to_str(s):
    return pd.cut(s, bins=[0,1,2,3,4,5], labels=['[0,1)', '[1,2)', '[2,3)', '[3,4)', '[4,5)']).astype(str)

以与之前定义的 udf 相同的方式使用它:

Use this in the same fashion as the previously defined udf:

diamonds.withColumn("carat_bin", cut_to_str(diamonds['carat'])).show()

它会产生与上面显示的完全相同的数据帧.

And it will result in the exact same dataframe as shown above.

这篇关于通过在此数据框中的另一列上应用 udf,在 pyspark 数据框中创建一个新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆