project-reactor相关内容
我正在使用 Spring Reactor 3.1.0.M3 并且有一个用例,我需要合并来自多个来源的 Mono.我发现如果 Mono 之一是空 Mono,则 zip 会失败而不会出错. 示例: Monom1 = Mono.just("A");单声道m2 = Mono.just("B");单声道m3 = Mono.empty();单声道组合 = Mono.zip(strings -> {St
..
我有需要丰富的传入事件流,然后在它们到达时并行处理. 我认为 Project Reactor 是为这项工作定制的,但在我的测试中,所有处理似乎都是连续完成的. 这是一些测试代码: ExecutorService executor = Executors.newFixedThreadPool(10);System.out.println("主线程:" + Thread.currentT
..
我想链接 Mono 并发出它们中的第一个非空.我认为 or() 运算符就是为此目的而设计的. 这是我的 Mono 链:第一个是空的,第二个应该发出“hello". @Test无效或测试(){单声道chain = Mono.empty().or(Mono.just("hello"));StepVerifier.create(链).expectNext("你好").verifyComplete
..
我使用 Spring WebFlux (Project Reactor),但面临以下问题:我必须从 db 获取一些数据才能使用它们来调用另一项服务 - 一个流中的所有内容.如何做到这一点? public MonosaveObj(Mono obj) {返回对象.flatMap(ob->单声道.zip(repo1.save(...),回购2.保存全部(...).collectList(),回购3.保
..
在 Reactor 3 中,通过模式匹配将异构通量拆分为多个通量的最有效方法是什么?(而且后续对每个flux的操作可能会有很大的不同) 例如 源通量:a->b->c->a->b->c||vv通量:a->a->aB 通量:b->b->bC 通量:c->c->c 我是响应式编程的新手,我想出的唯一解决方案是 share()+filter(),比如 val shared = flux.sha
..
我希望缓存一个 Mono(仅当它成功时),这是 WebClient 调用的结果. 从阅读项目反应堆插件文档我不觉得CacheMono 非常适合,因为它也缓存了我不想要的错误. 因此,我不使用 CacheMono,而是执行以下操作: Cache>myCaffeineCache =咖啡因.newBuilder().maximumSize(100).expireAfterWrite(Dur
..
我们有一个服务器来检索 OAUTH 令牌,并且通过 WebClient.filter 方法将 oauth 令牌添加到每个请求中例如 webClient.mutate().filter((request, next) -> tokenProvider.getBearerToken().map(token -> ClientRequest.from(request).headers(httpHead
..
我知道可以链接 Mono,例如... Mono结果AMono = loadA();单声道resultBMono = resultA.flatMap(resultA -> loadB()); 这将链接并且 resultBMono 将在 resultAMono 返回时运行.... 所以我的问题是,是否可以并行启动 2 个 Mono,并且当两个返回都继续使用另一个 Mono 时? 我认为
..
我对 Flux Sinks 一头雾水,无法理解更高层次的图片.使用 Sinks.Many 时tryEmitNext,该函数告诉我是否存在争用以及失败时我该怎么办,(FailFast/Handler). 但是有没有一个简单的结构可以让我从多个线程安全地发出元素.例如,与其让用户知道存在争用,我应该再试一次,不如将元素添加到队列(mpmc、mpsc 等)中,并且仅在队列已满时才通知. 现在
..
我正在查看一些响应式 Web 应用程序的示例,我看到它们是这样的 @RequestMapping(value = "/{id}", method = RequestMethod.GET)@ResponseBody公共单声道findById(...) {返回 exampleService.findById(...);}@RequestMapping(方法 = RequestMethod.GET,产
..
我一直在使用 Spring Reactor 并且之前进行了一些测试,这让我想知道 Fluxes 默认如何处理背压.我知道 onBackpressureBuffer 等存在,我也读过 RxJava 默认为无界,直到你定义是否缓冲、丢弃等 那么,谁能为我澄清一下:Reactor 3 中 Flux 的默认背压行为是什么? 我尝试寻找答案,但没有找到任何明确的答案,只有背压的定义或上面链接的
..
我的 Spring Boot 应用程序中有一个类似下面的方法. public Flux搜索(搜索请求请求){Fluxresult = searchService.search(request);//返回 Flux;Mono>listOfData = result.collectList();//doThisAsync()//这里我想传递这个列表并对其运行一些处理//处理应异
..
目前正在使用 Spring 5.0.0.RC2、Reactor 3.1.0.M2 和 Spring Boot 2.0.0.M2 试验反应式编程. 想知道 WebFlux 和 Reactor 使用并发和线程模型来正确编码应用程序和处理可变状态. Reactor 文档指出该库被认为是并发不可知的,并提到了调度程序抽象.WebFlux 文档未提供信息. 然而,当通过 Spring Bo
..
我是反应式编程的新手.我看到可以压缩两个单声道来生成结果: Monoinfo = Mono.just(id).map(this :: getInfo).subscribeOn(Schedulers.parallel());Mono>detail = Mono.just(petitionRequest).map(this.service :: getD
..
我有一个返回 Mono 的方法: 接口处理器{单声道处理(输入输入);} 我想为集合执行此 processor 方法: List输入=//获取输入处理器处理器=//获取处理器List输出= inputs.stream().map(supplier :: supply).collect(toList()); 但我想获取的
..
我希望在响应为5xx时等待10秒后重试该请求3次.但我看不到可以使用的方法.在物体上 WebClient.builder().baseUrl("...").build().post().retrieve().bodyToMono(...) 我可以看到方法: 在有重试次数但没有延迟的条件下重试 .retry(3,{这是WebClientResponseException&& it.
..
我可能丢失了一些东西,但我不知道它是什么. 以下代码什么都不做: webClient.get().uri("/some/path/here").retrieve().bodyToMono(GetLocationsResponse.class).doOnNext(System.out :: println).订阅(); 如果我尝试阻止通话,则效果很好: webClient.get(
..
我有以下代码: import org.springframework.http.MediaType;导入org.springframework.stereotype.Component;导入org.springframework.web.reactive.function.BodyInserters;导入org.springframework.web.reactive.function.ser
..
关于 RxJava ,但我想了解它在Reactor中的工作原理. 我目前的理解非常模糊,我倾向于认为map是同步的,而flatMap是异步的,但是我真的无法解决它. 这里是一个例子: files.flatMap {it->Mono.just(Paths.get(UPLOAD_ROOT,it.filename()).toFile()).map {destFile->destFile.
..
我一直在Spring Security + Webflux中使用ReactiveAuthenticationManager.它是自定义的,以返回 UsernamePasswordAuthenticationToken 的实例,从该实例中可以得知,当我调用 ReactiveSecurityContextHolder.getContext().map(ctx-> ctx.getAuthenticati
..