测试风暴螺栓和喷口 [英] Testing Storm Bolts and Spouts
问题描述
这是关于用 Java 编写的 Storm 拓扑中的单元测试 Bolt 和 Spout 的一般问题.
This is a general question regarding Unit Testing Bolts and Spouts in a Storm Topology written in Java.
单元测试(JUnit?)Bolts 和 Spouts 的推荐做法和指南是什么?
What is the recommended practice and guideline for unit-testing (JUnit?) Bolts and Spouts?
例如,我可以为 Bolt
编写一个 JUnit 测试,但没有完全理解框架(如 Bolt
的生命周期)和序列化的含义,很容易犯基于构造函数创建不可序列化成员变量的错误.在 JUnit 中,这个测试会通过,但在拓扑中,它不起作用.我完全可以想象有很多测试点需要考虑(例如这个带有序列化和生命周期的示例).
For instance, I could write a JUnit test for a Bolt
, but without fully understanding the framework (like the lifecycle of a Bolt
) and the Serialization implications, easily make the mistake of Constructor-based creation of non-serializable member variables. In JUnit, this test would pass, but in a topology, it wouldn't work. I fully imagine there are many test points one needs to consider (such as this example with Serialization & lifecycle).
因此,如果您使用基于 JUnit 的单元测试,是否建议您运行一个小型模拟拓扑(LocalMode
?)并测试 Bolt
的隐含合约(或 Spout
) 在该拓扑下?或者,是否可以使用 JUnit,但其含义是我们必须模拟 Bolt 的生命周期(创建它,调用 prepare()
,模拟 Config
,等)仔细?在这种情况下,被测类(Bolt/Spout)需要考虑哪些通用测试点?
Therefore, is it recommended that if you use JUnit based unit tests, you run a small mock topology (LocalMode
?) and test the implied contract for the Bolt
(or Spout
) under that Topology? Or, is it OK to use JUnit, but the implication being that we have to simulate the lifecycle of a Bolt (creating it, calling prepare()
, mocking a Config
, etc) carefully? In this case, what are some general test points for the class under test (Bolt/Spout) to consider?
其他开发人员在创建适当的单元测试方面做了哪些工作?
我注意到有一个拓扑测试 API(参见:https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java).使用其中的一些 API 并为每个单独的 Bolt
& 建立测试拓扑"会更好吗?Spout
(并验证 Bolt 必须提供的隐式合约,例如 - 它是声明的输出)?
I noticed there is a Topology testing API (See: https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java). Is it better to use some of that API, and stand up "Test Topologies" for each individual Bolt
& Spout
(and verifying the implicit contract that the Bolt has to provide for, eg - it's Declared outputs)?
谢谢
推荐答案
我们的方法是将可序列化工厂的构造函数注入到 spout/bolt 中.然后喷嘴/螺栓以其打开/准备方法咨询工厂.工厂的唯一职责是以可序列化的方式封装获取 spout/bolt 的依赖项.这种设计允许我们的单元测试注入假/测试/模拟工厂,当被咨询时,返回模拟服务.通过这种方式,我们可以使用模拟对 spout/bolts 进行严格的单元测试,例如莫基托.
Our approach is to use constructor-injection of a serializable factory into the spout/bolt. The spout/bolt then consults the factory in its open/prepare method. The factory's single responsibility is to encapsulate obtaining the spout/bolt's dependencies in a serializable fashion. This design allows our unit tests to inject fake/test/mock factories which, when consulted, return mock services. In this way we can narrowly unit test the spout/bolts using mocks e.g. Mockito.
以下是螺栓的通用示例及其测试.我省略了工厂 UserNotificationFactory
的实现,因为它取决于您的应用程序.您可以使用服务定位器来获取服务、序列化配置、HDFS 可访问的配置,或者实际上任何方式来获取正确的服务,只要工厂可以在 serde 周期后完成.您应该介绍该类的序列化.
Below is a generic example of a bolt and a test for it. I have omitted the implementation of the factory UserNotificationFactory
because it depends on your application. You might use service locators to obtain the services, serialized configuration, HDFS-accessible configuration, or really any way at all to get the correct services, so long as the factory can do it after a serde cycle. You should cover serialization of that class.
螺栓
public class NotifyUserBolt extends BaseBasicBolt {
public static final String NAME = "NotifyUser";
private static final String USER_ID_FIELD_NAME = "userId";
private final UserNotifierFactory factory;
transient private UserNotifier notifier;
public NotifyUserBolt(UserNotifierFactory factory) {
checkNotNull(factory);
this.factory = factory;
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
notifier = factory.createUserNotifier();
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// This check ensures that the time-dependency imposed by Storm has been observed
checkState(notifier != null, "Unable to execute because user notifier is unavailable. Was this bolt successfully prepared?");
long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME);
notifier.notifyUser(userId);
collector.emit(new Values(userId));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(USER_ID_FIELD_NAME));
}
}
测试
public class NotifyUserBoltTest {
private NotifyUserBolt bolt;
@Mock
private TopologyContext topologyContext;
@Mock
private UserNotifier notifier;
// This test implementation allows us to get the mock to the unit-under-test.
private class TestFactory implements UserNotifierFactory {
private final UserNotifier notifier;
private TestFactory(UserNotifier notifier) {
this.notifier = notifier;
}
@Override
public UserNotifier createUserNotifier() {
return notifier;
}
}
@Before
public void before() {
MockitoAnnotations.initMocks(this);
// The factory will return our mock `notifier`
bolt = new NotifyUserBolt(new TestFactory(notifier));
// Now the bolt is holding on to our mock and is under our control!
bolt.prepare(new Config(), topologyContext);
}
@Test
public void testExecute() {
long userId = 24;
Tuple tuple = mock(Tuple.class);
when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId);
BasicOutputCollector collector = mock(BasicOutputCollector.class);
bolt.execute(tuple, collector);
// Here we just verify a call on `notifier`, but we could have stubbed out behavior befor
// the call to execute, too.
verify(notifier).notifyUser(userId);
verify(collector).emit(new Values(userId));
}
}
这篇关于测试风暴螺栓和喷口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!