project-reactor相关内容
我正在将用RxJava 1.x编写的小项目转换为Reactor3.x.一切都很好,除了我找不到如何用适当的替代品替换 flatMap(Observable :: from)之外.我有 Mono> ,我需要将其转换为 Flux . 解决方案 在Reactor 3中, from 运算符已根据原始来源(数组,可迭代等)专门化为几个变体....).
..
我想在一个项目中使用两种方法(反应性和标准方法). 我尝试将一个REST API端点迁移到反应式Webflux并测试性能,然后再迁移其余的.但这没有用.我为他添加了路由器和处理程序,但是直到我没有从依赖项中删除 spring-boot-starter-web 并禁用 @RestController ,我才获得http 404 始终进行编码.有没有可能?还是应该将所有项目迁移到被动方法?
..
我在一些Spring webflux代码中遇到了Mono.defer() 我在文档中查找了该方法,但不理解其中的解释: "创建一个Mono提供程序,该提供程序将提供要订阅的目标Mono对于每个下游订阅者" 请给我一个解释和一个例子.我可能会参考一堆Reactor示例代码(它们的单元测试?)的地方. 谢谢 解决方案 这有点过分简化,但是从概念上讲,Reactor的来源
..
我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,则api返回成功,但是DTO会在生成文件时详细说明错误,而不是使用文件本身.这使用的是非常古老且设计不佳的api,因此请原谅post和api设计的使用. api调用(exchange())的响应是ClientResponse.从这里,我可以使用bodyToMono转换为ByteArrayResourc
..
即使我的信息流不为空,也将始终创建后备信息流?这样做的目的是什么?这是极不习惯的. 另一方面,. onErrorResume 的计算是延迟的. 有人可以向我解释为什么. switchIsEmpty 受到热切评价吗? 这是代码: public static void main(String [] args){Monom = Mono.just(1);m.fl
..
我继承了使用Spring和相关库(包括Reactor)以Java编写的REST服务的职责.对于诸如REST调用或数据库操作之类的昂贵操作,代码会将结果广泛包装在Reactor Mono中. 代码中需要解决各种问题,但不断出现的是嵌套在 Mono 上的 flatMap s,用于执行一系列昂贵的操作,最终缩进了几个层次,陷入了难以理解的混乱之中.我觉得这特别令人讨厌,因为我来自Scala,在这里
..
我正在使用spock框架,需要从Mono.create(..)返回模拟的Mono. 我尝试过: GroovyMock(Mono) 以及 GroovyMock(Mono,global:true)1 * Mono.create(_为MonoSink)>Mono.just(returnedValue) 但是我得到消息,上面的代码断言太少了. 这是实际的Mono.creat
..
这是先前反应式kafka问题(将数据流量发送到反应式kafka 时发出的问题)的后续问题. 我正在尝试使用反应式方法将一些日志记录发送到kafka.这是使用响应式kafka发送消息的响应式代码. 公共类LogProducer {私有最终KafkaSender发件人公共LogProducer(String bootstrapServers){Map
..
在Java中: Mono响应= mon.just()Mono对象= mono.just()返回response.block() 响应和对象彼此不依赖.有没有办法同时并行运行两个Monos? 解决方案 有不止一种方法.一种简单的解决方案是使用 subscribeOn 运算符: Mono响应= Mono.just(...
..
我有一个数据源服务,该服务将观察者作为参数. void subscription(Consumer onEventConsumer); 我想使用flux作为RSocket的响应流.我怎样才能做到这一点?我现在看到的应该是 FluxcontrollerMethod(RequestMessage mgs){var flux = Flux.empty();dataSource.sub
..
我知道 Publisher 不能同时发布,但是如果我使用 Flux#create(FluxSink),我可以安全地呼叫 FluxSink#next 并发? 换句话说,即使同时调用 FluxSink#next ,Spring是否具有内部魔术来确保事件的正确串行发布? 公共类FluxTest {私有最终Map>下沉=新的ConcurrentHas
..
TL; DR; 是否有一种方法可以根据下游运行状况自动调整Project Reactor中元素之间的延迟? 更多详细信息 我有一个应用程序,它从Kafka主题读取记录,为每个记录发送一个HTTP请求,并将结果写入另一个Kafka主题.从Kafka读写数据很容易,但是第三方HTTP服务很容易被淹没,因此我将 delayElements()与属性文件中的值一起使用,这意味着该值不会在
..
最初通过api调用触发 1.服务A向主题1产生m1(非事务发送)2.服务B消耗topic1并进行一些处理(开始发送)3.服务B向主题2产生m2(提交TX)4.服务A消耗了topic2(开始发送) 这是我的生产者配置: 最终映射道具= Maps.newConcurrentMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CON
..
我正在使用反应堆卡夫卡发送卡夫卡消息并接收和处理它们.在接收kakfa有效负载时,我会进行反序列化,如果有异常,我想只记录该有效负载(通过保存到mongo),然后继续接收其他有效负载. 为此,我正在使用以下方法- @EventListener(ApplicationStartedEvent.class)公共无效kafkaReceiving(){for(Flux
..
我读过许多文章,其中有许多不同的配置可以实现一次处理. 这是我的生产者配置: 最终映射道具= Maps.newConcurrentMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CON
..
我正在为返回promise的方法编写方面.请考虑以下方法: 公共MonopublishToKafka(Stream s){//publishToKafka是异步的返回Mono.just(s).flatMap(worker :: publishToKafka);} 如果发布成功或失败,我想缓存.由于这是一个贯穿各领域的问题,因此外观似乎是最好的设计.这是我的观点. @Ar
..
让我们说我有一个使用CustomObjects列表的API操作.对于这些对象中的每个对象,它调用一个服务方法来创建一个Mono.如何以惯用的,因此无阻塞的方式从那些Mono对象创建助焊剂? 我现在想出的是这个.我更改了方法名称以更好地反映其预期目的. fun myApiMethod(@RequestBody customObjs: List): Flux
..
在简化的情况下,我想将WebSocket客户端发送的消息广播给所有其他客户端.该应用程序是使用Spring的反应式Websockets构建的. 我的想法是使用单身 Sink,如果从客户端收到消息,则在此接收器上发出它. WebsocketSession::send只是将此Sink发出的事件转发到连接的客户端. @Component class ReactiveWebSocketHand
..
我想知道从资源读取,解析和提供文件的正确方法是什么. 目前,我正在执行以下操作: fun getFile(request: ServerRequest): Mono { val parsedJson = objectMapper.readValue(readFile("fileName.json"), JsonModel::cla
..
我有以下JSON响应: { "Count": 1, "Products": [ { "ProductID": 3423 }, { "ProductID": 4321 } ] } 我希望能够使用WebClient从Products数组中返回“产品"列表,而
..