使用项目反应器 mergeWith() 运算符以实现“if/elseif/else"分支逻辑 [英] Using project reactor mergeWith() operator in order to achieve "if/elseif/else" branching logic
问题描述
我正在尝试使用 project reactor mergeWith
运算符来实现 if/elseif/else
分支逻辑,如下所述:RxJS,If-Else 运算符在哪里.
I am trying to use project reactor mergeWith
operator in order to achieve a if/elseif/else
branching logic as described here: RxJS, where is the If-Else Operator.
提供的示例是用 RxJS 编写的,但基本思想保持不变.
The provided samples are written in RxJS but the underlying idea remains the same.
基本上的想法是在 3 个 monos/publishers
(因此有 3 个不同的谓词)上使用 filter
运算符并合并 3 个 monos
如下(这里当然是 RxJS Observables
):
Basically the idea is to use the filter
operator on 3 monos/publishers
(therefore with 3 different predicates) and merge the 3 monos
as follows (here they are RxJS Observables
of course):
const somethings$ = source$
.filter(isSomething)
.do(something);
const betterThings$ = source$
.filter(isBetterThings)
.do(betterThings);
const defaultThings$ = source$
.filter((val) => !isSomething(val) && !isBetterThings(val))
.do(defaultThing);
// merge them together
const onlyTheRightThings$ = somethings$
.merge(
betterThings$,
defaultThings$,
)
.do(correctThings);
我已经复制并粘贴了上述文章中的相关示例.
I have copied and pasted the relevant sample from the above article.
考虑到 something$
、betterThings$
和 defaultThings$
是我们的单声道 isSomething
&isBetterThings
是谓词.
Consider that something$
, betterThings$
and defaultThings$
are our monos isSomething
& isBetterThings
are the predicates.
现在是我的 3 个真正的 monos/publishers
(用 Java 编写):
Now here are my 3 real monos/publishers
(written in java):
private Mono<ServerResponse> validateUser(User user) {
return Mono.just(new BeanPropertyBindingResult(user, User.class.getName()))
.doOnNext(err -> userValidator.validate(user, err))
.filter(AbstractBindingResult::hasErrors)
.flatMap(err ->
status(BAD_REQUEST)
.contentType(APPLICATION_JSON)
.body(BodyInserters.fromObject(err.getAllErrors()))
);
}
private Mono<ServerResponse> validateEmailNotExists(User user) {
return userRepository.findByEmail(user.getEmail())
.flatMap(existingUser ->
status(BAD_REQUEST)
.contentType(APPLICATION_JSON)
.body(BodyInserters.fromObject("User already exists."))
);
}
private Mono<ServerResponse> saveUser(User user) {
return userRepository.save(user)
.flatMap(newUser -> status(CREATED)
.contentType(APPLICATION_JSON)
.body(BodyInserters.fromObject(newUser))
);
}
这里是需要合并三个publishers
的顶级方法:
Here is the top level method that needs to merge the three publishers
:
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
return serverRequest.bodyToMono(User.class)
.mergeWith(...)
}
我不知道如何使用 mergeWith()
运算符...我尝试了 Mono.when()
静态运算符,它需要多个发布者(适用于me)但返回一个 Mono
(对我不利).
I am not sure how to use the mergeWith()
operator... I have tried the Mono.when()
static operator which takes several publishers (good for me) but returns a Mono<void>
(bad for me).
有人可以帮忙吗?
P.S.我相信你会原谅 RxJS (js) 和 Reactor 代码 (java) 之间的混合.我打算利用我在 RxJS 中的知识在我的 Reactor 应用程序中实现类似的目标. :-)
编辑 1:我试过了:
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
return serverRequest
.bodyToMono(User.class)
.flatMap(user -> validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user))).single();
}
但是我收到这个错误:NoSuchElementException: Source was empty
edit 2:同(注意括号):
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
return serverRequest
.bodyToMono(User.class)
.flatMap(user -> validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user)).single());
}
编辑 3:与 Mono
相同的错误:
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
Mono<User> userMono = serverRequest.bodyToMono(User.class);
return validateUser(userMono)
.or(validateEmailNotExists(userMono))
.or(saveUser(userMono))
.single();
}
编辑 4:我可以确认三个单声道中至少有一个会一直发射.当我使用 or()
运算符时,出现了问题......
edit 4: I can confirm that at least one of the three monos will always emit. It is when I use the or()
operator that something goes wrong...
如果我使用它,我所有的测试都会通过:
If I use this, all my tests pass:
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
return serverRequest.bodyToMono(User.class)
.flatMap(user -> Flux.concat(validateUser(user), validateEmailNotExists(user), saveUser(user)).next().single());
}
我在这里使用了 concat()
运算符来保留操作顺序.
I have used the concat()
operator here to preserve the order of operations.
你知道我对 or()
操作符有什么误解吗?
Do you know what I am getting wrong with the or()
operator?
edit 5:我尝试使用 cache()
运算符如下但无济于事:
edit 5: I have tried with the cache()
operator as follows to no avail:
public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
return serverRequest
.bodyToMono(User.class)
.cache()
.flatMap(user -> validateUser(user)
.or(validateEmailNotExists(user))
.or(saveUser(user))
.single()
);
}
推荐答案
您当前的代码示例意味着您的 3 个返回 Mono
的方法应该采用 Mono
而不是 User
,因此您可能需要在那里更改某些内容.
Your current code sample implies that your 3 methods returning Mono<ServerResponse>
should be taking a Mono<User>
rather than a User
, so you may need to alter something there.
但是,我离题了——这似乎不是这里的主要问题.
However, I digress - that doesn't seem to be the main question here.
根据我对该链接中描述的模式的理解,您正在创建 3 个单独的 Mono
对象,其中只有一个会返回结果 - 而您需要一个 Mono
原始 3 个 Mono
对象中的任何一个返回.
From what I understand of the pattern described in that link, you're creating 3 separate Mono
objects, only one of which will ever return a result - and you need a Mono
of whichever one of your original 3 Mono
objects returns.
在这种情况下,我会推荐以下内容:
In that case, I'd recommend something like the following:
Mono<ServerResult> result = Flux.merge(validateUser(user), validateEmailNotExists(user), saveUser(user)).next().single();
分解:
- 静态
Flux.merge()
方法将你的 3 个Mono
对象合并成一个Flux
; next()
返回第一个可用结果作为Mono
;single()
将确保Mono
发出一个值,而不是什么都没有,否则抛出异常.(可选,但只是一点安全网.)
- The static
Flux.merge()
method takes your 3Mono
objects and merges them into aFlux
; next()
returns the first available result as aMono
;single()
will ensure that theMono
emits a value, as oppose to nothing at all, and throw an exception otherwise. (Optional, but just a bit of a safety net.)
你也可以像这样链接Mono.or()
:
Mono<ServerResult> result = validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user)).single();
这种方法的优点是:
- 在某些情况下,它可以说更具可读性;
- 如果您的链中可能有多个
Mono
返回结果,这允许您设置优先顺序以选择一个(与上面的示例相反)在这里,您将首先获得Mono
发出的值.)
- It's arguably more readable in some cases;
- If it's possible that you'll have more than one
Mono
in your chain return a result, this allows you to set an order of precedence for which one is chosen (as oppose to the above example where you'll just get whateverMono
emitted a value first.)
缺点可能是性能之一.如果saveUser()
在上面的代码中先返回一个值,那么你还是要等另外两个Mono
对象完成后再组合起来Mono代码> 将完成.
The disadvantage is potentially one of performance. If saveUser()
returns a value first in the above code, then you still have to wait for the other two Mono
objects to complete before your combined Mono
will complete.
这篇关于使用项目反应器 mergeWith() 运算符以实现“if/elseif/else"分支逻辑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!