根据databrick上另一个pyspark数据框的某些列,在大型pyspark数据框的列上执行用户定义的功能 [英] Perform a user defined function on a column of a large pyspark dataframe based on some columns of another pyspark dataframe on databricks

查看:66
本文介绍了根据databrick上另一个pyspark数据框的某些列,在大型pyspark数据框的列上执行用户定义的功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题与我以前的问题有关,网址为

My question is relevant to my previous one at How to efficiently join large pyspark dataframes and small python list for some NLP results on databricks.

我已经解决了一部分问题,现在又遇到了另一个问题.

I have worked out part of it and now stuck by another problem.

我有一个小的pyspark数据框,如:

I have a small pyspark dataframe like :

  df1: 

   +-----+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
   |topic|                                       termIndices|                                       termWeights|                                             terms|
   +-----+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
   |    0|      [3, 155, 108, 67, 239, 4, 72, 326, 128, 189]|[0.023463344607734377, 0.011772322769900843, 0....|[cell, apoptosis, uptake, loss, transcription, ...|
   |    1|      [16, 8, 161, 86, 368, 153, 18, 214, 21, 222]|[0.013057307487199429, 0.011453455929929763, 0....|[therapy, cancer, diet, lung, marker, sensitivi...|
   |    2|            [0, 1, 124, 29, 7, 2, 84, 299, 22, 90]|[0.03979063871841061, 0.026593954837078836, 0.0...|[group, expression, performance, use, disease, ...|
   |    3|   [204, 146, 74, 240, 152, 384, 55, 250, 238, 92]|[0.009305626056223443, 0.008840730657888991, 0....|[pattern, chemotherapy, mass, the amount, targe...|

它少于100行并且非常小.每个术语在"termWeights"列中都有一个termWeight值.

It has less than 100 rows and very small. Each term has a termWeight value in the column of "termWeights".

我还有另一个大型pyspark数据框(超过50 GB),例如:

I have another large pyspark dataframe (50+ GB) like:

  df2: 
  +------+--------------------------------------------------+
  |r_id|                                    tokens|
  +------+--------------------------------------------------+
  |     0|[The human KCNJ9, Kir, GIRK3, member, potassium...|
  |     1|[BACKGROUND, the treatment, breast, cancer, the...|
  |     2|[OBJECTIVE, the relationship, preoperative atri...|

对于df2中的每一行,我需要在df1中找到最匹配的术语,其中所有主题中的termWeights最高.

For each row in df2, I need to find best matching terms in df1 with the highest termWeights among all topics.

最后,我需要df之类的

Finally, I need a df like

 r_id tokens topic (the topic in df1 that has the highest sum of termWeights among all topics)

我已经定义了一个UDF(基于df2),但是它无法访问df1的列.我正在考虑如何使用交叉连接"对于df1和df2,但我不需要将df2的每一行与df1的每一行连接起来.我只需要保留df2的所有列,并添加一列,即主题"列即可.根据每个df1主题与每个df2所在行的匹配项匹配的termWeights总和最高.

I have defined a UDF (based on df2) but it cannot access the columns of df1. I am thinking how to use "cross join" for df1 and df2 but I do not need to join each row of df2 with each row of df1. I only need to keep all columns of df2 and add one column that is the "topic" with the highest sum of termWeights based on the matching terms of each df1's topic with the terms of each df2's row.

我不确定如何通过pyspark.sql.functions.udf实现此逻辑.

I am not sure how to implement this logic by pyspark.sql.functions.udf.

推荐答案

IIUC,您可以尝试执行以下操作(我将处理流程分为4个步骤,需要 Spark 2.4 + ):

IIUC, you can try something like the following (I split the processing flow into 4 steps, Spark 2.4+ is required):

步骤1::将所有df2.token转换为小写,以便我们进行文本比较:

Step-1: convert all df2.tokens to lowercase so we can do text comparison:

from pyspark.sql.functions import expr, desc, row_number, broadcast

df2 = df2.withColumn('tokens', expr("transform(tokens, x -> lower(x))"))

第2步:使用 arrays_overlap

df3 = df2.join(broadcast(df1), expr("arrays_overlap(terms, tokens)"), "left")

第3步:使用 aggregate 函数从 terms termWeights tokens

df4 = df3.selectExpr(
    "r_id",
    "tokens",
    "topic",
    """
      aggregate(
        /* find all terms+termWeights which are shown in tokens array */
        filter(arrays_zip(terms,termWeights), x -> array_contains(tokens, x.terms)),
        0D,
        /* get the sum of all termWeights from the matched terms */
        (acc, y) -> acc + y.termWeights
      ) as matched_sum_of_weights
    """)

第4步:,对于每个r_id,使用Window函数查找具有最高 matched_sum_of_weights 的行,并仅保留具有 row_number == 1 的行

Step-4: for each r_id, find the row with highest matched_sum_of_weights using Window function and only keep rows having row_number == 1

from pyspark.sql import Window
w1 = Window.partitionBy('r_id').orderBy(desc('matched_sum_of_weights'))

df_new = df4.withColumn('rn', row_number().over(w1)).filter('rn=1').drop('rn', 'matched_sum_of_weights')

替代::如果df1的大小不是很大,则可以在没有join/window.partition等的情况下进行处理.下面的代码仅概述了您应该根据实际数据进行改进的想法:

Alternative: if the size of df1 is not very large, this might be handled without join/window.partition etc. below code only outlines the idea which you should improve based on your actual data:

from pyspark.sql.functions import expr, when, coalesce, array_contains, lit, struct

# create a dict from df1 with topic as key and list of termWeights+terms as value
d = df1.selectExpr("string(topic)", "arrays_zip(termWeights,terms) as terms").rdd.collectAsMap()

# ignore this if text comparison are case-sensitive, you might do the same to df1 as well
df2 = df2.withColumn('tokens', expr("transform(tokens, x -> lower(x))"))

# save the column names of the original df2
cols = df2.columns

# iterate through all items of d(or df1) and update df2 with new columns from each 
# topic with the value a struct containing `sum_of_weights`, `topic` and `has_match`(if any terms is matched)
for x,y in d.items():
  df2 = df2.withColumn(x,
      struct(
        sum([when(array_contains('tokens', t.terms), t.termWeights).otherwise(0) for t in y]).alias('sum_of_weights'),
        lit(x).alias('topic'),
        coalesce(*[when(array_contains('tokens', t.terms),1) for t in y]).isNotNull().alias('has_match')
      )
  )

# create a new array containing all new columns(topics), and find array_max
# from items with `has_match == true`, and then retrieve the `topic` field
df_new = df2.selectExpr(
    *cols,
    f"array_max(filter(array({','.join(map('`{}`'.format,d.keys()))}), x -> x.has_match)).topic as topic"
)

这篇关于根据databrick上另一个pyspark数据框的某些列,在大型pyspark数据框的列上执行用户定义的功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆