测试风暴螺栓和喷口 [英] Testing Storm Bolts and Spouts
问题描述
这是一个关于用Java编写的Storm拓扑中的单元测试螺栓和喷口的普遍问题.
This is a general question regarding Unit Testing Bolts and Spouts in a Storm Topology written in Java.
推荐的单元测试实践和指南是什么(JUnit?)螺栓和喷口?
What is the recommended practice and guideline for unit-testing (JUnit?) Bolts and Spouts?
例如,我可以为Bolt
编写一个JUnit测试,但是在不完全了解框架(如Bolt
的生命周期)和序列化含义的情况下,很容易犯下基于构造函数的非可序列化的成员变量.在JUnit中,此测试可以通过,但是在拓扑中,它将无法正常工作.我完全想象有很多测试点需要考虑(例如带有序列化和生命周期的示例).
For instance, I could write a JUnit test for a Bolt
, but without fully understanding the framework (like the lifecycle of a Bolt
) and the Serialization implications, easily make the mistake of Constructor-based creation of non-serializable member variables. In JUnit, this test would pass, but in a topology, it wouldn't work. I fully imagine there are many test points one needs to consider (such as this example with Serialization & lifecycle).
因此,建议如果使用基于JUnit的单元测试,则运行小的模拟拓扑(LocalMode
?)并在该拓扑下测试Bolt
(或Spout
)的隐式协定?还是可以使用JUnit,但这意味着我们必须仔细模拟Bolt的生命周期(创建它,调用prepare()
,模拟Config
等)?在这种情况下,要考虑的被测类(螺栓/喷口)有哪些常规测试点?
Therefore, is it recommended that if you use JUnit based unit tests, you run a small mock topology (LocalMode
?) and test the implied contract for the Bolt
(or Spout
) under that Topology? Or, is it OK to use JUnit, but the implication being that we have to simulate the lifecycle of a Bolt (creating it, calling prepare()
, mocking a Config
, etc) carefully? In this case, what are some general test points for the class under test (Bolt/Spout) to consider?
在创建适当的单元测试方面,其他开发人员做了什么?
我注意到有一个拓扑测试API(请参阅:Spout(并验证Bolt必须提供的隐式合同,例如-它是Declared输出)?
I noticed there is a Topology testing API (See: https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java). Is it better to use some of that API, and stand up "Test Topologies" for each individual Bolt
& Spout
(and verifying the implicit contract that the Bolt has to provide for, eg - it's Declared outputs)?
谢谢
推荐答案
我们的方法是使用可序列化工厂的构造函数注入到喷口/螺栓中.然后,喷口/螺栓以其打开/准备方法向工厂咨询.工厂的唯一职责是以可序列化的方式封装获得喷口/螺栓的依赖关系.这种设计使我们的单元测试可以注入伪造/测试/模拟工厂,在被咨询时返回模拟服务.通过这种方式,我们可以使用模拟等对单元进行狭窄的单元测试. Mockito.
Our approach is to use constructor-injection of a serializable factory into the spout/bolt. The spout/bolt then consults the factory in its open/prepare method. The factory's single responsibility is to encapsulate obtaining the spout/bolt's dependencies in a serializable fashion. This design allows our unit tests to inject fake/test/mock factories which, when consulted, return mock services. In this way we can narrowly unit test the spout/bolts using mocks e.g. Mockito.
以下是螺栓的一般示例及其测试.我已经省略了工厂UserNotificationFactory
的实现,因为它取决于您的应用程序.您可以使用服务定位器来获取服务,序列化配置,可访问HDFS的配置,或者实际上以任何方式获取正确的服务,只要工厂可以在服务周期后进行即可.您应该介绍该类的序列化.
Below is a generic example of a bolt and a test for it. I have omitted the implementation of the factory UserNotificationFactory
because it depends on your application. You might use service locators to obtain the services, serialized configuration, HDFS-accessible configuration, or really any way at all to get the correct services, so long as the factory can do it after a serde cycle. You should cover serialization of that class.
螺栓
public class NotifyUserBolt extends BaseBasicBolt {
public static final String NAME = "NotifyUser";
private static final String USER_ID_FIELD_NAME = "userId";
private final UserNotifierFactory factory;
transient private UserNotifier notifier;
public NotifyUserBolt(UserNotifierFactory factory) {
checkNotNull(factory);
this.factory = factory;
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
notifier = factory.createUserNotifier();
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// This check ensures that the time-dependency imposed by Storm has been observed
checkState(notifier != null, "Unable to execute because user notifier is unavailable. Was this bolt successfully prepared?");
long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME);
notifier.notifyUser(userId);
collector.emit(new Values(userId));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(USER_ID_FIELD_NAME));
}
}
测试
public class NotifyUserBoltTest {
private NotifyUserBolt bolt;
@Mock
private TopologyContext topologyContext;
@Mock
private UserNotifier notifier;
// This test implementation allows us to get the mock to the unit-under-test.
private class TestFactory implements UserNotifierFactory {
private final UserNotifier notifier;
private TestFactory(UserNotifier notifier) {
this.notifier = notifier;
}
@Override
public UserNotifier createUserNotifier() {
return notifier;
}
}
@Before
public void before() {
MockitoAnnotations.initMocks(this);
// The factory will return our mock `notifier`
bolt = new NotifyUserBolt(new TestFactory(notifier));
// Now the bolt is holding on to our mock and is under our control!
bolt.prepare(new Config(), topologyContext);
}
@Test
public void testExecute() {
long userId = 24;
Tuple tuple = mock(Tuple.class);
when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId);
BasicOutputCollector collector = mock(BasicOutputCollector.class);
bolt.execute(tuple, collector);
// Here we just verify a call on `notifier`, but we could have stubbed out behavior befor
// the call to execute, too.
verify(notifier).notifyUser(userId);
verify(collector).emit(new Values(userId));
}
}
这篇关于测试风暴螺栓和喷口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!