基于第二数据框数据框滤波 [英] DataFrame filtering based on second Dataframe

查看:194
本文介绍了基于第二数据框数据框滤波的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用火花SQL,我有两个dataframes,它们被从一个创建的,如:

  DF = sqlContext.createDataFrame(...);
DF1 = df.filter(值='ABC'); // [路径,价值]
DF2 = df.filter(值='QWE'); // [路径,价值]

我要过滤DF1,如果它的路径的一部分,是DF2任何路径。
所以,如果DF1已与路径A / B / C / D / E'行我会看看在DF2是行的路径是A / B / C。
在SQL应该像

SELECT * FROM WHERE DF1 UDF(路径)IN(SELECT FROM DF2路径)

在这里UDF是用户定义函数,缩短从DF1原始路径。
幼稚解决方案是使用JOIN然后筛选的结果,但它是缓慢的,因为DF1和DF2各有超过10MIL行

我也试过以下code,但首先我必须创建DF2广播变量

 静态广播LT;数据框> BDF;
BDF = sc.broadcast(DF2); //变量SC是JavaSparkContextsqlContext.createDataFrame(df1.javaRDD()。过滤(
         新功能与LT;行,布尔>(){
             @覆盖
             公共布尔调用(鳞次栉比)抛出异常{
                 串富= shortenPath(row.getString(0));
                 返回bdf.value()过滤器(PATH ='+富+')计数()方式> 0;
             }
          }
    ),myClass.class)

我遇到的问题是收益评估时星火卡住/进行时DF2的过滤。

我想知道如何用两个dataframes努力做到这一点。
我真的想避免JOIN。任何想法?


编辑>>

在我原来的code DF1有别名'第一'和DF2'第二'。这种连接是不是笛卡尔,也没有使用广播。

  DF1 = df1.as(第一);
DF2 = df2.as(第二);    df1.join(DF2,df1.col(first.path)。
                                下(df2.col(second.path))
                                      left_outer)。
                    过滤器(是preFIX(first.path,second.path))。
                    娜()降(任何);

为preFIX是UDF

  UDF2是preFIX =新UDF2<字符串,字符串,布尔值>(){
        @覆盖
        公共布尔调用(串P,String s)将抛出异常{
            //返回true,如果(p.length()+ 4 = = s.length())和s.contains(P)
        }};

shortenPath - 这最后两个字符切入路径

  UDF1 shortenPath =新UDF1<字符串,字符串>(){
        @覆盖
        公共字符串调用(String s)将抛出异常{
            的String []富= s.split(/);
            字符串结果=;
            的for(int i = 0; I< foo.length-2;我++){
                结果+ = foo的[I]
                如果(I< foo.length-3)结果+ =/;
            }
            返回结果;
        }
    };

记录实例。 Path是独一无二的。

  A / A / A / B / C ABC
A / A / A QWE
A / B / C / D / E ABC
A / B / C QWE
A / B / B / K富
A / B / F /酒吧
...

这么DF1 consits

  A / A / A / B / C ABC
A / B / C / D / E ABC
...

和DF2的consits

  A / A / A QWE
A / B / C QWE
...


解决方案

由于左半JOIN实施与子查询中,的可能途径可以用:

JavaSparkContext javaSparkContext =新JavaSparkContext(本地,testApp);
    SQLContext sqlContext =新SQLContext(javaSparkContext);
    StructType模式= DataTypes.createStructType(新StructField [] {
            DataTypes.createStructField(路径,DataTypes.StringType,假),
            DataTypes.createStructField(价值,DataTypes.StringType,FALSE)
    });
    // prepare第一数据框
    清单<行> dataForFirstDF =新的ArrayList<>();
    dataForFirstDF.add(RowFactory.create(A / A / A / B / C,ABC));
    dataForFirstDF.add(RowFactory.create(A / B / C / D / E,ABC));
    dataForFirstDF.add(RowFactory.create(X / Y / Z,某某));
    数据帧DF1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF),模式);
    //
    df1.show();
    //
    // + --------- + ----- +
    // |路径|值|
    // + --------- + ----- +
    // | A / A / A / B / C | ABC |
    // | A / B / C / D / E | ABC |
    // | X / Y / Z | XYZ |
    // + --------- + ----- +    // prepare二数据框
    清单<行> dataForSecondDF =新的ArrayList<>();
    dataForSecondDF.add(RowFactory.create(一/一个/一种,QWE));
    dataForSecondDF.add(RowFactory.create(A / B / C,QWE));
    数据帧DF2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF),模式);    //使用左半加入根据DF2路径过滤掉DF1
    。列pathContains = functions.column(firstDF.path)包含(functions.column(secondDF.path));
    数据帧结果= df1.as(firstDF)加入(df2.as(secondDF),pathContains,leftsemi);    //
    result.show();
    //
    // + --------- + ----- +
    // |路径|值|
    // + --------- + ----- +
    // | A / A / A / B / C | ABC |
    // | A / B / C / D / E | ABC |
    // + --------- + ----- +

这种查询的物理规划将是这样的:

 物理== ==计划
限制21
 ConvertToSafe
  LeftSemiJoinBNL部分(包含(路径#0,路径#2))
   ConvertToUnsafe
    扫描PhysicalRDD [路径#0,值#1]
   TungstenProject [路径#2]
    扫描PhysicalRDD [路径#2,#值3]

它将使用LeftSemiJoinBNL实际连接操作,应该在内部广播值。从更多细节请参考Spark中实际执行 - <一个href=\"https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala\"相对=nofollow> LeftSemiJoinBNL.scala

P.S。我不太明白,需要去除最后两个字符,但如果这是需要的 - 这是可以做到,像(使用 REGEXP_EXTRACT ),提出@ zero323

Using Spark SQL, I have two dataframes, they are created from one, such as:

df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]

I want to filter df1, if part of its 'path' is any path in df2. So if df1 has row with path 'a/b/c/d/e' I would find out if in df2 is a row that path is 'a/b/c'. In SQL it should be like

SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)

where udf is user defined function that shorten original path from df1. Naive solution is to use JOIN and then filter result, but it is slow, since df1 and df2 have each more than 10mil rows.

I also tried following code, but firstly I had to create broadcast variable from df2

static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext 

sqlContext.createDataFrame(df1.javaRDD().filter(
         new Function<Row, Boolean>(){
             @Override
             public Boolean call(Row row) throws Exception {
                 String foo = shortenPath(row.getString(0));
                 return bdf.value().filter("path = '"+foo+"'").count()>0;
             }
          }
    ), myClass.class)

the problem I'm having is that Spark got stuck when the return was evaluated/when filtering of df2 was performed.

I would like to know how to work with two dataframes to do this. I really want to avoid JOIN. Any ideas?


EDIT>>

In my original code df1 has alias 'first' and df2 'second'. This join is not cartesian, and it also does not use broadcast.

df1 = df1.as("first");
df2 = df2.as("second");

    df1.join(df2, df1.col("first.path").
                                lt(df2.col("second.path"))
                                      , "left_outer").
                    filter("isPrefix(first.path, second.path)").
                    na().drop("any");

isPrefix is udf

UDF2 isPrefix = new UDF2<String, String, Boolean>() {
        @Override
        public Boolean call(String p, String s) throws Exception {
            //return true if (p.length()+4==s.length()) and s.contains(p)
        }};

shortenPath - it cuts last two characters in path

UDF1 shortenPath = new UDF1<String, String>() {
        @Override
        public String call(String s) throws Exception {
            String[] foo = s.split("/");
            String result = "";
            for (int i = 0; i < foo.length-2; i++) {
                result += foo[i];
                if(i<foo.length-3) result+="/";
            }
            return result;
        }
    };

Example of records. Path is unique.

a/a/a/b/c abc
a/a/a     qwe
a/b/c/d/e abc
a/b/c     qwe
a/b/b/k   foo
a/b/f/a   bar
...

So df1 consits of

a/a/a/b/c abc
a/b/c/d/e abc
...

and df2 consits of

a/a/a     qwe
a/b/c     qwe
...

解决方案

As a possible way of implementing IN with subquery, the LEFT SEMI JOIN can be used:

    JavaSparkContext javaSparkContext = new JavaSparkContext("local", "testApp");
    SQLContext sqlContext = new SQLContext(javaSparkContext);
    StructType schema = DataTypes.createStructType(new StructField[]{
            DataTypes.createStructField("path", DataTypes.StringType, false),
            DataTypes.createStructField("value", DataTypes.StringType, false)
    });
    // Prepare First DataFrame
    List<Row> dataForFirstDF = new ArrayList<>();
    dataForFirstDF.add(RowFactory.create("a/a/a/b/c", "abc"));
    dataForFirstDF.add(RowFactory.create("a/b/c/d/e", "abc"));
    dataForFirstDF.add(RowFactory.create("x/y/z", "xyz"));
    DataFrame df1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF), schema);
    // 
    df1.show();
    //
    // +---------+-----+
    // |     path|value|
    // +---------+-----+
    // |a/a/a/b/c|  abc|
    // |a/b/c/d/e|  abc|
    // |    x/y/z|  xyz|
    // +---------+-----+

    // Prepare Second DataFrame
    List<Row> dataForSecondDF = new ArrayList<>();
    dataForSecondDF.add(RowFactory.create("a/a/a", "qwe"));
    dataForSecondDF.add(RowFactory.create("a/b/c", "qwe"));
    DataFrame df2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF), schema);

    // Use left semi join to filter out df1 based on path in df2
    Column pathContains = functions.column("firstDF.path").contains(functions.column("secondDF.path"));
    DataFrame result = df1.as("firstDF").join(df2.as("secondDF"), pathContains, "leftsemi");

    //
    result.show();
    //
    // +---------+-----+
    // |     path|value|
    // +---------+-----+
    // |a/a/a/b/c|  abc|
    // |a/b/c/d/e|  abc|
    // +---------+-----+

The Physical Plan of such query will look like this:

== Physical Plan ==
Limit 21
 ConvertToSafe
  LeftSemiJoinBNL Some(Contains(path#0, path#2))
   ConvertToUnsafe
    Scan PhysicalRDD[path#0,value#1]
   TungstenProject [path#2]
    Scan PhysicalRDD[path#2,value#3]

It will use the LeftSemiJoinBNL for the actual join operation, which should broadcast values internally. From more details refer to the actual implementation in Spark - LeftSemiJoinBNL.scala

P.S. I didn't quite understand the need for removing the last two characters, but if that's needed - it can be done, like @zero323 proposed (using regexp_extract).

这篇关于基于第二数据框数据框滤波的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆