用Reactor抛出异常的正确方法 [英] Correct way of throwing exceptions with Reactor

查看:185
本文介绍了用Reactor抛出异常的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是项目反应器和反应性编程的新手.

I'm new to project Reactor and reactive programming in general.

我目前正在编写类似于以下代码:

I'm currently working on a piece of code similar to this:

Mono.just(userId)
    .map(repo::findById)
    .map(user-> {
        if(user == null){
            throw new UserNotFoundException();
        }
        return user;
    })
    // ... other mappings

这个例子可能很愚蠢,确实有更好的方法来实现这种情况,但是重点是:

This example is probably silly and there are surely better ways of implementing this case, but the point is:

map块中使用throw new异常是否错误,还是应该将其替换为return Mono.error(new UserNotFoundException())?

Is it wrong to use a throw new exception in a map block or should I replace this with a return Mono.error(new UserNotFoundException())?

这两种方式有什么实际区别吗?

Is there any actual difference in these two ways of doing?

推荐答案

有两种方法可以被视为方便的异常抛出方法:

There are a couple of ways that could be considered as a convenient way of exception throwing:

可以简化元素处理(可能导致错误或空流)的方法之一是运算符handle.

One of the way that could simplify handling of an element which may result in an error or empty stream is operator handle.

以下代码显示了如何使用它来解决我们的问题:

The following code shows how we can use it in order to solve our problem:

Mono.just(userId)
    .map(repo::findById)
    .handle((user, sink) -> {
        if(!isValid(user)){
            sink.error(new InvalidUserException());
        } else if (isSendable(user))
            sink.next(user);
        }
        else {
            //just ignore element
        }
    })

如我们所见,

.handle运算符需要传递BiConsumer<T, SynchronousSink<>才能处理元素.在这里,我们的BiConsumer中有两个参数.第一个元素是上游元素,第二个元素是SynchronousSink,这有助于我们向下游同步提供元素.这种技术扩展了提供元素处理的不同结果的能力.例如,如果元素无效,我们可以向同一SycnchronousSync提供错误,这将取消上游并向下游产生onError信号.反过来,我们可以使用相同的handle运算符过滤".一旦执行了句柄BiConsumer并且未提供任何元素,Reactor会将其视为一种过滤,并将为我们请求一个额外的元素.最后,如果该元素有效,我们可以简单地调用SynchronousSink#next并将其传播到下游或在其上应用一些映射,因此在这里我们将handle作为map运算符.此外,我们可以安全地使用该运算符,而不会影响性能,并提供复杂的元素验证,例如元素验证或向下游发送错误.

as we can see, the .handle operator requires to pass BiConsumer<T, SynchronousSink<> in order to handle an element. Here we have two parameters in our BiConsumer. The first one is an element from the upstream where the second one is SynchronousSink which helps us to supply element to downstream synchronously. Such a technique expands an ability to supply different results of our element's processing. For example, in case the element is invalid, we can supply error to the same SycnchronousSync which will cancel upstream and produce onError signal to downstream. In turn, we can "filter" using the same handle operator. Once the handle BiConsumer is executed and no element has been supplied, Reactor will consider that as kind of filtering and will request for an additional element for us. Finally, in case the element is valid, we can simply call SynchronousSink#next and propagate our element downstream or apply some mapping on it, so we will have handle as the map operator here. Moreover, we can safely use that operator with no-performance impact and provide complex element verification such as validation of element or error sending to downstream.

在映射过程中引发异常的选项之一是将map替换为concatMap.本质上,concatMap的作用几乎与flatMap相同.唯一的区别是concatMap一次仅允许一个子流.这种行为大大简化了内部实现,并且不影响性能.因此,我们可以使用以下代码来以更实用的方式引发异常:

One of the options to throw an exception during mapping is to replace map with concatMap. In its essence, concatMap does almost the same flatMap does. The only difference is that concatMap allows only one substream at a time. Such behavior simplifies internal implementation a lot and does not impact performance. So we can use the following code in order to throw an exception in a more functional way:

Mono.just(userId)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Mono.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

在上面的示例中,如果用户无效,我们将使用Mono.error返回异常.我们可以使用Flux.error对通量做同样的事情:

In the sample above in case of an invalid user, we return exception using Mono.error. The same we can do for flux using Flux.error:

Flux.just(userId1, userId2, userId3)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Flux.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

注意,在两种情况下,我们都返回仅包含一个元素的 cold 流.在Reactor中,如果返回的流是冷的 scalar 流,可以进行一些优化以提高性能.因此,建议在需要更复杂的映射时使用Flux/Mono concatMap + .justemptyerror,结果可能以return nullthrow new ...结尾.

Note, in both cases we return cold stream which has only one element. In Reactor, there is a couple of optimizations that improve performance in the case returned stream is a cold scalar stream. Thus, it is recommended to use Flux/Mono concatMap + .just, empty, error as a result when we need more complex mapping, that could end up with return null or throw new ....

注意!永远不要检查传入元素的可空性.由于Reactor Project违反了Reactive Streams规范,因此永远不会为您发送null值(请参见

Attention! Don't ever check incoming element on nullability. The Reactor Project will never send a null value for you since this violates Reactive Streams spec (see Rule 2.13) Thus, in case if repo.findById returns null, Reactor will throw NullPointerException for you.

等等,为什么concatMapflatMap好?

本质上,flatMap设计为合并一次执行的多个子流中的元素.这意味着flatMap在其下应具有异步流,因此它们可能处理多个线程上的数据,也可能是多个网络调用.随后,这种期望对实现产生了很大的影响,因此flatMap应该能够处理来自多个流(Thread)的数据(意味着并发数据结构的使用),如果另一个流消耗了资源,则将元素加入队列(意味着附加每个子流的Queue的内存分配),并且不违反反应式流规范规则(意味着确实很复杂的实现).计算所有这些事实以及我们将普通的map操作(是同步的)替换为使用Flux/Mono.error的更方便的引发异常的方式(不会改变执行的同步性)的事实导致我们这样做不需要这么复杂的运算符,我们可以使用更简单的concatMap,该concatMap一次用于单个流的异步处理,并且进行了一些优化以处理标量冷流.

Wait, Why concatMap is better than flatMap?

In its essence, flatMap is designed to merge elements from the multiple substreams that is executing at a time. It means that flatMap should have asynchronous streams underneath so, they could potentially process data on the multiple threads or that could be a several network calls. Subsequently, such expectations impact implementation a lot so flatMap should be able to handle data from the multiple streams (Threads) (means usage of concurrent data structures), enqueue elements if there is a draining from another stream (means additional memory allocation for Queues for each substream) and do not violate Reactive Streams specification rules (means really complex implementation). Counting all these facts and the fact that we replace a plain map operation (which is synchronous) onto the more convenient way of throwing an exception using Flux/Mono.error (which does not change synchronicity of execution) leads to the fact that we do not need such a complex operator and we can use much simpler concatMap which is designed for asynchronous handling of a single stream at a time and has a couple of optimization in order to handle scalar, cold stream.

因此,当结果为空时,引发异常的另一种方法是switchOnEmpty运算符.以下代码演示了如何使用该方法:

So, another approach to throw an exception when the result is empty is switchOnEmpty operator. The following code demonstrates how we can use that approach :

Mono.just(userId)
    .flatMap(repo::findById)
    .switchIfEmpty(Mono.error(new UserNotFoundExeception()))

如我们所见,在这种情况下,repo::findById应该具有UserMono作为返回类型.因此,如果找不到User实例,则结果流将为空.因此,Reactor将调用替代的Mono,指定为switchIfEmpty参数.

As we can see, in this case repo::findById should have Mono of User as the return type. Therefore, in case a User instance will not be found, the result stream will be empty. Thus, Reactor will call an alternative Mono, specified as switchIfEmpty parameter.

它可能被认为是可读性较差的代码或不好的做法( 我自己的观点 ),但是您可以像使用Project Reactor一样抛出异常.即使这样做在某种程度上可能会违反Reactive Streams规范(从语义的角度来看,在这种情况下会违反,因为在幕后的操作员是Subscriber链中的Subscriber,因此-从语义上讲,在lambda中引发异常可以映射为违反onNext方法中的引发异常. 0.3/README.md#2.13"rel =" noreferrer>规范的规则2.13 ).但是,由于Reactor会为您捕获引发的异常并将其作为onError信号传播给下游,因此不禁止这样做.

It could be counted as a less readable code or bad practice (my own opinion), but you can throw your exception as is with Project Reactor. Even though, in someway doing so can violates Reactive Streams specification (in this context violate from the semantic perspective, because your operator under the hood is a Subscriber in a chain of Subscribers, therefore - semantically, throwing an exception in lambda could be mapped to throwing exception in the onNext method which violates the spec's rule 2.13). However, since Reactor will catch the thrown exception for you and propagate it then as the onError signal to your downstream, it is not prohibited to do that.

  1. 使用.handle运算符以提供复杂的元素处理
  2. 当我们需要在映射过程中引发异常时,请使用concatMap + Mono.error,但是这种技术最适合异步元素处理的情况.
  3. 在已经安装了flatMap的情况下,使用flatMap + Mono.error
  4. 禁止使用
  5. Null作为返回类型,因此在您的下游map中,您将得到onError而不是onError
  6. 如果调用某些特定函数的结果以 empty
  7. 结尾的情况下,则在需要发送错误信号的所有情况下均使用switchIfEmpty
  1. Use .handle operator in order to provide complex element processing
  2. Use concatMap+ Mono.error when we need to throw an exception during mapping but such a technique is most suitable for cases of asynchronous element processing.
  3. Use flatMap + Mono.error when we have already had flatMap in place
  4. Null as a return type is forbidden so instead of null in your downstream map you will get unexpected onError with NullPointerException
  5. Use switchIfEmpty in all cases when you need to send an error signal if the result of calling some specific function finished with the empty stream

这篇关于用Reactor抛出异常的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆