即使未能发布到 Kafka Streams 中的输出主题,消费者偏移量是否已提交? [英] Is consumer offset commited even when failing to post to output topic in Kafka Streams?

查看:28
本文介绍了即使未能发布到 Kafka Streams 中的输出主题,消费者偏移量是否已提交?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我有一个 Kafka 流应用程序无法发布到某个主题(因为该主题不存在),它是提交消费者偏移量并继续,还是会在同一条消息上循环直到它可以解析输出主题?根据我的观察,该应用程序仅打印一个错误并且运行良好.

尝试发布到主题时的错误示例:

获取关联 ID 为 80 的元数据时出错:{super.cool.test.topic=UNKNOWN_TOPIC_OR_PARTITION}

在我看来,它只会旋转相同的消息,直到问题得到解决才能不丢失数据?我找不到关于默认行为是什么的明确答案.我们还没有将自动提交设置为关闭或类似的设置,大多数设置都设置为默认值.

我之所以这么问,是因为我们不希望出现健康检查正常的情况(应用程序正在运行,同时打印错误以记录日志),而我们只是扔掉了大量 Kafka 消息.

解决方案

Kafka Streams 不会在这种情况下提交偏移量,因为它提供至少一次处理保证(实际上,甚至不可能重新配置 Kafka Streams不同——只有更强的恰好一次保证是可能的).此外,Kafka Streams 始终禁用消费者的自动提交(并且不允许您启用它),因为 Kafka Streams 自行管理提交偏移量.

如果你使用默认设置运行,生产者实际上应该抛出一个异常并且相应的线程应该死——如果一个线程死了,你可以通过注册KafkaStreams#uncaughtExceptionHandler()获得一个回调.

你也可以观察KafkaStreams#state()(或者注册一个回调KafkaStreams#setStateListener()).如果所有线程都已死,则状态将转到 DEAD(注意,在旧版本中存在一个错误,在这种情况下,状态仍为 RUNNING:https://issues.apache.org/jira/browse/KAFKA-5372)

因此,应用程序不应该处于健康状态,Kafka Streams 不会重试输入消息而是停止处理,您需要重新启动客户端.重新启动时,它会重新读取失败的输入消息并重新尝试写入输出主题.

如果想让Kafka Streams重试,需要增加producer配置reties,避免producer在内部抛出异常重试写入.如果生产者写入缓冲区已满,这最终可能会阻止"进一步处理.

If I have a Kafka stream application that fails to post to a topic (because the topic does not exist) does it commit the consumer offset and continue, or will it loop on the same message until it can resolve the output topic? The application merely prints an error and runs fine otherwise from what I can observe.

An example of the error when trying to post to topic:

Error while fetching metadata with correlation id 80 : {super.cool.test.topic=UNKNOWN_TOPIC_OR_PARTITION}

In my mind it would just spin on the same message until the issue is resolved in order to not lose data? I could not find a clear answer on what the default behavior is. We haven't set autocommit to off or anything like that, most of the settings are set to the default.

I am asking as we don't want to end up in a situation where the health check is fine (application is running while printing errors to log) and we are just throwing away tons of Kafka messages.

解决方案

Kafka Streams will not commit the offsets for this case, as it provides at-least-once processing guarantees (in fact, it's not even possible to reconfigure Kafka Streams differently -- only stronger exactly-once guarantees are possible). Also, Kafka Streams disables auto-commit on the consumer always (and does not allow you to enable it), as Kafka Streams manages committing offset itself.

If you run with default setting, the producer should actually throw an exception and the corresponding thread should die -- you can get a callback if a thread dies, by registering KafkaStreams#uncaughtExceptionHandler().

You can also observe KafkaStreams#state() (or register a callback KafkaStreams#setStateListener()). The state will go to DEAD if all threads are dead (note, there was a bug in older version for which the state was still RUNNING for this case: https://issues.apache.org/jira/browse/KAFKA-5372)

Hence, the application should not be in a healthy state and Kafka Streams will not retry the input message but stop processing and you would need to restart the client. On restart, it would re-read the failed input message an re-try to write to the output topic.

If you want Kafka Streams to retry, you need to increase the producer config reties to avoid that the producer throws an exception and retries writing internally. This may "block" further processing eventually if producer write buffer becomes full.

这篇关于即使未能发布到 Kafka Streams 中的输出主题,消费者偏移量是否已提交?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆