如何在具有超时的单独调度程序上运行Akka Streams图? [英] How to run Akka Streams graph on a separate dispatcher with timeout?

查看:56
本文介绍了如何在具有超时的单独调度程序上运行Akka Streams图?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题是基于我所做的一个宠物项目,并且这个 SO 线程。在Akka HTTP路由定义中,我启动了一个长时间运行的过程,很自然地我想这样做而不阻塞用户。我可以通过下面的代码片段实现这一目标:

This question is based on a pet project that I did and this SO thread. Inside a Akka HTTP route definition, I start a long-running process, and naturally I want to do that without blocking the user. I'm able to achieve this with the code snippet below:

blocking-io-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 16
  }
  throughput = 1
}







complete {
  Try(new URL(url)) match {
    case scala.util.Success(u) => {
      val src = Source.fromIterator(() => parseMovies(u).iterator)

      src
        .via(findMovieByTitleAndYear)
        .via(persistMovies)
        .toMat(Sink.fold(Future(0))((acc, elem) => Applicative[Future].map2(acc, elem)(_ + _)))(Keep.right)
        // run the whole graph on a separate dispatcher
        .withAttributes(ActorAttributes.dispatcher("blocking-io-dispatcher"))
        .run.flatten
        .onComplete {
          _ match {
            case scala.util.Success(n) => logger.info(s"Created $n movies")
            case Failure(t) => logger.error(t, "Failed to process movies")
          }
        }

      Accepted
    }
    case Failure(t) => logger.error(t, "Bad URL"); BadRequest -> "Bad URL"
  }
}

如果我已经解决了吗?问题是我不确定如何设置超时时间。图形的执行会创建一个 Future ,并在专用的 blocking-io-dispatcher 上执行直至完成。如果我添加一个 Await 调用,代码将阻塞。有没有办法超时?

What's the problem then if I've already solved it? The problem is that I'm not sure how to set a timeout. The execution of the graph creates a Future that executes until complete on the dedicated blocking-io-dispatcher. If I add a Await call, the code blocks. Is there a way to put a timeout?

推荐答案

completionTimeout 阶段应对此有所帮助。下面的示例:

completionTimeout stage should help here. Example below:

src
    .completionTimeout(5.seconds)
    ...
    .run.flatten
    .onComplete {
        case scala.util.Success(n) => logger.info(s"Created $n movies")
        case Failure(t: TimeoutException) => logger.error(t, "Timed out")
        case Failure(t) => logger.error(t, "Failed to process movies")
    }

文档参考< a href = http://doc.akka.io/docs/akka/2.4/scala/stream/stages-overview.html#completionTimeout rel = nofollow noreferrer>此处。

这篇关于如何在具有超时的单独调度程序上运行Akka Streams图?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆