基于流的应用程序中的受控/手动错误/恢复处理 [英] Controlled/manual error/recovery handling in stream-based applications

查看:20
本文介绍了基于流的应用程序中的受控/手动错误/恢复处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个基于 Apache Flink 的应用程序,它使用 Apache Kafka 进行输入和输出.可能这个应用程序将被移植到 Apache Spark,所以我也将它添加为标签,问题保持不变.

I am working on an application based on Apache Flink, which makes use of Apache Kafka for input and out. Possibly this application will be ported to Apache Spark, so I have added this as a tag as well, and the question remains the same.

我要求所有通过 kafka 接收的传入消息必须按顺序处理,并安全地存储在持久层(数据库)中,并且不能丢失任何消息.

I have the requirement that all incoming messages received via kafka must be processed in-order, as well safely be stored in a persistence layer (database), and no message must get lost.

这个应用程序中的流部分相当琐碎/小,因为主要逻辑将归结为:

The streaming-part in this application is rather trivial/small, as the main logic will boil down to something like:

environment.addSource(consumer)    // 1) DataStream[Option[Elem]]
  .filter(_.isDefined)             // 2) discard unparsable messages
  .map(_.get)                      // 3) unwrap Option
  .map(InputEvent.fromXml(_))      // 4) convert from XML to internal representation
  .keyBy(_.id)                     // 5) assure in-order processing on logical-key level
  .map(new DBFunction)             // 6) database lookup, store of update and additional enrichment
  .map(InputEvent.toXml(_))        // 7) convert back to XML
  .addSink(producer)               // 8) attach kafka producer sink

现在,在此管道中,可能发生几种错误情况:

Now, during this pipeline, several error situations could occur:

  • 数据库变得不可用(关闭、表空间已满、...)
  • 由于逻辑错误(来自列格式)而无法存储更改
  • 由于代理不可用,kafka 生产者无法发送消息

可能还有其他情况.

现在我的问题是,如何在这些情况下确保上述一致性,而实际上我必须执行以下操作:

Now my question is, how can I assure consistency as per the above in those situations, when I in fact would have to do something like:

  1. Stream-Operator 6) 检测到问题(数据库不可用)
  2. 必须恢复DBFunction 对象的数据库连接,这可能仅在几分钟后才能成功
  3. 这意味着必须暂停整个处理,最好是暂停整个管道,以便将传入的消息大量加载到内存中
  4. 恢复数据库后继续处理.处理必须与在 1) 处遇到问题的消息完全一致地恢复
  1. Stream-Operator 6) detects a problem (DB unavailable)
  2. The DB-connection of the DBFunction object must be recovered, which might only succeed after some minutes
  3. This means that overall processing must be suspended, at best for the whole pipeline, so that incoming messages are lot loaded into memory
  4. Resume processing after database has been recovered. Processing must resume exactly with the message which encountered the problem at 1)

现在我知道至少有两个关于故障处理的工具:

Now I know that there is at least 2 tools regarding failure handling:

  1. kafka 消费者抵消
  2. apache flink 检查点

但是,在搜索文档时,我没有看到如何在单个运算符内的流处理过程中使用其中任何一个.

However, searching the docs, I fail to see how either of those could be used in the middle of stream processing from within a single operator.

那么,在流式应用程序中进行细粒度错误处理和恢复的推荐策略是什么?

So, what would be the recommended strategies for fine-grained error handling and recovery in a streaming application?

推荐答案

几点:

keyBy 不会帮助确保按顺序处理.如果有的话,它可能会交错来自不同 Kafka 分区的事件(它们可能在每个分区内是有序的),从而在以前不存在的地方创建无序.在不了解您打算使用多少 FlinkKafkaConsumer 实例、每个实例将消耗多少分区、键如何在 Kafka 分区中分布以及您为什么这么想的情况下,很难更具体地评论如何保证有序处理keyBy 是必要的——但如果你正确设置,保留顺序可能是可以实现的.reinterpretAsKeyedStream 可能会有所帮助在这里,但是这个功能很难理解,而且很难正确使用.

The keyBy is not going to help ensure in-order processing. If anything, it may interleave events from different Kafka partitions (which may have been in-order within each partition), thereby creating out-of-orderness where it didn't previously exist. It's hard to comment more specifically about how you might guarantee in-order processing without understanding how many FlinkKafkaConsumer instances you intend to use, how many partitions each one will be consuming from, how the keys are distributed across the Kafka partitions, and why you think a keyBy is necessary -- but if you set things up correctly, preserving order may be achievable. reinterpretAsKeyedStream can be helpful here, but this feature is difficult to understand, and tricky to use correctly.

你可以使用 Flink 的 AsyncFunction 以容错、仅一次的方式管理与外部数据库的连接.

You could use Flink's AsyncFunction to manage the connection to the external DB in a fault tolerant, exactly-once, manner.

Flink 不以系统的方式支持细粒度恢复——它的检查点是整个分布式集群状态的全局快照,并且被设计为在恢复期间作为一个整体的、自洽的快照使用.如果您的工作失败,通常唯一的办法是从检查点重新启动,这将涉及回滚输入队列(到检查点中存储的偏移量),重播这些偏移量后的事件,重新发出数据库查找(异步函数将自动执行),并使用 kafka 事务来实现端到端的恰好一次语义.然而,在尴尬的并行作业的情况下,有时可以利用 细粒度恢复.

Flink doesn't support fine-grained recovery in a systematic way -- its checkpoints are global snapshots of the state of the entire distributed cluster, and are designed to be used during recovery as a monolithic, self-consistent, snapshot. If your job fails, normally the only recourse is to restart from a checkpoint, which will involve rewinding input queues (to the offsets stored in the checkpoint), replaying the events since those offsets, re-issuing the DB lookups (which the async function will do automatically), and using kafka transactions to achieve end-to-end exactly once semantics. However, in the case of embarrassingly parallel jobs, it is sometimes possible to take advantage of fine-grained recovery.

这篇关于基于流的应用程序中的受控/手动错误/恢复处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆