如何在 Spark 2.3.0 中进行自联接?什么是正确的语法? [英] How to do a self join in Spark 2.3.0? What is the correct syntax?

查看:27
本文介绍了如何在 Spark 2.3.0 中进行自联接?什么是正确的语法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();   
jdf.createOrReplaceTempView("table")
val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")
resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

我得到以下异常

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]

我已经把代码改成了这个

I have changed the code to this

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table1")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

这是有效的.但是,我不相信这是我正在寻找的解决方案.我希望能够使用原始 SQL 进行自连接,但不能像上面的代码那样制作数据帧的额外副本.那还有什么办法吗?

And this works. However, I don't believe it is the solution I am looking for. I want to be able to do a self join using raw SQL but not by making additional copies of a dataframe like the code above. so is there any other way?

推荐答案

这是一个已知问题,将在 2.4.0 中修复.请参阅 https://issues.apache.org/jira/browse/SPARK-23406.现在你可以避免加入相同的 DataFrame 对象.

This is a known issue and will be fixed in 2.4.0. See https://issues.apache.org/jira/browse/SPARK-23406. Right now you can just avoid to join the same DataFrame objects.

这篇关于如何在 Spark 2.3.0 中进行自联接?什么是正确的语法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆