带有火花流的多个writeStream [英] multiple writeStream with spark streaming

查看:74
本文介绍了带有火花流的多个writeStream的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark Streaming,并且在尝试实现多个写入流时遇到一些问题.下面是我的代码

I am working with spark streaming and I am facing some issues trying to implement multiple writestreams. Below is my code

DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)

其中writeStreamer的定义如下:

where writeStreamer is defined as follows :

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {

  val query = input
                .writeStream
                .format("orc")
                .option("checkpointLocation", checkPointFolder)
                .option("path", output)
                .outputMode(OutputMode.Append)
                .start()

  query.awaitTermination()
}

我面临的问题是,只有第一个表是用spark writeStream编写的,其他所有表都没有任何反应.您对此有任何想法吗?

the problem I am facing is that only the first table is written with spark writeStream , nothing happens for all other tables . Do you have any idea about this please ?

推荐答案

query.awaitTermination()应该在创建最后一个流之后之后完成.

query.awaitTermination() should be done after the last stream is created.

writeStreamer 函数,以返回 StreamingQuery 而不是那时的awaitTermination(因为它是 blocking ):

writeStreamer function can be modified to return a StreamingQuery and not awaitTermination at that point (as it is blocking):

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
  input
    .writeStream
    .format("orc")
    .option("checkpointLocation", checkPointFolder)
    .option("path", output)
    .outputMode(OutputMode.Append)
    .start()
}

那么您将拥有:

val query1 = DataWriter.writeStreamer(...)
val query2 = DataWriter.writeStreamer(...)
val query3 = DataWriter.writeStreamer(...)

query3.awaitTermination()

这篇关于带有火花流的多个writeStream的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆