Storm acker的混乱和有保证的消息处理 [英] Confusion of Storm acker and guaranteed message processing

查看:78
本文介绍了Storm acker的混乱和有保证的消息处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

现在我正在学习Storm的保证消息处理,并且对此部分中的一些概念感到困惑.

Now I am learning Storm's Guaranteeing Message Processing and am confused by some concepts in this part.

为了确保喷口发出的消息得到完全处理,Storm使用acker来实现.每次喷口发出一个元组时,acker都将分配初始化为0的"ack val"以存储元组树的状态.每当此元组的下游螺栓发出新元组或确认旧"元组时,元组ID将与"ack val"进行XOR.确认程序仅需要检查"ack val"是否为0即可知道元组已被完全处理.让我们看看下面的代码:

To guarantee a message emitted by a spout is fully processed, Storm uses acker to achieve this. Each time a spout emits a tuple, acker will assign "ack val" initialized as 0 to store the status of the tuple tree. Each time the downstream bolts of this tuple emit new tuple or ack an "old" tuple, the tuple ID will be XOR with "ack val". The acker only needs to check whether "ack val" is 0 or not to know the tuple has been fully processed. Let's see the code below:

public class WordReader implements IRichSpout {
    ... ...
while((str = reader.readLine()) != null){
    this.collector.emit(new Values(str), str);
    ... ...
}

上面的代码段是"Storm入门"教程中的单词计数程序中的一个喷口.在发出方法中,第二个参数"str"是messageId.我对此参数感到困惑: 1)据我了解,每次以元组或螺栓的形式发出元组(即消息)时,Storm都有责任为该消息分配64位messageId.那是对的吗?还是在这里,"str"只是该消息的可读别名? 2)不管对1)的回答是什么,这里的"str"在两个不同的消息中都是相同的词,因为在文本文件中应该有许多重复的词.如果是这样,那么Storm如何区分不同的消息?这个参数的含义是什么? 3)在某些代码段中,我看到一些喷口使用以下代码在Spout发射方法中设置消息ID:

The code piece above is a spout in word count program from "Getting Started with Storm" tutorial. In the emit method, the 2nd parameter "str" is the messageId. I am confused by this parameter: 1) As I understand, each time a tuple (i.e., a message) is emitted no matter in spouts or in bolts, it should be Storm's responsibility to assign a 64-bit messageId to that message. Is that correct? Or here "str" is just a human-readable alias to this message? 2) No matter what's answer to 1), here "str" would be the same word in two different messages because in a text file there should be many duplicate words. If this is true, then how does Storm differentiate different messages? And what's the meaning of this parameter? 3) In some code piece, I see some spouts use the following code to set the message Id in Spout emit method:

public class RandomIntegerSpout extends BaseRichSpout {
    private long msgId = 0;
    collector.emit(new Values(..., ++msgId), msgId);
}

这更接近我的想法:不同邮件之间的邮件ID应该完全不同.但是对于此代码段,另一个困惑是:不同执行者之间的私有字段"msgId"会发生什么?因为每个执行程序都有自己的msgId初始化为0,所以不同执行程序中的消息将从0、1、2等命名.那么Storm如何区分这些消息?

This is much closer to what I think it should be: the message ID should be totally different across different messages. But for this code piece, another confusion is: what will happen to private field "msgId" across different executors? Because each executor has its own msgId initialized as 0, then messages in different executors will be named from 0, 1, 2, and so on. Then how does Storm differentiate these messages?

我是Storm的新手,所以也许这些问题是幼稚的.希望有人可以帮助我找出答案.谢谢!

I am novice to Storm, so maybe these problems are naive. Hope someone could help me to figure out. Thanks!

推荐答案

关于消息ID是通用的:内部可能是64位值,但是该64位值是根据msgID对象的哈希值计算得出的>在Spout中.因此,您可以将任何对象作为消息ID(两个对象散列为相同值的概率接近零).

About message ID is general: internally it might be a 64bit value, but this 64bit value is computed as a hash from the msgID object provided in emit() within Spout. So you can hand any object as message ID (the probability that two objects hash to the same value is close to zero).

关于使用str:我认为在此示例中,str包含一行(而不是单词),并且文档不太可能两次包含完全相同的行(如果没有空行,可能是很多).

About using str: I think in this example, str contains a line (and not a word) and it is very unlikely that document contains the exact same line twice (if there are no empty lines which might be many).

关于计数器作为消息ID:关于观察,您绝对是正确的-如果多个喷嘴并行运行,这将导致消息ID冲突并破坏容错能力.

About the counter as message id: you are absolutely right about you observation -- if multiple spouts are running in parallel, this would give message ID conflict and would break fault tolerance.

如果要修复"计数器方法,则应以不同的方式初始化每个计数器(最好从1...#SpoutTasks开始).您可以为此使用taskID(这是唯一的,可以通过Spout.open()中提供的TopologyContext进行访问).基本上,您将获得所有并行喷口任务的所有taskID,对其进行排序,并为每个喷口任务分配其订购号.此外,您需要增加平行喷嘴的数量",而不是1.

If you want to "fix" the counter approach, each counter should be initialized differently (best, from 1...#SpoutTasks). You can use the taskID for this (which is unique and can be accessed via TopologyContext provided in Spout.open()). Basically, you get all taskIDs for all parallel spout tasks, sort them, and assign each spout task its ordering number. Furthermore, you need to increment by "number of parallel spouts" instead of 1.

这篇关于Storm acker的混乱和有保证的消息处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆