Scala中的并行文件处理 [英] Parallel file processing in Scala
问题描述
假设我需要并行处理给定文件夹中的文件。在Java中,我将创建一个 FolderReader
线程,以从该文件夹中读取文件名以及一个 FileProcessor
线程池。 FolderReader
读取文件名,并将文件处理功能( Runnable
)提交给池执行器。
Suppose I need to process files in a given folder in parallel. In Java I would create a FolderReader
thread to read file names from the folder and a pool of FileProcessor
threads. FolderReader
reads file names and submits the file processing function (Runnable
) to the pool executor.
在Scala中,我看到两个选项:
In Scala I see two options:
- 创建
FileProcessor
actor并使用Actors.Scheduler
安排文件处理功能。 - 为每个文件名创建一个actor,同时读取文件名。
- create a pool of
FileProcessor
actors and schedule a file processing function withActors.Scheduler
. - create an actor for each file name while reading the file names.
这有意义吗?最好的选择是什么?
Does it make sense? What is the best option?
推荐答案
我竭尽全力建议您尽可能地远离线程。幸运的是,我们拥有更好的抽象,可以处理下面发生的事情,在您看来,您似乎不需要(只要有可能)使用actor,但是可以使用更简单的抽象,称为Future。它们是Akka开源库的一部分,我认为将来也将成为Scala标准库的一部分。
I suggest with all my energies to keep as far as you can from the threads. Luckily we have better abstractions which take care of what's happening below, and in your case it appears to me that you do not need to use actors (while you can) but you can use a simpler abstraction, called Futures. They are a part of Akka open source library, and I think in the future will be a part of the Scala standard library as well.
A Future [T ]只是会在将来返回T的东西。
运行将来的所有操作就是拥有一个隐式的ExecutionContext,您可以从Java executor服务派生。然后,您将可以享受优雅的API,并了解到期货是一个单子,可以将集合转换为期货集合,收集结果等等。我建议您看一下 http://doc.akka .io / docs / akka / 2.0.1 / scala / futures.html
All you need to run a future, is to have an implicit ExecutionContext, which you can derive from a java executor service. Then you will be able to enjoy the elegant API and the fact that a future is a monad to transform collections into collections of futures, collect the result and so on. I suggest you to give a look to http://doc.akka.io/docs/akka/2.0.1/scala/futures.html
object TestingFutures {
implicit val executorService = Executors.newFixedThreadPool(20)
implicit val executorContext = ExecutionContext.fromExecutorService(executorService)
def testFutures(myList:List[String]):List[String]= {
val listOfFutures : Future[List[String]] = Future.traverse(myList){
aString => Future{
aString.reverse
}
}
val result:List[String] = Await.result(listOfFutures,1 minute)
result
}
}
这里发生了很多事情:
- 我正在使用
Future.traverse
,它接收的第一个参数是M [T]<:Traversable [T]
并作为第二个参数T => Future [T]
或如果您更喜欢Function1 [T,Future [T]]
并返回Future [M [T]] - 我正在使用
Future.apply
方法创建类型为Future [T] $ c的匿名类$ c>
- I am using
Future.traverse
which receives as a first parameter which isM[T]<:Traversable[T]
and as second parameter aT => Future[T]
or if you prefer aFunction1[T,Future[T]]
and returns Future[M[T]] - I am using the
Future.apply
method to create an anonymous class of typeFuture[T]
还有很多其他理由可以考虑Akka期货。
There are many other reasons to look at Akka futures.
-
可以映射期货,因为它们是monad,即可以链接期货执行:
Futures can be mapped because they are monad, i.e. you can chain Futures execution :
期货{3} .map {_ * 2} .map {_.toString}
期货具有回调:future.onComplete, onSuccess,onFailure和Then等。
Futures have callback: future.onComplete, onSuccess, onFailure, andThen etc.
未来不仅支持遍历,还支持理解
Futures support not only traverse, but also for comprehension
这篇关于Scala中的并行文件处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!