将数据框列和外部列表传递给 withColumn 下的 udf [英] Passing a data frame column and external list to udf under withColumn

查看:23
本文介绍了将数据框列和外部列表传递给 withColumn 下的 udf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有以下结构的 Spark 数据框.bodyText_token 具有标记(已处理/一组单词).我有一个已定义关键字的嵌套列表

I have a Spark dataframe with the following structure. The bodyText_token has the tokens (processed/set of words). And I have a nested list of defined keywords

root
 |-- id: string (nullable = true)
 |-- body: string (nullable = true)
 |-- bodyText_token: array (nullable = true)

keyword_list=[['union','workers','strike','pay','rally','free','immigration',],
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']]

我需要检查每个关键字列表下有多少标记,并将结果添加为现有数据框的新列.例如:如果 代币 =[成为"、农民"、集会"、工人"、学生"]结果将是 ->[1,2,0]

I needed to check how many tokens fall under each keyword list and add the result as a new column of the existing dataframe. Eg: if tokens =["become", "farmer","rally","workers","student"] the result will be -> [1,2,0]

以下功能按预期工作.

def label_maker_topic(tokens,topic_words):
    twt_list = []
    for i in range(0, len(topic_words)):
        count = 0
        #print(topic_words[i])
        for tkn in tokens:
            if tkn in topic_words[i]:
                count += 1
        twt_list.append(count)
    
    return twt_list

我在 withColumn 下使用 udf 来访问该函数,但出现错误.我认为这是将外部列表传递给 udf.有没有办法可以将外部列表和数据框列传递给 udf 并向我的数据框添加新列?

I used udf under withColumn to access the function and I get an error. I think it's about passing an external list to a udf. Is there a way I can pass the external list and the dataframe column to a udf and add a new column to my dataframe?

topicWord = udf(label_maker_topic,StringType())
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list))

推荐答案

最简洁的解决方案是使用闭包传递额外的参数:

The cleanest solution is to pass additional arguments using closure:

def make_topic_word(topic_words):
     return udf(lambda c: label_maker_topic(c, topic_words))

df = sc.parallelize([(["union"], )]).toDF(["tokens"])

(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens")))
    .show())

这不需要对 keyword_list 或您使用 UDF 包装的函数进行任何更改.您还可以使用此方法传递任意对象.例如,这可用于传递 sets 列表以进行高效查找.

This doesn't require any changes in keyword_list or the function you wrap with UDF. You can also use this method to pass an arbitrary object. This can be used to pass for example a list of sets for efficient lookups.

如果您想使用当前的 UDF 并直接传递 topic_words,则必须先将其转换为列文字:

If you want to use your current UDF and pass topic_words directly you'll have to convert it to a column literal first:

from pyspark.sql.functions import array, lit

ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list])
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()

根据您的数据和要求,可以使用其他更高效的解决方案,这些解决方案不需要 UDF(分解 + 聚合 + 折叠)或查找(散列 + 向量运算).

Depending on your data and requirements there can alternative, more efficient solutions, which don't require UDFs (explode + aggregate + collapse) or lookups (hashing + vector operations).

这篇关于将数据框列和外部列表传递给 withColumn 下的 udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆