Storm如何知道消息“已完全处理"的时间? [英] How does Storm know when a message is "fully processed"?

查看:136
本文介绍了Storm如何知道消息“已完全处理"的时间?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

(还有一些有关超时和maxSpoutPending的问题)

(Also a couple of questions about timeouts and maxSpoutPending)

我在Storm文档中看到了很多有关消息已被完全处理的参考.但是我的KafkaSpout如何知道消息何时被完全处理?

I see a lot of references in the Storm documentation about messages being fully processed. But how does my KafkaSpout know when a message is fully processed?

希望它能知道我的螺栓的连接方式,因此当Stream中的最后一个螺栓固定一个元组时,喷口会知道何时处理我的消息?

Hopefully it is cognizant of the way my bolts are connected so when the final bolt in my Stream acks a tuple, the spout knows when my message is processed?

否则,我可以想象超时时间到期后,将检查消息的确认状态,如果通过确认/锚定XOR指示将其视为已处理.但是我希望不是这样吗?

Otherwise, I would imagine that after the timeout period expires, the ack-ed state of a message is checked, and it is considered processed if indicated by the acking/anchoring XORs. But I hope this is not the case?

我也有关于maxTuplesPending和超时配置的相关问题.

I also have related questions about maxTuplesPending and timeout configuration.

如果我将maxTuplePending设置为10k,那么我认为每个喷口实例将继续发出元组,直到该喷口实例正在跟踪飞行中的1万个元组,还有未完全处理的1万个元组,这是正确的吗?然后,当当前正在运行的消息被完全处理时,会发出一个新的元组?

If I set the maxTuplePending to 10k, then am I correct in thinking each spout instance will continue to emit tuples until that spout instance is tracking 10k tuples in flight, 10k tuples that have not been fully processed? And then a new tuple is emitted when a currently in flight message is fully processed?

最后,这与超时配置有关吗?喷口是否以任何方式等待配置的超时发生后才发出新消息?还是仅在消息停顿/处理速度慢(导致由于超时而失败)时才起作用超时配置?

And finally, does this relate to the timeout configuration? Do the spouts wait in any fashion for the configured timeout to occur before emitting new messages?Or does the timeout configuration only come into play if a message is stalled/slow in being processed, resulting in it getting failed due to timeout?

更简洁地(或希望更清楚地),是否可以将我的超时设置为30分钟,除非消息不会被失败,除非消息在30分钟内被最终的Bolt确认?还是还有其他影响,例如超时配置会影响喷嘴的排放速率?

More succinctly (or hopefully more clearly), is there an effect to setting my timeout to 30 mins other than messages won't be failed unless they are ack-ed by the final Bolt within 30 mins? Or are there other impacts, such as the timeout configuration effecting the emission rate of the spouts?

很抱歉,这个漫长而漫不经心的问题.预先感谢您的任何答复.

Sorry for the long, rambling question(s). Thanks in advance for any response.

*编辑以进一步阐明

这对我来说很重要,因为我的消息不一定遍及整个Stream.

The reason this is a concern for me, is because my messages don't necessarily run through the entire Stream.

说我有螺栓A,B,C,D.大多数情况下,消息将从A-> B->-> D传递.但是我有一些消息会故意停在螺栓A上.A会确认它们但不会发出它们(由于我的业务逻辑,在那种情况下,我确实希望进一步处理消息).

Say I have Bolts A, B, C, D. Most of the time messages will get passed from A->B->->D. But I have some messages that intentionally will stop on bolt A. A will ack them but not emit them (because of my business logic, in those cases I do want further processing of the messages).

那么我的KafkaSpout会知道已确认但未从A发出的消息已被完全处理吗?正如我希望的那样,在这种情况下,一旦螺栓A完成,喷嘴就会发出另一条消息.

So will my KafkaSpout know that the message which is ack-ed but not emitted from A is fully processed? As I would like another message to be emitted from the spout as soon as Bolt A is done with it, in this case.

推荐答案

Storm通过UDF代码必须使用的锚定机制来跟踪整个拓扑中的元组.这种锚定导致所谓的元组树,树的根是喷口发出的元组,而所有其他节点(以树形结构连接)表示使用输入元组的螺栓发出的元组.作为锚点(尽管这只是一个逻辑模型,但在Storm中并未以这种方式实现).

Storm tracks the tuples throughout the whole topology via the anchoring mechanism that the UDF code must use. This anchoring results in so-called tuple-tree, were the root of the tree is the tuple emitted by the spout and all other nodes (that are connected in a tree structure) represent the emitted tuples from bolts that used input tuples as anchors (this is only a logical model and not implemented this way in Storm, though).

例如,Spout发出一个句子元组,该句子元组被第一个螺栓拆分为单词,一些单词被第二个螺栓过滤,而单词计数则由第三个螺栓应用.最后,水槽螺栓将结果写入文件.这棵树看起来像这样:

For example, a Spout emit a sentence tuple that is split by the first bolt in words, some word are filtered by the second bolt, and a word count is applied by the third bolt. Finally, a sink bolt writes the result into file. The tree would look like this:

"this is an example sentence" -+-> "this" 
                               +-> "is" 
                               +-> "an"
                               +-> "example" -> "example",1 -> "example",1
                               +-> "sentence" -> "sentence",1 -> "sentence",1

初始语句由spout发出,由bolt1用作所有发出的标记的锚点,并由bolt1确认. Bolt2过滤掉"this","is"和"an",仅对三个元组进行固定.刚刚转发了"example"和"sentence",用作输出元组的锚点,之后进行确认.在bolt2中也会发生同样的情况,而最后的接收器螺栓只会使所有传入的元组终止.

The initial sentence is emitted by spout, used as anchor by bolt1 for all tokens that are emitted, and gets acked by bolt1. Bolt2 filters out "this", "is" and "an" and just acks the three tuples. "example" and "sentence" are just forwarded, used as anchor for the output tuple and acked afterwards. Same happens in bolt2, and the final sink bolt just acks all incoming tuples.

此外,Storm跟踪所有元组的所有ack,即从中间螺栓以及沉头螺栓跟踪.首先,喷口将输出元组的ID发送给acker任务.每次将元组用作锚点时,确认程序还会收到一条消息,其中包含锚点元组ID和输出元组ID(由Storm自动生成).螺栓上的ackes也会执行与它们异或的相同acker任务.如果收到了所有ack(即,对于spout和所有递归锚定的输出元组)(XOR结果将为零),则确认程序会向spout发送一条消息,说明元组已被完全处理,并且对Spout.ack(MessageId)的回调发生(即,在完成元组处理后立即执行回叫).此外,确认者会定期检查是否有元组被确认者注册的时间超过了超时时间.如果发生这种情况,那么元组ID将被应答者丢弃,并且会向喷嘴发送一条消息,指出元组失败(导致调用Spout.fail(MessageId)).

Furthermore, Storm tracks all acks of all tuples, ie, from intermediate bolts as well as sink bolts. First, the spout sends the ID of the output tuple to an acker task. Each time a tuple is used as anchor, the acker also get a message with the anchor tuple ID and the output tuple ID (which in auto generated by Storm). The ackes from the bolt also go to the same acker task that XORs them. If all acks got received -- ie, for the spout and all recursively anchored output tuples -- (the XOR result will be zero), the acker send a message to the spout that the tuple is fully processed and the backcall to Spout.ack(MessageId) happens (ie, the back call is done immediately when the tuple is fully processed). Furthermore, the ackers check regularly, if there is a tuple that is registered by the acker longer than the timeout. If this happens, the tuple ID is dropped by the acker and a message is sent to the spout that the tuple failed (resulting in a call to Spout.fail(MessageId)).

此外,Spouts会保留飞行中所有元组的计数,如果该计数超过maxTuplesPending参数,则停止调用Spout.nextTuple().据我所知,该参数是全局应用的,即,对每个喷口任务的本地计数求和,并将全局计数与该参数进行比较(尽管不确定如何详细实现).

Furthermore, the Spouts keep a count of all tuple in flight and stop calling Spout.nextTuple() if this count exceeds maxTuplesPending parameter. As far as I know, the parameter is applied in globally, ie, the local counts of each spout tasks are summed up and the global count is compared to the parameter (not sure how this in implemented in detail though).

因此timeout参数独立于maxTuplesPending.

这篇关于Storm如何知道消息“已完全处理"的时间?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆