在 Pandas UDF PySpark 中传递多列 [英] Passing multiple columns in Pandas UDF PySpark

查看:179
本文介绍了在 Pandas UDF PySpark 中传递多列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想计算 PySpark DataFrame 的两列之间的 Jaro Winkler 距离.Jaro Winkler 距离可通过所有节点上的 pyjarowinkler 包获得.

I want to calculate the Jaro Winkler distance between two columns of a PySpark DataFrame. Jaro Winkler distance is available through pyjarowinkler package on all nodes.

pyjarowinkler 的工作原理如下:

pyjarowinkler works as follows:

from pyjarowinkler import distance
distance.get_jaro_distance("A", "A", winkler=True, scaling=0.1)

输出:

1.0

我正在尝试编写 Pandas UDF 以将两列作为系列传递并使用 lambda 函数计算距离.这是我的做法:

I am trying to write a Pandas UDF to pass two columns as Series and calculate the distance using lambda function. Here's how I am doing it:

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col1, col2):
    import pandas as pd
    distance_df  = pd.DataFrame({'column_A': col1, 'column_B': col2})
    distance_df['distance'] = distance_df.apply(lambda x: distance.get_jaro_distance(str(distance_df['column_A']), str(distance_df['column_B']), winkler = True, scaling = 0.1))
    return distance_df['distance']

temp = temp.withColumn('jaro_distance', get_distance(temp.x, temp.x))

我应该能够在上面的函数中传递任意两个字符串列.我得到以下输出:

I should be able to pass any two string columns in the above function. I am getting the following output:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|         null|
|  B|  3|  4|         null|
|  C|  5|  6|         null|
|  D|  7|  8|         null|
+---+---+---+-------------+

预期输出:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|          1.0|
|  B|  3|  4|          1.0|
|  C|  5|  6|          1.0|
|  D|  7|  8|          1.0|
+---+---+---+-------------+

我怀疑这可能是因为 str(distance_df['column_A']) 不正确.它包含所有行值的串联字符串.

I suspect this might be because str(distance_df['column_A']) is not correct. It contains the concatenated string of all row values.

虽然这段代码对我有用:

While this code works for me:

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col):
    return col.apply(lambda x: distance.get_jaro_distance(x, "A", winkler = True, scaling = 0.1))

temp = temp.withColumn('jaro_distance', get_distance(temp.x))

输出:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|          1.0|
|  B|  3|  4|          0.0|
|  C|  5|  6|          0.0|
|  D|  7|  8|          0.0|
+---+---+---+-------------+

有没有办法用 Pandas UDF 做到这一点?我正在处理数百万条记录,因此 UDF 会很昂贵,但如果有效,仍然可以接受.谢谢.

Is there a way to do this with Pandas UDF? I'm dealing with millions of records so UDF will be expensive but still acceptable if it works. Thanks.

推荐答案

该错误来自您在 df.apply 方法中的函数,将其调整为以下内容即可修复:

The error was from your function in the df.apply method, adjust it to the following should fix it:

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col1, col2):
    import pandas as pd
    distance_df  = pd.DataFrame({'column_A': col1, 'column_B': col2})
    distance_df['distance'] = distance_df.apply(lambda x: distance.get_jaro_distance(x['column_A'], x['column_B'], winkler = True, scaling = 0.1), axis=1)
    return distance_df['distance']

然而,Pandas df.apply 方法不是矢量化的,这超出了我们在 PySpark 中需要 pandas_udf 而不是 udf 的目的.一个更快、更少开销的解决方案是使用列表理解来创建返回的 pd.Series(检查这个 链接 了解有关 Pandas df.apply 及其替代方案的更多讨论):

However, Pandas df.apply method is not vectorised which beats the purpose why we need pandas_udf over udf in PySpark. A faster and less overhead solution is to use list comprehension to create the returning pd.Series (check this link for more discussion about Pandas df.apply and its alternatives):

from pandas import Series

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col1, col2):
   return Series([ distance.get_jaro_distance(c1, c2, winkler=True, scaling=0.1) for c1,c2 in zip(col1, col2) ])

df.withColumn('jaro_distance', get_distance('x', 'y')).show()
+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
| AB| 1B|  2|         0.67|
| BB| BB|  4|          1.0|
| CB| 5D|  6|          0.0|
| DB|B7F|  8|         0.61|
+---+---+---+-------------+

这篇关于在 Pandas UDF PySpark 中传递多列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆