测试风暴螺栓和喷口 [英] Testing Storm Bolts and Spouts

查看:24
本文介绍了测试风暴螺栓和喷口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是关于用 Java 编写的 Storm 拓扑中的单元测试 Bolt 和 Spout 的一般问题.

This is a general question regarding Unit Testing Bolts and Spouts in a Storm Topology written in Java.

单元测试(JUnit?)BoltsSpouts 的推荐做法和指南是什么?

What is the recommended practice and guideline for unit-testing (JUnit?) Bolts and Spouts?

例如,我可以为 Bolt 编写一个 JUnit 测试,但没有完全理解框架(如 Bolt 的生命周期)和序列化的含义,很容易犯基于构造函数创建不可序列化成员变量的错误.在 JUnit 中,这个测试会通过,但在拓扑中,它不起作用.我完全可以想象有很多测试点需要考虑(例如这个带有序列化和生命周期的示例).

For instance, I could write a JUnit test for a Bolt, but without fully understanding the framework (like the lifecycle of a Bolt) and the Serialization implications, easily make the mistake of Constructor-based creation of non-serializable member variables. In JUnit, this test would pass, but in a topology, it wouldn't work. I fully imagine there are many test points one needs to consider (such as this example with Serialization & lifecycle).

因此,如果您使用基于 JUnit 的单元测试,是否建议您运行一个小型模拟拓扑(LocalMode?)并测试 Bolt 的隐含合约(或 Spout) 在该拓扑下?或者,是否可以使用 JUnit,但其含义是我们必须模拟 Bolt 的生命周期(创建它,调用 prepare(),模拟 Config,等)仔细?在这种情况下,被测类(Bolt/Spout)需要考虑哪些通用测试点?

Therefore, is it recommended that if you use JUnit based unit tests, you run a small mock topology (LocalMode?) and test the implied contract for the Bolt (or Spout) under that Topology? Or, is it OK to use JUnit, but the implication being that we have to simulate the lifecycle of a Bolt (creating it, calling prepare(), mocking a Config, etc) carefully? In this case, what are some general test points for the class under test (Bolt/Spout) to consider?

其他开发人员在创建适当的单元测试方面做了哪些工作?

我注意到有一个拓扑测试 API(参见:https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java).使用其中的一些 API 并为每个单独的 Bolt & 建立测试拓扑"会更好吗?Spout(并验证 Bolt 必须提供的隐式合约,例如 - 它是声明的输出)?

I noticed there is a Topology testing API (See: https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java). Is it better to use some of that API, and stand up "Test Topologies" for each individual Bolt & Spout (and verifying the implicit contract that the Bolt has to provide for, eg - it's Declared outputs)?

谢谢

推荐答案

我们的方法是将可序列化工厂的构造函数注入到 spout/bolt 中.然后喷嘴/螺栓以其打开/准备方法咨询工厂.工厂的唯一职责是以可序列化的方式封装获取 spout/bolt 的依赖项.这种设计允许我们的单元测试注入假/测试/模拟工厂,当被咨询时,返回模拟服务.通过这种方式,我们可以使用模拟对 spout/bolts 进行严格的单元测试,例如莫基托.

Our approach is to use constructor-injection of a serializable factory into the spout/bolt. The spout/bolt then consults the factory in its open/prepare method. The factory's single responsibility is to encapsulate obtaining the spout/bolt's dependencies in a serializable fashion. This design allows our unit tests to inject fake/test/mock factories which, when consulted, return mock services. In this way we can narrowly unit test the spout/bolts using mocks e.g. Mockito.

以下是螺栓的通用示例及其测试.我省略了工厂 UserNotificationFactory 的实现,因为它取决于您的应用程序.您可以使用服务定位器来获取服务、序列化配置、HDFS 可访问的配置,或者实际上任何方式来获取正确的服务,只要工厂可以在 serde 周期后完成.您应该介绍该类的序列化.

Below is a generic example of a bolt and a test for it. I have omitted the implementation of the factory UserNotificationFactory because it depends on your application. You might use service locators to obtain the services, serialized configuration, HDFS-accessible configuration, or really any way at all to get the correct services, so long as the factory can do it after a serde cycle. You should cover serialization of that class.

螺栓

public class NotifyUserBolt extends BaseBasicBolt {
  public static final String NAME = "NotifyUser";
  private static final String USER_ID_FIELD_NAME = "userId";

  private final UserNotifierFactory factory;
  transient private UserNotifier notifier;

  public NotifyUserBolt(UserNotifierFactory factory) {
    checkNotNull(factory);

    this.factory = factory;
  }

  @Override
  public void prepare(Map stormConf, TopologyContext context) {
    notifier = factory.createUserNotifier();
  }

  @Override
  public void execute(Tuple input, BasicOutputCollector collector) {
    // This check ensures that the time-dependency imposed by Storm has been observed
    checkState(notifier != null, "Unable to execute because user notifier is unavailable.  Was this bolt successfully prepared?");

    long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME);

    notifier.notifyUser(userId);

    collector.emit(new Values(userId));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields(USER_ID_FIELD_NAME));
  }
}

测试

public class NotifyUserBoltTest {

  private NotifyUserBolt bolt;

  @Mock
  private TopologyContext topologyContext;

  @Mock
  private UserNotifier notifier;

  // This test implementation allows us to get the mock to the unit-under-test.
  private class TestFactory implements UserNotifierFactory {

    private final UserNotifier notifier;

    private TestFactory(UserNotifier notifier) {
      this.notifier = notifier;
    }

    @Override
    public UserNotifier createUserNotifier() {
      return notifier;
    }
  }

  @Before
  public void before() {
    MockitoAnnotations.initMocks(this);

    // The factory will return our mock `notifier`
    bolt = new NotifyUserBolt(new TestFactory(notifier));
    // Now the bolt is holding on to our mock and is under our control!
    bolt.prepare(new Config(), topologyContext);
  }

  @Test
  public void testExecute() {
    long userId = 24;
    Tuple tuple = mock(Tuple.class);
    when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId);
    BasicOutputCollector collector = mock(BasicOutputCollector.class);

    bolt.execute(tuple, collector);

    // Here we just verify a call on `notifier`, but we could have stubbed out behavior befor
    //  the call to execute, too.
    verify(notifier).notifyUser(userId);
    verify(collector).emit(new Values(userId));
  }
}

这篇关于测试风暴螺栓和喷口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆