在 Storm TrackedTopology 单元测试中运行 Trident 拓扑 [英] Running Trident Topology in Storm TrackedTopology Unit Test

查看:20
本文介绍了在 Storm TrackedTopology 单元测试中运行 Trident 拓扑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何运行 Trident 拓扑的 JUnit 测试以允许元组在每个阶段测试和验证输出时流过拓扑?我曾尝试在 Storm 的测试框架内运行,但它无法验证和一致执行 Trident.

How can I run a JUnit test of a Trident Topology to allow tuples to flow through the topology while testing and verifying the output at each stage? I've tried running within Storm's Testing framework, but it's falling short of allowing verification and consistent execution of Trident.

这是一个示例拓扑,其中包含一些我遇到最多问题的内嵌注释.

Here's an example topology with some in-line comments where I'm having the most issues.

import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.testing.MemoryMapState;
import storm.trident.testing.Split;
import backtype.storm.Config;
import backtype.storm.ILocalCluster;
import backtype.storm.Testing;
import backtype.storm.testing.FeederSpout;
import backtype.storm.testing.TestJob;
import backtype.storm.testing.TrackedTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;

public class WordCountTopologyTest {

    @Test
    public void testWordCountTopology() throws Exception {
        Testing.withTrackedCluster(new WordCountTestJob());
    }

    public class WordCountTestJob implements TestJob {

        @Override
        public void run(ILocalCluster cluster) throws Exception {

            // Create the test topology to submit
            TridentTopology termCountTopology = new TridentTopology();

            FeederSpout feeder = new FeederSpout(new Fields("text", "author"));

            TridentState tridentState = termCountTopology.newStream("inputSpout", feeder)
                    .each(new Fields("text"), new Split(), new Fields("word"))
                   .groupBy(new Fields("word"))
                   .name("counter-output")
                   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
                   .parallelismHint(6);

            TrackedTopology tracked = Testing.mkTrackedTopology(cluster, termCountTopology.build());

            // Feed some random data into the topology
            feeder.feed(Arrays.asList("Nearly all men can stand adversity, but if you want to test a man's character, give him power.", "Abraham Lincoln"));
            feeder.feed(Arrays.asList("No man has a good enough memory to be a successful liar.", "Abraham Lincoln"));
            feeder.feed(Arrays.asList("Either write something worth reading or do something worth writing.", "Benjamin Franklin"));
            feeder.feed(Arrays.asList("Well done is better than well said.", "Benjamin Franklin"));

            cluster.submitTopology("word-count-testing", new Config(), tracked.getTopology());

            // (!!) Runs, but bad to sleep for any time when may run faster or slower on other systems
            // Utils.sleep(5000);

            // (!!) Fails with 5000ms Topology timeout
            // Testing.trackedWait(tracked, 3);

            /*
             * (!!) Always 0. Trident creates the streams and bolts internally with
             * different names, so how can we read them to verify?
             */
            List outputTuples = Testing.readTuples(tracked, "counter-output");
            assertEquals(0, outputTuples.size());
        }
    }
}

除此之外,我尝试编写自己的 BaseFilter 来标记到存储所有元组的流的末尾,但似乎必须有更好的方法.此外,这并不能解决以受控方式运行拓扑的问题.这是 Trident 支持的东西吗?

Beyond this, I've tried writing my own BaseFilter to tag on to the end of the stream that stores all of the tuples, but it seems like there must be a better way. Also, that doesn't solve the issue of running the topology in a controlled manner. Is this something that Trident supports?

推荐答案

使用类 FeederBatchSpout(用于 Trident)而不是 FeederSpout.FeederBatchSpout 自己阻塞,不需要使用 Testing.trackedWait() 或类似的东西.

Use the class FeederBatchSpout (for Trident) instead of FeederSpout. FeederBatchSpout blocks by itself, there is no need to use Testing.trackedWait() or anything like that.

来源:https://groups.google.com/forum/#!topic/storm-user/CrAdQEXo5OU

这篇关于在 Storm TrackedTopology 单元测试中运行 Trident 拓扑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆