使用Kafka Streams DSL时如何处理错误且不提交 [英] How to handle error and don't commit when use Kafka Streams DSL

查看:73
本文介绍了使用Kafka Streams DSL时如何处理错误且不提交的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于Kafka Streams,如果我们使用较低级别的处理器API,则可以控制是否提交.因此,如果我们的代码中发生问题,并且我们不想提交此消息.在这种情况下,Kafka将多次重新发送此消息,直到问题解决.

For Kafka Streams, if we use lower-level processor API, we can control to commit or not. So if problems happens in our code, and we don't want to commit this message. In this case, Kafka will redeliver this message multiple times until the problem gets fixed.

但是在使用其更高级别的流DSL API时如何控制是否提交消息?

But how to control whether commit the message when use its higher-level stream DSL API?

资源:

http://docs.confluent.io/2.1. 0-alpha1/streams/developer-guide.html

推荐答案

您的陈述并不完全正确.您无法控制是否提交"-至少不能直接(既不在Processor API中也不在DSL中).您只能使用ProcessorContext#commit()请求其他提交.因此,在调用#commit()之后,Streams会尝试尽快提交,但这不是立即提交.此外,即使您从不调用#commit(),流也将自动提交.您可以通过Streams配置commit.interval.m控制Streams提交间隔(请参见 http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application )

Your statement is not completely true. You cannot "control to commit or not" -- at least not directly (neither in Processor API nor in DSL). You can only use ProcessorContext#commit() to request additional commits. Thus, after a call to #commit() Streams tries to commit as soon as possible, but it's not an immediate commit. Furthermore, Streams will commit automatically even if you never call #commit(). You can control Streams commit interval via Streams configuration commit.interval.m (cf. http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)

在出现问题"的情况下,这取决于您对问题的反应方式:

In case of a "problem", it depends on the type of problem you have how to respond to it:

  • 如果您发现无法解决的问题,则只能抛出异常并停止运行"(参见下文).
  • 如果您有可恢复的错误,则需要在自己的代码中循环"(例如,在Processor#process()KeyValueMapper#apply()中),直到问题解决并且可以成功处理当前消息为止(注意,您可能使用此策略会遇到超时(即异常)的情况,请参阅-消费者配置heartbeat.interval.ms并针对0.10.1 session.timeout.ms
  • if you detect a problem you cannot recover from, you can only throw an exception and "stop the world" (cf. below).
  • if you have a recoverable error, you need to "loop" within your own code (e.g., within Processor#process() or KeyValueMapper#apply() until the problem got resolved and you can successfully process the current message (note, that you might run into a timeout, ie, exception, using this strategy -- cf. consumer configs heartbeat.interval.ms and for 0.10.1 session.timeout.ms [KIP-62])
  • an alternative would be, to put records that cannot be processed right now into an StateStore and process them later on. However, it's hard to get right and also breaks a few Streams assumptions (eg, processing order). It's not recommended to use, and if used, you must be very carefully about the implications

如果有未捕获的异常StreamThread将死亡,并且不会发生提交(您可以注册异常处理程序以获取有关以下内容的通知:

If there is an uncaught exception, StreamThread will die and no commit happens (you can register an exception handler to get notified about this: http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code. If all you StreamThread died, you will need to create a new instance of KafkaStreams to restart you application.

在成功处理一条消息之前,您不得从用户代码中返回,因为如果返回,则Streams会假定该消息已成功处理(因此可能会提交相应的偏移量).关于要点(3),将记录放入特殊的StateStore中以供以后处理被认为是成功"处理的记录.

这篇关于使用Kafka Streams DSL时如何处理错误且不提交的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆