如何限制 Akka Stream 每秒仅执行和发送一条消息一次? [英] How to limit an Akka Stream to execute and send down one message only once per second?

查看:24
本文介绍了如何限制 Akka Stream 每秒仅执行和发送一条消息一次?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Akka Stream,我希望该流大约每秒向下游发送消息.

I have an Akka Stream and I want the stream to send messages down stream approximately every second.

我尝试了两种方法来解决这个问题,第一种方法是让流开始的生产者只在有 Continue 消息进入这个 actor 时每秒发送一次消息.

I tried two ways to solve this problem, the first way was to make the producer at the start of the stream only send messages once every second when a Continue messages comes into this actor.

<代码>//当在 ActorPublisher 中收到 Continue 消息时//做工作然后...如果(总需求 > 0){导入 scala.concurrent.duration._context.system.scheduler.scheduleOnce(1 秒,自我,继续)}

这工作了一小会儿,然后大量的 Continue 消息出现在 ActorPublisher 演员中,我假设(猜测但不确定)来自下游通过背压请求消息,因为下游可以快速消耗,但上游不生产速度快.所以这个方法失败了.

This works for a short while then a flood of Continue messages appear in the ActorPublisher actor, I assume (guess but not sure) from downstream via back-pressure requesting messages as the downstream can consume fast but the upstream is not producing at a fast rate. So this method failed.

我尝试的另一种方法是通过背压控制,我在流末尾的 ActorSubscriber 上使用了 MaxInFlightRequestStrategy 以将消息数量限制为每秒 1 个.这有效,但传入的消息大约一次传入三个左右,而不是一次传入一个.似乎背压控制并没有立即改变传入消息的速率,或者消息已经在流中排队等待处理.

The other way I tried was via backpressure control, I used a MaxInFlightRequestStrategy on the ActorSubscriber at the end of the stream to limit the number of messages to 1 per second. This works but messages coming in come in at approximately three or so at a time, not just one at a time. It seems the backpressure control doesn't immediately change the rate of messages coming in OR messages were already queued in the stream and waiting to be processed.

那么问题是,我如何才能拥有一个每秒只能处理一条消息的 Akka Stream?

So the problem is, how can I have an Akka Stream which can process one message only per second?

我发现 MaxInFlightRequestStrategy 是一种有效的方法,但我应该将批处理大小设置为 1,它的批处理大小默认为 5,这导致了我发现的问题.现在我正在这里查看提交的答案,这也是解决问题的一种过于复杂的方法.

I discovered that MaxInFlightRequestStrategy is a valid way to do it but I should set the batch size to 1, its batch size is default 5, which was causing the problem I found. Also its an over-complicated way to solve the problem now that I am looking at the submitted answer here.

推荐答案

你可以让你的元素通过节流流,这会反压一个快速的源,或者你可以使用 tick 的组合和 zip.

You can either put your elements through the throttling flow, which will back pressure a fast source, or you can use combination of tick and zip.

第一个解决方案是这样的:

The first solution would be like this:

val veryFastSource =
  Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))

val throttlingFlow = Flow[Long].throttle(
  // how many elements do you allow
  elements = 1,
  // in what unit of time
  per = 1.second,
  maximumBurst = 0,
  // you can also set this to Enforcing, but then your
  // stream will collapse if exceeding the number of elements / s
  mode = ThrottleMode.Shaping
)

veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println))

第二种解决方案是这样的:

The second solution would be like this:

val veryFastSource =
  Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))

val tickingSource = Source.tick(1.second, 1.second, 0) 

veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println))

这篇关于如何限制 Akka Stream 每秒仅执行和发送一条消息一次?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆