project-reactor相关内容

如何将数据向下传递到反应链

每当我需要将数据向下传递到反应链时,我最终都会做这样的事情: public MonodoFooAndPassDtoAsMono(Dto dto) {返回 Mono.just(dto).flatMap(dtoMono -> {单声道result =//远程调用返回一个 Monoreturn Mono.zip(Mono.just(dtoMono), result);}).flatMap(tup2 - ..
发布时间:2021-06-22 18:37:03 其他开发

Flux 的方法 publishOn 没有按预期工作

我正在尝试在 Reactor Aluminium-SR1 中将阻塞消费者集成为 Flux 订阅者.我想使用并行调度程序,并发执行阻塞操作. 我已经实现了一个主类来描述我的意图: package etienne.peiniau;导入 org.reactivestreams.Subscriber;导入 org.reactivestreams.Subscription;进口reactor.cor ..
发布时间:2021-06-22 18:36:55 Java开发

Reactor Flux replay(int history) 方法未按预期工作

我正在尝试使用具有以下特征的 Project Reactor 制作 Flux 的示例: 单个热 observable,每秒发出一个项目. 两个订阅者,每个订阅者都使用发布者的一个单独线程. 调用 replay() 时的有限历史记录,因此如果其中一个订阅者太慢,某些项目将被错过. 然后我编码了这个示例: import java.time.Duration;进口reactor.cor ..
发布时间:2021-06-22 18:36:52 其他开发

反序列化异常后继续消费reactor kafka中的后续记录

我正在使用 reactor kafka 并有一个自定义的 AvroDeserializer 类用于反序列化消息. 现在我有一个案例,对于某些有效负载,反序列化类会抛出异常.我的 Kafka 听众在尝试读取此类记录时立即死亡.我尝试使用 onErrorReturn 并使用 (doOnError 和 onErrorContinue) 的组合来处理此异常,但是,它有助于记录异常,但未能使用后续记录 ..
发布时间:2021-06-22 18:36:34 Java开发

在 Mono 对象上执行 block() 时出现异常我从 ReactiveMongoRepository 对象返回

我有一个服务将数据流式传输到第二个服务,该服务接收对象流并将它们保存到我的 MongoDB.在我从流服务获得的 Flux 对象上的订阅函数中,我使用了 ReactiveMongoRepository 接口中的 save 方法.当我尝试使用块功能并获取数据时,出现以下错误: 2019-10-11 13:30:38.559 INFO 19584 --- [localhost:27017] org.m ..

如何为流数据创建 Flux/Publisher

我正在使用轮询方法定期获取数据.新数据可能随时到达.我想向我的客户公开一个反应式接口.所以,我想创建一个发布者(Flux?),它会在新数据可用时发布新数据并通知订阅者.我怎么做?我看到的所有 Flux 示例都是针对数据已知/可用的情况.实际上,我想要类似基于队列的 Flux 之类的东西,并且我的轮询线程可以在找到新数据时继续填充队列. 解决方案 对于简单的事情,您可能需要使用 Direct ..
发布时间:2021-06-22 18:36:23 Java开发

Spring Flux 和 Async 注释

我有一个 Spring Flux 应用程序,有时我需要在后台执行一些繁重的任务,调用者(HTTP 请求)不需要等到该任务完成. 如果没有反应器,我可能只会使用 Async 注释,在不同的线程上执行该方法.使用 reactor,我不确定我是否应该继续使用这种方法,或者是否已经有一个内置机制可以让我实现这一点. 例如,给定一个接受 Resource 对象的 Controller: @Po ..
发布时间:2021-06-22 18:36:20 Java开发

处理 Reactor 中的并行通量

我从 iterable 创建了一个并行通量.在每个可迭代对象上,我都必须进行一次休息调用.但是在执行时,即使任何请求失败,所有剩余的请求也会失败.我希望无论失败或成功,都执行所有请求. 我目前正在使用 Flux.fromIterable 并使用 runOn 运算符 Flux.fromIterable(actions).平行().runOn(Schedulars.elastic()).fla ..
发布时间:2021-06-22 18:36:14 其他开发

如何减缓排放形成 Flux.interval?

这需要背压还是有更简单的方法? 例如在下面的代码中,我希望每 2 秒调用一次自旋函数.有时“旋转"可能需要比 2 秒间隔更长的时间来计算,在这种情况下,我不希望任何间隔排放排队.但在下面的代码中,他们确实排队. 在下面的代码中,前 4 个自旋函数调用需要 10 秒,其余的需要 1 秒.因此,一旦函数变得更快, Flux.interval 排放就会“赶上".但是,我不希望发生任何“追赶" ..
发布时间:2021-06-22 18:36:11 其他开发

使用 Hooks 和 Lift 将 Context 推送到 ThreadLocal

我问自己是否有办法在订阅者收到 onNext 信号之前将反应性上下文推送到 ThreadLocal 变量中.在研究 reactor-core 时,我发现了 Hooks 类和 Lift BiFunction. 我创建了一个具有以下实现的类.该类由一个 ThreadLocal 变量组成,该变量将保存 Context 并实现必要的 BiFunction 接口.它会将所有调用委托给实际订阅者,并且还 ..
发布时间:2021-06-22 18:36:06 其他开发