Akka Stream Source.queue 的背压策略不起作用 [英] Backpressure strategies for Akka Stream Source.queue not working

查看:26
本文介绍了Akka Stream Source.queue 的背压策略不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图理解为什么下面的代码片段正在做它正在做的事情.我会认为,因为接收器不能比源生成内容更快地产生需求,那么我会收到丢弃的消息以响应某些报价(溢出策略设置为丢弃缓冲区)以及错误和队列关闭消息在自毁片之后.

I'm trying to understand why the below code snippet is doing what it's doing. I would have thought that because the Sink cannot produce demand faster than the Source is producing content, then I would be getting dropped messages in response to some of the offers (overflow strategy is set to Drop Buffer) and also an error and queue closed message after the self destruct piece.

片段:

package playground

import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.duration._

case object MessageToSink

object Playground extends App {

  implicit val actorSystem = ActorSystem("Playground")
  implicit val execCntxt = actorSystem.dispatcher

  val sinkActor = actorSystem.actorOf(Props[Actor2SinkFwder])
  actorSystem.scheduler.schedule(1 millisecond, 50 milliseconds, sinkActor, MessageToSink)

  println(s"Playground has started... ${LocalDateTime.now()}")
}

class Actor2SinkFwder extends Actor with ActorLogging {

  implicit val materializer = ActorMaterializer()
  implicit val execCtxt = context.dispatcher

  val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
    .to(Sink.foreach[Int] {
      i =>
        println(s"$i Sinking starts at ${LocalDateTime.now()}")
        Thread.sleep(150)
        if (i == 5) throw new RuntimeException("KaBoom!")
        println(s"$i Sinking completes at ${LocalDateTime.now()}")
    }).run()

  val i: AtomicInteger = new AtomicInteger(0)

  override def receive: Receive = {
    case MessageToSink =>
      val num = i.incrementAndGet()
      println(s"$num Sink Command received at ${LocalDateTime.now()}")
      flow.offer(num).collect {
        case Enqueued => println(s"$num Enqueued ${LocalDateTime.now}")
        case Dropped => println(s"$num Dropped ${LocalDateTime.now}")
        case Failure(err) => println(s"$num Failed ${LocalDateTime.now} $err")
        case QueueClosed => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
      }
   }
}

输出:

Playground has started... 2016-12-27T18:35:29.574
1 Sink Command received at 2016-12-27T18:35:29.640
2 Sink Command received at 2016-12-27T18:35:29.642
3 Sink Command received at 2016-12-27T18:35:29.642
1 Sinking starts at 2016-12-27T18:35:29.649
1 Enqueued 2016-12-27T18:35:29.650
4 Sink Command received at 2016-12-27T18:35:29.688
5 Sink Command received at 2016-12-27T18:35:29.738
6 Sink Command received at 2016-12-27T18:35:29.788
1 Sinking completes at 2016-12-27T18:35:29.799
2 Sinking starts at 2016-12-27T18:35:29.800
2 Enqueued 2016-12-27T18:35:29.800
7 Sink Command received at 2016-12-27T18:35:29.838
8 Sink Command received at 2016-12-27T18:35:29.888
9 Sink Command received at 2016-12-27T18:35:29.938
2 Sinking completes at 2016-12-27T18:35:29.950
3 Sinking starts at 2016-12-27T18:35:29.951
3 Enqueued 2016-12-27T18:35:29.951
10 Sink Command received at 2016-12-27T18:35:29.988
11 Sink Command received at 2016-12-27T18:35:30.038
12 Sink Command received at 2016-12-27T18:35:30.088
3 Sinking completes at 2016-12-27T18:35:30.101
4 Sinking starts at 2016-12-27T18:35:30.101
4 Enqueued 2016-12-27T18:35:30.101
13 Sink Command received at 2016-12-27T18:35:30.138
14 Sink Command received at 2016-12-27T18:35:30.189
15 Sink Command received at 2016-12-27T18:35:30.238
4 Sinking completes at 2016-12-27T18:35:30.251
5 Sinking starts at 2016-12-27T18:35:30.251
5 Enqueued 2016-12-27T18:35:30.252
16 Sink Command received at 2016-12-27T18:35:30.288
17 Sink Command received at 2016-12-27T18:35:30.338
18 Sink Command received at 2016-12-27T18:35:30.388
19 Sink Command received at 2016-12-27T18:35:30.438
20 Sink Command received at 2016-12-27T18:35:30.488
21 Sink Command received at 2016-12-27T18:35:30.538
22 Sink Command received at 2016-12-27T18:35:30.588
23 Sink Command received at 2016-12-27T18:35:30.638
24 Sink Command received at 2016-12-27T18:35:30.688
25 Sink Command received at 2016-12-27T18:35:30.738
26 Sink Command received at 2016-12-27T18:35:30.788
etc...

我认为我的误解是关于使用 getAsyncCallback 在 QueueSource 类中.即使 QueueSource 中的 offer 调用使用正确的 Offer 详细信息调用 stageLogic,在前一个元素完成处理之前,不会调用阶段逻辑中此代码的实际处理程序,因此没有用于检查缓冲区大小或应用溢出的逻辑策略正在得到应用...... :-/

I think my miss-understanding is around the use of getAsyncCallback in the QueueSource class. Even though the offer call in the QueueSource invokes the stageLogic with the correct Offer details, the actual handler for this code in the stage logic doesnt get invoked until the previous element has finished processing, so none of the logic for checking buffer sizes or applying Overflow Strategies is getting applied... :-/

推荐答案

要查看您期望的结果,您应该在 Source 和你的 接收器.这是一种告诉 Akka 使用两个不同的 Actor 运行两个阶段的方法——通过强制在两者之间建立异步边界.

To see the result you're expecting, you should add an async stage between your Source and your Sink. This is a way to tell Akka to run the two stages using two distinct Actors - by forcing an asynchronous boundary between the two.

如果没有 async,Akka 将通过在一个 Actor 中粉碎所有内容来优化执行,这将使处理顺序化.在您的示例中,正如您所注意到的,在前一条消息的 Thread.sleep(150) 完成之前,一条消息被提供到队列中.可以找到有关该主题的更多信息 这里.

Without the async, Akka will optimize the execution by smashing everything in one actor, which will sequentialise the processing. In your example, as you noticed, a message is offered to the queue until the Thread.sleep(150) of the previous message have been completed. More info on the topic can be found here.

  val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
    .async
    .to(Sink.foreach[Int] {...}).run()

此外,您应该在匹配 .offer 结果时再添加一种情况.这是FutureFailure,当队列下游出现故障时,Future 完成.这适用于在前 5 个

Also, you should add one more case when matching the .offer result. This is a Failure of the Future, which the Future gets completed with when the queue downstream has been failed. This applies to all messages offered after the first 5

  override def receive: Receive = {
    case MessageToSink =>
      val num = i.incrementAndGet()
      println(s"$num Sink Command received at ${LocalDateTime.now()}")
      flow.offer(num).onComplete {
        case Success(Enqueued) => println(s"$num Enqueued ${LocalDateTime.now}")
        case Success(Dropped) => println(s"$num Dropped ${LocalDateTime.now}")
        case Success(Failure(err)) => println(s"$num Failed ${LocalDateTime.now} $err")
        case Success(QueueClosed) => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
        case util.Failure(err) => println(s"$num Failed ${LocalDateTime.now} with exception $err")
      }
  }

请注意,即使执行上述所有操作,您也不会看到任何 QueueOfferResult.Dropped 结果.那是因为您选择了 DropBuffer 策略.每个传入的消息都将排队(因此产生一个 Enqueued 消息),踢出现有的缓冲区.如果您将策略更改为 DropNew,您应该会开始看到一些 Dropped 消息.

Note that, even by doing all the above, you will not see any QueueOfferResult.Dropped results. That is because you chose DropBuffer strategy. Every incoming message will be queued (therefore producing an Enqueued message), kicking out the existing buffer. If you change the strategy to DropNew, you should start seeing some Dropped messages.

这篇关于Akka Stream Source.queue 的背压策略不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆