如何在火花结构化流连接中选择最新记录 [英] How to pick latest record in spark structured streaming join

查看:22
本文介绍了如何在火花结构化流连接中选择最新记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 spark-sql 2.4.x 版本,Cassandra-3.x 版本使用的是 datastax-spark-cassandra-connector.与 kafka 一起.

<块引用>

我有货币样本的汇率元数据如下:

val ratesMetaDataDf = Seq((欧元"、5/10/2019"、1.130657"、美元")、(欧元"、5/9/2019"、1.13088"、美元")).toDF("base_code", "rate_date","rate_value","target_code").withColumn("rate_date", to_date($"rate_date" ,"MM/dd/yyyy").cast(DateType)).withColumn("rate_value", $"rate_value".cast(DoubleType))

<块引用>

我从 kafka 主题收到的销售记录是,如下(示例):

val kafkaDf = Seq((15,2016, 4, 100.5,"USD","2021-01-20","EUR",221.4)).toDF("companyId", "year","quarter","sales","code","calc_date","c_code","prev_sales")

要计算 "prev_sales" ,我需要得到它的 "c_code" 各自的 "rate_value",它最接近 "calc_date",即 rate_date"

我正在做的事情如下

val w2 = Window.orderBy(col("rate_date") desc)val rateJoinResultDf = kafkaDf.as("k").join(ratesMetaDataDf.as("e")).where( ($"k.c_code" === $"e.base_code") &&($"rate_date" < $"calc_date")).orderBy($"rate_date" desc).withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row").withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast(DoubleType)).select("companyId", "year","quarter","sales","code","calc_date","prev_sales")

在上面为给定的rate_date"获取最近的记录(即5/10/2019" from ratesMetaDataDf )我正在使用 window 和 row_number 函数并按desc"对记录进行排序.

<块引用>

但是在 spark-sql 流中它导致了如下错误

<代码>"流数据帧/数据集不支持排序,除非它是在完整输出模式下的聚合数据帧/数据集上;;"

那么如何获取第一条记录加入上面.

解决方案

用下面的代码替换你最后的代码部分.此代码将执行 left join 并计算日期差 calc_date &rate_date.下一个 Window 函数,我们将选择最近的日期并使用您的计算方法计算 prev_sales.

<块引用>

请注意,我添加了一个过滤条件 filter(col("diff") >=0),它将处理 calc_date <;rate_date.我加了几个更多记录以更好地了解此案例.

scala>rateMetaDataDf.show+---------+----------+----------+-----------+|基本代码|rate_date|rate_value|target_code|+---------+----------+----------+-----------+|欧元|2019-05-10|1.130657|美元||欧元|2019-05-09|1.12088|美元||欧元|2019-12-20|1.1584|美元|+---------+----------+----------+-----------+标度>kafkaDf.show+---------+----+-------+-----+----+----------+-------+-----------+|companyId|年|季度|销售额|代码|calc_date|c_code|prev_sales|+---------+----+-------+-----+----+----------+-------+-----------+|15|2016|4|100.5|美元|2021-01-20|欧元|221.4||15|2016|4|100.5|美元|2019-06-20|欧元|221.4|+---------+----+-------+-----+----+----------+-------+-----------+标度>val W = Window.partitionBy("companyId","year","quarter","sales","code","calc_date","c_code","prev_sales").orderBy(col(差异"))标度>val rateJoinResultDf= kafkaDf.alias("k").join(ratesMetaDataDf.alias("r"), col("k.c_code") === col("r.base_code"), "left";).withColumn("diff",datediff(col("calc_date"), col("rate_date"))).filter(col(diff") >= 0).withColumn(关闭日期", row_number.over(W)).filter(col(closedate") === 1).drop("diff", "closedate").withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast("Decimal(14,5)")).select(companyId", year", 季度", sales", code", calc_date", prev_sales")标度>rateJoinResultDf.show+---------+----+-------+-----+----+----------+----------+|companyId|年|季度|销售额|代码|calc_date|prev_sales|+---------+----+-------+-----+----+----------+----------+|15|2016|4|100.5|美元|2021-01-20|256.46976||15|2016|4|100.5|美元|2019-06-20|250.32746|+---------+----+-------+-----+----+----------+----------+

I am using spark-sql 2.4.x version , datastax-spark-cassandra-connector for Cassandra-3.x version. Along with kafka.

I have rates meta data of currency sample as below :

val ratesMetaDataDf = Seq(
     ("EUR","5/10/2019","1.130657","USD"),
     ("EUR","5/9/2019","1.13088","USD")
     ).toDF("base_code", "rate_date","rate_value","target_code")
.withColumn("rate_date", to_date($"rate_date" ,"MM/dd/yyyy").cast(DateType))
.withColumn("rate_value", $"rate_value".cast(DoubleType))

Sales records which i received from kafka topic is , as (sample) below :

val kafkaDf = Seq((15,2016, 4, 100.5,"USD","2021-01-20","EUR",221.4)
                                ).toDF("companyId", "year","quarter","sales","code","calc_date","c_code","prev_sales")

To calculate "prev_sales" , I need get its "c_code" 's respective "rate_value" which is nearest to the "calc_date" i.e. rate_date"

Which i am doing as following

val w2 = Window.orderBy(col("rate_date") desc)
val rateJoinResultDf = kafkaDf.as("k").join(ratesMetaDataDf.as("e"))
                                   .where( ($"k.c_code" === $"e.base_code") &&
                                           ($"rate_date" < $"calc_date")
                                         ).orderBy($"rate_date" desc)
                                  .withColumn("row",row_number.over(w2))
                                  .where($"row" === 1).drop("row")
                                  .withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast(DoubleType))
                                  .select("companyId", "year","quarter","sales","code","calc_date","prev_sales")

In the above to get nearest record (i.e. "5/10/2019" from ratesMetaDataDf ) for given "rate_date" I am using window and row_number function and sorting the records by "desc".

But in the spark-sql streaming it is causing the error as below

"
Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;"

So how to fetch first record to join in the above.

解决方案

Replace your last code part with below code. This code will do left join and calculate date difference calc_date & rate_date. Next Window function we will pick nearest date and calculate prev_sales by using same your calculation.

Please note I have added one filter condition filter(col("diff") >=0), which will handle a case of calc_date < rate_date. I have added few more records for better understanding of this case.

scala> ratesMetaDataDf.show
+---------+----------+----------+-----------+
|base_code| rate_date|rate_value|target_code|
+---------+----------+----------+-----------+
|      EUR|2019-05-10|  1.130657|        USD|
|      EUR|2019-05-09|   1.12088|        USD|
|      EUR|2019-12-20|    1.1584|        USD|
+---------+----------+----------+-----------+


scala> kafkaDf.show
+---------+----+-------+-----+----+----------+------+----------+
|companyId|year|quarter|sales|code| calc_date|c_code|prev_sales|
+---------+----+-------+-----+----+----------+------+----------+
|       15|2016|      4|100.5| USD|2021-01-20|   EUR|     221.4|
|       15|2016|      4|100.5| USD|2019-06-20|   EUR|     221.4|
+---------+----+-------+-----+----+----------+------+----------+


scala>  val W = Window.partitionBy("companyId","year","quarter","sales","code","calc_date","c_code","prev_sales").orderBy(col("diff"))

scala>   val rateJoinResultDf= kafkaDf.alias("k").join(ratesMetaDataDf.alias("r"), col("k.c_code") === col("r.base_code"), "left")
                                         .withColumn("diff",datediff(col("calc_date"), col("rate_date")))
                                         .filter(col("diff") >= 0)
                                         .withColumn("closedate", row_number.over(W))
                                         .filter(col("closedate") === 1)
                                         .drop("diff", "closedate")
                                         .withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast("Decimal(14,5)"))
                                         .select("companyId", "year","quarter","sales","code","calc_date","prev_sales")

scala> rateJoinResultDf.show
+---------+----+-------+-----+----+----------+----------+
|companyId|year|quarter|sales|code| calc_date|prev_sales|
+---------+----+-------+-----+----+----------+----------+
|       15|2016|      4|100.5| USD|2021-01-20| 256.46976|
|       15|2016|      4|100.5| USD|2019-06-20| 250.32746|
+---------+----+-------+-----+----+----------+----------+ 

这篇关于如何在火花结构化流连接中选择最新记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆