使用项目反应器 mergeWith() 运算符以实现“if/elseif/else"分支逻辑 [英] Using project reactor mergeWith() operator in order to achieve "if/elseif/else" branching logic

查看:75
本文介绍了使用项目反应器 mergeWith() 运算符以实现“if/elseif/else"分支逻辑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 project reactor mergeWith 运算符来实现 if/elseif/else 分支逻辑,如下所述:RxJS,If-Else 运算符在哪里.

I am trying to use project reactor mergeWith operator in order to achieve a if/elseif/else branching logic as described here: RxJS, where is the If-Else Operator.

提供的示例是用 RxJS 编写的,但基本思想保持不变.

The provided samples are written in RxJS but the underlying idea remains the same.

基本上的想法是在 3 个 monos/publishers(因此有 3 个不同的谓词)上使用 filter 运算符并合并 3 个 monos如下(这里当然是 RxJS Observables):

Basically the idea is to use the filter operator on 3 monos/publishers (therefore with 3 different predicates) and merge the 3 monos as follows (here they are RxJS Observables of course):

const somethings$ = source$
  .filter(isSomething)
  .do(something);

const betterThings$ = source$
  .filter(isBetterThings)
  .do(betterThings);

const defaultThings$ = source$
  .filter((val) => !isSomething(val) && !isBetterThings(val))
  .do(defaultThing);

// merge them together
const onlyTheRightThings$ = somethings$
  .merge(
    betterThings$,
    defaultThings$,
  )
  .do(correctThings);

我已经复制并粘贴了上述文章中的相关示例.

I have copied and pasted the relevant sample from the above article.

考虑到 something$betterThings$defaultThings$ 是我们的单声道 isSomething &isBetterThings 是谓词.

Consider that something$, betterThings$ and defaultThings$ are our monos isSomething & isBetterThings are the predicates.

现在是我的 3 个真正的 monos/publishers(用 Java 编写):

Now here are my 3 real monos/publishers (written in java):

private Mono<ServerResponse> validateUser(User user) {
    return Mono.just(new BeanPropertyBindingResult(user, User.class.getName()))
        .doOnNext(err -> userValidator.validate(user, err))
        .filter(AbstractBindingResult::hasErrors)
        .flatMap(err ->
            status(BAD_REQUEST)
                .contentType(APPLICATION_JSON)
                .body(BodyInserters.fromObject(err.getAllErrors()))
        );
}

private Mono<ServerResponse> validateEmailNotExists(User user) {
    return userRepository.findByEmail(user.getEmail())
        .flatMap(existingUser ->
            status(BAD_REQUEST)
                .contentType(APPLICATION_JSON)
                .body(BodyInserters.fromObject("User already exists."))
        );
}

private Mono<ServerResponse> saveUser(User user) {
    return userRepository.save(user)
        .flatMap(newUser -> status(CREATED)
            .contentType(APPLICATION_JSON)
            .body(BodyInserters.fromObject(newUser))
        );
}

这里是需要合并三个publishers的顶级方法:

Here is the top level method that needs to merge the three publishers:

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    return serverRequest.bodyToMono(User.class)
        .mergeWith(...)

}

我不知道如何使用 mergeWith() 运算符...我尝试了 Mono.when() 静态运算符,它需要多个发布者(适用于me)但返回一个 Mono(对我不利).

I am not sure how to use the mergeWith() operator... I have tried the Mono.when() static operator which takes several publishers (good for me) but returns a Mono<void> (bad for me).

有人可以帮忙吗?

P.S.我相信你会原谅 RxJS (js) 和 Reactor 代码 (java) 之间的混合.我打算利用我在 RxJS 中的知识在我的 Reactor 应用程序中实现类似的目标. :-)

编辑 1:我试过了:

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    return serverRequest
        .bodyToMono(User.class)
        .flatMap(user -> validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user))).single();
}

但是我收到这个错误:NoSuchElementException: Source was empty

edit 2:同(注意括号):

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    return serverRequest
        .bodyToMono(User.class)
        .flatMap(user -> validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user)).single());
}

编辑 3:与 Mono 相同的错误:

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    Mono<User> userMono = serverRequest.bodyToMono(User.class);
    return validateUser(userMono)
        .or(validateEmailNotExists(userMono))
        .or(saveUser(userMono))
        .single();
}

编辑 4:我可以确认三个单声道中至少有一个会一直发射.当我使用 or() 运算符时,出现了问题......

edit 4: I can confirm that at least one of the three monos will always emit. It is when I use the or() operator that something goes wrong...

如果我使用它,我所有的测试都会通过:

If I use this, all my tests pass:

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    return serverRequest.bodyToMono(User.class)
        .flatMap(user -> Flux.concat(validateUser(user), validateEmailNotExists(user), saveUser(user)).next().single());
}

我在这里使用了 concat() 运算符来保留操作顺序.

I have used the concat() operator here to preserve the order of operations.

你知道我对 or() 操作符有什么误解吗?

Do you know what I am getting wrong with the or() operator?

edit 5:我尝试使用 cache() 运算符如下但无济于事:

edit 5: I have tried with the cache() operator as follows to no avail:

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    return serverRequest
        .bodyToMono(User.class)
        .cache()
        .flatMap(user -> validateUser(user)
            .or(validateEmailNotExists(user))
            .or(saveUser(user))
            .single()
        );
}

推荐答案

您当前的代码示例意味着您的 3 个返回 Mono 的方法应该采用 Mono 而不是 User,因此您可能需要在那里更改某些内容.

Your current code sample implies that your 3 methods returning Mono<ServerResponse> should be taking a Mono<User> rather than a User, so you may need to alter something there.

但是,我离题了——这似乎不是这里的主要问题.

However, I digress - that doesn't seem to be the main question here.

根据我对该链接中描述的模式的理解,您正在创建 3 个单独的 Mono 对象,其中只有一个会返回结果 - 而您需要一个 Mono 原始 3 个 Mono 对象中的任何一个返回.

From what I understand of the pattern described in that link, you're creating 3 separate Mono objects, only one of which will ever return a result - and you need a Mono of whichever one of your original 3 Mono objects returns.

在这种情况下,我会推荐以下内容:

In that case, I'd recommend something like the following:

Mono<ServerResult> result = Flux.merge(validateUser(user), validateEmailNotExists(user), saveUser(user)).next().single();

分解:

  • 静态Flux.merge() 方法将你的 3 个 Mono 对象合并成一个 Flux;
  • next() 返回第一个可用结果作为 Mono;
  • single() 将确保 Mono 发出一个值,而不是什么都没有,否则抛出异常.(可选,但只是一点安全网.)
  • The static Flux.merge() method takes your 3 Mono objects and merges them into a Flux;
  • next() returns the first available result as a Mono;
  • single() will ensure that the Mono emits a value, as oppose to nothing at all, and throw an exception otherwise. (Optional, but just a bit of a safety net.)

你也可以像这样链接Mono.or():

Mono<ServerResult> result = validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user)).single();

这种方法的优点是:

  • 在某些情况下,它可以说更具可读性;
  • 如果您的链中可能有多个 Mono 返回结果,这允许您设置优先顺序以选择一个(与上面的示例相反)在这里,您将首先获得 Mono 发出的值.)
  • It's arguably more readable in some cases;
  • If it's possible that you'll have more than one Mono in your chain return a result, this allows you to set an order of precedence for which one is chosen (as oppose to the above example where you'll just get whatever Mono emitted a value first.)

缺点可能是性能之一.如果saveUser()在上面的代码中先返回一个值,那么你还是要等另外两个Mono对象完成后再组合起来Mono 将完成.

The disadvantage is potentially one of performance. If saveUser() returns a value first in the above code, then you still have to wait for the other two Mono objects to complete before your combined Mono will complete.

这篇关于使用项目反应器 mergeWith() 运算符以实现“if/elseif/else"分支逻辑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆