project-reactor相关内容
每当我需要将数据向下传递到反应链时,我最终都会做这样的事情: public MonodoFooAndPassDtoAsMono(Dto dto) {返回 Mono.just(dto).flatMap(dtoMono -> {单声道result =//远程调用返回一个 Monoreturn Mono.zip(Mono.just(dtoMono), result);}).flatMap(tup2 -
..
我正在尝试使用一个使用 RxJava 1.1.5 和 Spring WebFlux(即 Reactor Core 3.1.0.M3)的库,但我无法将 Observable 适配到 Flux. 我认为这相对简单,但我的适配器不工作: import reactor.core.publisher.Flux;进口 rx.Observable;导入 rx.Subscriber;导入 rx.Subsc
..
我正在尝试在 Reactor Aluminium-SR1 中将阻塞消费者集成为 Flux 订阅者.我想使用并行调度程序,并发执行阻塞操作. 我已经实现了一个主类来描述我的意图: package etienne.peiniau;导入 org.reactivestreams.Subscriber;导入 org.reactivestreams.Subscription;进口reactor.cor
..
我正在尝试使用具有以下特征的 Project Reactor 制作 Flux 的示例: 单个热 observable,每秒发出一个项目. 两个订阅者,每个订阅者都使用发布者的一个单独线程. 调用 replay() 时的有限历史记录,因此如果其中一个订阅者太慢,某些项目将被错过. 然后我编码了这个示例: import java.time.Duration;进口reactor.cor
..
使用 Spring 和 Reactor 项目将多个 api 调用压缩为聚合结果.Mono.zip() 和 Mono.empty() 参数可以返回空结果吗? MonodogMono = dogApiClient.getDog();//可以返回 Mono.empty()MonocatMono = catMono = catApiClient.getCat();//可以返回Mono.empt
..
如果在 main() 方法中,我执行这个 Flux.just(1,2).日志().订阅(); 我在控制台中得到了这个: [ INFO] (main) |onSubscribe([同步可熔断] FluxArray.ArraySubscription)[信息](主要)|请求(无界)[信息](主要)|onNext(1)[信息](主要)|onNext(2)[信息](主要)|完成() 如果不是 ju
..
我想知道在我使用 Java 和 Spring Boot 实现的 DDD 项目中实现反应式 Mongo 存储库时遇到的一个问题.假设我们有这样的包结构: /app||------/申请||||------/顺序||||------订单应用服务.java||------/域||||------/顺序||||------订单.java||------OrderRepository.java||----
..
Spring 文档说需要手动为 WebClient 配置 http 客户端以设置超时:https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-客户端构建器反应器超时.但是由于 WebClient 返回反应式 Mono,因此可以(api-wise)应用 .timeo
..
我有一个 Flux,对于每个对象,我应该对第三方 REST 进行 API 调用(大约 1000 次调用).为了防止每秒出现许多请求,我使用: Flux调用IntervalFlux=Flux.interval(Duration.ofMillis(100)).zipWith(callsFlux, (i, call) -> call);//现在 Calls 每 10ms 发出一次,并且 REST A
..
我正在使用 reactor kafka 并有一个自定义的 AvroDeserializer 类用于反序列化消息. 现在我有一个案例,对于某些有效负载,反序列化类会抛出异常.我的 Kafka 听众在尝试读取此类记录时立即死亡.我尝试使用 onErrorReturn 并使用 (doOnError 和 onErrorContinue) 的组合来处理此异常,但是,它有助于记录异常,但未能使用后续记录
..
https://stackoverflow.com/a/47136941/1776585 的后续问题 在使用 Flux + split() + FluxMessageChannel 时,我无法让我的集成处理程序在并行线程中运行. 考虑以下片段: //....handle(message -> Flux.range(0, 10).doOnNext(i -> LOG.info("> " +
..
我有一个服务将数据流式传输到第二个服务,该服务接收对象流并将它们保存到我的 MongoDB.在我从流服务获得的 Flux 对象上的订阅函数中,我使用了 ReactiveMongoRepository 接口中的 save 方法.当我尝试使用块功能并获取数据时,出现以下错误: 2019-10-11 13:30:38.559 INFO 19584 --- [localhost:27017] org.m
..
我正在使用轮询方法定期获取数据.新数据可能随时到达.我想向我的客户公开一个反应式接口.所以,我想创建一个发布者(Flux?),它会在新数据可用时发布新数据并通知订阅者.我怎么做?我看到的所有 Flux 示例都是针对数据已知/可用的情况.实际上,我想要类似基于队列的 Flux 之类的东西,并且我的轮询线程可以在找到新数据时继续填充队列. 解决方案 对于简单的事情,您可能需要使用 Direct
..
我有一个 Spring Flux 应用程序,有时我需要在后台执行一些繁重的任务,调用者(HTTP 请求)不需要等到该任务完成. 如果没有反应器,我可能只会使用 Async 注释,在不同的线程上执行该方法.使用 reactor,我不确定我是否应该继续使用这种方法,或者是否已经有一个内置机制可以让我实现这一点. 例如,给定一个接受 Resource 对象的 Controller: @Po
..
有以下webclient的实现: public WebClient.ResponseSpec sendRequest(HttpMethod method, String contentType, T body, String baseUrl, String path) {尝试 {WebClient webClient = WebClient.builder().baseUrl(baseUrl)
..
我从 iterable 创建了一个并行通量.在每个可迭代对象上,我都必须进行一次休息调用.但是在执行时,即使任何请求失败,所有剩余的请求也会失败.我希望无论失败或成功,都执行所有请求. 我目前正在使用 Flux.fromIterable 并使用 runOn 运算符 Flux.fromIterable(actions).平行().runOn(Schedulars.elastic()).fla
..
这需要背压还是有更简单的方法? 例如在下面的代码中,我希望每 2 秒调用一次自旋函数.有时“旋转"可能需要比 2 秒间隔更长的时间来计算,在这种情况下,我不希望任何间隔排放排队.但在下面的代码中,他们确实排队. 在下面的代码中,前 4 个自旋函数调用需要 10 秒,其余的需要 1 秒.因此,一旦函数变得更快, Flux.interval 排放就会“赶上".但是,我不希望发生任何“追赶"
..
我有以下异步任务: public class AsyncValidationTask {//如果出错则返回 Mono.error(new Exception()),否则返回 Mono.empty()公共 Mono执行(对象 o);} public class AsyncSaveTask {//如果出错则返回 Mono.error(new Exception()),否则返回 Mono 的
..
我问自己是否有办法在订阅者收到 onNext 信号之前将反应性上下文推送到 ThreadLocal 变量中.在研究 reactor-core 时,我发现了 Hooks 类和 Lift BiFunction. 我创建了一个具有以下实现的类.该类由一个 ThreadLocal 变量组成,该变量将保存 Context 并实现必要的 BiFunction 接口.它会将所有调用委托给实际订阅者,并且还
..
我正在使用 Spring 5 WebClient 从 REST api 重复获取正在运行的进程的某些状态. 在这里的帮助下 我现在想到了这个解决方案: webClient.get().uri(...).retrieve.bodyToMono(State.class).重复().skipUntil(state -> stateFinished()).limitRequest(1).subsc
..