pyspark 在 udf 中使用数据框 [英] pyspark use dataframe inside udf

查看:30
本文介绍了pyspark 在 udf 中使用数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据帧 df1

+---+---+----------+
|  n|val| distances|
+---+---+----------+
|  1|  1|0.27308652|
|  2|  1|0.24969208|
|  3|  1|0.21314497|
+---+---+----------+

df2

+---+---+----------+
| x1| x2|         w|
+---+---+----------+
|  1|  2|0.03103427|
|  1|  4|0.19012526|
|  1| 10|0.26805446|
|  1|  8|0.26825935|
+---+---+----------+

我想向 df1 添加一个名为 gamma 的新列,它将包含来自 df2 的 w 值的总和df1.n == df2.x1 OR df1.n == df2.x2

I want to add a new column to df1 called gamma, which will contain the sum of the w value from df2 when df1.n == df2.x1 OR df1.n == df2.x2

我尝试使用 udf,但从不同的数据帧中进行选择显然不起作用,因为应在计算之前确定值

I tried to use udf, but apparently selecting from the different dataframe will not work, because values should be determined before calculation

gamma_udf = udf(lambda n: float(df2.filter("x1 = %d OR x2 = %d"%(n,n)).groupBy().sum('w').rdd.map(lambda x: x).collect()[0]), FloatType())
df1.withColumn('gamma1', gamma_udf('n'))

在不使用循环的情况下,有没有什么方法可以通过 joingroupby 做到这一点?

Is there any way of doing it with join or groupby without using cycles?

推荐答案

您不能在 udf 中引用 DataFrame.正如您所提到的,这个问题最好使用 join 来解决.

You can't reference a DataFrame inside of a udf. As your alluded to, this problem is best solved using a join.

IIUC,您正在寻找类似的东西:

IIUC, you are looking for something like:

from pyspark.sql import Window
import pyspark.sql.functions as F

df1.alias("L").join(df2.alias("R"), (df1.n == df2.x1) | (df1.n == df2.x2), how="left")\
    .select("L.*", F.sum("w").over(Window.partitionBy("n")).alias("gamma"))\
    .distinct()\
    .show()
#+---+---+----------+----------+
#|  n|val| distances|     gamma|
#+---+---+----------+----------+
#|  1|  1|0.27308652|0.75747334|
#|  3|  1|0.21314497|      null|
#|  2|  1|0.24969208|0.03103427|
#+---+---+----------+----------+

或者,如果您更喜欢 pyspark-sql 语法,您可以注册临时表并执行:

Or if you're more comfortable with pyspark-sql syntax, you can register temp tables and do:

df1.registerTempTable("df1")
df2.registerTempTable("df2")

sqlCtx.sql(
    "SELECT DISTINCT L.*, SUM(R.w) OVER (PARTITION BY L.n) AS gamma "
    "FROM df1 L LEFT JOIN df2 R ON L.n = R.x1 OR L.n = R.x2"
).show()
#+---+---+----------+----------+
#|  n|val| distances|     gamma|
#+---+---+----------+----------+
#|  1|  1|0.27308652|0.75747334|
#|  3|  1|0.21314497|      null|
#|  2|  1|0.24969208|0.03103427|
#+---+---+----------+----------+

说明

在这两种情况下,我们都在进行左连接df1 到 df2 的一个>.这将保留 df1 中的所有行,无论是否匹配.

In both cases we are doing a left join of df1 to df2. This will keep all the rows in df1 regardless if there's a match.

join 子句是您在问题中指定的条件.所以 df2x1x2 等于 n 的所有行都将被加入.

The join clause is the condition that you specified in your question. So all rows in df2 where either x1 or x2 equals n will be joined.

接下来从左表中选择所有行,加上我们分组依据(分区依据)n 并对w 的值求和.这将获得与连接条件匹配的所有行的总和,对于 n 的每个值.

Next select all of the rows from the left tables plus we group by (partition by) n and sum the values of w. This will get the sum over all rows that matched the join condition, for each value of n.

最后我们只返回不同的行来消除重复.

Finally we only return distinct rows to eliminate duplicates.

这篇关于pyspark 在 udf 中使用数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆