为什么 Thread.sleep() 会触发对 Flux.interval() 的订阅? [英] Why does Thread.sleep() trigger the subscription to Flux.interval()?

查看:68
本文介绍了为什么 Thread.sleep() 会触发对 Flux.interval() 的订阅?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果在 main() 方法中,我执行这个

If, in a main() method, I execute this

Flux.just(1,2)
    .log()
    .subscribe();

我在控制台中得到了这个:

I get this in the console:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onComplete()

如果不是 just() 我使用 interval() 方法:

If instead of just() I use the interval() method:

Flux.interval(Duration.ofMillis(100))
    .take(2)
    .log()
    .subscribe();

元素不会被记录,除非我添加 Thread.sleep() 这给了我:

the elements are not logged, unless I add Thread.sleep() which gives me:

[ INFO] (main) onSubscribe(FluxTake.TakeSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(0)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onComplete()

问题是:为什么我需要暂停一个线程才能真正触发订阅?

The question is: why do I require to pause a thread to actually trigger the subscription ?

推荐答案

您需要等待主线程并让执行完成.您的主线程在生成下一个元素之前终止.您的第一个元素是在 100 ms 之后生成的,因此您需要等待/阻塞主线程.试试这个:

You need to wait on main thread and let the execution complete. Your main thread terminates before next element is generated. Your first element is generated after 100 ms so you need to wait/block the main thread. Try this:

CountDownLatch latch = new CountDownLatch(1);
Flux.interval(Duration.ofMillis(100))
        .take(2)
        .doOnComplete(latch::countDown)
        .log()
        .subscribe();
latch.await(); // wait for doOnComplete

CountDownLatch:

一种同步辅助,允许一个或多个线程等待其他线程中正在执行的一组操作完成.

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

这篇关于为什么 Thread.sleep() 会触发对 Flux.interval() 的订阅?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆