Spring WebFlux - 如何从数据库获取数据以供下一步使用 [英] Spring WebFlux - how to get data from DB to use in the next step

查看:65
本文介绍了Spring WebFlux - 如何从数据库获取数据以供下一步使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 Spring WebFlux (Project Reactor),但面临以下问题:我必须从 db 获取一些数据才能使用它们来调用另一项服务 - 一个流中的所有内容.如何做到这一点?

I use Spring WebFlux (Project Reactor) and I'm facing the following problem: I have to get some data from db to use them to call another service - everything in one stream. How to do that?

public Mono<MyObj> saveObj(Mono<MyObj> obj) {
    return obj
        .flatMap(
            ob->
                Mono.zip(
                        repo1.save(
                          ...),
                        repo2
                            .saveAll(...)
                            .collectList(),
                        repo3
                            .saveAll(...)
                            .collectList())
                    .map(this::createSpecificObject))
        .doOnNext(item-> createObjAndCallAnotherService(item));
  }



private void createObjAndCallAnotherService(Prot prot){
myRepository
        .findById(
            prot.getDomCred().stream()
                .filter(Objects::nonNull)
                .findFirst()
                .map(ConfDomCred::getCredId)
                .orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")))
        .doOnNext( //one value is returned from myRepository -> Flux<MyObjectWithNeededData>
            confCred-> {//from this point the code is unreachable!!! - why????
              Optional<ConfDomCred> confDomCred=
                  prot.getDomCreds().stream().filter(Objects::nonNull).findFirst();

              confDomCred.ifPresent(
                  domCred -> {
                    ProtComDto com=
                        ProtComDto.builder()
                            .userName(confCred.getUsername())
                            .password(confCred.getPassword())                          
                            .build();
                    clientApiToAnotherService.callEndpintInAnotherService(com); //this is a client like Feign that invokes method in another service
                  });
            });
}

更新

当我调用时

    Flux<MyObj> myFlux =  myRepository
            .findById(
                prot.getDomCred().stream()
                    .filter(Objects::nonNull)
                    .findFirst()
                    .map(ConfDomCred::getCredId)
                    .orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")));

myFlux.subscribe(e -> e.getPassword()) 

然后打印值

更新2

总结一下——我认为下面的代码是异步/非阻塞的——我说得对吗?在我的

So as a recap - I think the code below is asynchronous/non-blocking - am I right? In my

保护命令服务

我不得不使用 subscribe() 两次 - 只有这样我才能调用我的其他服务并将它们存储为我的对象:commandControllerApi.createNewCommand

I had to use subscribe() twice - only then I can call my other service and store them my object: commandControllerApi.createNewCommand

public Mono<Protection> saveProtection(Mono<Protection> newProtection) {
    return newProtection.flatMap(
        protection ->
            Mono.zip(
                    protectorRepository.save(//some code),
                    domainCredentialRepository
                        .saveAll(//some code)
                        .collectList(),
                    protectionSetRepository
                        .saveAll(//some code)
                        .collectList())
                .map(this::createNewObjectWrapper)
                .doOnNext(protectionCommandService::createProtectionCommand));
  }

ProtectionCommandService 类:

ProtectionCommandService class:

public class ProtectionCommandService {

  private final ProtectionCommandStrategyFactory protectionCommandFactory;
  private final CommandControllerApi commandControllerApi;

  public Mono<ProtectionObjectsWrapper> createProtectionCommand(
      ProtectionObjectsWrapper protection) {
    ProductType productType = protection.getProtector().getProductType();

    Optional<ProtectionCommandFactory> commandFactory = protectionCommandFactory.get(productType);

    commandFactory
        .get()
        .createCommandFromProtection(protection)
        .subscribe(command -> commandControllerApi.createNewCommand(command).subscribe());
    return Mono.just(protection);
  }
}

还有 2 个工厂之一:

And one of 2 factories:

@Component
@AllArgsConstructor
@Slf4j
public class VmWareProtectionCommandFactory implements ProtectionCommandFactory {

  private static final Map<ProductType, CommandTypeEnum> productTypeToCommandType =
      ImmutableMap.of(...//some values);

  private final ConfigurationCredentialRepository configurationCredentialRepository;

  @Override
  public Mono<CommandDetails> createCommandFromProtection(ProtectionObjectsWrapper protection) {
    Optional<DomainCredential> domainCredential =
        protection.getDomainCredentials().stream().findFirst();

    return configurationCredentialRepository
        .findByOwnerAndId(protection.getOwner(), domainCredential.get().getCredentialId())
        .map(credential -> createCommand(protection, credential, domainCredential.get()));
  }

并且 createCommand 方法返回 Mono 对象作为这个工厂的结果.

and createCommand method returns Mono object as a result of this factory.

private Mono<CommandDetails> createCommand(Protection protection
     //other parameters) {

    CommandDto commandDto =
        buildCommandDto(protection, confCredential, domainCredentials);

    String commands = JsonUtils.toJson(commandDto);
    CommandDetails details = new CommandDetails();
    details.setAgentId(protection.getProtector().getAgentId().toString());
    details.setCommandType(///some value);
    details.setArguments(//some value);
    return Mono.just(details);

更新3

我调用一切的主要方法已经改变了一点:

My main method that calls everything has been changed a little bit:

public Mono<MyObj> saveObj(Mono<MyObj> obj) {
    return obj
        .flatMap(
            ob->
                Mono.zip(
                        repo1.save(
                          ...),
                        repo2
                            .saveAll(...)
                            .collectList(),
                        repo3
                            .saveAll(...)
                            .collectList())
.map(this::wrapIntoAnotherObject)
.flatMap(protectionCommandService::createProtectionCommand)
.map(this::createMyObj));

推荐答案

停止破坏链条

这是一个纯函数,它返回一些东西,无论我们给它什么,它总是返回相同的东西.它没有副作用.

This is a pure function it returns something, and always returns the same something whatever we give it. It has no side effect.

public Mono<Integer> fooBar(int number) {
    return Mono.just(number);
}

我们可以调用它并链接起来,因为它会返回一些东西.

we can call it and chain on, because it returns something.

foobar(5).flatMap(number -> { ... }).subscribe();

这是一个非纯函数,我们无法链接,我们正在打破链.我们无法订阅,在我们订阅之前什么都不会发生.

This is a non pure function, we can't chain on, we are breaking the chain. We can't subscribe, and nothing happens until we subscribe.

public void fooBar(int number) {
    Mono.just(number)
}

fooBar(5).subscribe(); // compiler error

但是我想要一个void函数,我想要,我想要我想要....wuuaaa wuaaaa

我们总是需要返回一些东西,以便我们可以触发链中的下一部分.程序如何知道何时运行下一部分?但是假设我们想忽略返回值而只触发下一部分.那么我们可以返回一个 Mono.

We always need something to be returned so that we can trigger the next part in the chain. How else would the program know when to run the next section? But lets say we want to ignore the return value and just trigger the next part. Well we can then return a Mono<Void>.

public Mono<Void> fooBar(int number) {
    System.out.println("Number: " + number);
    return Mono.empty();
}

foobar(5).subscribe(); // Will work we have not broken the chain

你的例子:

private void createObjAndCallAnotherService(Prot prot){
    myRepository.findById( ... ) // breaking the chain, no return
}

以及其他一些提示:

  • 正确命名您的对象,而不是 MyObjsaveObjmyRepository
  • 避免长名称createObjAndCallAnotherService
  • 遵循单一职责 createObjAndCallAnotherService 这是在做两件事,因此得名.
  • 创建私有函数或辅助函数以使您的代码更具可读性,不要内联所有内容.
  • Name your objects correctly not MyObj and saveObj, myRepository
  • Avoid long names createObjAndCallAnotherService
  • Follow single responsibility createObjAndCallAnotherService this is doing 2 things, hence the name.
  • Create private functions, or helper functions to make your code more readable don't inline everything.

更新

你还在犯同样的错误.

commandFactory // Here you are breaking the chain because you are ignoring the return type
    .get()
    .createCommandFromProtection(protection)
    .subscribe(command -> commandControllerApi.createNewCommand(command)
.subscribe()); // DONT SUBSCRIBE you are not the consumer, the client that initiated the call is the subscriber
return Mono.just(protection);

你想做的是:

return commandFactory.get()
    .createCommandFrom(protection)
    .flatMap(command -> commandControllerApi.createNewCommand(command))
    .thenReturn(protection);

停止破坏链条,除非您的服务是最终消费者或发起呼叫的消费者,否则不要订阅.

Stop breaking the chain, and don't subscribe unless your service is the final consumer, or the one initiating a call.

这篇关于Spring WebFlux - 如何从数据库获取数据以供下一步使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆