Storm Kafka Spout 上的最大元组重放次数 [英] Max number of tuple replays on Storm Kafka Spout

查看:23
本文介绍了Storm Kafka Spout 上的最大元组重放次数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们将 Storm 与 Kafka Spout 一起使用.当消息失败时,我们希望重放它们,但在某些情况下,坏数据或代码错误会导致消息始终无法通过 Bolt,因此我们将进入无限重放循环.显然,当我们发现错误时,我们正在修复它们,但希望我们的拓扑结构具有一般的容错性.在重放 N 次以上后,我们如何 ack() 一个元组?

We’re using Storm with the Kafka Spout. When we fail messages, we’d like to replay them, but in some cases bad data or code errors will cause messages to always fail a Bolt, so we’ll get into an infinite replay cycle. Obviously we’re fixing errors when we find them, but would like our topology to be generally fault tolerant. How can we ack() a tuple after it’s been replayed more than N times?

查看 Kafka Spout 的代码,我发现它被设计为使用指数退避计时器和评论进行重试关于 PR 状态:

Looking through the code for the Kafka Spout, I see that it was designed to retry with an exponential backoff timer and the comments on the PR state:

spout 不会终止重试循环(我确信它不应该这样做,因为它无法报告有关中止请求的失败的上下文),它只处理延迟重试.一个螺栓插入预计拓扑最终仍会调用 ack() 而不是 fail() 来停止循环."

"The spout does not terminate the retry cycle (it is my conviction that it should not do so, because it cannot report context about the failure that happened to abort the reqeust), it only handles delaying the retries. A bolt in the topology is still expected to eventually call ack() instead of fail() to stop the cycle."

我已经看到 StackOverflow 响应建议编写自定义 spout,但如果有推荐的方法在 Bolt 中执行此操作,我宁愿不坚持维护 Kafka Spout 内部的自定义补丁.

I've seen StackOverflow responses that recommend writing a custom spout, but I'd rather not be stuck maintaining a custom patch of the internals of the Kafka Spout if there's a recommended way to do this in a Bolt.

在 Bolt 中执行此操作的正确方法是什么?我在元组中没有看到任何状态显示它被重放了多少次.

What’s the right way to do this in a Bolt? I don’t see any state in the tuple that exposes how many times it’s been replayed.

推荐答案

Storm 本身不为您的问题提供任何支持.因此,定制的解决方案是唯一的出路.即使您不想修补 KafkaSpout,我认为,引入一个计数器并打破其中的重播周期,将是最好的方法.作为替代方案,您也可以从 KafkaSpout 继承并在您的子类中放置一个计数器.这当然有点类似于补丁,但可能不那么具有侵入性并且更容易实现.

Storm itself does not provide any support for your problem. Thus, a customized solution is the only way to go. Even if you do not want to patch KafkaSpout, I think, introducing a counter and breaking the replay cycle in it, would be the best approach. As an alternative, you could also inherit from KafkaSpout and put a counter in your subclass. This is of course somewhat similar to a patch, but might be less intrusive and easier to implement.

如果您想使用 Bolt,您可以执行以下操作(这还需要对 KafkaSpout 或其子类进行一些更改).

If you want to use a Bolt, you could do the following (which also requires some changes to the KafkaSpout or a subclass of it).

  • 为每个元组分配一个唯一 ID 作为附加属性(也许,已经有一个唯一 ID 可用;否则,您可以引入计数器 ID"或仅引入整个元组,即所有属性,以标识每个元组).
  • 通过 ID 上的 fieldsGroupingKafkaSpout 之后插入一个 bolt(以确保重放的元组被流式传输到同一个 bolt 实例).
  • 在您的 Bolt 中,使用 HashMap 缓冲所有元组并计算(重)尝试的次数.如果计数器小于您的阈值,则转发输入元组,以便由后面的实际拓扑处理(当然,您需要适当地锚定元组).如果计数大于您的阈值,确认元组以打破循环并从 HashMap 中删除其条目(您可能还想记录所有失败的元组).
  • 为了从 HashMap 中删除成功处理的元组,每次在 KafkaSpout 中确认元组时,您都需要将元组 ID 转发给 Bolt,以便它可以从 HashMap 中删除元组.只需为您的 KafkaSpout 子类声明第二个输出流并覆盖 Spout.ack(...)(当然您需要调用 super.ack(...) 以确保 KafkaSpout 也得到确认.
  • Assign an unique IDs as an additional attribute to each tuple (maybe, there is already a unique ID available; otherwise, you could introduce a "counter-ID" or just the whole tuple, ie, all attributes, to identify each tuple).
  • Insert a bolt after KafkaSpout via fieldsGrouping on the ID (to ensure that a tuple that is replayed is streamed to the same bolt instance).
  • Within your bolt, use a HashMap<ID,Counter> that buffers all tuples and counts the number of (re-)tries. If the counter is smaller than your threshold value, forward the input tuple so it gets processed by the actual topology that follows (of course, you need to anchor the tuple appropriately). If the count is larger than your threshold, ack the tuple to break the cycle and remove its entry from the HashMap (you might also want to LOG all failed tuples).
  • In order to remove successfully processed tuples from the HashMap, each time a tuple is acked in KafkaSpout you need to forward the tuple ID to the bolt so that it can remove the tuple from the HashMap. Just declare a second output stream for your KafkaSpout subclass and overwrite Spout.ack(...) (of course you need to call super.ack(...) to ensure KafkaSpout gets the ack, too).

尽管这种方法可能会消耗大量内存.作为为 HashMap 中的每个元组设置一个条目的替代方法,您还可以使用第三个流(与其他两个流一样连接到 bolt),如果元组失败,则转发一个元组 ID(即,在 Spout.fail(...) 中).每次,bolt 收到来自第三个流的失败"消息,计数器都会增加.只要 HashMap 中没有条目(或未达到阈值),bolt 就会简单地转发元组进行处理.这应该会减少使用的内存,但需要在您的 spout 和 bolt 中实现更多的逻辑.

This approach might consume a lot of memory though. As an alternative to have an entry for each tuple in the HashMap you could also use a third stream (that is connected to the bolt as the other two), and forward a tuple ID if a tuple fails (ie, in Spout.fail(...)). Each time, the bolt receives a "fail" message from this third stream, the counter is increase. As long as no entry is in the HashMap (or the threshold is not reached), the bolt simply forwards the tuple for processing. This should reduce the used memory but requires some more logic to be implemented in your spout and bolt.

这两种方法都有缺点,即每个确认的元组都会为您新引入的bolt 带来一条额外的消息(因此,增加了网络流量).对于第二种方法,您似乎只需要为之前失败的元组向 Bolt 发送ack"消息.但是,您不知道哪些元组失败了,哪些没有.如果你想摆脱这种网络开销,你可以在 KafkaSpout 中引入第二个 HashMap 来缓冲失败消息的 ID.因此,如果失败的元组被成功重放,您只能发送ack"消息.当然,这第三种方法使得要实现的逻辑更加复杂.

Both approaches have the disadvantage, that each acked tuple results in an additional message to your newly introduces bolt (thus, increasing network traffic). For the second approach, it might seem that you only need to send a "ack" message to the bolt for tuples that failed before. However, you do not know which tuples did fail and which not. If you want to get rid of this network overhead, you could introduce a second HashMap in KafkaSpout that buffers the IDs of failed messages. Thus, you can only send an "ack" message if a failed tuple was replayed successfully. Of course, this third approach makes the logic to be implemented even more complex.

在不修改 KafkaSpout 的情况下,我看不到您的问题的解决方案.我个人会修补 KafkaSpout 或将使用第三种方法与 KafkaSpout 子类中的 HashMap 和螺栓(因为它消耗很少的内存并且不会与前两种解决方案相比,给网络带来了很多额外的负载).

Without modifying KafkaSpout to some extend, I see no solution for your problem. I personally would patch KafkaSpout or would use the third approach with a HashMap in KafkaSpout subclass and the bolt (because it consumed little memory and does not put a lot of additional load on the network compared to the first two solutions).

这篇关于Storm Kafka Spout 上的最大元组重放次数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆