Spark结构化将多个WriteStreams流传输到同一接收器 [英] Spark Structured Streaming Multiple WriteStreams to Same Sink

查看:68
本文介绍了Spark结构化将多个WriteStreams流传输到同一接收器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

两个 Writestream 没有按顺序发生.请提出如何使它们依次执行的建议.

Two Writestream to the same database sink is not happening in sequence in Spark Structured Streaming 2.2.1. Please suggest how to make them execute in sequence.

val deleteSink = ds1.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

val UpsertSink = ds2.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

使用上述代码,在 UpsertSink 之后执行 deleteSink .

Using the above code, deleteSink is executed after UpsertSink.

推荐答案

如果要并行运行两个流,则必须使用

If you want to have two streams running in parallel, you have to use

sparkSession.streams.awaitAnyTermination()

代替

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

在您的情况下,除非deleteSink停止或引发异常(如scaladoc中所述),否则UpsertSink将永远不会启动

In your case UpsertSink will never start unless deleteSink will be stopped or an exception thrown, as it says in the scaladoc

等待通过 query.stop()或异常终止 this 查询.如果查询因异常终止,则将引发异常.如果查询已终止,则此方法的所有后续调用都将返回立即(如果查询被 stop()终止),或者抛出异常立即(如果查询已异常终止).

Waits for the termination of this query, either by query.stop() or by an exception. If the query has terminated with an exception, then the exception will be thrown. If the query has terminated, then all subsequent calls to this method will either return immediately (if the query was terminated by stop()), or throw the exception immediately (if the query has terminated with exception).

这篇关于Spark结构化将多个WriteStreams流传输到同一接收器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆