跨接收器的Spark结构化流一致性 [英] Spark structured streaming consistency across sinks

查看:68
本文介绍了跨接收器的Spark结构化流一致性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在以下情况下,我想更好地了解Spark 2.2结构化流的一致性模型:

I'd like to understand better the consistency model of Spark 2.2 structured streaming in the following case :

  • 一个来源(运动)
  • 从此来源向2个不同的接收器发出2个查询:一个文件接收器用于存档目的(S3),另一个接收器用于处理数据(数据库或文件,尚未确定)

我想了解至少在某些情况下,跨接收器是否有任何一致性保证:

I'd like to understand if there's any consistency guarantee across sinks, at least under certain circumstances :

  • 一个水槽能领先另一个水槽吗?还是他们在源上以相同的速度使用数据(因为它是相同的源)?它们可以同步吗?
  • 如果我(正常地)停止了流应用程序,那么两个接收器上的数据是否一致?

原因是我想构建由Jay创建的

The reason is I'd like to build a Kappa-like processing app, with the ability to suspend/shutdown the streaming part when I want to reprocess some history, and, when I resume the streaming, avoid reprocessing something that has already been processed (as being in the history), or missing some (eg. some data that has not been committed to the archive, and then skipped as already processed when the streaming resume)

推荐答案

要记住的重要一件事是,将使用来自2个不同查询的2个接收器,每个查询均独立于源.因此,检查点是针对每个查询完成的.

One important thing to keep in mind is the 2 sinks will be used from 2 distinct queries, each reading independently from the source. So checkpointing is done per-query.

只要在DataStreamWriter上调用start会导致产生 new 查询,并且如果设置了checkpointLocation,则每个查询都会有其自己的检查点以跟踪与接收器的偏移量.

Whenever you call start on a DataStreamWriter that results in a new query and if you set checkpointLocation each query will have its own checkpointing to track offsets from the sink.

val input = spark.readStream....

val query1 = input.select('colA, 'colB)
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir1")
  .start("/path1")

val query2 = input.select('colA, 'colB)
  .writeStream
  .format("csv")
  .option("checkpointLocation", "path/to/checkpoint/dir2")
  .start("/path2")

因此,每个查询都从源中读取数据并独立跟踪偏移量.这也意味着每个查询可以位于输入流的不同偏移处,并且您可以在不影响另一个的情况下重新启动其中一个.

So each query is reading from the source and tracking offsets independently. Which then also means, each query can be at different offsets of the input stream and you can restart either or both without impacting the other.

UPDATE

UPDATE

现在,我想提出另一个建议,即 Databricks Delta 是开源的.我使用的一种常见模式是将数据从上游源直接登陆到仅附加的Delta表中.然后,通过结构化流,您可以有效地订阅表并以增量方式处理新记录. Delta的内部事务日志比基本文件源所需的S3文件列表更有效.这可确保您从S3 vs Kinesis提取的多个查询中具有一致的数据源.

I wanted to make another suggestion now that Databricks Delta is open sourced. A common pattern I've used is landing data from upstream sources directly into an append-only Delta table. Then, with Structured Streaming, you can efficiently subscribe to the table and process the new records incrementally. Delta's internal transaction log is more efficient than S3 file listings required with the basic file source. This ensures you have a consistent source of data across multiple queries, pulling from S3 vs Kinesis.

这篇关于跨接收器的Spark结构化流一致性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆