pyspark在udf内部使用数据框 [英] pyspark use dataframe inside udf
问题描述
我有两个数据框df1
+---+---+----------+
| n|val| distances|
+---+---+----------+
| 1| 1|0.27308652|
| 2| 1|0.24969208|
| 3| 1|0.21314497|
+---+---+----------+
和df2
+---+---+----------+
| x1| x2| w|
+---+---+----------+
| 1| 2|0.03103427|
| 1| 4|0.19012526|
| 1| 10|0.26805446|
| 1| 8|0.26825935|
+---+---+----------+
我想在df1
中添加一个名为gamma
的新列,当df1.n == df2.x1 OR df1.n == df2.x2
I want to add a new column to df1
called gamma
, which will contain the sum of the w
value from df2
when df1.n == df2.x1 OR df1.n == df2.x2
我尝试使用udf,但是显然无法从其他数据框中进行选择,因为应该在计算之前确定值
I tried to use udf, but apparently selecting from the different dataframe will not work, because values should be determined before calculation
gamma_udf = udf(lambda n: float(df2.filter("x1 = %d OR x2 = %d"%(n,n)).groupBy().sum('w').rdd.map(lambda x: x).collect()[0]), FloatType())
df1.withColumn('gamma1', gamma_udf('n'))
有什么方法可以使用join
或groupby
而不使用循环?
Is there any way of doing it with join
or groupby
without using cycles?
推荐答案
您不能在udf
内引用DataFrame.就像您提到的那样,最好使用join
解决此问题.
You can't reference a DataFrame inside of a udf
. As your alluded to, this problem is best solved using a join
.
IIUC,您正在寻找类似的东西:
IIUC, you are looking for something like:
from pyspark.sql import Window
import pyspark.sql.functions as F
df1.alias("L").join(df2.alias("R"), (df1.n == df2.x1) | (df1.n == df2.x2), how="left")\
.select("L.*", F.sum("w").over(Window.partitionBy("n")).alias("gamma"))\
.distinct()\
.show()
#+---+---+----------+----------+
#| n|val| distances| gamma|
#+---+---+----------+----------+
#| 1| 1|0.27308652|0.75747334|
#| 3| 1|0.21314497| null|
#| 2| 1|0.24969208|0.03103427|
#+---+---+----------+----------+
或者,如果您更熟悉pyspark-sql
语法,则可以注册临时表并执行以下操作:
Or if you're more comfortable with pyspark-sql
syntax, you can register temp tables and do:
df1.registerTempTable("df1")
df2.registerTempTable("df2")
sqlCtx.sql(
"SELECT DISTINCT L.*, SUM(R.w) OVER (PARTITION BY L.n) AS gamma "
"FROM df1 L LEFT JOIN df2 R ON L.n = R.x1 OR L.n = R.x2"
).show()
#+---+---+----------+----------+
#| n|val| distances| gamma|
#+---+---+----------+----------+
#| 1| 1|0.27308652|0.75747334|
#| 3| 1|0.21314497| null|
#| 2| 1|0.24969208|0.03103427|
#+---+---+----------+----------+
说明
在这两种情况下,我们都在执行左联接df1
到df2
的a>.不管是否存在匹配,这都会将所有行保留在df1
中.
In both cases we are doing a left join of df1
to df2
. This will keep all the rows in df1
regardless if there's a match.
join子句是您在问题中指定的条件.因此,df2
中x1
或x2
等于n
的所有行都将被连接.
The join clause is the condition that you specified in your question. So all rows in df2
where either x1
or x2
equals n
will be joined.
接下来,从左表中选择所有行,然后我们将n
分组(分区)为n
,并对w
的值求和.对于每个n
值,这将获得与联接条件匹配的所有行的总和.
Next select all of the rows from the left tables plus we group by (partition by) n
and sum the values of w
. This will get the sum over all rows that matched the join condition, for each value of n
.
最后,我们仅返回不同的行以消除重复.
Finally we only return distinct rows to eliminate duplicates.
这篇关于pyspark在udf内部使用数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!