延迟 Kafka Streams 消费 [英] Delaying Kafka Streams consuming
问题描述
我正在尝试使用 Kafka Streams(即不是简单的 Kafka 消费者)从重试主题中读取先前未能处理的事件.我希望从重试主题中消费,如果处理仍然失败(例如,如果外部系统停机),我希望将事件放回重试主题.因此,我不想立即继续消费,而是在消费前等待一段时间,以免暂时无法处理的消息淹没系统.
I'm trying to use Kafka Streams (i.e. not a simple Kafka Consumer) to read from a retry topic with events that have previously failed to process. I wish to consume from the retry topic, and if processing still fails (for example, if an external system is down), I wish to put the event back on the retry topic. Thus I don't want to keep consuming immediately, but instead wait a while before consuming, in order to not flood the systems with messages that are temporarily unprocessable.
简化,代码目前执行此操作,我希望为其添加延迟.
Simplified, the code currently does this, and I wish to add a delay to it.
fun createTopology(topic: String): Topology {
val streamsBuilder = StreamsBuilder()
streamsBuilder.stream<String, ArchivalData>(topic, Consumed.with(Serdes.String(), ArchivalDataSerde()))
.peek { key, msg -> logger.info("Received event for key $key : $msg") }
.map { key, msg -> enrich(msg) }
.foreach { key, enrichedMsg -> archive(enrichedMsg) }
return streamsBuilder.build()
}
我曾尝试使用 Window Delay 来设置它,但没有设法让它工作.我当然可以在 peek
中休眠,但这会导致线程挂起,而且听起来不是一个非常干净的解决方案.
I have tried to use Window Delay to set this up, but have not managed to get it to work. I could of course do a sleep inside a peek
, but that would leave a thread hanging and does not sound like a very clean solution.
延迟如何工作的确切细节对我的用例来说并不是非常重要.例如,所有这些都可以正常工作:
The exact details of how the delay would work is not terribly important to my use case. For example, all of these would work fine:
- 在过去
x
秒内关于该主题的所有事件都被一次性消耗.在开始/完成消费后,流等待x
秒后再次消费 - 每个事件在被放到话题上后
x
秒被处理 - 流消耗消息,每个事件之间的延迟为
x
秒
- All events on the topic in the past
x
seconds are all consumed at once. After it begins / finishes to consume, the stream waitsx
seconds before consuming again - Every event is processed
x
seconds after being put on the topic - The stream consumes messages with a delay of
x
seconds between every event
如果有人能提供几行 Kotlin 或 Java 代码来完成上述任何一项,我将不胜感激.
I would be very grateful if someone could provide a few lines of Kotlin or Java code that would accomplish any of the above.
推荐答案
您不能真正暂停使用 Kafka Streams 从输入主题中读取—延迟"的唯一方法是调用睡眠",但正如您提到的,这会阻塞整个线程,不是一个好的解决方案.
You cannot really pause reading from the input topic using Kafka Streams—the only way to "delay" would be to call a "sleep", but as you mentioned, that blocks the whole thread and is not a good solution.
但是,您可以做的是使用有状态处理器,例如,process()
(带有附加的状态存储)而不是 foreach()
.如果重试失败,您不会将记录放回输入主题,而是将其放入存储中,并注册一个具有所需重试延迟的标点符号.如果标点被触发,你重试,如果重试成功,你从存储中删除条目并取消标点;否则,您会等到标点符号再次触发.
However, what you can do is to use a stateful processor, e.g., process()
(with attached state store) instead of foreach()
. If the retry fails, you don't put the record back into the input topic, but you put it into the store and also register a punctuation with desired retry delay. If the punctuation fires, you retry and if the retry succeeds, you delete the entry from the store and cancel the punctuation; otherwise, you wait until the punctuation fires again.
这篇关于延迟 Kafka Streams 消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!