如何在流批量流连接中定义连接条件? [英] How to define a join condition in stream-batch streaming join?
问题描述
我在Java 1.8中使用spark-sql-2.4.1v.和kafka版本spark-sql-kafka-0-10_2.11_2.4.3.
I am using spark-sql-2.4.1v with java 1.8. and kafka versions spark-sql-kafka-0-10_2.11_2.4.3.
我正在尝试将静态数据框架(即元数据)与另一个流数据框架合并,如下所示:
I am trying to join static data frame i.e. meta-data with another streaming dataframe as below:
Dataset<Row> streamingDs = //read from kafka topic
Dataset<Row> staticDf= //read from oracle meta-data table.
Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
"c.code = i.industry_code"
);
即使我在数据帧中有各自的列数据,其给出的以下错误也是如此.
Even though I have respective columns data in the dataframes its giving below error.
线程"main"中的异常org.apache.spark.sql.AnalysisException:无法在联接的左侧解析USING列c.code = i.industry_code
.左侧列:[id,tranasctionDate,companyName,code];
Exception in thread "main" org.apache.spark.sql.AnalysisException: USING column c.code = i.industry_code
cannot be resolved on the left side of the join. The left-side columns: [id, tranasctionDate, companyName,code];
我尝试如下:
Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
"c.code = i.industry_code",
"inner"
);
这会产生以下错误:
Dataset类型的join(Dataset,String)方法不适用于参数(Dataset,String,String)
The method join(Dataset, String) in the type Dataset is not applicable for the arguments (Dataset, String, String)
推荐答案
tl; dr c.code = i.industry_code
被认为是要连接的列的名称(不是连接表达式).
tl;dr c.code = i.industry_code
is considered the name of a column to join on (not a join expression).
将代码更改如下:
streamingDs.as("c").join(staticDf.as("i")) // INNER JOIN is the default
.where("c.code = i.industry_code")
这篇关于如何在流批量流连接中定义连接条件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!