使用 StepVerifier 对 Flux.take(Duration duration) 进行单元测试 [英] Unit-testing a Flux.take(Duration duration) with StepVerifier

查看:77
本文介绍了使用 StepVerifier 对 Flux.take(Duration duration) 进行单元测试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 Spring Reactor Core 3.0.6 并且我有一个返回 Flux 的方法:

I am using Spring Reactor Core 3.0.6 and I have a method returning a Flux:

public Flux<Foo> createFlux(){
    return Flux.<List<Foo>,String>generate(/* generator omitted for clarity's sake */ )
        .take(Duration.ofSeconds(10)
        .flatMap(Flux::fromIterable);
}

生成器函数调用分页的 REST api 来获取结果,如果 API 继续返回数据,我希望 Flux 只运行 10 秒.

The generator function calls a paginated REST api to get the results, and if the API continue to return data, I want the Flux to just run for 10 seconds.

它工作正常,但我想创建一些单元测试,但在创建测试以验证 Flux 最多仅运行 10 秒时遇到了麻烦.

It works fine, but I'd like to create some unit-testing and I have trouble in creating a test to verify that the Flux run only for 10 seconds at maximum.

我模拟了其余服务,以便它始终返回数据并写下:

I mocked the rest service so that it always return data and wrote this:

StepVerifier.withVirtualTime(() -> createFlux())
    .thenAwait(Duration.ofSeconds(10))
    .verifyComplete();

但它失败了:

java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext([my toString() Foo bean]))

我想我应该以某种方式使用生成的项目,但我无法找到正确的 StepVerifier 方法来这样做.

I guess that I should consume the generated items in some way, but I am unable to find the right StepVerifier method to do so.

编辑

我尝试使用 thenConsumeWhile 跳过每个项目:

I try to skip every items with thenConsumeWhile:

StepVerifier.withVirtualTime(() -> createFlux())
    .thenAwait(Duration.ofSeconds(10))
    .thenConsumeWhile(t -> true)
    .verifyComplete();

但现在测试只是无限期地运行,永远不会结束.

but now the test just runs indefinitely and never ends.

推荐答案

生成器实际上可能非常重要... StepVerifier 受到无限序列的限制,在使用虚拟时间时更是如此.问题是生成器和 thenAwait 都运行在主线程中,所以生成器是无限的,这会阻止 stepverifier 提前时间,从而阻止序列超时.

The generator might in fact be very important... StepVerifier is limited with infinite sequences, even more so when using virtual time. The problem is that both the generator and the thenAwait run in the main thread, so the generator being infinite prevents the stepverifier from advancing time, which in turn prevents the sequence to time out.

由于您想测试 take 的持续时间,我认为虚拟时间不合适(您正在测试模拟时间).我会让 createFlux 方法使用 take 持续时间进行参数化,并执行 StepVerifier.create(),持续时间要短得多.

Since you want to test the duration of the take, I don't think virtual time is right (you're testing a mock of time). I'd make the createFlux method parameterizable with the take duration, and do a StepVerifier.create(), for a much shorter duration.

如果你真的想使用某种形式的虚拟时间,我发现让它工作的最低要求是

If you really want to use some form of virtual time, I found that the minimal requirement to make it work is to

  1. 通过在测试开始时实例化 Scheduler 来隔离 非虚拟 线程上的生成器循环,然后使用 subscribeOn(scheduler)在 StepVerifier 的 Supplier 中.
  2. 首先调用 .expectNextCount(1),确保在尝试提前时间之前订阅所有内容并且数据开始流动.
  1. Isolate the generator loop on a non virtual thread by instantiating the Scheduler at the beginning of the test, then use subscribeOn(scheduler) in the StepVerifier's Supplier.
  2. Ensure everything is subscribed and data starts to flow before attempting to advance time, by calling .expectNextCount(1) first.

像这样:

public Flux<Integer> createFlux() {
    return Flux.<List<Integer>>generate(sink -> {
        sink.next(Arrays.asList(1, 2, 3));
    })
            .take(Duration.ofSeconds(10))
            .flatMap(Flux::fromIterable);
}

@Test
public void so44657525() throws InterruptedException {
    Scheduler scheduler = Schedulers.newSingle("test");
    AtomicInteger adder = new AtomicInteger();

    StepVerifier.withVirtualTime(() -> createFlux()
            .subscribeOn(scheduler)
            .doOnNext(v -> adder.incrementAndGet())
    )
                .expectNextCount(1)
                .thenAwait(Duration.ofSeconds(10))
                .thenConsumeWhile(t -> true)
                .verifyComplete();

    System.out.println("Total number of values in generated lists: " + adder.get());
}

expectNextCount(1) 修改为 expectNextCount(100_000),我运行了打印 生成列表中的值总数:102405 花了 40 毫秒.

Modifying the expectNextCount(1) to expectNextCount(100_000), I had a run that printed Total number of values in generated lists: 102405 and took 40ms.

这篇关于使用 StepVerifier 对 Flux.take(Duration duration) 进行单元测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆