jdbc源代码和Spark结构化流 [英] jdbc source and spark structured streaming

查看:52
本文介绍了jdbc源代码和Spark结构化流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在使用Spark结构的流媒体,对此感到非常满意.我目前正在执行ETL类型的活动.我有一个基于PostgreSQL的表,其中包含元数据类型信息,我希望与流数据框架合并.

I've been using spark structured streaming, and quite happy with it. I'm currently performing an ETL type activity. I have a table based in PostgreSQL which contains metadata type information, that I'm looking to merge with the streaming data frame.

metadataDf = spark \
    .read \
    .jdbc(url=jdbcUrl, \
        table = query,
        properties = connectionProperties) 

streamDF = spark \
    .readStream \
    .option("maxFilesPerTrigger",10) \
    .option("latestFirst",True) \
    .schema(sensorSchema) \
    .json(sensorPath)

joined_metadata = streamDF \
    .join(metadataDf,["uid"],"left")

write_query = joined_metadata \
    .writeStream \
    .trigger(processingTime=arbitarytime) \
    .format("json") \
    .option("checkpointLocation",chkploc) \
    .option("path",write_path) \
    .start()

postgresql上的元数据表可以每两天更新一次.我想知道,是否需要通过某种while循环来容纳spark上的表刷新.还是Spark的懒惰评估会照顾到这种特定情况.

The metadata table on postgresql can get updated once every couple of days. I was wondering, do I need to accommodate the refresh of the table on spark with some kind of while loop. Or does spark's lazy eval takes care of that particular scenario.

谢谢

推荐答案

只要程序正在运行,Spark就会处理它.如果您未指定触发间隔,Spark将连续处理此流(每个批处理在最后一个处理完成后开始)

Spark will take care of it as long as the program is running. If you don't specify a trigger interval Spark will process this stream continuously (each batch starts once the last has finished)

要指定触发间隔,请参见 df.trigger()

To specify a trigger interval see df.trigger() here and in the docs

:)

这篇关于jdbc源代码和Spark结构化流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆