是否有任何 Java API 可以知道拓扑何时准备好从 Spout 读取第一条消息 [英] Is there any Java API to know when topology is ready for reading first message from Spout

查看:22
本文介绍了是否有任何 Java API 可以知道拓扑何时准备好从 Spout 读取第一条消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们的 Apache Storm 拓扑使用 KafkaSpout 侦听来自 Kafka 的消息,并在做了大量映射/减少/丰富/聚合等之后最终将数据插入到 Cassandra 中.还有另一个 kafka 输入,如果拓扑找到响应,我们将接收用户对数据的查询,然后将其发送到第三个 kafka 主题.现在我们想使用 Junit 编写 E2E 测试,其中我们可以直接以编程方式将数据插入到拓扑中,然后通过插入用户查询消息,我们可以在第三点断言我们的查询收到的响应是正确的.

Our Apache Storm topology listens messages from Kafka using KafkaSpout and after doing lot of mapping/reducing/enrichment/aggregation etc. etc finally inserts data into Cassandra. There is another kafka input where we receive user queries for data if topology finds a response then it sends that onto a third kafka topic. Now we want to write E2E test using Junit in which we can directly programmatically insert data into topology and then by inserting user query message, we can assert on third point that response received on our query is correct.

为了实现这一点,我们考虑启动 EmbeddedKafka 和 CassandraUnit,然后用它们替换实际的 Kafka 和 Cassandra,然后我们可以在这个单一的 Junit 测试的上下文中启动拓扑.

To achieve this, we thought of starting EmbeddedKafka and CassandraUnit and then replacing actual Kafka and Cassandra with them and then we can start topology in the context of this single Junit test.

在开始我们的实际测试之前,我们创建拓扑并将其提交到LocalCluster.它在不同的线程上启动拓扑,然后从 Before 出来并开始执行我们的测试.到那时,拓扑还没有准备好,因为它需要一些时间来准备好进行处理.是否有任何 Java API 可以告诉我们拓扑何时准备好进行处理(意味着准备好从 Spout 读取第一条消息)?

Before, we start our actual test, we create topology and submit it into LocalCluster. It starts topology on a different thread and comes out from Before and starts executing our test. Till that time, topology is not ready because it takes some time to be ready for processing. Is there any java API which can tell us when topology is ready for processing (means ready to read first message from Spout)?

推荐答案

这取决于您说准备处理"时的意思.

This depends on what you mean when you say "ready for processing".

如果您为 LocalCluster 启用时间模拟,则可以使用 Time.advanceClusterTime 逐步推进时间.如果在提交拓扑后调用此方法,它只会在集群大部分空闲时返回.见例如https://github.com/apache/storm/blob/8f49e06998abb4dfc50f51d78b6784ebd04844fb/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java#L233.

If you enable time simulation for your LocalCluster, you can use Time.advanceClusterTime to advance time in steps. If you call this method after submitting a topology, it will only return once the cluster is mostly idling. See e.g. https://github.com/apache/storm/blob/8f49e06998abb4dfc50f51d78b6784ebd04844fb/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java#L233.

如果您愿意用存根(例如 FixedTupleSpout)替换 spout,您可以使用 Testing.completeTopology 等待拓扑完成处理您设置存根发出的所有元组.

If you're willing to replace your spouts with stubs (e.g. FixedTupleSpout), you can use Testing.completeTopologyto wait until the topology has finished processing all the tuples you set up the stub to emit.

另一种等待拓扑处理一些元组的方法是将一些消息放入 Kafka,启动拓扑,然后让测试线程轮询 Cassandra 以查看您期望的消息是否已经通过.这样,您可以在测试线程中设置超时,如果在几秒钟内不满足条件,则测试失败.您可以为此 https://github.com/awaitility/awaitility 使用像 Awaitility 这样的实用程序,或者只需编写您自己的轮询逻辑即可.

Another method to wait for the topology to have processed some tuples would be that you put some messages in Kafka, start your topology, and then have your testing thread poll Cassandra to see if the messages you expect have made it through. This way, you can set a timeout in your testing thread, and have the test fail if the condition is not met in some number of seconds. You could use a utility like Awaitility for this https://github.com/awaitility/awaitility, or just write your own polling logic.

如果您的意思是准备处理",请更详细地说明您的意思.

If you mean something else by "ready for processing", please describe in more detail what you mean.

这篇关于是否有任何 Java API 可以知道拓扑何时准备好从 Spout 读取第一条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆