Scala 中的异步 IO 与期货 [英] Asynchronous IO in Scala with futures

查看:26
本文介绍了Scala 中的异步 IO 与期货的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我从某些 URL 获取要下载的(可能很大)图像列表.我正在使用 Scala,所以我会做的是:

Let's say I'm getting a (potentially big) list of images to download from some URLs. I'm using Scala, so what I would do is :

import scala.actors.Futures._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val fimages: List[Future[...]] = urls.map (url => future { download url })

// Do something (display) when complete
fimages.foreach (_.foreach (display _))

我对 Scala 有点陌生,所以这对我来说仍然有点像魔术:

I'm a bit new to Scala, so this still looks a little like magic to me :

  • 这是正确的做法吗?如果不是,还有其他选择吗?
  • 如果我有 100 张图片要下载,这会一次创建 100 个线程,还是会使用线程池?
  • 最后一条指令(display _)是否会在主线程上执行,如果没有,我如何确定它是?
  • Is this the right way to do it? Any alternatives if it is not?
  • If I have 100 images to download, will this create 100 threads at once, or will it use a thread pool?
  • Will the last instruction (display _) be executed on the main thread, and if not, how can I make sure it is?

感谢您的建议!

推荐答案

在 Scala 2.10 中使用 Futures.他们是 Scala 团队、Akka 团队和 Twitter 之间的联合工作,旨在实现更标准化的未来 API 和跨框架使用的实现.我们刚刚发布了一个指南:http://docs.scala-lang.org/overviews/core/futures.html

Use Futures in Scala 2.10. They were joint work between the Scala team, the Akka team, and Twitter to reach a more standardized future API and implementation for use across frameworks. We just published a guide at: http://docs.scala-lang.org/overviews/core/futures.html

除了完全非阻塞(默认情况下,尽管我们提供了进行托管阻塞操作的能力)和可组合之外,Scala 的 2.10 期货还带有一个隐式线程池来执行您的任务,以及一些用于管理时间的实用程序出局.

Beyond being completely non-blocking (by default, though we provide the ability to do managed blocking operations) and composable, Scala's 2.10 futures come with an implicit thread pool to execute your tasks on, as well as some utilities to manage time outs.

import scala.concurrent.{future, blocking, Future, Await, ExecutionContext.Implicits.global}
import scala.concurrent.duration._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val imagesFuts: List[Future[...]] = urls.map {
  url => future { blocking { download url } }
}

// Do something (display) when complete
val futImages: Future[List[...]] = Future.sequence(imagesFuts)
Await.result(futImages, 10 seconds).foreach(display)

上面,我们先导入一些东西:

Above, we first import a number of things:

  • future:用于创造未来的 API.
  • blocking:用于托管阻塞的 API.
  • Future:Future 伴随对象,其中包含许多用于集合期货的有用方法.
  • Await:用于阻塞未来的单例对象(将其结果传输到当前线程).
  • ExecutionContext.Implicits.global:默认的全局线程池,一个 ForkJoin 池.
  • duration._:用于管理超时持续时间的实用程序.
  • future: API for creating a future.
  • blocking: API for managed blocking.
  • Future: Future companion object which contains a number of useful methods for collections of futures.
  • Await: singleton object used for blocking on a future (transferring its result to the current thread).
  • ExecutionContext.Implicits.global: the default global thread pool, a ForkJoin pool.
  • duration._: utilities for managing durations for time outs.

imagesFuts 与您最初所做的大致相同 - 这里唯一的区别是我们使用托管阻塞 - blocking.它通知线程池您传递给它的代码块包含长时间运行或阻塞的操作.这允许池临时产生新的工作人员,以确保永远不会发生所有工作人员都被阻塞的情况.这样做是为了防止阻塞应用程序中的饥饿(锁定线程池).请注意,线程池也知道托管阻塞块中的代码何时完成 - 因此它会在此时删除空闲的工作线程,这意味着池将收缩回其预期大小.

imagesFuts remains largely the same as what you originally did- the only difference here is that we use managed blocking- blocking. It notifies the thread pool that the block of code you pass to it contains long-running or blocking operations. This allows the pool to temporarily spawn new workers to make sure that it never happens that all of the workers are blocked. This is done to prevent starvation (locking up the thread pool) in blocking applications. Note that the thread pool also knows when the code in a managed blocking block is complete- so it will remove the spare worker thread at that point, which means that the pool will shrink back down to its expected size.

(如果您想绝对防止创建额外的线程,那么您应该使用 AsyncIO 库,例如 Java 的 NIO 库.)

(If you want to absolutely prevent additional threads from ever being created, then you ought to use an AsyncIO library, such as Java's NIO library.)

然后我们使用 Future 伴随对象的集合方法将 imagesFutsList[Future[...]] 转换为 Future[List[...]].

Then we use the collection methods of the Future companion object to convert imagesFuts from List[Future[...]] to a Future[List[...]].

Await 对象是我们确保display 在调用线程上执行的方式——Await.result 只是强制当前线程等待它通过的未来完成.(这在内部使用托管阻塞.)

The Await object is how we can ensure that display is executed on the calling thread-- Await.result simply forces the current thread to wait until the future that it is passed is completed. (This uses managed blocking internally.)

这篇关于Scala 中的异步 IO 与期货的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆