测试风暴螺栓和喷口 [英] Testing Storm Bolts and Spouts

查看:90
本文介绍了测试风暴螺栓和喷口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是一个关于用Java编写的Storm拓扑中的单元测试螺栓和喷口的普遍问题.

This is a general question regarding Unit Testing Bolts and Spouts in a Storm Topology written in Java.

推荐的单元测试实践和指南是什么(JUnit?)螺栓喷口?

What is the recommended practice and guideline for unit-testing (JUnit?) Bolts and Spouts?

例如,我可以为Bolt编写一个JUnit测试,但是在不完全了解框架(如Bolt的生命周期)和序列化含义的情况下,很容易犯下基于构造函数的非可序列化的成员变量.在JUnit中,此测试可以通过,但是在拓扑中,它将无法正常工作.我完全想象有很多测试点需要考虑(例如带有序列化和生命周期的示例).

For instance, I could write a JUnit test for a Bolt, but without fully understanding the framework (like the lifecycle of a Bolt) and the Serialization implications, easily make the mistake of Constructor-based creation of non-serializable member variables. In JUnit, this test would pass, but in a topology, it wouldn't work. I fully imagine there are many test points one needs to consider (such as this example with Serialization & lifecycle).

因此,建议如果使用基于JUnit的单元测试,则运行小的模拟拓扑(LocalMode?)并在该拓扑下测试Bolt(或Spout)的隐式协定?还是可以使用JUnit,但这意味着我们必须仔细模拟Bolt的生命周期(创建它,调用prepare(),模拟Config等)?在这种情况下,要考虑的被测类(螺栓/喷口)有哪些常规测试点?

Therefore, is it recommended that if you use JUnit based unit tests, you run a small mock topology (LocalMode?) and test the implied contract for the Bolt (or Spout) under that Topology? Or, is it OK to use JUnit, but the implication being that we have to simulate the lifecycle of a Bolt (creating it, calling prepare(), mocking a Config, etc) carefully? In this case, what are some general test points for the class under test (Bolt/Spout) to consider?

在创建适当的单元测试方面,其他开发人员做了什么?

我注意到有一个拓扑测试API(请参阅:Spout(并验证Bolt必须提供的隐式合同,例如-它是Declared输出)?

I noticed there is a Topology testing API (See: https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java). Is it better to use some of that API, and stand up "Test Topologies" for each individual Bolt & Spout (and verifying the implicit contract that the Bolt has to provide for, eg - it's Declared outputs)?

谢谢

推荐答案

我们的方法是使用可序列化工厂的构造函数注入到喷口/螺栓中.然后,喷口/螺栓以其打开/准备方法向工厂咨询.工厂的唯一职责是以可序列化的方式封装获得喷口/螺栓的依赖关系.这种设计使我们的单元测试可以注入伪造/测试/模拟工厂,在被咨询时返回模拟服务.通过这种方式,我们可以使用模拟等对单元进行狭窄的单元测试. Mockito.

Our approach is to use constructor-injection of a serializable factory into the spout/bolt. The spout/bolt then consults the factory in its open/prepare method. The factory's single responsibility is to encapsulate obtaining the spout/bolt's dependencies in a serializable fashion. This design allows our unit tests to inject fake/test/mock factories which, when consulted, return mock services. In this way we can narrowly unit test the spout/bolts using mocks e.g. Mockito.

以下是螺栓的一般示例及其测试.我已经省略了工厂UserNotificationFactory的实现,因为它取决于您的应用程序.您可以使用服务定位器来获取服务,序列化配置,可访问HDFS的配置,或者实际上以任何方式获取正确的服务,只要工厂可以在服务周期后进行即可.您应该介绍该类的序列化.

Below is a generic example of a bolt and a test for it. I have omitted the implementation of the factory UserNotificationFactory because it depends on your application. You might use service locators to obtain the services, serialized configuration, HDFS-accessible configuration, or really any way at all to get the correct services, so long as the factory can do it after a serde cycle. You should cover serialization of that class.

螺栓

public class NotifyUserBolt extends BaseBasicBolt {
  public static final String NAME = "NotifyUser";
  private static final String USER_ID_FIELD_NAME = "userId";

  private final UserNotifierFactory factory;
  transient private UserNotifier notifier;

  public NotifyUserBolt(UserNotifierFactory factory) {
    checkNotNull(factory);

    this.factory = factory;
  }

  @Override
  public void prepare(Map stormConf, TopologyContext context) {
    notifier = factory.createUserNotifier();
  }

  @Override
  public void execute(Tuple input, BasicOutputCollector collector) {
    // This check ensures that the time-dependency imposed by Storm has been observed
    checkState(notifier != null, "Unable to execute because user notifier is unavailable.  Was this bolt successfully prepared?");

    long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME);

    notifier.notifyUser(userId);

    collector.emit(new Values(userId));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields(USER_ID_FIELD_NAME));
  }
}

测试

public class NotifyUserBoltTest {

  private NotifyUserBolt bolt;

  @Mock
  private TopologyContext topologyContext;

  @Mock
  private UserNotifier notifier;

  // This test implementation allows us to get the mock to the unit-under-test.
  private class TestFactory implements UserNotifierFactory {

    private final UserNotifier notifier;

    private TestFactory(UserNotifier notifier) {
      this.notifier = notifier;
    }

    @Override
    public UserNotifier createUserNotifier() {
      return notifier;
    }
  }

  @Before
  public void before() {
    MockitoAnnotations.initMocks(this);

    // The factory will return our mock `notifier`
    bolt = new NotifyUserBolt(new TestFactory(notifier));
    // Now the bolt is holding on to our mock and is under our control!
    bolt.prepare(new Config(), topologyContext);
  }

  @Test
  public void testExecute() {
    long userId = 24;
    Tuple tuple = mock(Tuple.class);
    when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId);
    BasicOutputCollector collector = mock(BasicOutputCollector.class);

    bolt.execute(tuple, collector);

    // Here we just verify a call on `notifier`, but we could have stubbed out behavior befor
    //  the call to execute, too.
    verify(notifier).notifyUser(userId);
    verify(collector).emit(new Values(userId));
  }
}

这篇关于测试风暴螺栓和喷口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆