reactive-streams相关内容

Akka Stream Source.queue 的背压策略不起作用

我试图理解为什么下面的代码片段正在做它正在做的事情.我会认为,因为接收器不能比源生成内容更快地产生需求,那么我会收到丢弃的消息以响应某些报价(溢出策略设置为丢弃缓冲区)以及错误和队列关闭消息在自毁片之后. 片段: 打包游乐场导入 java.time.LocalDateTime导入 java.util.concurrent.atomic.AtomicInteger导入 akka.actor. ..
发布时间:2021-11-26 22:51:37 其他开发

端到端反应式流 RESTful 服务(又名 Back-Pressure over HTTP)

我在网上尝试澄清这个问题有一段时间没有成功,所以我会尝试在这里提问. 我想找到一些资源或示例,其中展示了如何构建端到端的完全背压式 REST 服务 + 客户端.我的意思是,我希望看到,给定一个实现 Reactive Streams 的 REST 客户端(无论是在 Akka、JS 还是其他中),我将拥有(并且能够“可视化")整个过程中处理的背压构建的 REST 服务器,例如使用 Akka-Ht ..
发布时间:2021-11-26 22:50:28 其他开发

Slick 中如何使用反应流来插入数据

在 Slick 的文档中,提供了使用反应流的示例仅用于读取数据作为 DatabasePublisher 的一种方式.但是,当您想根据插入率将数据库用作 Sink 和 backpreasure 时会发生什么? 我已经寻找了等效的DatabaseSubscriber,但它不存在.所以问题是,如果我有来源,请说: val 源 = 源(0 到 100) 我怎样才能用 Slick 构造一个 ..
发布时间:2021-11-26 22:48:36 数据库

Akka-Stream 实现比单线程实现慢

2015 年 10 月 30 日更新 基于 Roland Kuhn Awnser: Akka Streams 使用 Actors 之间的异步消息传递来实现流处理阶段.跨数据传递异步边界有一个你在这里看到的开销:你的计算似乎只需要大约 160ns(来自单线程测量),而流解决方案需要每个元素大约 1µs,主要由消息传递决定. 另一个误解是说“流"意味着并行:在您的代码所有计算都在单个 ..
发布时间:2021-11-26 22:48:04 其他开发

使用 Akka Stream 从数据库流式传输记录

我有一个使用 Akka 的系统,该系统当前通过消息队列处理传入的流数据.当一条记录到达时,它被处理,mq 被确认并传递记录以在系统内进一步处理. 现在我想添加对使用数据库作为输入的支持. 输入源能够处理数据库的方法是什么(应该以接收器可以处理的速度流式传输 > 100M 记录 - 所以我假设是反应性/akka-streams?)? 解决方案 Slick Library 流畅的 ..
发布时间:2021-11-26 22:38:35 其他开发

Akka Streams:Mat 在 Source[out, Mat] 中代表什么

在 Akka 流中,Source[Out, Mat] 或 Sink[In, Mat] 中的 Mat 代表什么.什么时候会实际使用? 解决方案 Mat 类型参数表示该流的物化值的类型. 请记住,在 Akka Source、Flow、Sink(好吧,所有图)只是蓝图——它们不做任何处理他们自己,他们只描述了应该如何构造流.将这些蓝图转换为带有实时数据的工作流的过程称为物化. 实现流 ..
发布时间:2021-11-26 22:37:02 其他开发

Spring Reactive Stream - 意外关闭

我们正在将 Spring Cloud Reactive Streams 与 RabbitMQ 结合使用. Spring Reactive Stream 似乎在将消息从队列中拉出时立即确认该消息.因此,在消息处理期间发生的任何错误未处理异常都需要在应用程序中进行处理(这与非反应流不同,非反应流可以抛出未处理的异常并拒绝消息,从而将其发送到死信队列). 当消息正在传输时,我们应该如何处理应 ..

Akka Stream 连接到多个接收器

我在 akka 流中实现了一个自定义组件,它将元素作为输入,根据一个键对它们进行分组和合并,然后通过十几个出口之一将它们发送出去.您可以将此组件视为一种 GroupBy 组件,它不会将流划分为子流,而是实际的流.除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生了一些缓冲,使得 1 个元素进入并不一定意味着 1 个元素通过出口传出. 以下是所述组件的简化实现. clas ..
发布时间:2021-07-15 20:50:18 其他开发

来自带有 Spring/Project 反应器的 HTTP 客户端的流响应

如何将响应从响应式 HTTP 客户端流式传输到控制器,而无需在应用程序内存中随时保留整个响应主体? 几乎所有项目反应器客户端的示例都返回 Mono.据我了解,反应式流是关于流式传输的,而不是全部加载然后发送响应. 是否有可能返回一种Flux,从而可以将大文件从某些外部服务传输到应用程序客户端,而无需使用大量 RAM 内存来存储中间结果? 解决方案 我不认为在你的场景中你需要创建 ..
发布时间:2021-06-22 18:37:06 其他开发

Project Reactor:如何控制通量发射

我有一个发射一些 Date 的通量.这个 Date 被映射到 1024 个模拟的 HTTP 请求,我在一些 Executer 上运行. 我想做的是在发出下一个 Date 之前等待所有 1024 个 HTTP 请求. 当前运行时,onNext() 被多次调用,然后稳定在某个稳定的速率上. 我该如何改变这种行为? 附言如果需要,我愿意转向架构. private void ru ..

MongoDB Reactive Streams 运行时依赖错误与 Alpakka Mongo 连接器 ClassNotFoundException

我正在尝试将 Alpakka Mongo Connector 集成到严重依赖 Akka 库进行流处理的应用程序.该应用程序也使用 Akka HTTP. 我在运行时遇到依赖问题.特别是,当我尝试使用 Mongo 连接器提供的 MongoSink.insertOne 方法时,我得到了一个 NoClassDefFoundError 的某种成功/失败包装器.完整的堆栈跟踪: java.lang.NoC ..
发布时间:2021-06-03 19:31:04 其他开发