新的 Dataframe 列作为其他行的通用函数(spark) [英] New Dataframe column as a generic function of other rows (spark)

查看:27
本文介绍了新的 Dataframe 列作为其他行的通用函数(spark)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在 DataFrame 这是 spark ?

这是我描述的问题的 spark 实现此处:

This is a spark implementation of the problem I described here:

from nltk.metrics.distance import edit_distance as edit_dist
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

d = {
    'id': [1, 2, 3, 4, 5, 6],
    'word': ['cat', 'hat', 'hag', 'hog', 'dog', 'elephant']
}

spark_df = sqlCtx.createDataFrame(pd.DataFrame(d))
words_list = list(spark_df.select('word').collect())

get_n_similar = udf(
    lambda word: len(
        [
            w for w in words_list if (w['word'] != word) and 
            (edit_dist(w['word'], word) < 2)
        ]
    ),
    IntegerType()
)

spark_df.withColumn('n_similar', get_n_similar(col('word'))).show()

输出:

+---+--------+---------+
|id |word    |n_similar|
+---+--------+---------+
|1  |cat     |1        |
|2  |hat     |2        |
|3  |hag     |2        |
|4  |hog     |2        |
|5  |dog     |1        |
|6  |elephant|0        |
+---+--------+---------+

这里的问题是我不知道如何告诉 spark 将当前行与 Dataframe 中的其他行进行比较而不先将值收集到一个 list.有没有办法在不调用 collect 的情况下应用其他行的通用函数?

The problem here is that I don't know a way to tell spark to compare the current row to the other rows in the Dataframe without first collecting the values into a list. Is there a way to apply a generic function of other rows without calling collect?

推荐答案

这里的问题是,我不知道如何告诉 spark 将当前行与 Dataframe 中的其他行进行比较,而无需先将值收集到列表中.

The problem here is that I don't know a way to tell spark to compare the current row to the other rows in the Dataframe without first collecting the values into a list.

UDF 不是这里的一个选项(您不能在 udf 中引用分布式 DataFrame)您的逻辑的直接翻译是笛卡尔积和聚合:

UDF is not an option here (you cannot reference distributed DataFrame in udf) Direct translation of your logic is Cartesian product and aggregate:

from pyspark.sql.functions import levenshtein, col

result = (spark_df.alias("l")
    .crossJoin(spark_df.alias("r"))
    .where(levenshtein("l.word", "r.word") < 2)
    .where(col("l.word") != col("r.word"))
    .groupBy("l.id", "l.word")
    .count())

但实际上你应该尝试做一些更有效的事情:Apache Spark 中的高效字符串匹配

but in practice you should try to do something more efficient: Efficient string matching in Apache Spark

根据问题,您应该尝试找到其他近似值以避免完全笛卡尔积.

Depending on the problem, you should try to find other approximations to avoid full Cartesian product.

如果您想保留没有匹配项的数据,您可以跳过一个过滤器:

If you want to keep data without matches you can skip one filter:

(spark_df.alias("l")
    .crossJoin(spark_df.alias("r"))
    .where(levenshtein("l.word", "r.word") < 2)
    .groupBy("l.id", "l.word")
    .count()
    .withColumn("count", col("count") - 1))

或(更慢,但更通用),加入参考:

or (slower, but more generic), join back with reference:

(spark_df
    .select("id", "word")
    .distinct()
    .join(result, ["id", "word"], "left")
    .na.fill(0))

这篇关于新的 Dataframe 列作为其他行的通用函数(spark)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆