是否有任何Java API可以知道何时拓扑已准备好从Spout读取第一条消息 [英] Is there any Java API to know when topology is ready for reading first message from Spout

查看:115
本文介绍了是否有任何Java API可以知道何时拓扑已准备好从Spout读取第一条消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们的Apache Storm拓扑使用KafkaSpout侦听来自Kafka的消息,并在进行大量映射/缩减/富集/聚合等之后,最终将数据插入Cassandra.还有另一个kafka输入,如果拓扑找到响应,我们将在其中接收用户对数据的查询,然后将其发送到第三个kafka主题.现在,我们要使用Junit编写E2E测试,在该测试中,我们可以以编程方式直接将数据插入拓扑,然后通过插入用户查询消息,可以在第三点断言查询中收到的响应是正确的.

Our Apache Storm topology listens messages from Kafka using KafkaSpout and after doing lot of mapping/reducing/enrichment/aggregation etc. etc finally inserts data into Cassandra. There is another kafka input where we receive user queries for data if topology finds a response then it sends that onto a third kafka topic. Now we want to write E2E test using Junit in which we can directly programmatically insert data into topology and then by inserting user query message, we can assert on third point that response received on our query is correct.

为此,我们考虑了启动EmbeddedKafka和CassandraUnit,然后用它们替换实际的Kafka和Cassandra,然后可以在此Junit测试的上下文中启动拓扑.

To achieve this, we thought of starting EmbeddedKafka and CassandraUnit and then replacing actual Kafka and Cassandra with them and then we can start topology in the context of this single Junit test.

在开始实际测试之前,我们创建拓扑并将其提交到LocalCluster.它在不同的线程上启动拓扑,并从Before出来并开始执行我们的测试.到那时为止,拓扑还没有准备好,因为需要花费一些时间来准备进行处理.是否有任何Java API可以告诉我们何时拓扑已准备好进行处理(意味着已准备好从Spout读取第一条消息)?

Before, we start our actual test, we create topology and submit it into LocalCluster. It starts topology on a different thread and comes out from Before and starts executing our test. Till that time, topology is not ready because it takes some time to be ready for processing. Is there any java API which can tell us when topology is ready for processing (means ready to read first message from Spout)?

推荐答案

这取决于您说准备处理"时的意思.

This depends on what you mean when you say "ready for processing".

如果为LocalCluster启用了时间模拟,则可以使用Time.advanceClusterTime逐步扩展时间.如果在提交拓扑后调用此方法,则只有在群集大部分处于空闲状态时,它才会返回.参见例如 https://github.com/apache/storm/blob/8f49e06998abb4dfc50f51d78b6784ebd04844fb/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java#L233 .

If you enable time simulation for your LocalCluster, you can use Time.advanceClusterTime to advance time in steps. If you call this method after submitting a topology, it will only return once the cluster is mostly idling. See e.g. https://github.com/apache/storm/blob/8f49e06998abb4dfc50f51d78b6784ebd04844fb/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java#L233.

如果您愿意用存根替换喷口(例如FixedTupleSpout),则可以使用Testing.completeTopology等到拓扑处理完将存根设置为发出的所有元组之后.

If you're willing to replace your spouts with stubs (e.g. FixedTupleSpout), you can use Testing.completeTopologyto wait until the topology has finished processing all the tuples you set up the stub to emit.

等待拓扑处理一些元组的另一种方法是,将一些消息放入Kafka,启动拓扑,然后让测试线程轮询Cassandra,以查看您期望的消息是否通过.这样,您可以在测试线程中设置超时,如果在数秒内未满足条件,则测试将失败.您可以为此 https://github.com/awaitility/awaitility 使用类似Awaitility的实用程序,或者只需编写自己的轮询逻辑即可.

Another method to wait for the topology to have processed some tuples would be that you put some messages in Kafka, start your topology, and then have your testing thread poll Cassandra to see if the messages you expect have made it through. This way, you can set a timeout in your testing thread, and have the test fail if the condition is not met in some number of seconds. You could use a utility like Awaitility for this https://github.com/awaitility/awaitility, or just write your own polling logic.

如果您通过准备处理"来表示其他含义,请更详细地说明您的意思.

If you mean something else by "ready for processing", please describe in more detail what you mean.

这篇关于是否有任何Java API可以知道何时拓扑已准备好从Spout读取第一条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆