比较两个数据框的行,以找到匹配的列数1 [英] Compare rows of two dataframes to find the matching column count of 1's

查看:67
本文介绍了比较两个数据框的行,以找到匹配的列数1的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有2个具有相同架构的数据帧,我需要比较数据帧的行,并在两个数据帧中保留至少有1列且值为1的行计数

I have 2 dataframes with same schema, i need to compare the rows of dataframes and keep a count of rows with at-least one column with value 1 in both the dataframes

现在,我正在制作一个行列表,然后比较2个列表以查找即使列表中两个值均等于1都等于

Right now i am making a list of the rows and then comparing the 2 lists to find even if one value is equal in both the list and equal to 1

rowOgList = []
for row in cat_og_df.rdd.toLocalIterator():
    rowOgDict = {}
    for cat in categories:
        rowOgDict[cat] = row[cat]
    rowOgList.append(rowOgDict)

#print(rowOgList[0])

rowPredList = []
for row in prob_df.rdd.toLocalIterator():
    rowPredDict = {}
    for cat in categories:
        rowPredDict[cat] = row[cat]
    rowPredList.append(rowPredDict)

但是在这里,当我在庞大的数据集上尝试该函数时,函数rdd.tolocalIterator会给我一个堆空间错误. 例如: 这是第一个数据帧

But here the function rdd.tolocalIterator gives me a heap space error when i try it on a huge dataset. for example: this is the 1st dataframe

+-------+-------+-------+-------+
|column1|column2|column3|column4|
+-------+-------+-------+-------+
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      1|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      1|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      1|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      1|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
|      1|      0|      0|      0|
|      0|      0|      1|      0|
|      0|      0|      0|      0|
|      0|      0|      0|      0|
+-------+-------+-------+-------+

这是第二个数据帧

+-------+-------+-------+-------+
|column1|column2|column3|column4|
+-------+-------+-------+-------+
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      0|      0|      1|      1|
|      0|      0|      1|      1|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      0|      0|      1|      1|
|      1|      0|      1|      0|
|      0|      0|      1|      1|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
|      1|      0|      1|      0|
+-------+-------+-------+-------+

这里的第9,11,17,18行至少有一列具有相同的值,并且该值等于1 所以这里的count = 4

here rows 9,11,17,18 have at least one column with same value and that value as 1 so here the count = 4

可以通过任何优化的方式完成此操作,谢谢.

Can this be done in any optimized way, Thanks.

推荐答案

注意:如pault所述,如果您具有连接两个数据帧的唯一行索引,则此方法会更好.否则,某些Spark操作可能无法保证行顺序.

Note : As mentioned by pault, this will work better if you have unique row indices that connect both dataframes. Otherwise, the row orders may not be guaranteed in some Spark operations.

(1)设置环境和一些示例数据.

(1) Setup the environment and some sample data.

import numpy as np

from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F

df1 = spark.createDataFrame([
    (0, 0, 1),
    (1, 0, 0),
    (0, 0, 1)
], ["column1", "column2", "column3"])

df2 = spark.createDataFrame([
    (0, 0, 0),
    (1, 0, 1),
    (0, 0, 1)
], ["column1", "column2", "column3"])

(2)将所有列收集到Spark向量中.

(2) Collect all columns into a Spark vector.

assembler = VectorAssembler(
    inputCols=["column1", "column2", "column3"],
    outputCol="merged_col")

df1_merged = assembler.transform(df1)
df2_merged = assembler.transform(df2)

df1_merged.show()

+-------+-------+-------+-------------+
|column1|column2|column3|   merged_col|
+-------+-------+-------+-------------+
|      0|      0|      1|[0.0,0.0,1.0]|
|      1|      0|      0|[1.0,0.0,0.0]|
|      0|      0|      1|[0.0,0.0,1.0]|
+-------+-------+-------+-------------+

(3)获取非零元素的行和列索引.在Spark Vector的RDD上使用numpy.nonzero().

(3) Get the row and column index of non-zero elements. Using numpy.nonzero() on RDD of Spark Vector.

def get_nonzero_index(args):
    (row, index) = args
    np_arr = np.array(row.merged_col)
    return (index, np_arr.nonzero()[0].tolist())

df1_ind_rdd = df1_merged.rdd.zipWithIndex().map(get_nonzero_index)
df2_ind_rdd = df2_merged.rdd.zipWithIndex().map(get_nonzero_index)

df1_ind_rdd.collect()
[(0, [2]), (1, [0]), (2, [2])]

df2_ind_rdd.collect()
[(0, []), (1, [0, 2]), (2, [2])]

(4)然后,您可以轻松地对这2个Python列表进行比较.

(4) You can then do your comparison on these 2 Python lists easily.

请注意,如果您拥有的行数很大,则此方法将无效(由于collect).在这种情况下,您将需要通过对2个数据帧进行联接来完成Spark中的所有处理.

Note that this method will not be efficient (due to collect) if the number of rows you have is very large. In that case, you will want to do all processing in Spark by doing a join on the 2 dataframes.

(5)要完全在Spark中进行匹配,您可以尝试以下依赖于行索引联接的方法.

(5) To do the matching purely in Spark, you can try the methods below that rely on a join on row index.

df1_index = spark.createDataFrame(df1_ind_rdd, ["row_index_1", "column_index_1"])
df2_index = spark.createDataFrame(df2_ind_rdd, ["row_index_2", "column_index_2"])

df_joined = df1_index.join(df2_index, df1_index.row_index_1 == df2_index.row_index_2)

然后展开列表,以便我们在每一行上都有一个元素.

Then expand the list so that we get an element on each row.

df_exploded = df_joined.withColumn("column_index_exp_1", F.explode(df_joined.column_index_1))\
                            .withColumn("column_index_exp_2", F.explode(df_joined.column_index_2))

检查两列之间是否匹配,最后将其转换为整数以求和.

Check for match between the two columns and finally convert into integer for summing.

df_match_bool = df_exploded.withColumn("match_bool", df_exploded.column_index_exp_1 == df_exploded.column_index_exp_2)

df_match_int = df_match_bool.withColumn("match_integer", df_match_bool.match_bool.cast("long"))

df_match_bool.show()
+-----------+--------------+-----------+--------------+------------------+------------------+----------+
|row_index_1|column_index_1|row_index_2|column_index_2|column_index_exp_1|column_index_exp_2|match_bool|
+-----------+--------------+-----------+--------------+------------------+------------------+----------+
|          1|           [0]|          1|        [0, 2]|                 0|                 0|      true|
|          1|           [0]|          1|        [0, 2]|                 0|                 2|     false|
|          2|           [2]|          2|           [2]|                 2|                 2|      true|
+-----------+--------------+-----------+--------------+------------------+------------------+----------+

df_match_int.groupBy().sum("match_integer").collect()[0][0]
2

这篇关于比较两个数据框的行,以找到匹配的列数1的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆