在Spark结构流2.3.0中连接两个流时,左外部连接不发出空值 [英] Left outer join not emitting null values when joining two streams in spark structured streaming 2.3.0

查看:58
本文介绍了在Spark结构流2.3.0中连接两个流时,左外部连接不发出空值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

两个流上的左外部联接不发出空输出.它只是在等待将记录添加到另一个流中.使用套接字流对此进行测试.在我们的例子中,我们要发出的空值与id或/不匹配且不属于时间范围条件的记录

Left outer join on two streams not emitting the null outputs. It is just waiting for the record to be added to the other stream. Using socketstream to test this. In our case, we want to emit the records with null values which don't match with id or/and not fall in time range condition

水印和间隔的详细信息是:

Details of the watermarks and intervals are:

val ds1Map = ds1
.selectExpr("Id AS ds1_Id", "ds1_timestamp")
.withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
.selectExpr("Id AS ds2_Id", "ds2_timestamp")
.withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
expr(
""" ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= ds1_timestamp + interval 1 minutes """),
"leftOuter")

val query = output.select("*")
.writeStream

.outputMode(OutputMode.Append)
.format("console")
.option("checkpointLocation", "./spark-checkpoints/")
.start()

query.awaitTermination()

谢谢.

推荐答案

杰克,感谢您的答复.问题/问题是一年半以前的,恢复我去年所做的工作花了一些时间:),我在两个主题上运行流2流连接,其中一个主题具有更多的10K sec消息,并且它在具有4.67 TB总内存和1614个VCor的Spark集群上运行.

Hi Jack and thanks for the response. question/issue was a year and a half ago and it took some time to recover what I did last year:), I run stream 2 stream join on two topics one with more the 10K sec msg and it was running on Spark cluster with 4.67 TB total memory with 1614 VCors total.

实施是简单的结构化流式流2流连接,如Spark官方文档中所示:

Implementation was simple structured streaming stream 2 stream join as in Spark official documents :

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)

它运行了几个小时,直到OOM.经过调查,我发现了HDFSBackedStateStoreProvider中的火花清除状态和spark中打开的Jira的问题:

It was running for a few hours until OOM. After investigation, I found out the issue about spark clean state in HDFSBackedStateStoreProvider and the open Jira in spark :

https://issues.apache.org/jira/browse/SPARK-23682

spark结构化流的内存问题

这就是为什么我移回并在Spark Streaming 2.1.1 mapWithState中实现流到流连接的原因.

And this is why I moved back and implemented stream to stream join in spark streaming 2.1.1 mapWithState.

Thx

这篇关于在Spark结构流2.3.0中连接两个流时,左外部连接不发出空值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆