计算Spark Dataframe中的新列,将df1中的令牌列表列与pyspark中的df2中的文本列交叉 [英] Calculate new column in spark Dataframe, crossing a tokens list column in df1 with a text column in df2 with pyspark

查看:111
本文介绍了计算Spark Dataframe中的新列,将df1中的令牌列表列与pyspark中的df2中的文本列交叉的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用spark 2.4.5,我需要根据 df2 (西班牙语情感词典).在 df1 中,我必须创建一个包含令牌分数列表的新列,并创建一个具有每条记录的分数平均值(分数总和/计数字)的新列.如果列表( df1 )中的任何标记不在词典( df2 )中,则得分为零.

I am using spark 2.4.5 and I need to calculate the sentiment score from a token list column (MeaningfulWords column) of df1, according to the words in df2 (spanish sentiment dictionary). In df1 I must create a new column with the scores list of tokens and another column with the mean of scores (sum of scores / count words) of each record. If any token in the list (df1) is not in the dictionary (df2), zero is scored.

数据框如下所示:

df1.select("ID","MeaningfulWords").show(truncate=True, n=5)
+------------------+------------------------------+
|                ID|               MeaningfulWords|
+------------------+------------------------------+
|abcde00000qMQ00001|[casa, alejado, buen, gusto...|
|abcde00000qMq00002|[clientes, contentos, servi...|
|abcde00000qMQ00003|                 [resto, bien]|
|abcde00000qMQ00004|[mal, servicio, no, antiend...|
|abcde00000qMq00005|[gestion, adecuada, proble ...|
+------------------+------------------------------+

df2.show(5)
+-----+----------+
|score|      word|
+-----+----------+
| 1.68|abandonado|
| 3.18|    abejas|
|  2.8|    aborto|
| 2.46| abrasador|
| 8.13|    abrazo|
+-----+----------+

要在 df1 中添加的新列应如下所示:

The new columns to add in df1, should look like this:

+------------------+---------------------+
|         MeanScore|            ScoreList|
+------------------+---------------------+
|              2.95|[3.10, 2.50, 1.28,...|
|              2.15|[1.15, 3.50, 2.75,...|
|              2.75|[4.20, 1.00, 1.75,...|
|              3.25|[3.25, 2.50, 3.20,...|
|              3.15|[2.20, 3.10, 1.28,...|
+------------------+---------------------+

我已经使用 .join 查看了一些选项,但是使用具有不同数据类型的列会出错.我也尝试过将数据框转换为RDD并调用一个函数:

I have reviewed some options using .join, but using columns with different data types gives error. I have also tried converting the Dataframes to RDD and calling a function:

def map_words_to_values(review_words, dict_df):
return [dict_df[word] for word in review_words if word in dict_df]

RDD1=swRemoved.rdd.map(list) 
RDD2=Dict_df.rdd.map(list)

reviewsRDD_dict_values = RDD1.map(lambda tuple: (tuple[0], map_words_to_values(tuple[1], RDD2)))
reviewsRDD_dict_values.take(3)

但是使用此选项,我会收到错误消息:

But with this option I get the error:

PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

我发现了一些使用 afinn 库对文本进行评分的示例.但这不适用于西班牙文字.

I have found some examples to score text using afinn library. But it doesn't works with spanish text.

如果可能的话,我想尝试使用pyspark的本机功能,而不是使用udfs以避免影响性能.但是我是火花的初学者,我想找到火花的方法.

I wanna try to utilize native functions of pyspark instead of using udfs to avoid affect the performance, if it's possible. But I'm a begginer in spark and I would like to find the spark way to do that.

推荐答案

您可以先使用 array_contains 单词,然后使用 groupBy加入 first collect_list 平均值 .( spark2.4 + )

You could do this by first joining using array_contains word, then groupBy with aggregations of first, collect_list, and mean.(spark2.4+)

欢迎使用SO

df1.show()

#+------------------+----------------------------+
#|ID                |MeaningfulWords             |
#+------------------+----------------------------+
#|abcde00000qMQ00001|[casa, alejado, buen, gusto]|
#|abcde00000qMq00002|[clientes, contentos, servi]|
#|abcde00000qMQ00003|[resto, bien]               |
#+------------------+----------------------------+

df2.show()

#+-----+---------+
#|score|     word|
#+-----+---------+
#| 1.68|     casa|
#|  2.8|  alejado|
#| 1.03|     buen|
#| 3.68|    gusto|
#| 0.68| clientes|
#|  2.1|contentos|
#| 2.68|    servi|
#| 1.18|    resto|
#| 1.98|     bien|
#+-----+---------+


from pyspark.sql import functions as F
df1.join(df2, F.expr("""array_contains(MeaningfulWords,word)"""),'left')\
   .groupBy("ID").agg(F.first("MeaningfulWords").alias("MeaningfullWords")\
                      ,F.collect_list("score").alias("ScoreList")\
                      ,F.mean("score").alias("MeanScore"))\
                      .show(truncate=False)

#+------------------+----------------------------+-----------------------+------------------+
#|ID                |MeaningfullWords            |ScoreList              |MeanScore         |
#+------------------+----------------------------+-----------------------+------------------+
#|abcde00000qMQ00003|[resto, bien]               |[1.18, 1.98]           |1.58              |
#|abcde00000qMq00002|[clientes, contentos, servi]|[0.68, 2.1, 2.68]      |1.8200000000000003|
#|abcde00000qMQ00001|[casa, alejado, buen, gusto]|[1.68, 2.8, 1.03, 3.68]|2.2975            |
#+------------------+----------------------------+-----------------------+------------------+

这篇关于计算Spark Dataframe中的新列,将df1中的令牌列表列与pyspark中的df2中的文本列交叉的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆