jdbc源代码和Spark结构化流 [英] jdbc source and spark structured streaming
问题描述
我一直在使用Spark结构的流媒体,对此感到非常满意.我目前正在执行ETL类型的活动.我有一个基于PostgreSQL的表,其中包含元数据类型信息,我希望与流数据框架合并.
I've been using spark structured streaming, and quite happy with it. I'm currently performing an ETL type activity. I have a table based in PostgreSQL which contains metadata type information, that I'm looking to merge with the streaming data frame.
metadataDf = spark \
.read \
.jdbc(url=jdbcUrl, \
table = query,
properties = connectionProperties)
streamDF = spark \
.readStream \
.option("maxFilesPerTrigger",10) \
.option("latestFirst",True) \
.schema(sensorSchema) \
.json(sensorPath)
joined_metadata = streamDF \
.join(metadataDf,["uid"],"left")
write_query = joined_metadata \
.writeStream \
.trigger(processingTime=arbitarytime) \
.format("json") \
.option("checkpointLocation",chkploc) \
.option("path",write_path) \
.start()
postgresql上的元数据表可以每两天更新一次.我想知道,是否需要通过某种while循环来容纳spark上的表刷新.还是Spark的懒惰评估会照顾到这种特定情况.
The metadata table on postgresql can get updated once every couple of days. I was wondering, do I need to accommodate the refresh of the table on spark with some kind of while loop. Or does spark's lazy eval takes care of that particular scenario.
谢谢
推荐答案
只要程序正在运行,Spark就会处理它.如果您未指定触发间隔,Spark将连续处理此流(每个批处理在最后一个处理完成后开始)
Spark will take care of it as long as the program is running. If you don't specify a trigger interval Spark will process this stream continuously (each batch starts once the last has finished)
要指定触发间隔,请参见 df.trigger()
在文档中
To specify a trigger interval see df.trigger()
here and in the docs
:)
这篇关于jdbc源代码和Spark结构化流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!