计算 spark Dataframe 中的新列,将 df1 中的标记列表列与 df2 中的文本列与 pyspark 交叉 [英] Calculate new column in spark Dataframe, crossing a tokens list column in df1 with a text column in df2 with pyspark

查看:25
本文介绍了计算 spark Dataframe 中的新列,将 df1 中的标记列表列与 df2 中的文本列与 pyspark 交叉的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 spark 2.4.5,我需要根据 df1 的标记列表列(MeaningfulWords 列)计算情绪分数,根据 df1 中的词code>df2(西班牙语情感词典).在 df1 中,我必须创建一个包含标记分数列表的新列和另一个包含每个记录的分数平均值(分数总和/计数单词)的列.如果列表中的任何标记 (df1) 不在字典中 (df2),则得分为零.

I am using spark 2.4.5 and I need to calculate the sentiment score from a token list column (MeaningfulWords column) of df1, according to the words in df2 (spanish sentiment dictionary). In df1 I must create a new column with the scores list of tokens and another column with the mean of scores (sum of scores / count words) of each record. If any token in the list (df1) is not in the dictionary (df2), zero is scored.

数据框如下所示:

df1.select("ID","MeaningfulWords").show(truncate=True, n=5)
+------------------+------------------------------+
|                ID|               MeaningfulWords|
+------------------+------------------------------+
|abcde00000qMQ00001|[casa, alejado, buen, gusto...|
|abcde00000qMq00002|[clientes, contentos, servi...|
|abcde00000qMQ00003|                 [resto, bien]|
|abcde00000qMQ00004|[mal, servicio, no, antiend...|
|abcde00000qMq00005|[gestion, adecuada, proble ...|
+------------------+------------------------------+

df2.show(5)
+-----+----------+
|score|      word|
+-----+----------+
| 1.68|abandonado|
| 3.18|    abejas|
|  2.8|    aborto|
| 2.46| abrasador|
| 8.13|    abrazo|
+-----+----------+

要添加到 df1 中的新列应如下所示:

The new columns to add in df1, should look like this:

+------------------+---------------------+
|         MeanScore|            ScoreList|
+------------------+---------------------+
|              2.95|[3.10, 2.50, 1.28,...|
|              2.15|[1.15, 3.50, 2.75,...|
|              2.75|[4.20, 1.00, 1.75,...|
|              3.25|[3.25, 2.50, 3.20,...|
|              3.15|[2.20, 3.10, 1.28,...|
+------------------+---------------------+

我已经使用 .join 查看了一些选项,但是使用具有不同数据类型的列会出错.我还尝试将数据帧转换为 RDD 并调用函数:

I have reviewed some options using .join, but using columns with different data types gives error. I have also tried converting the Dataframes to RDD and calling a function:

def map_words_to_values(review_words, dict_df):
return [dict_df[word] for word in review_words if word in dict_df]

RDD1=swRemoved.rdd.map(list) 
RDD2=Dict_df.rdd.map(list)

reviewsRDD_dict_values = RDD1.map(lambda tuple: (tuple[0], map_words_to_values(tuple[1], RDD2)))
reviewsRDD_dict_values.take(3)

但是使用这个选项我得到错误:

But with this option I get the error:

PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

我找到了一些使用 afinn 库为文本评分的示例.但它不适用于西班牙语文本.

I have found some examples to score text using afinn library. But it doesn't works with spanish text.

如果可能的话,我想尝试利用 pyspark 的本机功能而不是使用 udfs 来避免影响性能.但我是 spark 的初学者,我想找到 spark 方法来做到这一点.

I wanna try to utilize native functions of pyspark instead of using udfs to avoid affect the performance, if it's possible. But I'm a begginer in spark and I would like to find the spark way to do that.

推荐答案

您可以先使用 array_contains 词加入,然后使用 groupByfirstcollect_list 的聚合意思是.(spark2.4+)

You could do this by first joining using array_contains word, then groupBy with aggregations of first, collect_list, and mean.(spark2.4+)

欢迎来到SO

df1.show()

#+------------------+----------------------------+
#|ID                |MeaningfulWords             |
#+------------------+----------------------------+
#|abcde00000qMQ00001|[casa, alejado, buen, gusto]|
#|abcde00000qMq00002|[clientes, contentos, servi]|
#|abcde00000qMQ00003|[resto, bien]               |
#+------------------+----------------------------+

df2.show()

#+-----+---------+
#|score|     word|
#+-----+---------+
#| 1.68|     casa|
#|  2.8|  alejado|
#| 1.03|     buen|
#| 3.68|    gusto|
#| 0.68| clientes|
#|  2.1|contentos|
#| 2.68|    servi|
#| 1.18|    resto|
#| 1.98|     bien|
#+-----+---------+


from pyspark.sql import functions as F
df1.join(df2, F.expr("""array_contains(MeaningfulWords,word)"""),'left')\
   .groupBy("ID").agg(F.first("MeaningfulWords").alias("MeaningfullWords")\
                      ,F.collect_list("score").alias("ScoreList")\
                      ,F.mean("score").alias("MeanScore"))\
                      .show(truncate=False)

#+------------------+----------------------------+-----------------------+------------------+
#|ID                |MeaningfullWords            |ScoreList              |MeanScore         |
#+------------------+----------------------------+-----------------------+------------------+
#|abcde00000qMQ00003|[resto, bien]               |[1.18, 1.98]           |1.58              |
#|abcde00000qMq00002|[clientes, contentos, servi]|[0.68, 2.1, 2.68]      |1.8200000000000003|
#|abcde00000qMQ00001|[casa, alejado, buen, gusto]|[1.68, 2.8, 1.03, 3.68]|2.2975            |
#+------------------+----------------------------+-----------------------+------------------+

这篇关于计算 spark Dataframe 中的新列,将 df1 中的标记列表列与 df2 中的文本列与 pyspark 交叉的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆