Spark 结构化流文件源起始偏移量 [英] Spark Structured Streaming File Source Starting Offset

查看:25
本文介绍了Spark 结构化流文件源起始偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法指定 Spark Structured File Stream Source 的起始偏移量?

Is there a way how to specify starting offset for Spark Structured File Stream Source ?

我正在尝试从 HDFS 流式传输镶木地板:

I am trying to stream parquets from HDFS:

spark.sql("SET spark.sql.streaming.schemaInference=true")

spark.readStream
  .parquet("/tmp/streaming/")
  .writeStream
  .option("checkpointLocation", "/tmp/streaming-test/checkpoint")
  .format("parquet")
  .option("path", "/tmp/parquet-sink")
  .trigger(Trigger.ProcessingTime(1.minutes))
  .start()

如我所见,第一次运行是处理路径中检测到的所有可用文件,然后将偏移量保存到检查点位置并仅处理新文件,即接受年龄并且不存在于文件可见地图中.

As I see, the first run is processing all available files detected in path, then save offsets to checkpoint location and process only new files, that is accept age and does not exist in files seen map.

我正在寻找一种方法,如何指定起始偏移量或时间戳或选项数量,以便在第一次运行时不处理所有可用文件.

I'm looking for a way, how to specify starting offset or timestamp or number of options to do not process all available files in the first run.

有我正在寻找的方法吗?

Is there a way I'm looking for?

推荐答案

谢谢@jayfah,据我所知,我们可以使用以下技巧模拟 Kafka 的最新"起始偏移量:

Thanks @jayfah, as far as I found, we might simulate Kafka 'latest' starting offsets using following trick:

  1. 使用带有检查点、虚拟接收器的 option("latestFirst", true)option("maxFilesPerTrigger", "1") 运行警告流和巨大的处理时间.这样,预热流会将最新的文件时间戳保存到检查点.

  1. Run warn-up stream with option("latestFirst", true) and option("maxFilesPerTrigger", "1") with checkpoint, dummy sink and huge processing time. This way, warm-up stream will save latest file timestamp to checkpoint.

使用 option("maxFileAge", "0") 运行真实流,使用相同检查点位置的真实接收器.在这种情况下,流将仅处理新的可用文件.

Run real stream with option("maxFileAge", "0"), real sink using the same checkpoint location. In this case stream will process only newly available files.

很可能这不是生产所必需的,并且有更好的方法,例如重新组织数据路径等,但这样至少我找到了我的问题的答案.

Most probably that is not necessary for production and there is better way, e.g. reorganize data paths etc., but this way at least I found as answer for my question.

这篇关于Spark 结构化流文件源起始偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆