Storm Kafka Spout上元组重播的最大数量 [英] Max number of tuple replays on Storm Kafka Spout

查看:101
本文介绍了Storm Kafka Spout上元组重播的最大数量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在将Storm与Kafka Spout一起使用.当邮件失败时,我们想重播它们,但是在某些情况下,错误的数据或代码错误将导致邮件始终无法通过Bolt失败,因此我们将进入无限的重播周期.显然,我们在发现错误时会予以纠正,但希望我们的拓扑通常具有容错能力.重播N次以上后,我们如何确认()元组?

We’re using Storm with the Kafka Spout. When we fail messages, we’d like to replay them, but in some cases bad data or code errors will cause messages to always fail a Bolt, so we’ll get into an infinite replay cycle. Obviously we’re fixing errors when we find them, but would like our topology to be generally fault tolerant. How can we ack() a tuple after it’s been replayed more than N times?

查看Kafka Spout的代码,我发现它的设计是使用指数退避计时器和评论重试在PR 状态:

Looking through the code for the Kafka Spout, I see that it was designed to retry with an exponential backoff timer and the comments on the PR state:

喷口不会终止重试周期(我坚信不应终止重试周期,因为它无法报告有关中止请求的失败的上下文),它只能处理延迟重试的过程.仍然希望拓扑最终调用ack()而不是fail()来停止循环."

"The spout does not terminate the retry cycle (it is my conviction that it should not do so, because it cannot report context about the failure that happened to abort the reqeust), it only handles delaying the retries. A bolt in the topology is still expected to eventually call ack() instead of fail() to stop the cycle."

我已经看到了建议建议编写自定义喷口的StackOverflow响应,但是如果有推荐的方法在Bolt中这样做,我宁愿坚持使用Kafka喷口内部的自定义修补程序.

I've seen StackOverflow responses that recommend writing a custom spout, but I'd rather not be stuck maintaining a custom patch of the internals of the Kafka Spout if there's a recommended way to do this in a Bolt.

在Bolt中执行此操作的正确方法是什么?我看不到元组中有任何状态可以显示已被重播了多少次.

What’s the right way to do this in a Bolt? I don’t see any state in the tuple that exposes how many times it’s been replayed.

推荐答案

暴风雨本身不为您的问题提供任何支持.因此,定制的解决方案是唯一的方法.即使您不想修补KafkaSpout,我认为,引入一个计数器并中断其中的重播周期也是最好的方法.或者,您也可以从KafkaSpout继承,并在您的子类中放置一个计数器.当然,这与补丁有些相似,但是可能不那么麻烦且更易于实现.

Storm itself does not provide any support for your problem. Thus, a customized solution is the only way to go. Even if you do not want to patch KafkaSpout, I think, introducing a counter and breaking the replay cycle in it, would be the best approach. As an alternative, you could also inherit from KafkaSpout and put a counter in your subclass. This is of course somewhat similar to a patch, but might be less intrusive and easier to implement.

如果要使用Bolt,可以执行以下操作(这也需要对KafkaSpout或其子类进行一些更改).

If you want to use a Bolt, you could do the following (which also requires some changes to the KafkaSpout or a subclass of it).

  • 为每个元组分配一个唯一的ID作为附加属性(也许,已经有一个唯一的ID;否则,您可以引入一个计数器ID"或整个元组,即所有属性,以标识每个元组)元组).
  • 通过fieldsGrouping在ID上的KafkaSpout之后插入一个螺栓(以确保已重播的元组流到相同的螺栓实例).
  • 在螺栓中,使用HashMap<ID,Counter>缓冲所有元组并计算(重试)次数.如果计数器小于阈值,请转发输入元组,以便随后的实际拓扑对其进行处理(当然,您需要适当地锚定元组).如果计数大于阈值,请确认元组以中断循环并从HashMap中删除其条目(您可能还希望记录所有失败的元组).
  • 为了从HashMap中删除成功处理的元组,每次在KafkaSpout中确认一个元组时,您都需要将元组ID转发到螺栓,以便它可以从HashMap中删除该元组.只需为KafkaSpout子类声明第二个输出流并覆盖Spout.ack(...)(当然,您还需要调用super.ack(...)来确保KafkaSpout也得到确认).
  • Assign an unique IDs as an additional attribute to each tuple (maybe, there is already a unique ID available; otherwise, you could introduce a "counter-ID" or just the whole tuple, ie, all attributes, to identify each tuple).
  • Insert a bolt after KafkaSpout via fieldsGrouping on the ID (to ensure that a tuple that is replayed is streamed to the same bolt instance).
  • Within your bolt, use a HashMap<ID,Counter> that buffers all tuples and counts the number of (re-)tries. If the counter is smaller than your threshold value, forward the input tuple so it gets processed by the actual topology that follows (of course, you need to anchor the tuple appropriately). If the count is larger than your threshold, ack the tuple to break the cycle and remove its entry from the HashMap (you might also want to LOG all failed tuples).
  • In order to remove successfully processed tuples from the HashMap, each time a tuple is acked in KafkaSpout you need to forward the tuple ID to the bolt so that it can remove the tuple from the HashMap. Just declare a second output stream for your KafkaSpout subclass and overwrite Spout.ack(...) (of course you need to call super.ack(...) to ensure KafkaSpout gets the ack, too).

这种方法可能会消耗大量内存.作为在HashMap中每个元组都有一个条目的替代方法,您还可以使用第三个流(与其他两个连接到螺栓连接),并在元组失败时转发元组ID(即,在).每当螺栓从该第三流接收到失败"消息时,计数器就会增加.只要HashMap中没有任何条目(或未达到阈值),螺栓就会简单地将元组转发进行处理.这样可以减少使用的内存,但是需要在您的喷嘴和螺栓中实现更多的逻辑.

This approach might consume a lot of memory though. As an alternative to have an entry for each tuple in the HashMap you could also use a third stream (that is connected to the bolt as the other two), and forward a tuple ID if a tuple fails (ie, in Spout.fail(...)). Each time, the bolt receives a "fail" message from this third stream, the counter is increase. As long as no entry is in the HashMap (or the threshold is not reached), the bolt simply forwards the tuple for processing. This should reduce the used memory but requires some more logic to be implemented in your spout and bolt.

这两种方法都有一个缺点,即每个确认的元组都会为您新引入的螺栓增加一条消息(从而增加网络流量).对于第二种方法,似乎您仅需要向之前失败的元组的螺栓发送"ack"消息.但是,您不知道哪些元组确实失败了,哪些没有.如果要消除此网络开销,可以在KafkaSpout中引入第二个HashMap来缓冲失败消息的ID.因此,如果成功重播失败的元组,则只能发送"ack"消息.当然,第三种方法使要实现的逻辑更加复杂.

Both approaches have the disadvantage, that each acked tuple results in an additional message to your newly introduces bolt (thus, increasing network traffic). For the second approach, it might seem that you only need to send a "ack" message to the bolt for tuples that failed before. However, you do not know which tuples did fail and which not. If you want to get rid of this network overhead, you could introduce a second HashMap in KafkaSpout that buffers the IDs of failed messages. Thus, you can only send an "ack" message if a failed tuple was replayed successfully. Of course, this third approach makes the logic to be implemented even more complex.

如果不对KafkaSpout进行一定程度的修改,我看不到您的问题的解决方案.我个人会修补KafkaSpout或在KafkaSpout子类中使用第三种方法并在KafkaSpout中使用螺栓(因为与前两种解决方案相比,它消耗的内存很少,并且不会给网络带来很多额外的负担) ).

Without modifying KafkaSpout to some extend, I see no solution for your problem. I personally would patch KafkaSpout or would use the third approach with a HashMap in KafkaSpout subclass and the bolt (because it consumed little memory and does not put a lot of additional load on the network compared to the first two solutions).

这篇关于Storm Kafka Spout上元组重播的最大数量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆