春季5反应堆中未订购助焊剂 [英] Flux not subscribing in Spring 5 reactor

查看:65
本文介绍了春季5反应堆中未订购助焊剂的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我可能丢失了一些东西,但我不知道它是什么.

I'm probably missing something but I can't figure out what it is.

以下代码什么都不做:

webClient.get().uri("/some/path/here").retrieve()
     .bodyToMono(GetLocationsResponse.class)
     .doOnNext(System.out::println)
     .subscribe();

如果我尝试阻止通话,则效果很好:

If I try to block the call it works fine:

webClient.get().uri("/some/path/here").retrieve()
      .bodyToMono(GetLocationsResponse.class)
      .doOnNext(System.out::println)
      .block();

奇怪的是,如果我手动"创建Flux(即不是来自spring webClient),则效果很好:

The weird thing is that if I create a Flux "manually" (i.e not coming from the spring webClient) this works fine:

Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .doOnNext(System.out::println)
    .subscribe();

有人可以解释我做错了什么吗?难道 .subscribe()在第一种情况下不应该像在后一种情况下那样执行操作吗?

Could someone please explain what it is that I am doing wrong? Isn't .subscribe() supposed to execute the operation in the first case, just like it did in the last?

谢谢!

推荐答案

简短回答

subscribe 不会阻止当前线程,这意味着应用程序主线程可以在Flux发出任何元素之前完成.因此,要么使用 block 要么在主线程中使用waiting.

subscribe does not block current thread, that means app main thread can complete earlier than Flux emits any element. So either use block or employ waiting in the main thread.

详细信息

调用无参数subscription()只需对 Flux 进行 request(unbounded),而无需设置任何 Subscriber .它通常在单独线程中触发操作,但不会阻塞当前线程.您的主线程很可能在 WebClient 在该单独的线程和

Call to the no-args subscribe() just makes request(unbounded) on Flux without setting up any Subscriber. It triggers operation generally in a separate thread but does not block the current thread. Most likely, your main thread ends before WebClient received the response in that separate thread and passive side effect doOnNext(...) happened.

为说明/测试操作已开始,请在主线程中等待一段时间.只需在 subscribe()调用之后放置以下行:

To illustrate/test that operation is started, wait for some time in the main thread. Just put the following line right after subscribe() call:

Thread.sleep(1000);

现在,在使用超时值播放之后,您将能够看到打印的结果.

Now, after playing with the timeout value, you'll be able to see result printed.

现在让我们隐式地发送一个自定义的 Scheduler 用于异步操作,并等待其所有任务完成.另外,让我们将 System.out :: println 作为 subscribe(...)参数而不是 doOnNext 传递,以便完整的代码显示为如下:

Let's now implicitly ship a custom Scheduler for async operations and wait for all its tasks to be completed. Also, let's pass the System.out::println as subscribe(...) argument instead of doOnNext, so that complete code appears as follows:

ExecutorService executor = Executors.newSingleThreadExecutor(); 

webClient.get().uri("/some/path/here").retrieve()
    .bodyToMono(GetLocationsResponse.class)
    .publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
    .subscribe(System.out::println); //still non-blocking

executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread 

此示例使用略有不同的订阅(消费者).最重要的是,它添加了

This example uses slightly different subscribe(Consumer). Most importantly, it adds publishOn(Scheduler) which is backed by ExecutorService. The latter is used then to wait for termination in the main thread.

当然,获得相同结果的简单得多的方法是使用您最初提到的 block():

Surely, the much easier way to achieve the same result is to use block() as you mentioned initially:

webClient.get().uri("/some/path/here").retrieve()
      .bodyToMono(GetLocationsResponse.class)
      .doOnNext(System.out::println)
      .block();

最后,用 Flux.just(...)... subscribe()记录您的第三个示例-似乎它很快就完成了,您的主线程被终止了.这是因为与发出单个 GetLocationsResponse 元素相比,发出几个 String 元素所需的时间更少(这意味着写入请求+读取响应+解析到POJO的时间).但是,如果使此 Flux 延迟元素,则将得到相同的行为:

Finally, note on your third example with Flux.just(...)...subscribe() - seems it just quickly completes before your main thread gets terminated. That's because it requires way less time to emit a few String elements compared to the emission of a single GetLocationsResponse element (implying timings for write request+read response+parse into POJO). However, if you make this Flux to delay elements, you'll get the same behavior reproduced:

Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
    .doOnNext(System.out::println)
    .subscribe(); 


Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .delayElements(Duration.ofMillis(500))
    .doOnNext(System.out::println)
    .blockLast(); //and that makes it printing back again

这篇关于春季5反应堆中未订购助焊剂的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆