如何连接两个DataFrame并更改缺少值的列? [英] How to join two DataFrames and change column for missing values?

查看:118
本文介绍了如何连接两个DataFrame并更改缺少值的列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

val df1 = sc.parallelize(Seq(
   ("a1",10,"ACTIVE","ds1"),
   ("a1",20,"ACTIVE","ds1"),
   ("a2",50,"ACTIVE","ds1"),
   ("a3",60,"ACTIVE","ds1"))
).toDF("c1","c2","c3","c4")`

val df2 = sc.parallelize(Seq(
   ("a1",10,"ACTIVE","ds2"),
   ("a1",20,"ACTIVE","ds2"),
   ("a1",30,"ACTIVE","ds2"),
   ("a1",40,"ACTIVE","ds2"),
   ("a4",20,"ACTIVE","ds2"))
).toDF("c1","c2","c3","c5")`


df1.show()

// +---+---+------+---+
// | c1| c2|    c3| c4|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds1|
// | a1| 20|ACTIVE|ds1|
// | a2| 50|ACTIVE|ds1|
// | a3| 60|ACTIVE|ds1|
// +---+---+------+---+

df2.show()
// +---+---+------+---+
// | c1| c2|    c3| c5|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds2|
// | a1| 20|ACTIVE|ds2|
// | a1| 30|ACTIVE|ds2|
// | a1| 40|ACTIVE|ds2|
// | a4| 20|ACTIVE|ds2|
// +---+---+------+---+

我的要求是:我需要同时加入两个数据框. 我的输出数据帧应具有来自df1的所有记录以及来自df2的记录,这些记录不在df1中,仅用于匹配"c1".而且我从df2中提取的记录应该在"c3"列更新为无效".

My requirement is: I need to Join both dataframes. My output dataframe should be having all the records from df1 and also the records from df2 which are not in df1 for the matching "c1" only. And the records which I pull from df2 should be updated to Inactive at column "c3".

在此示例中,只有"c1"的匹配值是a1.因此,我需要从df2中提取c2 = 30和40条记录,并使它们变为非活动状态.

In this example only matching value of "c1" is a1. So I need to pull c2=30 and 40 records from df2 and make them INACTIVE.

这是输出.

df_output.show()

// +---+---+--------+---+
// | c1| c2|    c3  | c4|
// +---+---+--------+---+
// | a1| 10|ACTIVE  |ds1|
// | a1| 20|ACTIVE  |ds1|
// | a2| 50|ACTIVE  |ds1|
// | a3| 60|ACTIVE  |ds1|
// | a1| 30|INACTIVE|ds1|
// | a1| 40|INACTIVE|ds1|
// +---+---+--------+---+

任何人都可以帮助我做到这一点.

Can any one help me to do this.

推荐答案

首先,一件小事.我为df2中的列使用了不同的名称:

First, a small thing. I use different names for the columns in df2:

val df2 = sc.parallelize(...).toDF("d1","d2","d3","d4")

没什么大不了的,但这使我更容易推理.

No big deal, but this made things easier for me to reason about.

现在提供有趣的东西.为了清楚起见,我将有点冗长:

Now for the fun stuff. I am going to be a bit verbose for the sake of clarity:

val join = df1
.join(df2, df1("c1") === df2("d1"), "inner")
.select($"d1", $"d2", $"d3", lit("ds1").as("d4"))
.dropDuplicates

我在这里执行以下操作:

Here I do the following:

  • c1d1列上df1df2之间的内部联接
  • 选择df2列,并在最后一列中简单地硬编码" ds1来替换ds2
  • 删除重复项
  • Inner join between df1 and df2 on the c1 and d1 columns
  • Select the df2 columns and simply "hardcode" ds1 in the last column to replace ds2
  • Drop duplicates

这基本上只是过滤掉df2中所有没有df1中的c1中具有相应键的内容.

This basically just filters out everything in df2 that does not have a corresponding key in c1 in df1.

下一个我比较:

val diff = join
.except(df1)
.select($"d1", $"d2", lit("INACTIVE").as("d3"), $"d4")

这是一个基本的设置操作,可以找到join中所有不是df1中的 的内容.这些是要停用的项目,因此我选择了所有列,但将第三个列替换为硬编码的INACTIVE值.

This is a basic set operation that finds everything in join that is not in df1. These are the items to deactivate, so I select all the columns but replace the third with a hardcoded INACTIVE value.

剩下的就是将它们放在一起:

All that's left is to put them all together:

df1.union(diff)

这简单地将df1与我们先前计算的停用值表结合在一起,以产生最终结果:

This simply combines df1 with the table of deactivated values we calculated earlier to produce the final result:

+---+---+--------+---+
| c1| c2|      c3| c4|
+---+---+--------+---+
| a1| 10|  ACTIVE|ds1|
| a1| 20|  ACTIVE|ds1|
| a2| 50|  ACTIVE|ds1|
| a3| 60|  ACTIVE|ds1|
| a1| 30|INACTIVE|ds1|
| a1| 40|INACTIVE|ds1|
+---+---+--------+---+

同样,您不需要所有这些中间值.我只是很冗长,无法帮助跟踪整个过程.

And again, you don't need all these intermediate values. I just was verbose to help trace through the process.

这篇关于如何连接两个DataFrame并更改缺少值的列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆