pyspark在udf内部使用数据框 [英] pyspark use dataframe inside udf

查看:94
本文介绍了pyspark在udf内部使用数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据框df1

+---+---+----------+
|  n|val| distances|
+---+---+----------+
|  1|  1|0.27308652|
|  2|  1|0.24969208|
|  3|  1|0.21314497|
+---+---+----------+

df2

+---+---+----------+
| x1| x2|         w|
+---+---+----------+
|  1|  2|0.03103427|
|  1|  4|0.19012526|
|  1| 10|0.26805446|
|  1|  8|0.26825935|
+---+---+----------+

我想在df1中添加一个名为gamma的新列,当df1.n == df2.x1 OR df1.n == df2.x2

I want to add a new column to df1 called gamma, which will contain the sum of the w value from df2 when df1.n == df2.x1 OR df1.n == df2.x2

我尝试使用udf,但是显然无法从其他数据框中进行选择,因为应该在计算之前确定值

I tried to use udf, but apparently selecting from the different dataframe will not work, because values should be determined before calculation

gamma_udf = udf(lambda n: float(df2.filter("x1 = %d OR x2 = %d"%(n,n)).groupBy().sum('w').rdd.map(lambda x: x).collect()[0]), FloatType())
df1.withColumn('gamma1', gamma_udf('n'))

有什么方法可以使用joingroupby而不使用循环?

Is there any way of doing it with join or groupby without using cycles?

推荐答案

您不能在udf内引用DataFrame.就像您提到的那样,最好使用join解决此问题.

You can't reference a DataFrame inside of a udf. As your alluded to, this problem is best solved using a join.

IIUC,您正在寻找类似的东西:

IIUC, you are looking for something like:

from pyspark.sql import Window
import pyspark.sql.functions as F

df1.alias("L").join(df2.alias("R"), (df1.n == df2.x1) | (df1.n == df2.x2), how="left")\
    .select("L.*", F.sum("w").over(Window.partitionBy("n")).alias("gamma"))\
    .distinct()\
    .show()
#+---+---+----------+----------+
#|  n|val| distances|     gamma|
#+---+---+----------+----------+
#|  1|  1|0.27308652|0.75747334|
#|  3|  1|0.21314497|      null|
#|  2|  1|0.24969208|0.03103427|
#+---+---+----------+----------+

或者,如果您更熟悉pyspark-sql语法,则可以注册临时表并执行以下操作:

Or if you're more comfortable with pyspark-sql syntax, you can register temp tables and do:

df1.registerTempTable("df1")
df2.registerTempTable("df2")

sqlCtx.sql(
    "SELECT DISTINCT L.*, SUM(R.w) OVER (PARTITION BY L.n) AS gamma "
    "FROM df1 L LEFT JOIN df2 R ON L.n = R.x1 OR L.n = R.x2"
).show()
#+---+---+----------+----------+
#|  n|val| distances|     gamma|
#+---+---+----------+----------+
#|  1|  1|0.27308652|0.75747334|
#|  3|  1|0.21314497|      null|
#|  2|  1|0.24969208|0.03103427|
#+---+---+----------+----------+

说明

在这两种情况下,我们都在执行左联接df1到df2的a>.不管是否存在匹配,这都会将所有行保留在df1中.

In both cases we are doing a left join of df1 to df2. This will keep all the rows in df1 regardless if there's a match.

join子句是您在问题中指定的条件.因此,df2x1x2等于n的所有行都将被连接.

The join clause is the condition that you specified in your question. So all rows in df2 where either x1 or x2 equals n will be joined.

接下来,从左表中选择所有行,然后我们将n分组(分区)为n,并对w的值求和.对于每个n值,这将获得与联接条件匹配的所有行的总和.

Next select all of the rows from the left tables plus we group by (partition by) n and sum the values of w. This will get the sum over all rows that matched the join condition, for each value of n.

最后,我们仅返回不同的行以消除重复.

Finally we only return distinct rows to eliminate duplicates.

这篇关于pyspark在udf内部使用数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆