新的Dataframe列作为其他行的通用功能(火花) [英] New Dataframe column as a generic function of other rows (spark)
问题描述
如何有效地在 DataFrame
中创建新列,该列是 spark
中其他行的功能?
This is a spark
implementation of the problem I described here:
from nltk.metrics.distance import edit_distance as edit_dist
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
d = {
'id': [1, 2, 3, 4, 5, 6],
'word': ['cat', 'hat', 'hag', 'hog', 'dog', 'elephant']
}
spark_df = sqlCtx.createDataFrame(pd.DataFrame(d))
words_list = list(spark_df.select('word').collect())
get_n_similar = udf(
lambda word: len(
[
w for w in words_list if (w['word'] != word) and
(edit_dist(w['word'], word) < 2)
]
),
IntegerType()
)
spark_df.withColumn('n_similar', get_n_similar(col('word'))).show()
输出:
+---+--------+---------+
|id |word |n_similar|
+---+--------+---------+
|1 |cat |1 |
|2 |hat |2 |
|3 |hag |2 |
|4 |hog |2 |
|5 |dog |1 |
|6 |elephant|0 |
+---+--------+---------+
这里的问题是,我不知道一种方法来告诉spark
将当前行与Dataframe
中的其他行进行比较,而无需先将值收集到list
中.有没有一种方法可以应用其他行的泛型函数而无需调用collect
?
The problem here is that I don't know a way to tell spark
to compare the current row to the other rows in the Dataframe
without first collecting the values into a list
. Is there a way to apply a generic function of other rows without calling collect
?
推荐答案
这里的问题是,我不知道一种方法,可以在不首先将值收集到列表中的情况下,告诉spark将当前行与Dataframe中的其他行进行比较.
The problem here is that I don't know a way to tell spark to compare the current row to the other rows in the Dataframe without first collecting the values into a list.
这里不是UDF选项(您不能在udf
中引用分布式DataFrame
)逻辑的直接翻译是笛卡尔乘积和集合:
UDF is not an option here (you cannot reference distributed DataFrame
in udf
) Direct translation of your logic is Cartesian product and aggregate:
from pyspark.sql.functions import levenshtein, col
result = (spark_df.alias("l")
.crossJoin(spark_df.alias("r"))
.where(levenshtein("l.word", "r.word") < 2)
.where(col("l.word") != col("r.word"))
.groupBy("l.id", "l.word")
.count())
但是在实践中,您应该尝试做一些更有效的事情: Apache Spark中有效的字符串匹配
but in practice you should try to do something more efficient: Efficient string matching in Apache Spark
根据问题,您应尝试查找其他近似值以避免完整的笛卡尔积.
Depending on the problem, you should try to find other approximations to avoid full Cartesian product.
如果要保留不匹配的数据,则可以跳过一个过滤器:
If you want to keep data without matches you can skip one filter:
(spark_df.alias("l")
.crossJoin(spark_df.alias("r"))
.where(levenshtein("l.word", "r.word") < 2)
.groupBy("l.id", "l.word")
.count()
.withColumn("count", col("count") - 1))
或(速度较慢,但通用性更高),请参考加入:
or (slower, but more generic), join back with reference:
(spark_df
.select("id", "word")
.distinct()
.join(result, ["id", "word"], "left")
.na.fill(0))
这篇关于新的Dataframe列作为其他行的通用功能(火花)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!