如何将DataFrame作为输入传递给Spark UDF? [英] How to pass DataFrame as input to Spark UDF?

查看:98
本文介绍了如何将DataFrame作为输入传递给Spark UDF?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据框,我想对每行应用一个函数.此功能取决于其他数据框.

I have a dataframe and I want to apply a function to each row. This function depends of other dataframes.

简化示例.我有如下三个数据框:

Simplified example. I have three dataframes like below:

df = sc.parallelize([
    ['a', 'b', 1],
    ['c', 'd', 3]
    ]).toDF(('feat1', 'feat2', 'value'))

df_other_1 = sc.parallelize([
        ['a', 0, 1, 0.0],
        ['a', 1, 3, 0.1],
        ['a', 3, 10, 1.0],
        ['c', 0, 10, 0.2],
        ['c', 10, 25, 0.5]
        ]).toDF(('feat1', 'lower', 'upper', 'score'))

df_other_2 = sc.parallelize([
        ['b', 0, 4, 0.1],
        ['b', 4, 20, 0.5],
        ['b', 20, 30, 1.0],
        ['d', 0, 5, 0.05],
        ['d', 5, 22, 0.9]
        ]).toDF(('feat1', 'lower', 'upper', 'score'))

对于 df 的每一行,我想从 df_other_1 feat1 feat2 的唯一上限值>和 df_other_2 ,即对于第一行,唯一值是(1、3、10、4、20、30).然后,我将它们排序为(30,20,10,4,3,1)并添加到前面,第一个数字上方. df 会变成这样:

For each row of df, I want to collect the unique upper values for feat1 and feat2 from df_other_1 and df_other_2, i.e. for first row, the unique values are (1, 3, 10, 4, 20, 30). Then, I'll sort them like (30, 20, 10, 4, 3, 1) and add to the front, one number above the first one. The df would become like so:

df = sc.parallelize([
        ['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1]],
        ['c', 'd', 3, [26, 25, 22, 10, 5]]
        ]).toDF(('feat1', 'feat2', 'value', 'lst'))

然后,对于 df 的每一行以及 lst 的各个值,我想计算分数的总和来自 df_other_1 df_other_2 的地方,其中 lst 的每个值都在 upper lower 之内.我的目标是在总得分高于某个阈值(例如1.4)的每个 lst 中找到最低的值.

Then, for each row of df and for each of the respective values of the lst, I want to calculate the sum of score from both df_other_1 and df_other_2 where each value of lst falls within upper and lower. My goal is to find the lowest value in each lst whose total score is above some threshold (e.g. 1.4).

这是计算总分的方法.因此,对于 df 的第一行, lst 的第一个值为31.在 feat1 df_other_1 中,它高于最高存储分区,因此得分为1.与 df_other_2 相同.因此,总分将为1 + 1 = 2.对于10的值(再次用于第一行),总得分将为1 + 0.5 = 1.5.

Here's how to calculate the total score. So, for the first row of df, the first value of lst is 31. In df_other_1 for feat1, it is above the highest bucket so it would get a score of 1. Same for df_other_2. So, the total score would be 1+1 =2. For the value of 10 (again for the first row), the total score would be 1 + 0.5 = 1.5.

这是 df 最终的样子:

df = sc.parallelize([
            ['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1], [2.0, 2.0, 2.0, 1.5, 1.5, 1.1, 0.2], 4],
            ['c', 'd', 3, [26, 25, 22, 10, 5], [2.0, 1.5, 1.4, 1.4, 1.1], 25]
            ]).toDF(('feat1', 'feat2', 'value', 'lst', 'total_scores', 'target_value'))

我实际上是在寻找这些目标值 4 25 .中间步骤并不重要.

I'm actually looking to find these target values 4 and 25. The intermediate steps do not really matter.

=========================================================================

==========================================================================

这是我到目前为止尝试过的:

Here's what I tried so far:

def get_threshold_for_row(feat1, feat2, threshold):

    this_df_other_1 = df_other_1.filter(col('feat1') == feat1)
    this_df_other_2 = df_other_2.filter(col('feat1') == feat2)

    values_feat_1 = [i[0] for i in this_df_other_1.select('upper').collect()]
    values_feat_1.append(values_feat_1[-1] + 1)
    values_feat_2 = [i[0] for i in this_df_other_2.select('upper').collect()]
    values_feat_2.append(values_feat_2[-1] + 1)

    values = values_feat_1 + values_feat_2
    values = list(set(values)) #Keep unique values
    values.sort(reverse=True)  #Sort from largest to smallest

    df_1_score = df_2_score = 0
    prev_value = 10000 #Any large number
    prev_score = 10000

    for value in values:
        df_1_score = get_score_for_key(this_df_other_1, 'feat_1', feat_1, value)
        df_2_score = get_score_for_key(this_df_other_2, 'feat_1', feat_2, value)

        total_score = df_1_score + df_2_score

        if total_score < threshold and prev_score >= threshold:
            return prev_value

        prev_score = total_score
        prev_value = value


def is_dataframe_empty(df):
    return len(df.take(1)) == 0

def get_score_for_key(scores_df, grouping_key, this_id, value):

    if is_dataframe_empty(scores_df):
        return 0.0

    w = Window.partitionBy([grouping_key]).orderBy(col('upper'))

    scores_df_tmp = scores_df.withColumn("prev_value", lead(scores_df.upper).over(w))\
                        .withColumn("is_last", when(col('prev_value').isNull(), 1).otherwise(0))\
                        .drop('prev_value')

    scores_df_tmp = scores_df_tmp.withColumn("next_value", lag(scores_df_tmp.upper).over(w))\
                        .withColumn("is_first", when(col('next_value').isNull(), 1).otherwise(0))\
                        .drop('next_value').cache()

    grouping_key_score = scores_df_tmp.filter((col(grouping_key) == this_id) & 
                              (((value >= col('from_value')) & (value < col('to_value'))) | 
                                ((value >= col('to_value')) & (col('is_last') == 1)) |
                                ((value < col('from_value')) & (col('is_first') == 1)) |
                                (col('from_value').isNull()))) \
                    .withColumn('final_score', when(value <= col('to_value'), col('score')).otherwise(1.0)) \
                    .collect()[0]['final_score']

    return grouping_key_score

df.rdd.map(lambda r: (r['feat_1'], r['feat_2'])) \
    .map(lambda v: (v[0], v[1], get_threshold_for_row(v[0], v[1], 1.4)))
    .toDF()

但是我得到: AttributeError:"Py4JError"对象没有属性"message"

很抱歉,很长的帖子.有什么想法吗?

Sorry for the long post. Any ideas?

推荐答案

我有一个数据框,我想对每行应用一个函数.此功能取决于其他数据框.

I have a dataframe and I want to apply a function to each row. This function depends of other dataframes.

tl; dr 这在UDF中是不可能的.

tl;dr That is not possible in UDFs.

从广义上讲,UDF是一个接受零个或多个列值(作为列引用)的函数(实际上是Catalyst表达式).

In the most broader sense, a UDF is a function (a Catalyst expression actually) that accepts zero or more column values (as Column references).

如果UDF是用户定义的聚合函数(UDAF),则UDF只能处理在更广泛的情况下可能是整个DataFrame的记录.

A UDF can only work on records that could in the most broader case be an entire DataFrame if the UDF is a user-defined aggregate function (UDAF).

如果要在UDF中处理多个数据框,则必须 join ,使数据框具有要用于UDF的列.

If you want to work on more than one DataFrame in a UDF you have to join the DataFrames to have the columns you want to use for the UDF.

这篇关于如何将DataFrame作为输入传递给Spark UDF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆