project-reactor相关内容

如何转换Mono< List< String>>进入Flux< String>

我正在将用RxJava 1.x编写的小项目转换为Reactor3.x.一切都很好,除了我找不到如何用适当的替代品替换 flatMap(Observable :: from)之外.我有 Mono> ,我需要将其转换为 Flux . 解决方案 在Reactor 3中, from 运算符已根据原始来源(数组,可迭代等)专门化为几个变体....). ..
发布时间:2021-05-18 19:27:26 Java开发

我可以一起使用SpringMvc和webflux吗?

我想在一个项目中使用两种方法(反应性和标准方法). 我尝试将一个REST API端点迁移到反应式Webflux并测试性能,然后再迁移其余的.但这没有用.我为他添加了路由器和处理程序,但是直到我没有从依赖项中删除 spring-boot-starter-web 并禁用 @RestController ,我才获得http 404 始终进行编码.有没有可能?还是应该将所有项目迁移到被动方法? ..

Mono.defer()有什么作用?

我在一些Spring webflux代码中遇到了Mono.defer() 我在文档中查找了该方法,但不理解其中的解释: "创建一个Mono提供程序,该提供程序将提供要订阅的目标Mono对于每个下游订阅者" 请给我一个解释和一个例子.我可能会参考一堆Reactor示例代码(它们的单元测试?)的地方. 谢谢 解决方案 这有点过分简化,但是从概念上讲,Reactor的来源 ..
发布时间:2021-05-18 19:04:16 Java开发

调用bodyToMono AFTER exchange()后,block()/blockFirst()/blockLast()发生阻塞错误

我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,则api返回成功,但是DTO会在生成文件时详细说明错误,而不是使用文件本身.这使用的是非常古老且设计不佳的api,因此请原谅post和api设计的使用. api调用(exchange())的响应是ClientResponse.从这里,我可以使用bodyToMono转换为ByteArrayResourc ..

Java Reactor中嵌套flatMap的一个好习惯是什么?

我继承了使用Spring和相关库(包括Reactor)以Java编写的REST服务的职责.对于诸如REST调用或数据库操作之类的昂贵操作,代码会将结果广泛包装在Reactor Mono中. 代码中需要解决各种问题,但不断出现的是嵌套在 Mono 上的 flatMap s,用于执行一系列昂贵的操作,最终缩进了几个层次,陷入了难以理解的混乱之中.我觉得这特别令人讨厌,因为我来自Scala,在这里 ..
发布时间:2021-05-15 19:33:22 Java开发

如何模拟Mono.create

我正在使用spock框架,需要从Mono.create(..)返回模拟的Mono. 我尝试过: GroovyMock(Mono) 以及 GroovyMock(Mono,global:true)1 * Mono.create(_为MonoSink)>Mono.just(returnedValue) 但是我得到消息,上面的代码断言太少了​​. 这是实际的Mono.creat ..
发布时间:2021-05-13 19:41:45 其他开发

将所有消息发送到Kafka之前,反应式程序退出得较早

这是先前反应式kafka问题(将数据流量发送到反应式kafka 时发出的问题)的后续问题. 我正在尝试使用反应式方法将一些日志记录发送到kafka.这是使用响应式kafka发送消息的响应式代码. 公共类LogProducer {私有最终KafkaSender发件人公共LogProducer(String bootstrapServers){Map ..

Reactor中的自动速率调整

TL; DR; 是否有一种方法可以根据下游运行状况自动调整Project Reactor中元素之间的延迟? 更多详细信息 我有一个应用程序,它从Kafka主题读取记录,为每个记录发送一个HTTP请求,并将结果写入另一个Kafka主题.从Kafka读写数据很容易,但是第三方HTTP服务很容易被淹没,因此我将 delayElements()与属性文件中的值一起使用,这意味着该值不会在 ..

使用onErrorResume处理使用Reactor Kafka发布到Kafka的有问题的有效负载

我正在使用反应堆卡夫卡发送卡夫卡消息并接收和处理它们.在接收kakfa有效负载时,我会进行反序列化,如果有异常,我想只记录该有效负载(通过保存到mongo),然后继续接收其他有效负载. 为此,我正在使用以下方法- @EventListener(ApplicationStartedEvent.class)公共无效kafkaReceiving(){for(Flux ..

编写反应性管道的方面

我正在为返回promise的方法编写方面.请考虑以下方法: 公共MonopublishToKafka(Stream s){//publishToKafka是异步的返回Mono.just(s).flatMap(worker :: publishToKafka);} 如果发布成功或失败,我想缓存.由于这是一个贯穿各领域的问题,因此外观似乎是最好的设计.这是我的观点. @Ar ..
发布时间:2021-04-07 20:29:45 Java开发

从Mono& #s列表中创建助焊剂的正确方法

让我们说我有一个使用CustomObjects列表的API操作.对于这些对象中的每个对象,它调用一个服务方法来创建一个Mono.如何以惯用的,因此无阻塞的方式从那些Mono对象创建助焊剂? 我现在想出的是这个.我更改了方法名称以更好地反映其预期目的. fun myApiMethod(@RequestBody customObjs: List): Flux ..
发布时间:2021-02-15 19:39:20 其他开发

如何从Spring WebFlux中的多个Flux(WebsocketSession :: receive)正确向Sink发射值?

在简化的情况下,我想将WebSocket客户端发送的消息广播给所有其他客户端.该应用程序是使用Spring的反应式Websockets构建的. 我的想法是使用单身 Sink,如果从客户端收到消息,则在此接收器上发出它. WebsocketSession::send只是将此Sink发出的事件转发到连接的客户端. @Component class ReactiveWebSocketHand ..