backpressure相关内容

RxJs:有损形式的 zip 运算符

考虑使用 zip 运算符将两个无限的 Observable 压缩在一起,其中之一它发出项目的频率是另一个的两倍. 当前的实现是无损的,即如果我让这些 Observable 发射一个小时,然后我在它们的发射率之间切换,第一个 Observable 最终会赶上另一个. 随着缓冲区越来越大,这将在某些时候导致内存爆炸. 如果第一个 observable 将发射项目数小时,而第二个将在最后发射一个项 ..
发布时间:2022-01-01 22:33:10 前端开发

为 Cassandra Writes 获得背压的最佳方法是什么?

我有一个服务,它以我控制的速度从队列中消耗消息.我做了一些处理,然后尝试通过 Datastax Java 客户端写入 Cassandra 集群.我已经使用 maxRequestsPerConnection 和 maxConnectionsPerHost 设置了我的 Cassandra 集群.但是,在测试中,我发现当我达到 maxConnectionsPerHost 和 maxRequestsPer ..
发布时间:2021-12-31 17:34:50 Java开发

卡夫卡的背压

我在 Kafka 中遇到过这样一种情况,即生产者以比消费者消费率高得多的速度发布消息.我必须在kafka中实现背压实现,以便进一步消费和处理. 请让我知道如何在 spark 和普通的 java api 中实现. 解决方案 Kafka 在这里充当监管者.你可以以任何你想要的速度在 Kafka 中生成,扩展代理以适应摄取率.然后你可以随心所欲地消费;Kafka 会保留数据并跟踪消费者在处 ..
发布时间:2021-11-12 01:56:11 其他开发

基于 TCP 的 Node.JS 无界并发/流背压

据我所知,Node 事件 IO 模型的后果之一是无法告诉(例如)通过 TCP 套接字接收数据的 Node 进程在您连接接收事件后进行阻塞处理程序(或以其他方式开始侦听数据). 如果接收方不能足够快地处理传入的数据,则会导致“无界并发",即后台节点继续尽可能快地从套接字读取数据,并在事件上安排新的数据事件在套接字上循环而不是阻塞,直到进程最终耗尽内存并死亡. 接收方不能告诉节点放慢它的读 ..
发布时间:2021-08-30 20:17:26 其他开发

如何一次处理 RxJS 流 n 个项目,一旦项目完成,又自动填充回 n 个项目?

我有一个事件流,我想调用一个函数来为每个事件返回一个承诺,问题是这个函数非常昂贵,所以我想一次最多处理 n 个事件. 这个卵石图可能有误,但这是我想要的: ---x--x--xxxxxxx-------------x------------->//事件---p--p--pppp------p-p-p-----p------------->//进行中-------d--d--------d- ..
发布时间:2021-07-14 19:11:17 前端开发

为什么`Publishers.Map`会急切地消耗上游值?

假设我有一个自定义订阅者,该订阅者在订阅时请求一个值,然后在收到前一个值三秒钟后再请求一个附加值: class MySubscriber:订阅者{typealias输入=整数typealias失败=永不私人var订阅:订阅?func receive(订阅:订阅){打印(“已订阅")self.subscription =订阅subscription.request(.max(1))}func r ..
发布时间:2021-04-23 19:13:58 移动开发

卡夫卡的背压

我在卡夫卡(Kafka)遇到一种情况,生产者以比消费者消费率高得多的速度发布消息.我必须在kafka中实施反压实施,以进一步消耗和处理. 请让我知道如何在spark和普通的Java API中实现. 解决方案 Kafka在这里充当监管者.您可以按照想要的任何速率将其生成到Kafka中,从而将经纪人向外扩展以适应摄取速率.然后,您可以按照自己的意愿进行消费;Kafka保留数据并跟踪消费者 ..
发布时间:2021-04-08 18:43:53 其他开发

处理FixedThreadPool中的背压

如何使用线程池处理Java中的背压? 如何拒绝新任务,以便提交的任务不超过 N 个. N -是提交队列中允许的最大任务数,包括新的,正在运行的,暂停的(未完成)任务. 用例 用户提交运行一段时间的计算任务.有时,有太多用户同时提交任务.如果已经提交了 N 个任务,如何拒绝新任务. 换句话说,已提交的总数(未完成,已开始或未开始的任务)不能大于 N . 示例代码 ..

是否可以让事件处理程序等到异步/基于Promise的代码完成?

我正在nodejs模式下使用出色的Papa Parse库,将超过一百万行的大型(500 MB)CSV文件流式传输到慢速持久性API中,一次只能处理一个请求。持久性API基于 Promise s,但是从Papa Parse,我在 synchronous 事件中收到每个已解析的CSV行,如下所示: parseStream.on(“ data”,row => {...} 我面临的挑战是Papa P ..
发布时间:2020-10-30 21:32:50 其他开发

Spark结构化流如何处理背压?

我正在分析Spark结构化流上的背压功能.有人知道细节吗?是否可以通过代码调整处理传入记录? 谢谢 解决方案 如果要在结构化流中动态更改每个内部批处理的大小,请否.结构化流中没有基于接收器的源,因此完全没有必要.从另一个角度来看,结构化流无法进行真正的背压,因为例如Spark无法告知其他应用程序减慢将数据推入Kafka的速度. 通常,结构化流默认会尝试尽快处理数据.每个源中都有允许控 ..

当缓慢的使用者在流处理中产生反压(火花,aws)时,避免数据丢失

我是分布式流处理(Spark)的新手.我已经阅读了一些教程/示例,这些教程/示例涵盖了背压如何导致生产者因过载的消费者而减慢速度的情况.给出的经典示例是摄取和分析推文.当流量出现意外增长而使用户无法承受负载时,他们会施加背压,生产者会通过将速率降低一些来做出响应. 我没有真正看到的是实践中使用什么方法来处理由于整个管道容量较低而无法立即处理的大量传入实时数据? 我想这的答案取决于业务领 ..

如何使用Akka BoundedMailBox限制生产者

我有两个参与者,一个正在制作消息,另一个正在以固定费率使用消息。 是否有可能生产者是否受到消费者BoundedMailBox的限制? (背压) 我的生产者当前是定期排定的(向其发送打勾消息),有没有办法根据消费者邮箱中的可用性来排定它? / p> 由于我不需要响应,因此我正在使用即弃即忘样式(Consumer.tell())。我应该使用其他邮件发送方法吗? 解决方案 仅指 ..
发布时间:2020-06-03 18:46:00 其他开发

RxJs:zip运算符的有损形式

考虑使用 zip 运算符将两个无限的Observable压缩在一起发射物品的次数是另一次发射的两倍。 目前的实施方案是无损耗的,即如果我让这些Observables发射一小时,然后我在它们的发射率之间切换,第一个Observable最终会赶上另一个。 随着缓冲区变得越来越大,这将导致内存爆炸。 如果第一个observable将发出几个小时的项目,那么同样会发生第二个将在最后发出一 ..
发布时间:2019-05-24 19:49:28 前端开发

为Cassandra写入获得背压的最佳方法是什么?

我有一个服务,使用我控制的速率消耗队列中的消息。我做一些处理,然后尝试通过Datastax Java客户端写入Cassandra集群。我已经设置我的Cassandra集群与 maxRequestsPerConnection 和 maxConnectionsPerHost 。但是,在测试中,我发现当我达到 maxConnectionsPerHost 和 maxRequestsPerConnectio ..
发布时间:2016-11-13 14:01:04 Java开发