在 Mono 对象上执行 block() 时出现异常我从 ReactiveMongoRepository 对象返回 [英] Getting exception while doing block() on Mono object I got back from ReactiveMongoRepository object

查看:57
本文介绍了在 Mono 对象上执行 block() 时出现异常我从 ReactiveMongoRepository 对象返回的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个服务将数据流式传输到第二个服务,该服务接收对象流并将它们保存到我的 MongoDB.在我从流服务获得的 Flux 对象上的订阅函数中,我使用了 ReactiveMongoRepository 接口中的 save 方法.当我尝试使用块功能并获取数据时,出现以下错误:

I have a service that streams data to a second service that receives stream of objects and saves them to my MongoDB. inside my subscribe function on the Flux object that I get from the streaming service I use the save method from the ReactiveMongoRepository interface. when I try to use the block function and get the data I get the following error :

2019-10-11 13:30:38.559  INFO 19584 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:25}] to localhost:27017
2019-10-11 13:30:38.566  INFO 19584 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 0, 1]}, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=6218300}
2019-10-11 13:30:39.158  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : onNext(Quote(id=null, ticker=AAPL, price=164.8, instant=2019-10-11T10:30:38.800Z))
2019-10-11 13:30:39.411  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : cancel()
2019-10-11 13:30:39.429  INFO 19584 --- [ntLoopGroup-2-2] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:26}] to localhost:27017
2019-10-11 13:30:39.437  WARN 19584 --- [ctor-http-nio-4] io.netty.util.ReferenceCountUtil         : Failed to release a message: DefaultHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at 
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at

我的代码:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                    Mono<Quote> savedQuote = quoteRepository.save(quote);
                    System.out.println("I saved a quote! Id: " +savedQuote.block().getId());
                });

经过一些挖掘,我设法让它工作,但我不明白为什么它现在工作.新代码:

after some digging, I manage to get it to work but I don't understand why it works now. the new code:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                       Mono<Quote> savedQuote = quoteRepository.insert(quote);
                       savedQuote.subscribe(result ->
                                 System.out.println("I saved a quote! Id :: " + result.getId()));
    });

block() 的定义:订阅这个 Mono 并无限期地阻塞,直到接收到下一个信号.

the definition of block(): Subscribe to this Mono and block indefinitely until a next signal is received.

subscribe()的定义:订阅这个Mono,请求无限需求.

the definition of subscribe(): Subscribe to this Mono and request unbounded demand.

有人能帮我理解为什么块不起作用而订阅起作用吗?我在这里错过了什么?

can someone help me understand why the block didn't work and the subscribe worked? what am I missing here?

推荐答案

阻塞是不好的,因为它会占用一个等待响应的线程.在响应式框架中,它非常很糟糕,它只有很少的线程可供使用,并且被设计为 没有 应该被不必要地阻塞.

Blocking is bad, since it ties up a thread waiting for a response. It's very bad in a reactive framework which has few threads at its disposal, and is designed so that none of them should be unnecessarily blocked.

这正是响应式框架旨在避免的事情,因此在这种情况下,它只会阻止您这样做:

This is the very thing that reactive frameworks are designed to avoid, so in this case it simply stops you doing it:

block()/blockFirst()/blockLast() 是阻塞的,线程 reactor-http-nio-4 不支持

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4

相比之下,您的新代码异步运行.线程不会被阻塞,因为在存储库返回值之前实际上什么也不会发生(然后执行传递给 savedQuote.subscribe() 的 lambda,将结果打印到控制台.)

Your new code, in contrast, works asynchronously. The thread isn't blocked, as nothing actually happens until the repository returns a value (and then the lambda that you passed to savedQuote.subscribe() is executed, printing out you result to the console.)

但是,从响应式流的角度来看,新代码仍然不是最佳/正常的,因为您正在订阅方法中执行所有逻辑.正常的做法是对我们进行一系列 flatMap/map 调用来转换流中的项目,并使用 doOnNext() 处理副作用(例如打印出一个值):

However, the new code still isn't optimal / normal from a reactive streams perspective, as you're doing all your logic in your subscribe method. The normal thing to do is to us a series of flatMap/map calls to transform the items in the stream, and use doOnNext() for side effects (such as printing out a value):

stockQuoteClient.getQuoteStream()
            .log("quote-monitor-service")
            .flatMap(quoteRepository::insert)
            .doOnNext(result -> System.out.println("I saved a quote! Id :: " + result.getId())))
            .subscribe();

如果您正在对反应器/反应流进行大量的工作,那么总体上值得阅读它们.它们对于非阻塞工作非常强大,但与更标准"的 Java 相比,它们确实需要一种不同的思维方式(和编码).

If you're doing any serious amount of work with reactor / reactive streams, it would be worth reading up on them in general. They're very powerful for non-blocking work, but they do require a different way of thinking (and coding) than more "standard" Java.

这篇关于在 Mono 对象上执行 block() 时出现异常我从 ReactiveMongoRepository 对象返回的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆