在火花数据帧之间的连接中包含一列时出错 [英] Error including a column in a join between spark dataframes

查看:16
本文介绍了在火花数据帧之间的连接中包含一列时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 array_containscleanDFsentiment_df 之间进行了连接很好(来自解决方案61687997).我需要在 Result df 中包含一个来自 cleanDF 的新列('Year').

I have a join between cleanDF and sentiment_df using array_contains that works fine (from solution 61687997). And I need include in the Result df a new column ('Year') from cleanDF.

这是连接:

from pyspark.sql import functions

Result = cleanDF.join(sentiment_df, expr("""array_contains(MeaningfulWords,word)"""), how='left')\
                .groupBy("ID")\
                .agg(first("MeaningfulWords").alias("MeaningfulWords")\
                  ,collect_list("score").alias("ScoreList")\
                  ,mean("score").alias("MeanScore"))

这是Result结构:

This is the Result structure:

Result.show(5)

#+------------------+--------------------+--------------------+-----------------+
#|                ID|     MeaningfulWords|           ScoreList|        MeanScore|
#+------------------+--------------------+--------------------+-----------------+
#|a0U3Y00000p1IzjUAE|[buen, servicio, ...|        [6.39, 1.82]|            4.105|
#|a0U3Y00000p1KhGUAU|              [mala]|              [2.02]|             2.02|
#|a0U3Y00000p1M1oUAE|[cliente, content...|        [6.39, 8.41]|              7.4|
#|a0U3Y00000p1OnTUAU|[positivo, trato,...|               [8.2]|             8.19|
#|a0U3Y00000p1R5DUAU|[momento, sido, g...|               [6.0]|              6.0|
#+------------------+--------------------+--------------------+-----------------+

我添加了一个 .select (36132322) 包含来自 cleanDFYear>:

I add a .select (36132322) to include the column Year from cleanDF:

Result1 = cleanDF.alias('a').join(sentiment_df.alias('b'), expr("""array_contains(a.MeaningfulWords,b.word)"""), how='left')\
                .select(col('a.ID'),col('a.Year'),col('a.MeaningfulWords'),col('b.word'),col('b.score'))\
                .groupBy("ID")\
                .agg(first("a.MeaningfulWords").alias("MeaningfulWords")\
                  ,collect_list("score").alias("ScoreList")\
                  ,mean("score").alias("MeanScore"))

但我在 Result1 中得到与 **Result** 相同的列:

display(Result1)

#DataFrame[ID: string, MeaningfulWords: array<string>, ScoreList: array<double>, MeanScore: double]

当我尝试在 .agg 函数中包含 Year 时:

When I'm try include Year in .agg function:

Result2 = cleanDF.join(sentiment_df, expr("""array_contains(MeaningfulWords,word)"""), how='left')\
                .groupBy("ID")\
                .agg(first("MeaningfulWords").alias("MeaningfulWords"),first("Year").alias("Year")\
                  ,collect_list("score").alias("ScoreList")\
                  ,mean("score").alias("MeanScore"))

Result2.show()

Py4JJavaError: An error occurred while calling o3205.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
    ...
    ...
    ...
        Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array<string>)
            at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1066)
            at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:109)
            at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:107)
            at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1063)
    ...
    ...
    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 411.0 failed 1 times, most recent failure: Lost task 2.0 in stage 411.0 (TID 9719, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$5: (array<string>) => array<string>)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1066)
        at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction$class.eval(higherOrderFunctions.scala:208)
        at org.apache.spark.sql.catalyst.expressions.ArrayFilter.eval(higherOrderFunctions.scala:296)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    ...
    ...
    ... 20 more
    Caused by: java.lang.NullPointerException

我在 spark 2.4.5 上使用 pyspark.

Im using pyspark on spark 2.4.5.

预先感谢您的帮助.

推荐答案

Year 列可能有空值 &因此,它因 Caused by: java.lang.NullPointerException 异常而失败.过滤 Year 列中的所有空值.

Year column might be having null values & because of that it is failing with Caused by: java.lang.NullPointerException exception. Filter all null values from Year column.

这篇关于在火花数据帧之间的连接中包含一列时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆