project-reactor相关内容

Spring Reactor:Mono.zip 在空 Mono 上失败

我正在使用 Spring Reactor 3.1.0.M3 并且有一个用例,我需要合并来自多个来源的 Mono.我发现如果 Mono 之一是空 Mono,则 zip 会失败而不会出错. 示例: Monom1 = Mono.just("A");单声道m2 = Mono.just("B");单声道m3 = Mono.empty();单声道组合 = Mono.zip(strings -> {St ..
发布时间:2021-06-22 18:33:52 Java开发

如何并行处理 Flux 事件?

我有需要丰富的传入事件流,然后在它们到达时并行处理. 我认为 Project Reactor 是为这项工作定制的,但在我的测试中,所有处理似乎都是连续完成的. 这是一些测试代码: ExecutorService executor = Executors.newFixedThreadPool(10);System.out.println("主线程:" + Thread.currentT ..
发布时间:2021-06-22 18:33:49 其他开发

Spring WebFlux - 如何从数据库获取数据以供下一步使用

我使用 Spring WebFlux (Project Reactor),但面临以下问题:我必须从 db 获取一些数据才能使用它们来调用另一项服务 - 一个流中的所有内容.如何做到这一点? public MonosaveObj(Mono obj) {返回对象.flatMap(ob->单声道.zip(repo1.save(...),回购2.保存全部(...).collectList(),回购3.保 ..

在 Reactor 3 中将 Flux 拆分为多个 Flux 的最有效方法

在 Reactor 3 中,通过模式匹配将异构通量拆分为多个通量的最有效方法是什么?(而且后续对每个flux的操作可能会有很大的不同) 例如 源通量:a->b->c->a->b->c||vv通量:a->a->aB 通量:b->b->bC 通量:c->c->c 我是响应式编程的新手,我想出的唯一解决方案是 share()+filter(),比如 val shared = flux.sha ..
发布时间:2021-06-22 18:33:41 其他开发

在 Spring WebFlux Web 应用程序中缓存来自 WebClient 调用的 Mono 的结果

我希望缓存一个 Mono(仅当它成功时),这是 WebClient 调用的结果. 从阅读项目反应堆插件文档我不觉得CacheMono 非常适合,因为它也缓存了我不想要的错误. 因此,我不使用 CacheMono,而是执行以下操作: Cache>myCaffeineCache =咖啡因.newBuilder().maximumSize(100).expireAfterWrite(Dur ..
发布时间:2021-06-22 18:33:38 其他开发

添加重试WebClient的所有请求

我们有一个服务器来检索 OAUTH 令牌,并且通过 WebClient.filter 方法将 oauth 令牌添加到每个请求中例如 webClient.mutate().filter((request, next) -> tokenProvider.getBearerToken().map(token -> ClientRequest.from(request).headers(httpHead ..
发布时间:2021-06-22 18:33:35 其他开发

是否可以并行启动 Mono 并汇总结果

我知道可以链接 Mono,例如... Mono结果AMono = loadA();单声道resultBMono = resultA.flatMap(resultA -> loadB()); 这将链接并且 resultBMono 将在 resultAMono 返回时运行.... 所以我的问题是,是否可以并行启动 2 个 Mono,并且当两个返回都继续使用另一个 Mono 时? 我认为 ..
发布时间:2021-06-22 18:33:32 其他开发

如何从多个线程调用 Sinks.Many<T>.tryEmitNext?

我对 Flux Sinks 一头雾水,无法理解更高层次的图片.使用 Sinks.Many 时tryEmitNext,该函数告诉我是否存在争用以及失败时我该怎么办,(FailFast/Handler). 但是有没有一个简单的结构可以让我从多个线程安全地发出元素.例如,与其让用户知道存在争用,我应该再试一次,不如将元素添加到队列(mpmc、mpsc 等)中,并且仅在队列已满时才通知. 现在 ..
发布时间:2021-06-22 18:33:29 其他开发

Project Reactor 中的背压是如何工作的?

我一直在使用 Spring Reactor 并且之前进行了一些测试,这让我想知道 Fluxes 默认如何处理背压.我知道 onBackpressureBuffer 等存在,我也读过 RxJava 默认为无界,直到你定义是否缓冲、丢弃等 那么,谁能为我澄清一下:Reactor 3 中 Flux 的默认背压行为是什么? 我尝试寻找答案,但没有找到任何明确的答案,只有背压的定义或上面链接的 ..
发布时间:2021-06-22 18:33:24 其他开发

Spring WebFlux 和 Reactor 的线程模型

目前正在使用 Spring 5.0.0.RC2、Reactor 3.1.0.M2 和 Spring Boot 2.0.0.M2 试验反应式编程. 想知道 WebFlux 和 Reactor 使用并发和线程模型来正确编码应用程序和处理可变状态. Reactor 文档指出该库被认为是并发不可知的,并提到了调度程序抽象.WebFlux 文档未提供信息. 然而,当通过 Spring Bo ..

反应堆中的地图vs平面地图

关于 RxJava ,但我想了解它在Reactor中的工作原理. 我目前的理解非常模糊,我倾向于认为map是同步的,而flatMap是异步的,但是我真的无法解决它. 这里是一个例子: files.flatMap {it->Mono.just(Paths.get(UPLOAD_ROOT,it.filename()).toFile()).map {destFile->destFile. ..
发布时间:2021-05-18 19:53:07 Java开发