Scala中的带有期货的异步IO [英] Asynchronous IO in Scala with futures

查看:66
本文介绍了Scala中的带有期货的异步IO的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

比方说,我从一些URL中下载了一个(可能很大)图像列表.我正在使用Scala,所以我要做的是:

Let's say I'm getting a (potentially big) list of images to download from some URLs. I'm using Scala, so what I would do is :

import scala.actors.Futures._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val fimages: List[Future[...]] = urls.map (url => future { download url })

// Do something (display) when complete
fimages.foreach (_.foreach (display _))

我对Scala有点陌生,所以对我来说,这仍然有点像魔术:

I'm a bit new to Scala, so this still looks a little like magic to me :

  • 这是正确的方法吗?如果不是,还有其他选择吗?
  • 如果我要下载100张图像,这会一次创建100个线程,还是会使用线程池?
  • 最后一条指令(display _)是否会在主线程上执行?如果没有,我如何确定它是?
  • Is this the right way to do it? Any alternatives if it is not?
  • If I have 100 images to download, will this create 100 threads at once, or will it use a thread pool?
  • Will the last instruction (display _) be executed on the main thread, and if not, how can I make sure it is?

感谢您的建议!

推荐答案

在Scala 2.10中使用期货.他们是Scala团队,Akka团队和Twitter之间的共同工作,以期实现更加标准化的未来API和实现,以供跨框架使用.我们刚刚在以下位置发布了指南: http://docs.scala-lang.org/overviews/core/futures .html

Use Futures in Scala 2.10. They were joint work between the Scala team, the Akka team, and Twitter to reach a more standardized future API and implementation for use across frameworks. We just published a guide at: http://docs.scala-lang.org/overviews/core/futures.html

Scala的2.10期货除了完全非阻塞(默认情况下,尽管我们提供了托管阻塞操作的能力)和可组合性之外,还带有隐式线程池来执行任务,以及一些实用程序来管理时间出局.

Beyond being completely non-blocking (by default, though we provide the ability to do managed blocking operations) and composable, Scala's 2.10 futures come with an implicit thread pool to execute your tasks on, as well as some utilities to manage time outs.

import scala.concurrent.{future, blocking, Future, Await, ExecutionContext.Implicits.global}
import scala.concurrent.duration._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val imagesFuts: List[Future[...]] = urls.map {
  url => future { blocking { download url } }
}

// Do something (display) when complete
val futImages: Future[List[...]] = Future.sequence(imagesFuts)
Await.result(futImages, 10 seconds).foreach(display)

上面,我们首先导入一些东西:

Above, we first import a number of things:

  • future:用于创建未来的API.
  • blocking:用于托管阻止的API.
  • Future:期货伴随对象,其中包含许多用于期货收集的有用方法.
  • Await:用于将来阻止(将其结果传输到当前线程)的单例对象.
  • ExecutionContext.Implicits.global:默认的全局线程池,一个ForkJoin池.
  • duration._:用于管理超时时间的实用程序.
  • future: API for creating a future.
  • blocking: API for managed blocking.
  • Future: Future companion object which contains a number of useful methods for collections of futures.
  • Await: singleton object used for blocking on a future (transferring its result to the current thread).
  • ExecutionContext.Implicits.global: the default global thread pool, a ForkJoin pool.
  • duration._: utilities for managing durations for time outs.

imagesFuts基本上与您最初所做的相同-唯一的区别是我们使用托管阻止-blocking.它通知线程池您传递给它的代码块包含长时间运行或阻塞的操作.这使池可以临时产生新的工作程序,以确保所有工作程序都不会被阻塞.这样做是为了防止阻塞应用程序中的饥饿(锁定线程池).请注意,线程池还知道托管阻塞块中的代码何时完成-因此它将在该点删除备用工作线程,这意味着该池将缩小到其预期大小.

imagesFuts remains largely the same as what you originally did- the only difference here is that we use managed blocking- blocking. It notifies the thread pool that the block of code you pass to it contains long-running or blocking operations. This allows the pool to temporarily spawn new workers to make sure that it never happens that all of the workers are blocked. This is done to prevent starvation (locking up the thread pool) in blocking applications. Note that the thread pool also knows when the code in a managed blocking block is complete- so it will remove the spare worker thread at that point, which means that the pool will shrink back down to its expected size.

(如果要绝对阻止创建其他线程,则应使用AsyncIO库,例如Java的NIO库.)

(If you want to absolutely prevent additional threads from ever being created, then you ought to use an AsyncIO library, such as Java's NIO library.)

然后,我们使用Future随行对象的收集方法将imagesFutsList[Future[...]]转换为Future[List[...]].

Then we use the collection methods of the Future companion object to convert imagesFuts from List[Future[...]] to a Future[List[...]].

Await对象是我们如何确保在调用线程上执行display的方法-Await.result只是强制当前线程等待,直到将来通过它为止. (这在内部使用托管阻止.)

The Await object is how we can ensure that display is executed on the calling thread-- Await.result simply forces the current thread to wait until the future that it is passed is completed. (This uses managed blocking internally.)

这篇关于Scala中的带有期货的异步IO的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆