延迟 Kafka Streams 消费 [英] Delaying Kafka Streams consuming

查看:26
本文介绍了延迟 Kafka Streams 消费的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Kafka Streams(即不是简单的 Kafka 消费者)从重试主题中读取先前未能处理的事件.我希望从重试主题中消费,如果处理仍然失败(例如,如果外部系统停机),我希望将事件放回重试主题.因此,我不想立即继续消费,而是在消费前等待一段时间,以免暂时无法处理的消息淹没系统.

I'm trying to use Kafka Streams (i.e. not a simple Kafka Consumer) to read from a retry topic with events that have previously failed to process. I wish to consume from the retry topic, and if processing still fails (for example, if an external system is down), I wish to put the event back on the retry topic. Thus I don't want to keep consuming immediately, but instead wait a while before consuming, in order to not flood the systems with messages that are temporarily unprocessable.

简化,代码目前执行此操作,我希望为其添加延迟.

Simplified, the code currently does this, and I wish to add a delay to it.

fun createTopology(topic: String): Topology {
    val streamsBuilder = StreamsBuilder()

    streamsBuilder.stream<String, ArchivalData>(topic, Consumed.with(Serdes.String(), ArchivalDataSerde()))
        .peek { key, msg -> logger.info("Received event for key $key : $msg") }
        .map { key, msg -> enrich(msg) }
        .foreach { key, enrichedMsg -> archive(enrichedMsg) }

    return streamsBuilder.build()
}

我曾尝试使用 Window Delay 来设置它,但没有设法让它工作.我当然可以在 peek 中休眠,但这会导致线程挂起,而且听起来不是一个非常干净的解决方案.

I have tried to use Window Delay to set this up, but have not managed to get it to work. I could of course do a sleep inside a peek, but that would leave a thread hanging and does not sound like a very clean solution.

延迟如何工作的确切细节对我的用例来说并不是非常重要.例如,所有这些都可以正常工作:

The exact details of how the delay would work is not terribly important to my use case. For example, all of these would work fine:

  1. 在过去 x 秒内关于该主题的所有事件都被一次性消耗.在开始/完成消费后,流等待 x 秒后再次消费
  2. 每个事件在被放到话题上后x秒被处理
  3. 流消耗消息,每个事件之间的延迟为 x
  1. All events on the topic in the past x seconds are all consumed at once. After it begins / finishes to consume, the stream waits x seconds before consuming again
  2. Every event is processed x seconds after being put on the topic
  3. The stream consumes messages with a delay of x seconds between every event

如果有人能提供几行 Kotlin 或 Java 代码来完成上述任何一项,我将不胜感激.

I would be very grateful if someone could provide a few lines of Kotlin or Java code that would accomplish any of the above.

推荐答案

您不能真正暂停使用 Kafka Streams 从输入主题中读取—延迟"的唯一方法是调用睡眠",但正如您提到的,这会阻塞整个线程,不是一个好的解决方案.

You cannot really pause reading from the input topic using Kafka Streams—the only way to "delay" would be to call a "sleep", but as you mentioned, that blocks the whole thread and is not a good solution.

但是,您可以做的是使用有状态处理器,例如,process()(带有附加的状态存储)而不是 foreach().如果重试失败,您不会将记录放回输入主题,而是将其放入存储中,并注册一个具有所需重试延迟的标点符号.如果标点被触发,你重试,如果重试成功,你从存储中删除条目并取消标点;否则,您会等到标点符号再次触发.

However, what you can do is to use a stateful processor, e.g., process() (with attached state store) instead of foreach(). If the retry fails, you don't put the record back into the input topic, but you put it into the store and also register a punctuation with desired retry delay. If the punctuation fires, you retry and if the retry succeeds, you delete the entry from the store and cancel the punctuation; otherwise, you wait until the punctuation fires again.

这篇关于延迟 Kafka Streams 消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆