reactive-streams相关内容
我正在尝试创建一个rx.net运算符,该运算符接受Observable和: 如果第一个元素"a" ,则转发每个元素时保持不变 仅发出完成信号,否则 例如: -a-b-c-d-|- --> -a-b-c-d-|- -b-c-d-|- --> -|- 如何执行此操作? 推荐答案 以下版本完全没有竞争条件: public static I
..
我一直在使用Spring Boot 2.0.0.RC1并使用spring-boot-starter-webflux来构建返回大量文本数据的REST控制器。 @GetMapping(value = "/") public Flux getData(){ return Flux.interval(Duration.ofSeconds(2)) .map(l
..
我试图理解为什么下面的代码片段正在做它正在做的事情.我会认为,因为接收器不能比源生成内容更快地产生需求,那么我会收到丢弃的消息以响应某些报价(溢出策略设置为丢弃缓冲区)以及错误和队列关闭消息在自毁片之后. 片段: 打包游乐场导入 java.time.LocalDateTime导入 java.util.concurrent.atomic.AtomicInteger导入 akka.actor.
..
我在网上尝试澄清这个问题有一段时间没有成功,所以我会尝试在这里提问. 我想找到一些资源或示例,其中展示了如何构建端到端的完全背压式 REST 服务 + 客户端.我的意思是,我希望看到,给定一个实现 Reactive Streams 的 REST 客户端(无论是在 Akka、JS 还是其他中),我将拥有(并且能够“可视化")整个过程中处理的背压构建的 REST 服务器,例如使用 Akka-Ht
..
在 Slick 的文档中,提供了使用反应流的示例仅用于读取数据作为 DatabasePublisher 的一种方式.但是,当您想根据插入率将数据库用作 Sink 和 backpreasure 时会发生什么? 我已经寻找了等效的DatabaseSubscriber,但它不存在.所以问题是,如果我有来源,请说: val 源 = 源(0 到 100) 我怎样才能用 Slick 构造一个
..
2015 年 10 月 30 日更新 基于 Roland Kuhn Awnser: Akka Streams 使用 Actors 之间的异步消息传递来实现流处理阶段.跨数据传递异步边界有一个你在这里看到的开销:你的计算似乎只需要大约 160ns(来自单线程测量),而流解决方案需要每个元素大约 1µs,主要由消息传递决定. 另一个误解是说“流"意味着并行:在您的代码所有计算都在单个
..
我有一个使用 Akka 的系统,该系统当前通过消息队列处理传入的流数据.当一条记录到达时,它被处理,mq 被确认并传递记录以在系统内进一步处理. 现在我想添加对使用数据库作为输入的支持. 输入源能够处理数据库的方法是什么(应该以接收器可以处理的速度流式传输 > 100M 记录 - 所以我假设是反应性/akka-streams?)? 解决方案 Slick Library 流畅的
..
我正在尝试使用 Akka HTTP 2.0-M2 编写用于批量数据上传的工具.但我面临 akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] 错误. 我试图隔离一个问题,这里是同样失败的示例代码: 公共类 TestMaxRe
..
在 Akka 流中,Source[Out, Mat] 或 Sink[In, Mat] 中的 Mat 代表什么.什么时候会实际使用? 解决方案 Mat 类型参数表示该流的物化值的类型. 请记住,在 Akka Source、Flow、Sink(好吧,所有图)只是蓝图——它们不做任何处理他们自己,他们只描述了应该如何构造流.将这些蓝图转换为带有实时数据的工作流的过程称为物化. 实现流
..
我们正在将 Spring Cloud Reactive Streams 与 RabbitMQ 结合使用. Spring Reactive Stream 似乎在将消息从队列中拉出时立即确认该消息.因此,在消息处理期间发生的任何错误未处理异常都需要在应用程序中进行处理(这与非反应流不同,非反应流可以抛出未处理的异常并拒绝消息,从而将其发送到死信队列). 当消息正在传输时,我们应该如何处理应
..
我在 akka 流中实现了一个自定义组件,它将元素作为输入,根据一个键对它们进行分组和合并,然后通过十几个出口之一将它们发送出去.您可以将此组件视为一种 GroupBy 组件,它不会将流划分为子流,而是实际的流.除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生了一些缓冲,使得 1 个元素进入并不一定意味着 1 个元素通过出口传出. 以下是所述组件的简化实现. clas
..
我有一个用例,流应该只在累积“总和"大于等于 1 时才发出.等于或超过给定值 n.我们以 n = 5 的六个整数为例. +---+------+-------+|我|发出 |总和 |+---+------+---------+|1 |- |1 ||2 |- |3 ||3 |5 |1 ||4 |5 |0 ||5 |5 |0 ||2 |2 |0(结束)|+---+------+---------+
..
如何将响应从响应式 HTTP 客户端流式传输到控制器,而无需在应用程序内存中随时保留整个响应主体? 几乎所有项目反应器客户端的示例都返回 Mono.据我了解,反应式流是关于流式传输的,而不是全部加载然后发送响应. 是否有可能返回一种Flux,从而可以将大文件从某些外部服务传输到应用程序客户端,而无需使用大量 RAM 内存来存储中间结果? 解决方案 我不认为在你的场景中你需要创建
..
我有以下异步任务: public class AsyncValidationTask {//如果出错则返回 Mono.error(new Exception()),否则返回 Mono.empty()公共 Mono执行(对象 o);} public class AsyncSaveTask {//如果出错则返回 Mono.error(new Exception()),否则返回 Mono 的
..
transform 和有什么区别?transformDeferred 在项目反应堆通量中. 好的例子会有帮助. https://projectreactor.io/docs/core/release/reference/index.html#advanced-mutualizing-operator-usage 解决方案 大多数时候,Flux 是“懒惰的":你声明一个处理管道,
..
我有一个发射一些 Date 的通量.这个 Date 被映射到 1024 个模拟的 HTTP 请求,我在一些 Executer 上运行. 我想做的是在发出下一个 Date 之前等待所有 1024 个 HTTP 请求. 当前运行时,onNext() 被多次调用,然后稳定在某个稳定的速率上. 我该如何改变这种行为? 附言如果需要,我愿意转向架构. private void ru
..
我有一个 Flux 和 Mono,但我不知道如何将它们组合起来,以便在 Flux 的每个项目中都有单声道值. 我正在尝试这种方法,但它不起作用: Monomono1 = Mono.just("x");FluxFlux1 = Flux.just("{1}", "{2}", "{3}", "{4}");Flux.zip(mono1, flux1, (itemMono1, itemFlux1)
..
我正在尝试将 Alpakka Mongo Connector 集成到严重依赖 Akka 库进行流处理的应用程序.该应用程序也使用 Akka HTTP. 我在运行时遇到依赖问题.特别是,当我尝试使用 Mongo 连接器提供的 MongoSink.insertOne 方法时,我得到了一个 NoClassDefFoundError 的某种成功/失败包装器.完整的堆栈跟踪: java.lang.NoC
..
我知道 Publisher 不能同时发布,但是如果我使用 Flux#create(FluxSink),我可以安全地呼叫 FluxSink#next 并发? 换句话说,即使同时调用 FluxSink#next ,Spring是否具有内部魔术来确保事件的正确串行发布? 公共类FluxTest {私有最终Map>下沉=新的ConcurrentHas
..
在什么情况下导致Flux::flatMap同时收听多个来源(0 ...无穷大)? 我在实验中发现,当上游将信号发送到线程thread-upstream-1中的flatMap且有N内部流时,flatMap会监听这些流,并且每个内部流都以不同的方式发送信号/em>线程:thread-inner-stream-i表示1
..