用超时对`Future`s进行排序 [英] Sequencing `Future`s with timeout
问题描述
我利用了 Scala期货-内置超时中引入的TimeoutScheduler
?
但是,现在如果没有TimeoutScheduler
,我的程序将不会像以前那样终止.
However, now my program does not terminate as before without TimeoutScheduler
.
我有两个Future
:res1
和res2
.两者的超时时间均为15秒.最后,为了在onComplete
回调中正确关闭HTTP执行程序,我对两个Future
进行了排序.如果不使用withTimeout
,程序将在http.shutdown
之后立即终止.但是使用withTimeout
则不是.为什么?肯定还有一些未来...
I have two Future
s: res1
and res2
. Both with a timeout of 15 seconds. In the end I sequence both Future
s in order to shutdown the HTTP executor properly in the onComplete
callback. Without using withTimeout
the program terminates right after http.shutdown
. But with using withTimeout
is doesn't. Why? There must be some further futures...
import java.net.URI
import scala.util.{ Try, Failure, Success }
import dispatch._
import org.json4s._
import org.json4s.native.JsonMethods._
import com.typesafe.scalalogging.slf4j.Logging
object Main extends App with Logging {
import scala.concurrent.ExecutionContext.Implicits.global
val http = Http
val github = host("api.github.com").secure
import timeout._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.collection.JavaConverters._
val req: dispatch.Req = github / "users" / "defunkt"
val res1 = http(req > dispatch.as.Response(_.getHeaders().get("Server").asScala)) withTimeout (15 seconds) recover { case x =>
logger.debug("Error1: " + x.toString)
Nil
}
val res2 = http(req > dispatch.as.Response(_.getHeaders().get("Vary").asScala)) withTimeout (15 seconds) recover { case x =>
logger.debug("Error2: " + x.toString)
Nil
}
Future.sequence(res1 :: res2 :: Nil) onComplete { case _ =>
http.shutdown() // without using `withTimeout` the program terminated after `http.shutdow`
TimeoutScheduler.timer.stop() // thanks to @cmbaxter
}
}
object timeout {
import java.util.concurrent.TimeUnit
import scala.concurrent.Promise
import scala.concurrent.duration.Duration
import org.jboss.netty.util.Timeout
import org.jboss.netty.util.TimerTask
import org.jboss.netty.util.HashedWheelTimer
import org.jboss.netty.handler.timeout.TimeoutException
// cf. https://stackoverflow.com/questions/16304471/scala-futures-built-in-timeout
object TimeoutScheduler {
val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
def scheduleTimeout(promise: Promise[_], after: Duration) = {
timer.newTimeout(new TimerTask{
def run(timeout: Timeout){
promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))
}
}, after.toNanos, TimeUnit.NANOSECONDS)
}
}
implicit class FutureWithTimeout[T](f: Future[T]) {
import scala.concurrent.ExecutionContext
def withTimeout(after: Duration)(implicit ec: ExecutionContext) = {
val prom = Promise[T]()
val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
val combinedFut = Future.firstCompletedOf(List(f, prom.future))
f onComplete { case result => timeout.cancel() }
combinedFut
}
}
}
欢迎任何建议,最好,/nm
Any suggestions are welcome, Best, /nm
推荐答案
如果完全按照描述使用我的代码,那么我猜想位于TimeoutScheduler
对象下的Netty HashedWheelTimer
不会终止.像这样调用http.shutdown
之后,您可以尝试显式调用stop
:
If you used my code exactly as described, then my guess is that the Netty HashedWheelTimer
that sits under the TimeoutScheduler
object is not being terminated. You could try explicitly calling stop
on it, after calling http.shutdown
like so:
TimeoutScheduler.timer.stop
现在,如果您希望Netty HashedWheelTimer
使用守护程序线程,则可以在其上使用一个接受ThreadFactory
的构造函数(在3.6.6-Final中看到它们),然后使用一个自定义的ThreadFactory
,它将守护程序标志设置为true.
Now if you wanted the Netty HashedWheelTimer
to use a daemon thread, then you can use one of the constructors on it (I'm seeing them in 3.6.6-Final) that accepts a ThreadFactory
and then use a custom ThreadFactory
that sets the daemon flag to true.
这篇关于用超时对`Future`s进行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!