如何加入两个 DataFrame 并更改缺失值的列? [英] How to join two DataFrames and change column for missing values?

查看:33
本文介绍了如何加入两个 DataFrame 并更改缺失值的列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

val df1 = sc.parallelize(Seq(
   ("a1",10,"ACTIVE","ds1"),
   ("a1",20,"ACTIVE","ds1"),
   ("a2",50,"ACTIVE","ds1"),
   ("a3",60,"ACTIVE","ds1"))
).toDF("c1","c2","c3","c4")`

val df2 = sc.parallelize(Seq(
   ("a1",10,"ACTIVE","ds2"),
   ("a1",20,"ACTIVE","ds2"),
   ("a1",30,"ACTIVE","ds2"),
   ("a1",40,"ACTIVE","ds2"),
   ("a4",20,"ACTIVE","ds2"))
).toDF("c1","c2","c3","c5")`


df1.show()

// +---+---+------+---+
// | c1| c2|    c3| c4|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds1|
// | a1| 20|ACTIVE|ds1|
// | a2| 50|ACTIVE|ds1|
// | a3| 60|ACTIVE|ds1|
// +---+---+------+---+

df2.show()
// +---+---+------+---+
// | c1| c2|    c3| c5|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds2|
// | a1| 20|ACTIVE|ds2|
// | a1| 30|ACTIVE|ds2|
// | a1| 40|ACTIVE|ds2|
// | a4| 20|ACTIVE|ds2|
// +---+---+------+---+

我的要求是:我需要加入两个数据帧.我的输出数据帧应该包含来自 df1 的所有记录以及来自 df2 的记录,这些记录不在 df1 中,仅用于匹配的c1".我从 df2 中提取的记录应该在c3"列更新为非活动状态.

My requirement is: I need to Join both dataframes. My output dataframe should be having all the records from df1 and also the records from df2 which are not in df1 for the matching "c1" only. And the records which I pull from df2 should be updated to Inactive at column "c3".

在这个例子中,只有c1"的匹配值是 a1.因此,我需要从 df2 中提取 c2=30 和 40 条记录并使它们处于非活动状态.

In this example only matching value of "c1" is a1. So I need to pull c2=30 and 40 records from df2 and make them INACTIVE.

这是输出.

df_output.show()

// +---+---+--------+---+
// | c1| c2|    c3  | c4|
// +---+---+--------+---+
// | a1| 10|ACTIVE  |ds1|
// | a1| 20|ACTIVE  |ds1|
// | a2| 50|ACTIVE  |ds1|
// | a3| 60|ACTIVE  |ds1|
// | a1| 30|INACTIVE|ds1|
// | a1| 40|INACTIVE|ds1|
// +---+---+--------+---+

谁能帮我做这件事.

推荐答案

首先,一件小事.我为 df2 中的列使用了不同的名称:

First, a small thing. I use different names for the columns in df2:

val df2 = sc.parallelize(...).toDF("d1","d2","d3","d4")

没什么大不了的,但这让我更容易推理.

No big deal, but this made things easier for me to reason about.

现在是有趣的东西.为了清楚起见,我会有点冗长:

Now for the fun stuff. I am going to be a bit verbose for the sake of clarity:

val join = df1
.join(df2, df1("c1") === df2("d1"), "inner")
.select($"d1", $"d2", $"d3", lit("ds1").as("d4"))
.dropDuplicates

这里我执行以下操作:

  • df1df2c1d1 列上的内连接
  • 选择 df2 列并在最后一列中简单地硬编码"ds1 以替换 ds2
  • 删除重复项
  • Inner join between df1 and df2 on the c1 and d1 columns
  • Select the df2 columns and simply "hardcode" ds1 in the last column to replace ds2
  • Drop duplicates

这基本上只是过滤掉df2没有df1中的c1中有相应键的所有内容.

This basically just filters out everything in df2 that does not have a corresponding key in c1 in df1.

接下来我比较:

val diff = join
.except(df1)
.select($"d1", $"d2", lit("INACTIVE").as("d3"), $"d4")

这是一个基本的集合操作,它在join中查找df1not的所有内容.这些是要停用的项目,因此我选择了所有列,但将第三列替换为硬编码的 INACTIVE 值.

This is a basic set operation that finds everything in join that is not in df1. These are the items to deactivate, so I select all the columns but replace the third with a hardcoded INACTIVE value.

剩下的就是把它们放在一起:

All that's left is to put them all together:

df1.union(diff)

这只是将 df1 与我们之前计算的停用值表结合以产生最终结果:

This simply combines df1 with the table of deactivated values we calculated earlier to produce the final result:

+---+---+--------+---+
| c1| c2|      c3| c4|
+---+---+--------+---+
| a1| 10|  ACTIVE|ds1|
| a1| 20|  ACTIVE|ds1|
| a2| 50|  ACTIVE|ds1|
| a3| 60|  ACTIVE|ds1|
| a1| 30|INACTIVE|ds1|
| a1| 40|INACTIVE|ds1|
+---+---+--------+---+

再说一次,您不需要所有这些中间值.我只是详细地帮助跟踪整个过程.

And again, you don't need all these intermediate values. I just was verbose to help trace through the process.

这篇关于如何加入两个 DataFrame 并更改缺失值的列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆