用超时对`Future`s进行排序 [英] Sequencing `Future`s with timeout

查看:153
本文介绍了用超时对`Future`s进行排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我利用了 Scala期货-内置超时中引入的TimeoutScheduler?

但是,现在如果没有TimeoutScheduler,我的程序将不会像以前那样终止.

However, now my program does not terminate as before without TimeoutScheduler.

我有两个Future:res1res2.两者的超时时间均为15秒.最后,为了在onComplete回调中正确关闭HTTP执行程序,我对两个Future进行了排序.如果不使用withTimeout,程序将在http.shutdown之后立即终止.但是使用withTimeout则不是.为什么?肯定还有一些未来...

I have two Futures: res1 and res2. Both with a timeout of 15 seconds. In the end I sequence both Futures in order to shutdown the HTTP executor properly in the onComplete callback. Without using withTimeout the program terminates right after http.shutdown. But with using withTimeout is doesn't. Why? There must be some further futures...

import java.net.URI
import scala.util.{ Try, Failure, Success }
import dispatch._
import org.json4s._
import org.json4s.native.JsonMethods._
import com.typesafe.scalalogging.slf4j.Logging

object Main extends App with Logging {
  import scala.concurrent.ExecutionContext.Implicits.global

  val http   = Http
  val github = host("api.github.com").secure 

  import timeout._
  import scala.concurrent.duration._
  import scala.language.postfixOps
  import scala.collection.JavaConverters._

  val req: dispatch.Req =  github / "users" / "defunkt"
  val res1 = http(req > dispatch.as.Response(_.getHeaders().get("Server").asScala)) withTimeout (15 seconds) recover { case x => 
    logger.debug("Error1: " + x.toString) 
    Nil
  }
  val res2 = http(req > dispatch.as.Response(_.getHeaders().get("Vary").asScala)) withTimeout (15 seconds) recover { case x => 
    logger.debug("Error2: " + x.toString) 
    Nil
  }

  Future.sequence(res1 :: res2 :: Nil) onComplete { case _ => 
    http.shutdown()               // without using `withTimeout` the program terminated after `http.shutdow`
    TimeoutScheduler.timer.stop() // thanks to @cmbaxter
  }
}

object timeout {
  import java.util.concurrent.TimeUnit
  import scala.concurrent.Promise
  import scala.concurrent.duration.Duration
  import org.jboss.netty.util.Timeout
  import org.jboss.netty.util.TimerTask
  import org.jboss.netty.util.HashedWheelTimer
  import org.jboss.netty.handler.timeout.TimeoutException

   // cf. https://stackoverflow.com/questions/16304471/scala-futures-built-in-timeout
  object TimeoutScheduler {
    val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
    def scheduleTimeout(promise: Promise[_], after: Duration) = {
      timer.newTimeout(new TimerTask{
        def run(timeout: Timeout){              
          promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
        }
      }, after.toNanos, TimeUnit.NANOSECONDS)
    }
  }
  implicit class FutureWithTimeout[T](f: Future[T]) {
    import scala.concurrent.ExecutionContext

    def withTimeout(after: Duration)(implicit ec: ExecutionContext) = {
      val prom = Promise[T]()
      val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
      val combinedFut = Future.firstCompletedOf(List(f, prom.future))
      f onComplete { case result => timeout.cancel() }
      combinedFut
    }
  } 
}

欢迎任何建议,最好,/nm

Any suggestions are welcome, Best, /nm

推荐答案

如果完全按照描述使用我的代码,那么我猜想位于TimeoutScheduler对象下的Netty HashedWheelTimer不会终止.像这样调用http.shutdown之后,您可以尝试显式调用stop:

If you used my code exactly as described, then my guess is that the Netty HashedWheelTimer that sits under the TimeoutScheduler object is not being terminated. You could try explicitly calling stop on it, after calling http.shutdown like so:

TimeoutScheduler.timer.stop

现在,如果您希望Netty HashedWheelTimer使用守护程序线程,则可以在其上使用一个接受ThreadFactory的构造函数(在3.6.6-Final中看到它们),然后使用一个自定义的ThreadFactory,它将守护程序标志设置为true.

Now if you wanted the Netty HashedWheelTimer to use a daemon thread, then you can use one of the constructors on it (I'm seeing them in 3.6.6-Final) that accepts a ThreadFactory and then use a custom ThreadFactory that sets the daemon flag to true.

这篇关于用超时对`Future`s进行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆