Storm acker和保证消息处理的混淆 [英] Confusion of Storm acker and guaranteed message processing

查看:24
本文介绍了Storm acker和保证消息处理的混淆的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

现在正在学习Storm的保证消息处理,对这部分的一些概念感到困惑.

Now I am learning Storm's Guaranteeing Message Processing and am confused by some concepts in this part.

为了保证一个 spout 发出的消息被完全处理,Storm 使用 acker 来实现这一点.每次 spout 发出一个元组时,acker 都会分配初始化为 0 的ack val"来存储元组树的状态.每次此元组的下游螺栓发出新元组或确认旧"元组时,元组 ID 将与ack val"异或.acker 只需要检查ack val"是否为 0 即可知道元组已被完全处理.让我们看看下面的代码:

To guarantee a message emitted by a spout is fully processed, Storm uses acker to achieve this. Each time a spout emits a tuple, acker will assign "ack val" initialized as 0 to store the status of the tuple tree. Each time the downstream bolts of this tuple emit new tuple or ack an "old" tuple, the tuple ID will be XOR with "ack val". The acker only needs to check whether "ack val" is 0 or not to know the tuple has been fully processed. Let's see the code below:

public class WordReader implements IRichSpout {
    ... ...
while((str = reader.readLine()) != null){
    this.collector.emit(new Values(str), str);
    ... ...
}

上面的代码片段是Storm 入门"教程中字数统计程序的一个喷口.在emit 方法中,第二个参数str"是messageId.我对这个参数感到困惑:1)据我所知,每次发出一个元组(即消息)时,无论是在 spouts 还是 bolts 中,Storm 都应该负责为该消息分配一个 64 位的 messageId.那是对的吗?或者这里的str"只是这个消息的一个人类可读的别名?2) 不管对 1) 的回答是什么,这里的str"在两个不同的消息中都是同一个词,因为在文本文件中应该有很多重复的词.如果这是真的,那么 Storm 是如何区分不同消息的呢?而这个参数的含义是什么?3) 在一些代码片段中,我看到一些 spouts 使用以下代码在 Spout 发射方法中设置消息 Id:

The code piece above is a spout in word count program from "Getting Started with Storm" tutorial. In the emit method, the 2nd parameter "str" is the messageId. I am confused by this parameter: 1) As I understand, each time a tuple (i.e., a message) is emitted no matter in spouts or in bolts, it should be Storm's responsibility to assign a 64-bit messageId to that message. Is that correct? Or here "str" is just a human-readable alias to this message? 2) No matter what's answer to 1), here "str" would be the same word in two different messages because in a text file there should be many duplicate words. If this is true, then how does Storm differentiate different messages? And what's the meaning of this parameter? 3) In some code piece, I see some spouts use the following code to set the message Id in Spout emit method:

public class RandomIntegerSpout extends BaseRichSpout {
    private long msgId = 0;
    collector.emit(new Values(..., ++msgId), msgId);
}

这更接近于我认为应该是的:消息 ID 在不同的消息中应该完全不同.但是对于这段代码,另一个困惑是:跨不同执行程序的私有字段msgId"会发生什么?因为每个 executor 都有自己的 msgId 初始化为 0,那么不同 executor 中的消息会以 0、1、2 等命名.那么 Storm 是如何区分这些消息的呢?

This is much closer to what I think it should be: the message ID should be totally different across different messages. But for this code piece, another confusion is: what will happen to private field "msgId" across different executors? Because each executor has its own msgId initialized as 0, then messages in different executors will be named from 0, 1, 2, and so on. Then how does Storm differentiate these messages?

我是 Storm 的新手,所以也许这些问题很幼稚.希望有人能帮我弄清楚.谢谢!

I am novice to Storm, so maybe these problems are naive. Hope someone could help me to figure out. Thanks!

推荐答案

关于消息 ID 是通用的:在内部它可能是一个 64 位值,但这个 64 位值是从 msgID 计算出来的哈希值Spout 中 emit() 中提供的对象.所以你可以将任何对象作为消息 ID(两个对象哈希到相同值的概率接近于零).

About message ID is general: internally it might be a 64bit value, but this 64bit value is computed as a hash from the msgID object provided in emit() within Spout. So you can hand any object as message ID (the probability that two objects hash to the same value is close to zero).

关于使用 str:我认为在这个例子中,str 包含一行(而不是一个词)并且文档不太可能包含两次完全相同的行(如果没有可能很多的空行).

About using str: I think in this example, str contains a line (and not a word) and it is very unlikely that document contains the exact same line twice (if there are no empty lines which might be many).

关于作为消息 ID 的计数器:您的观察完全正确——如果多个 spout 并行运行,这将导致消息 ID 冲突并破坏容错.

About the counter as message id: you are absolutely right about you observation -- if multiple spouts are running in parallel, this would give message ID conflict and would break fault tolerance.

如果你想修复"计数器方法,每个计数器应该以不同的方式初始化(​​最好,从1...#SpoutTasks).您可以为此使用 taskID(它是唯一的,可以通过 Spout.open() 中提供的 TopologyContext 访问).基本上,您可以获得所有并行 spout 任务的所有 taskID,对它们进行排序,并为每个 spout 任务分配其排序号.此外,您需要增加并行 spout 的数量"而不是 1.

If you want to "fix" the counter approach, each counter should be initialized differently (best, from 1...#SpoutTasks). You can use the taskID for this (which is unique and can be accessed via TopologyContext provided in Spout.open()). Basically, you get all taskIDs for all parallel spout tasks, sort them, and assign each spout task its ordering number. Furthermore, you need to increment by "number of parallel spouts" instead of 1.

这篇关于Storm acker和保证消息处理的混淆的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆