比较两个数据帧Pyspark [英] Compare two dataframes Pyspark

查看:56
本文介绍了比较两个数据帧Pyspark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试比较列数相同的两个数据帧,即两个数据帧中以id作为关键列的4列

I'm trying to compare two data frames with have same number of columns i.e. 4 columns with id as key column in both data frames

df1 = spark.read.csv("/path/to/data1.csv")
df2 = spark.read.csv("/path/to/data2.csv")

现在我要向DF2追加新列,即column_names,这是列值与df1不同的列的列表

Now I want to append new column to DF2 i.e. column_names which is the list of the columns with different values than df1

df2.withColumn("column_names",udf())

DF1

+------+---------+--------+------+
|   id | |name  | sal  | Address |
+------+---------+--------+------+
|     1|  ABC   | 5000 | US      |
|     2|  DEF   | 4000 | UK      |
|     3|  GHI   | 3000 | JPN     |
|     4|  JKL   | 4500 | CHN     |
+------+---------+--------+------+

DF2:

+------+---------+--------+------+
|   id | |name  | sal  | Address |
+------+---------+--------+------+
|     1|  ABC   | 5000 | US      |
|     2|  DEF   | 4000 | CAN     |
|     3|  GHI   | 3500 | JPN     |
|     4|  JKL_M | 4800 | CHN     |
+------+---------+--------+------+

现在我要DF3

DF3:

+------+---------+--------+------+--------------+
|   id | |name  | sal  | Address | column_names |
+------+---------+--------+------+--------------+
|     1|  ABC   | 5000 | US      |  []          |
|     2|  DEF   | 4000 | CAN     |  [address]   |
|     3|  GHI   | 3500 | JPN     |  [sal]       |
|     4|  JKL_M | 4800 | CHN     |  [name,sal]  |
+------+---------+--------+------+--------------+

我看到了这样的问题,

I saw this SO question, How to compare two dataframe and print columns that are different in scala. Tried that, however the result is different.

我正在考虑通过UDF函数将行从每个数据帧传递到udf,并逐列比较并返回列列表.但是,为此,两个数据帧都应按排序顺序,以便将相同的id行发送到udf.此处的分类操作成本很高.有解决办法吗?

I'm thinking of going with a UDF function by passing row from each dataframe to udf and compare column by column and return column list. However for that both the data frames should be in sorted order so that same id rows will be sent to udf. Sorting is costly operation here. Any solution?

推荐答案

假设我们可以使用id来连接这两个数据集,我认为不需要UDF.这可以通过使用内部联接 array array 和 array_remove 函数等等.

Assuming that we can use id to join these two datasets I don't think that there is a need for UDF. This could be solved just by using inner join, array and array_remove functions among others.

首先让我们创建两个数据集:

First let's create the two datasets:

df1 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "UK"],
  [3, "GHI", 3000, "JPN"],
  [4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])

df2 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "CAN"],
  [3, "GHI", 3500, "JPN"],
  [4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])

首先,我们在两个数据集之间进行内部联接,然后为除 id 之外的每一列生成条件 df1 [col]!= df2 [col] .当列不相等时,我们返回列名,否则返回一个空字符串.条件列表将包含一个数组中的项目,最后我们从中删除空项目:

First we do an inner join between the two datasets then we generate the condition df1[col] != df2[col] for each column except id. When the columns aren't equal we return the column name otherwise an empty string. The list of conditions will consist the items of an array from which finally we remove the empty items:

from pyspark.sql.functions import col, array, when, array_remove

# get conditions for all columns except id
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c != 'id']

select_expr =[
                col("id"), 
                *[df2[c] for c in df2.columns if c != 'id'], 
                array_remove(array(*conditions_), "").alias("column_names")
]

df1.join(df2, "id").select(*select_expr).show()

# +---+-----+----+-------+------------+
# | id| name| sal|Address|column_names|
# +---+-----+----+-------+------------+
# |  1|  ABC|5000|     US|          []|
# |  3|  GHI|3500|    JPN|       [sal]|
# |  2|  DEF|4000|    CAN|   [Address]|
# |  4|JKL_M|4800|    CHN| [name, sal]|
# +---+-----+----+-------+------------+

这篇关于比较两个数据帧Pyspark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆