如何限制Akka Stream每秒仅执行和发送一条消息一次? [英] How to limit an Akka Stream to execute and send down one message only once per second?

查看:105
本文介绍了如何限制Akka Stream每秒仅执行和发送一条消息一次?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Akka流,我希望流大约每秒发送一次消息。

I have an Akka Stream and I want the stream to send messages down stream approximately every second.

我尝试了两种方法来解决此问题,第一种方法是使制作者在流的开头仅在继续消息进入此参与者时每秒发送一次消息。

I tried two ways to solve this problem, the first way was to make the producer at the start of the stream only send messages once every second when a Continue messages comes into this actor.


//当在ActorPublisher
中接收到Continue消息// //然后工作...
if(totalDemand> 0){
import scala.concurrent.duration._
context.system.scheduler.scheduleOnce(1秒,自我,继续)
}

不久之后,在ActorPublisher actor中出现了Continue消息泛滥的时候,我假设下游用户可以通过背压请求消息(猜测但不确定),因为下游可能很快消耗掉,但上游却没有快速产生。因此,此方法失败了。

This works for a short while then a flood of Continue messages appear in the ActorPublisher actor, I assume (guess but not sure) from downstream via back-pressure requesting messages as the downstream can consume fast but the upstream is not producing at a fast rate. So this method failed.

我尝试的另一种方法是通过反压控制,我在<< c $ c> MaxInFlightRequestStrategy code> ActorSubscriber 在流的末尾,以将消息数限制为每秒1条。这种方法有效,但是每次传入的消息大约只有三个左右,而不仅仅是一次。似乎反压控件不会立即更改进入的消息的速率,或者消息已经在流中排队等待处理。

The other way I tried was via backpressure control, I used a MaxInFlightRequestStrategy on the ActorSubscriber at the end of the stream to limit the number of messages to 1 per second. This works but messages coming in come in at approximately three or so at a time, not just one at a time. It seems the backpressure control doesn't immediately change the rate of messages coming in OR messages were already queued in the stream and waiting to be processed.

问题是,我怎么有一个只能每秒处理一条消息的Akka流?

So the problem is, how can I have an Akka Stream which can process one message only per second?

我发现 MaxInFlightRequestStrategy 是一种有效的方法,但是我应该将批处理大小设置为1,其批处理大小默认为5,这导致了我发现的问题。现在,我正在这里查看提交的答案,这也是解决问题的一种过于复杂的方法。

I discovered that MaxInFlightRequestStrategy is a valid way to do it but I should set the batch size to 1, its batch size is default 5, which was causing the problem I found. Also its an over-complicated way to solve the problem now that I am looking at the submitted answer here.

推荐答案

将元素置于节流的作用下,这将给快速来源带来压力,或者您可以使用勾号 zip 。

You can either put your elements through the throttling flow, which will back pressure a fast source, or you can use combination of tick and zip.

第一个解决方案是这样的:

The first solution would be like this:

val veryFastSource =
  Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))

val throttlingFlow = Flow[Long].throttle(
  // how many elements do you allow
  elements = 1,
  // in what unit of time
  per = 1.second,
  maximumBurst = 0,
  // you can also set this to Enforcing, but then your
  // stream will collapse if exceeding the number of elements / s
  mode = ThrottleMode.Shaping
)

veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println))

第二个解决方案将是这样的:

The second solution would be like this:

val veryFastSource =
  Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))

val tickingSource = Source.tick(1.second, 1.second, 0) 

veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println))

这篇关于如何限制Akka Stream每秒仅执行和发送一条消息一次?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆