如何使用解析/窗口功能Spark中的Java? [英] How to use Analytic/Window Functions in Spark Java?

查看:806
本文介绍了如何使用解析/窗口功能Spark中的Java?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在星火的Java运用分析/窗函数LAST_VALUE。

I'm trying to use analytical/window function last_value in Spark Java.

select sno, name, addr1, addr2, run_dt, 
last_value(addr1 ignore nulls) over (partition by sno, name, addr1, addr2, run_dt order by beg_ts , end_ts rows between unbounded preceding and unbounded following  ) as last_addr1
from daily

我们要实现这个查询ñ星火爪哇(不使用HiveSQLContext):

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.WindowFunctionFrame;

    SparkConf conf = new SparkConf().setMaster("local").setAppName("Agg");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);


    JavaRDD<Stgdailydtl> daily = sc.textFile("C:\\Testing.txt").map(
              new Function<String, Stgdailydtl>() {
                  private static final long serialVersionUID = 1L;
                public Stgdailydtl call(String line) throws Exception {
                  String[] parts = line.split(",");

                  Stgdailydtl daily = new Stgdailydtl();
                  daily.setSno(Integer.parseInt(parts[0].trim()));
                  .....

                  return daily;
                }
              });
DataFrame schemaDailydtl = sqlContext.createDataFrame(daily, Stgdailydtl.class);
schemaDailydtl.registerTempTable("daily");
WindowSpec ws = Window.partitionBy("sno, name, addr1, addr2, run_dt").orderBy("beg_ts , end_ts").rowsBetween(0, 100000);
DataFrame df = sqlContext.sql("select sno, name, addr1, addr2, run_dt "
            + "row_number() over(partition by mach_id, msrmt_gbl_id, msrmt_dsc, elmt_dsc, end_cptr_dt order by beg_cptr_ts, end_cptr_ts) from daily ");

}

}

Exception in thread "main" java.lang.RuntimeException: [1.110] failure: ``union'' expected but `(' found

select stg.mach_id, stg.msrmt_gbl_id, stg.msrmt_dsc, stg.elmt_dsc, stg.elmt_dsc_grp_concat, row_number() over(partition by mach_id, msrmt_gbl_id, msrmt_dsc, elmt_dsc, end_cptr_dt order by beg_cptr_ts, end_cptr_ts) from stgdailydtl stg 
                                                                                                             ^
    at scala.sys.package$.error(package.scala:27)

我不明白如何使用WindowSpec / Window对象。请对此建议。
感谢您的帮助。

I could not understand how to use WindowSpec/Window object. Please suggest on this. Thanks for your help

推荐答案

您是混合数据框中语法和SQL语法 - 特别是你创建了一个WindowSpec但当时没有使用它

You're mixing dataframe syntax and sql syntax - specifically you created a WindowSpec but then didn't use it.

导入 org.apache.spark.sql.functions 来获得 ROW_NUMBER 函数,然后创建列你想选择:

Import org.apache.spark.sql.functions to get the row_number function, then create the column that you're trying to select:

Column rowNum = functions.row_number().over(ws)

然后使用数据框API选择它:

Then select it using the dataframe API:

df.select(each, column, you, want, rowNum)

我的语法可能会稍微偏离,我已经习惯了的Scala或Python,但要点是类似的东西。

My syntax may be slightly off, I'm used to scala or python, but the gist is something like that.

这篇关于如何使用解析/窗口功能Spark中的Java?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆