加入 Spark Dataframe 时出现意外失败的断言错误 - 发现重复的重写属性 [英] I'm getting an unexpected failed assertion error when joining Spark Dataframe - Found duplicate rewrite attributes

查看:229
本文介绍了加入 Spark Dataframe 时出现意外失败的断言错误 - 发现重复的重写属性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我运行下面的代码时,出现错误java.lang.AssertionError:断言失败:发现重复的重写属性.在更新我们的数据块运行时之前,它运行顺利.

When I run the code below, I get the error java.lang.AssertionError: assertion failed: Found duplicate rewrite attributes. Prior to updating our databricks runtime this ran smoothly.

  1. top10_df 是列表groups中具有唯一键的数据的数据框.

  1. top10_df is a dataframe of data with unique keys in the list groups.

res_df 是 top10_df 中具有最小和最大日期的唯一键的聚合.

res_df is an aggregation of the unique keys in top10_df with min and max dates.

一旦 res_df 被创建并持久化,它就会重新加入到组中唯一键的 top10_df 中.

once res_df is created and persisted it is joined back into the top10_df on the unique keys in groups.

groups = ['col1','col2','col3','col4']
min_date_created = fn.min('date_created').alias('min_date_created')
max_date_created = fn.max('date_created').alias('max_date_created')

res_df = (top10_df
            .groupBy(groups)
            .agg(min_date_created
            ,max_date_created
            )
         )
res_df.persist()
print(res_df.count())

score_rank = fn.row_number().over(w.partitionBy(groups).orderBy(fn.desc('score')))
unique_issue_id = fn.row_number().over(w.orderBy(groups))

out_df = (top10_df.alias('t10')
                    .join(res_df.alias('res'),groups,'left')
                    .where(fn.col('t10.date_created')==fn.col('res.max_date_created'))
                    .drop(fn.col('t10.date_created'))
                    .drop(fn.col('t10.date_updated'))
                    .withColumn('score_rank',score_rank)
                    .where(fn.col('score_rank')==1)
                    .drop('score_rank'
                          ,'latest_revision_complete_hash'
                          ,'latest_revision_durable_hash'
                         )
                    .withColumn('unique_issue_id',unique_issue_id)
                   .withColumnRenamed('res.id','resource_id')
                  )

out_df.persist()
print(out_df.count())

推荐答案

代替:out_df = (top10_df.alias('t10').join(res_df.alias('res'),groups,'left')

Instead of: out_df = (top10_df.alias('t10') .join(res_df.alias('res'),groups,'left')

在加入之后,选择右侧 df 中的所有列并为其添加别名,以消除重复属性的歧义:

right after the join, select and alias all columns in your right-hand-side df, to disambiguate the duplicate attributes:

out_df = (top10_df.alias('t10')
.join(res_df.alias('res')
.select(fn.col('groups').alias('groups'),
fn.col('min_date_created').alias('min_date_created'),
fn.col('max_date_created').alias('max_date_created')),
groups,'left')

这篇关于加入 Spark Dataframe 时出现意外失败的断言错误 - 发现重复的重写属性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆