Storm 如何知道消息何时“完全处理"了? [英] How does Storm know when a message is "fully processed"?

查看:22
本文介绍了Storm 如何知道消息何时“完全处理"了?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

(还有几个关于超时和 maxSpoutPending 的问题)

(Also a couple of questions about timeouts and maxSpoutPending)

我在 Storm 文档中看到很多关于消息被完全处理的参考.但是我的 KafkaSpout 如何知道消息何时被完全处理?

I see a lot of references in the Storm documentation about messages being fully processed. But how does my KafkaSpout know when a message is fully processed?

希望它知道我的 bolt 的连接方式,所以当我的 Stream 中的最后一个 bolt 确认一个元组时,spout 知道我的消息何时被处理?

Hopefully it is cognizant of the way my bolts are connected so when the final bolt in my Stream acks a tuple, the spout knows when my message is processed?

否则,我认为在超时期限到期后,将检查消息的确认状态,如果确认/锚定 XOR 指示,则将其视为已处理.但我希望不是这样?

Otherwise, I would imagine that after the timeout period expires, the ack-ed state of a message is checked, and it is considered processed if indicated by the acking/anchoring XORs. But I hope this is not the case?

我也有关于 maxTuplesPending 和超时配置的相关问题.

I also have related questions about maxTuplesPending and timeout configuration.

如果我将 maxTuplePending 设置为 10k,那么我是否正确地认为每个 spout 实例将继续发出元组,直到该 spout 实例正在跟踪 10k 个正在运行的元组,10k 个尚未完全处理的元组?然后当当前正在传输的消息被完全处理时发出一个新的元组?

If I set the maxTuplePending to 10k, then am I correct in thinking each spout instance will continue to emit tuples until that spout instance is tracking 10k tuples in flight, 10k tuples that have not been fully processed? And then a new tuple is emitted when a currently in flight message is fully processed?

最后,这与超时配置有关吗?在发送新消息之前,spout 是否以任何方式等待配置的超时发生?或者超时配置是否仅在消息被停止/处理缓慢,导致由于超时而失败时才起作用?

And finally, does this relate to the timeout configuration? Do the spouts wait in any fashion for the configured timeout to occur before emitting new messages?Or does the timeout configuration only come into play if a message is stalled/slow in being processed, resulting in it getting failed due to timeout?

更简洁(或希望更清楚),将我的超时设置为 30 分钟是否有影响,除非消息在 30 分钟内被最终 Bolt 确认,否则消息不会失败?或者是否有其他影响,例如超时配置会影响 spouts 的发射率?

More succinctly (or hopefully more clearly), is there an effect to setting my timeout to 30 mins other than messages won't be failed unless they are ack-ed by the final Bolt within 30 mins? Or are there other impacts, such as the timeout configuration effecting the emission rate of the spouts?

抱歉问了一个冗长而漫无目的的问题.提前感谢您的任何回复.

Sorry for the long, rambling question(s). Thanks in advance for any response.

*编辑以进一步澄清

这对我来说是一个问题,因为我的消息不一定贯穿整个流.

The reason this is a concern for me, is because my messages don't necessarily run through the entire Stream.

假设我有螺栓 A、B、C、D.大多数情况下,消息将从 A->B->->D 传递.但是我有一些消息会故意停止在螺栓 A 上. A 会确认它们但不会发出它们(因为我的业务逻辑,在这些情况下,我确实希望对消息进行进一步处理).

Say I have Bolts A, B, C, D. Most of the time messages will get passed from A->B->->D. But I have some messages that intentionally will stop on bolt A. A will ack them but not emit them (because of my business logic, in those cases I do want further processing of the messages).

那么我的 KafkaSpout 会知道已确认但未从 A 发出的消息已完全处理吗?因为我希望在 Bolt A 完成后立即从 spout 发出另一条消息,在这种情况下.

So will my KafkaSpout know that the message which is ack-ed but not emitted from A is fully processed? As I would like another message to be emitted from the spout as soon as Bolt A is done with it, in this case.

推荐答案

Storm 通过 UDF 代码必须使用的锚定机制跟踪整个拓扑中的元组.这种锚定导致了所谓的元组树,树的根是 spout 发出的元组,所有其他节点(连接在树结构中)表示从使用输入元组的螺栓 发出的元组作为锚点(不过,这只是一个逻辑模型,并没有在 Storm 中以这种方式实现).

Storm tracks the tuples throughout the whole topology via the anchoring mechanism that the UDF code must use. This anchoring results in so-called tuple-tree, were the root of the tree is the tuple emitted by the spout and all other nodes (that are connected in a tree structure) represent the emitted tuples from bolts that used input tuples as anchors (this is only a logical model and not implemented this way in Storm, though).

例如,一个 Spout 发出一个句子元组,它被单词中的第一个螺栓分割,一些单词被第二个螺栓过滤,而单词计数由第三个螺栓应用.最后,sink bolt 将结果写入文件.这棵树看起来像这样:

For example, a Spout emit a sentence tuple that is split by the first bolt in words, some word are filtered by the second bolt, and a word count is applied by the third bolt. Finally, a sink bolt writes the result into file. The tree would look like this:

"this is an example sentence" -+-> "this" 
                               +-> "is" 
                               +-> "an"
                               +-> "example" -> "example",1 -> "example",1
                               +-> "sentence" -> "sentence",1 -> "sentence",1

最初的句子由 spout 发出,由 bolt1 用作所有发出的令牌的锚点,并由 bolt1 确认.Bolt2 过滤掉this"、is"和an",并只确认三个元组.example"和sentence"只是被转发,用作输出元组的锚点,然后被确认.同样发生在 bolt2 中,最后一个 sink bolt 只是确认所有传入的元组.

The initial sentence is emitted by spout, used as anchor by bolt1 for all tokens that are emitted, and gets acked by bolt1. Bolt2 filters out "this", "is" and "an" and just acks the three tuples. "example" and "sentence" are just forwarded, used as anchor for the output tuple and acked afterwards. Same happens in bolt2, and the final sink bolt just acks all incoming tuples.

此外,Storm 会跟踪所有元组的所有 ack,即来自中间螺栓和接收螺栓.首先,spout 将输出元组的 ID 发送给 acker 任务.每次使用元组作为锚点时,acker 还会收到一条消息,其中包含锚点元组 ID 和输出元组 ID(由 Storm 自动生成).来自 Bolt 的 ackes 也转到对它们进行 XOR 的相同 acker 任务.如果收到所有确认——即,对于 spout 和所有递归锚定的输出元组——(异或结果将为零),acker 向 spout 发送一条消息,表示元组已被完全处理,并回调用 Spout.ack(MessageId) 发生(即,当元组被完全处理时,回调用立即完成).此外,ackers 会定期检查是否存在由 acker 注册的元组超过超时时间.如果发生这种情况,acker 会丢弃元组 ID,并向 spout 发送一条消息,表示元组失败(导致调用 Spout.fail(MessageId)).

Furthermore, Storm tracks all acks of all tuples, ie, from intermediate bolts as well as sink bolts. First, the spout sends the ID of the output tuple to an acker task. Each time a tuple is used as anchor, the acker also get a message with the anchor tuple ID and the output tuple ID (which in auto generated by Storm). The ackes from the bolt also go to the same acker task that XORs them. If all acks got received -- ie, for the spout and all recursively anchored output tuples -- (the XOR result will be zero), the acker send a message to the spout that the tuple is fully processed and the backcall to Spout.ack(MessageId) happens (ie, the back call is done immediately when the tuple is fully processed). Furthermore, the ackers check regularly, if there is a tuple that is registered by the acker longer than the timeout. If this happens, the tuple ID is dropped by the acker and a message is sent to the spout that the tuple failed (resulting in a call to Spout.fail(MessageId)).

此外,Spout 会保留所有正在运行的元组的计数,如果该计数超过 maxTuplesPending 参数,则停止调用 Spout.nextTuple().据我所知,该参数是全局应用的,即,将每个 spout 任务的局部计数相加,并将全局计数与参数进行比较(但不确定具体是如何实现的).

Furthermore, the Spouts keep a count of all tuple in flight and stop calling Spout.nextTuple() if this count exceeds maxTuplesPending parameter. As far as I know, the parameter is applied in globally, ie, the local counts of each spout tasks are summed up and the global count is compared to the parameter (not sure how this in implemented in detail though).

所以 timeout 参数独立于 maxTuplesPending.

So the timeout parameter is independent from maxTuplesPending.

这篇关于Storm 如何知道消息何时“完全处理"了?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆