使用嵌套列将两个Spark Dataframe连接起来并更新其中一列 [英] Join two spark Dataframe using the nested column and update one of the columns

查看:49
本文介绍了使用嵌套列将两个Spark Dataframe连接起来并更新其中一列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理一些要求,其中需要从CSV文件中获取一个小表,如下所示:

I am working on some requirement in which I am getting one small table in from of CSV file as follow:

root
 |-- ACCT_NO: string (nullable = true)
 |-- SUBID: integer (nullable = true)
 |-- MCODE: string (nullable = true)
 |-- NewClosedDate: timestamp (nullable = true

我们还有一个非常大的Avro外部配置单元表,它以如下方式存储在HDFS中:

We also have a very big external hive table in form of Avro which is stored in HDFS as follow:

root
-- accountlinks: array (nullable = true)
 |    |    |-- account: struct (nullable = true)
 |    |    |    |-- acctno: string (nullable = true)
 |    |    |    |-- subid: string (nullable = true)
 |    |    |    |-- mcode: string (nullable = true)
 |    |    |    |-- openeddate: string (nullable = true)
 |    |    |    |-- closeddate: string (nullable = true)

现在,要求是基于csv文件中的三列查找外部配置单元表: ACCT_NO-SUBID-MCODE .如果匹配,请使用CSV文件中的 NewClosedDate 更新 accountlinks.account.closeddate .

Now, the requirement is to look up the the external hive table based on the three columns from the csv file : ACCT_NO - SUBID - MCODE. If it matches, updates the accountlinks.account.closeddate with NewClosedDate from CSV file.

我已经编写了以下代码来分解所需的列并将其与小表连接起来,但是我不确定如何使用NewClosedDate更新closedate字段(当前对于所有帐户持有人为空),因为closeddate是一个嵌套列,我不能轻易使用withColumn来填充它.此外,由于这些文件已链接到某些外部配置单元表,因此无法更改架构和列名称.

I have already written the following code to explode the required columns and join it with the small table but I am not really sure how to update the closeddate field ( this is currently null for all account holders) with NewClosedDate because closeddate is a nested column and I cannot easily use withColumn to populate it. In addition to that the schema and column names cannot be changed as these files are linked to some external hive table.

 val df = spark.sql("select * from db.table where archive='201711'")

val ExtractedColumn = df 
.coalesce(150)
.withColumn("ACCT_NO", explode($"accountlinks.account.acctno"))
.withColumn("SUBID", explode($"accountlinks.account.acctsubid"))
.withColumn("MCODE", explode($"C.mcode"))

val ReferenceData = spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("file.csv")

val FinalData = ExtractedColumn.join(ReferenceData, Seq("ACCT_NO","SUBID","MCODE") , "left")

推荐答案

您需要做的就是分解 accountlinks 数组,然后像这样加入2个数据框:

All you need is to explode the accountlinks array and then join the 2 dataframes like this:

val explodedDF = df.withColumn("account", explode($"accountlinks"))
val joinCondition = $"ACCT_NO" === $"account.acctno" && $"SUBID" === $"account.subid" && $"MCODE" === $"account.mcode"
val joinDF  = explodedDF.join(ReferenceData, joinCondition, "left")

现在,您可以像下面那样更新 account 结构列,并收集列表以返回数组结构:

Now you can update the account struct column like below, and collect list to get back the array structure:

val FinalData = joinDF.withColumn("account", 
                                  struct($"account.acctno", $"account.subid", $"account.mcode", 
                                         $"account.openeddate", $"NewClosedDate".alias("closeddate")
                                        )
                                 )
                        .groupBy().agg(collect_list($"account").alias("accountlinks"))

这个想法是用来自 account 的所有字段创建一个新结构,除了从 NewCloseDate 列中获得的 closedate .

The idea is to create a new struct with all the fields from account except closedate that you get from NewCloseDate column.

如果该结构包含许多字段,则可以使用for理解来获取除截止日期以外的所有其他字段,以防止将它们全部键入.

If the struct contains many fields you can use a for-comprehension to get all the fields except the close date to prevent typing them all.

这篇关于使用嵌套列将两个Spark Dataframe连接起来并更新其中一列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆