Storm中按顺序处理记录 [英] Processing records in order in Storm

查看:29
本文介绍了Storm中按顺序处理记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Storm 的新手,我在弄清楚如何按顺序处理记录时遇到了问题.

I'm new to Storm and I'm having problems to figure out how to process records in order.

我有一个数据集,其中包含具有以下字段的记录:

I have a dataset which contains records with the following fields:

user_id、location_id、time_of_checking

user_id, location_id, time_of_checking

现在,我想确定满足我指定路径的用户(例如,从位置 A 到位置 B 再到位置 C 的用户).

Now, I would like to identify users which have fulfilled the path I specified (for example, users that went from location A to location B to location C).

我正在使用 Kafka 生产者并从文件中读取这些记录来模拟实时数据.数据按日期排序.

I'm using Kafka producer and reading this records from a file to simulate live data. Data is sorted by date.

因此,要检查我的模式是否满足,我需要按顺序处理记录.问题是,由于并行化(螺栓复制),我没有按顺序签到用户.因为这种模式是行不通的.

So, to check if my pattern is fulfilled I need to process records in order. The thing is, due to parallelization (bolt replication) I don't get check-ins of user in order. Because of that patterns won't work.

如何克服这个问题?如何按顺序处理记录?

How to overcome this problem? How to process records in order?

推荐答案

Storm 中没有对有序处理的通用系统支持.您要么使用支持有序 Steam 处理的不同系统,如 Apache Flink(免责声明,我是 Flink 的提交者),要么您需要自己在 Bolt 代码中处理它.

There is no general system support for ordered processing in Storm. Either you use a different system that supports ordered steam processing like Apache Flink (Disclaimer, I am a committer at Flink) or you need to take care of it in your bolt code by yourself.

Storm 提供的唯一支持是使用 Trident.您可以将特定时间段(例如一分钟)的元组放入一个批次中.因此,您可以在一分钟内一次处理所有元组.但是,这仅在您的用例允许时才有效,因为您无法将不同批次的元组相互关联.在您的情况下,只有当您知道存在所有用户都已到达目的地的时间点(并且没有其他用途开始新的交互)时才会出现这种情况;即,您需要不发生任何两个用户重叠的时间点.(在我看来,您的用例无法满足此要求).

The only support Storm delivers is using Trident. You can put tuples of a certain time period (for example one minute) into a single batch. Thus, you can process all tuples within a minute at once. However, this only works if your use case allows for it because you cannot related tuples from different batches to each other. In your case, this would only be the case, if you know that there are points in time, in which all users have reached their destination (and no other use started a new interaction); ie, you need points in time in which no overlap of any two users occurs. (It seems to me, that your use-case cannot fulfill this requirement).

对于非系统,即基于用户代码的定制解决方案,有两种方法:

For non-system, ie, customized user-code based solution, there would be two approaches:

例如,您可以在处理之前缓冲元组并按螺栓内的时间戳排序.为了使其正常工作,您需要注入标点符号/水印,以确保在标点符号之后没有比标点符号更大时间戳的元组.如果您从每个并行输入子流中收到一个标点符号,您就可以安全地触发排序和处理.

You could for example buffer up tuples and sort on time stamp within a bolt before processing. To make this work properly, you need to inject punctuations/watermarks that ensure that no tuple with larger timestamp than the punctuation comes after a punctuation. If you received a punctuation from each parallel input substream you can safely trigger sorting and processing.

另一种方法是在区域缓冲区中为每个传入子流缓冲元组(保留子流顺序内)并按顺序合并缓冲区中的元组.这具有避免排序的优点.但是,您需要确保每个运算符发出有序的元组.此外,为了避免阻塞(即,如果没有可用于子流的输入),也可能需要标点符号.(我实现了这种方法.随意使用代码或根据您的需要调整它:https://github.com/mjsax/aeolus/blob/master/queries/utils/src/main/java/de/hub/cs/dbis/aeolus/utils/TimestampMerger.java)

Another way would be to buffer tuples per incoming substream in district buffers (within a substream order is preserved) and merge the tuples from the buffers in order. This has the advantage that sorting is avoided. However, you need to ensure that each operator emits tuples ordered. Furthermore, to avoid blocking (ie, if no input is available for a substream) punctuations might be needed, too. (I implemented this approach. Feel free to use the code or adapt it to your needs: https://github.com/mjsax/aeolus/blob/master/queries/utils/src/main/java/de/hub/cs/dbis/aeolus/utils/TimestampMerger.java)

这篇关于Storm中按顺序处理记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆