基于第二个 Dataframe 的 DataFrame 过滤 [英] DataFrame filtering based on second Dataframe

查看:42
本文介绍了基于第二个 Dataframe 的 DataFrame 过滤的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 Spark SQL,我有两个数据帧,它们是从一个创建的,例如:

Using Spark SQL, I have two dataframes, they are created from one, such as:

df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]

我想过滤 df1,如果它的路径"的一部分是 df2 中的任何路径.因此,如果 df1 具有路径为a/b/c/d/e"的行,我会找出 df2 中是否是路径为a/b/c"的行.在 SQL 中它应该像

I want to filter df1, if part of its 'path' is any path in df2. So if df1 has row with path 'a/b/c/d/e' I would find out if in df2 is a row that path is 'a/b/c'. In SQL it should be like

SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)

其中 udf 是用户定义的函数,它缩短了 df1 的原始路径.幼稚的解决方案是使用 JOIN 然后过滤结果,但速度很慢,因为 df1 和 df2 各有超过 1000 万行.

where udf is user defined function that shorten original path from df1. Naive solution is to use JOIN and then filter result, but it is slow, since df1 and df2 have each more than 10mil rows.

我也尝试了以下代码,但首先我必须从 df2 创建广播变量

I also tried following code, but firstly I had to create broadcast variable from df2

static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext 

sqlContext.createDataFrame(df1.javaRDD().filter(
         new Function<Row, Boolean>(){
             @Override
             public Boolean call(Row row) throws Exception {
                 String foo = shortenPath(row.getString(0));
                 return bdf.value().filter("path = '"+foo+"'").count()>0;
             }
          }
    ), myClass.class)

我遇到的问题是在评估返回值/执行 df2 过滤时 Spark 卡住了.

the problem I'm having is that Spark got stuck when the return was evaluated/when filtering of df2 was performed.

我想知道如何使用两个数据框来做到这一点.我真的很想避免 JOIN.有什么想法吗?

I would like to know how to work with two dataframes to do this. I really want to avoid JOIN. Any ideas?

编辑>>

在我的原始代码中,df1 有别名first"和 df2second".此连接不是笛卡尔连接,也不使用广播.

In my original code df1 has alias 'first' and df2 'second'. This join is not cartesian, and it also does not use broadcast.

df1 = df1.as("first");
df2 = df2.as("second");

    df1.join(df2, df1.col("first.path").
                                lt(df2.col("second.path"))
                                      , "left_outer").
                    filter("isPrefix(first.path, second.path)").
                    na().drop("any");

isPrefix 是 udf

isPrefix is udf

UDF2 isPrefix = new UDF2<String, String, Boolean>() {
        @Override
        public Boolean call(String p, String s) throws Exception {
            //return true if (p.length()+4==s.length()) and s.contains(p)
        }};

shortenPath - 剪切路径中的最后两个字符

shortenPath - it cuts last two characters in path

UDF1 shortenPath = new UDF1<String, String>() {
        @Override
        public String call(String s) throws Exception {
            String[] foo = s.split("/");
            String result = "";
            for (int i = 0; i < foo.length-2; i++) {
                result += foo[i];
                if(i<foo.length-3) result+="/";
            }
            return result;
        }
    };

记录示例.路径是唯一的.

Example of records. Path is unique.

a/a/a/b/c abc
a/a/a     qwe
a/b/c/d/e abc
a/b/c     qwe
a/b/b/k   foo
a/b/f/a   bar
...

所以 df1 包含

a/a/a/b/c abc
a/b/c/d/e abc
...

和 df2 构成

a/a/a     qwe
a/b/c     qwe
...

推荐答案

作为使用子查询实现 IN 的一种可能方式,可以使用 LEFT SEMI JOIN:

As a possible way of implementing IN with subquery, the LEFT SEMI JOIN can be used:

    JavaSparkContext javaSparkContext = new JavaSparkContext("local", "testApp");
    SQLContext sqlContext = new SQLContext(javaSparkContext);
    StructType schema = DataTypes.createStructType(new StructField[]{
            DataTypes.createStructField("path", DataTypes.StringType, false),
            DataTypes.createStructField("value", DataTypes.StringType, false)
    });
    // Prepare First DataFrame
    List<Row> dataForFirstDF = new ArrayList<>();
    dataForFirstDF.add(RowFactory.create("a/a/a/b/c", "abc"));
    dataForFirstDF.add(RowFactory.create("a/b/c/d/e", "abc"));
    dataForFirstDF.add(RowFactory.create("x/y/z", "xyz"));
    DataFrame df1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF), schema);
    // 
    df1.show();
    //
    // +---------+-----+
    // |     path|value|
    // +---------+-----+
    // |a/a/a/b/c|  abc|
    // |a/b/c/d/e|  abc|
    // |    x/y/z|  xyz|
    // +---------+-----+

    // Prepare Second DataFrame
    List<Row> dataForSecondDF = new ArrayList<>();
    dataForSecondDF.add(RowFactory.create("a/a/a", "qwe"));
    dataForSecondDF.add(RowFactory.create("a/b/c", "qwe"));
    DataFrame df2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF), schema);

    // Use left semi join to filter out df1 based on path in df2
    Column pathContains = functions.column("firstDF.path").contains(functions.column("secondDF.path"));
    DataFrame result = df1.as("firstDF").join(df2.as("secondDF"), pathContains, "leftsemi");

    //
    result.show();
    //
    // +---------+-----+
    // |     path|value|
    // +---------+-----+
    // |a/a/a/b/c|  abc|
    // |a/b/c/d/e|  abc|
    // +---------+-----+

此类查询的物理计划如下所示:

The Physical Plan of such query will look like this:

== Physical Plan ==
Limit 21
 ConvertToSafe
  LeftSemiJoinBNL Some(Contains(path#0, path#2))
   ConvertToUnsafe
    Scan PhysicalRDD[path#0,value#1]
   TungstenProject [path#2]
    Scan PhysicalRDD[path#2,value#3]

它将使用 LeftSemiJoinBNL 进行实际的连接操作,它应该在内部广播值.从更多细节参考 Spark 中的实际实现 - LeftSemiJoinBNL.scala

It will use the LeftSemiJoinBNL for the actual join operation, which should broadcast values internally. From more details refer to the actual implementation in Spark - LeftSemiJoinBNL.scala

附言我不太明白删除最后两个字符的必要性,但如果需要 - 可以这样做,就像@zero323 建议的那样(使用 regexp_extract).

P.S. I didn't quite understand the need for removing the last two characters, but if that's needed - it can be done, like @zero323 proposed (using regexp_extract).

这篇关于基于第二个 Dataframe 的 DataFrame 过滤的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆