如何在流批量流连接中定义连接条件? [英] How to define a join condition in stream-batch streaming join?

查看:84
本文介绍了如何在流批量流连接中定义连接条件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Java 1.8中使用spark-sql-2.4.1v.和kafka版本spark-sql-kafka-0-10_2.11_2.4.3.

I am using spark-sql-2.4.1v with java 1.8. and kafka versions spark-sql-kafka-0-10_2.11_2.4.3.

我正在尝试将静态数据框架(即元数据)与另一个流数据框架合并,如下所示:

I am trying to join static data frame i.e. meta-data with another streaming dataframe as below:

 Dataset<Row> streamingDs  = //read from kafka topic
 Dataset<Row> staticDf=  //read from oracle meta-data table.


Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
                      "c.code = i.industry_code"
                      );

即使我在数据帧中有各自的列数据,其给出的以下错误也是如此.

Even though I have respective columns data in the dataframes its giving below error.

线程"main"中的异常org.apache.spark.sql.AnalysisException:无法在联接的左侧解析USING列c.code = i.industry_code.左侧列:[id,tranasctionDate,companyName,code];

Exception in thread "main" org.apache.spark.sql.AnalysisException: USING column c.code = i.industry_code cannot be resolved on the left side of the join. The left-side columns: [id, tranasctionDate, companyName,code];

我尝试如下:

Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
                      "c.code = i.industry_code",
                      "inner"
                      );

这会产生以下错误:

Dataset类型的join(Dataset,String)方法不适用于参数(Dataset,String,String)

The method join(Dataset, String) in the type Dataset is not applicable for the arguments (Dataset, String, String)

推荐答案

tl; dr c.code = i.industry_code被认为是要连接的列的名称(不是连接表达式).

tl;dr c.code = i.industry_code is considered the name of a column to join on (not a join expression).

将代码更改如下:

streamingDs.as("c").join(staticDf.as("i")) // INNER JOIN is the default
  .where("c.code = i.industry_code")

这篇关于如何在流批量流连接中定义连接条件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆