Scala中的并行文件处理 [英] Parallel file processing in Scala

查看:78
本文介绍了Scala中的并行文件处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我需要并行处理给定文件夹中的文件。在Java中,我将创建一个 FolderReader 线程,以从该文件夹中读取文件名以及一个 FileProcessor 线程池。 FolderReader 读取文件名,并将文件处理功能( Runnable )提交给池执行器。

Suppose I need to process files in a given folder in parallel. In Java I would create a FolderReader thread to read file names from the folder and a pool of FileProcessor threads. FolderReader reads file names and submits the file processing function (Runnable) to the pool executor.

在Scala中,我看到两个选项:

In Scala I see two options:


  • 创建 FileProcessor actor并使用 Actors.Scheduler 安排文件处理功能。

  • 为每个文件名创建一个actor,同时读取文件名。

  • create a pool of FileProcessor actors and schedule a file processing function with Actors.Scheduler.
  • create an actor for each file name while reading the file names.

这有意义吗?最好的选择是什么?

Does it make sense? What is the best option?

推荐答案

我竭尽全力建议您尽可能地远离线程。幸运的是,我们拥有更好的抽象,可以处理下面发生的事情,在您看来,您似乎不需要(只要有可能)使用actor,但是可以使用更简单的抽象,称为Future。它们是Akka开源库的一部分,我认为将来也将成为Scala标准库的一部分。

I suggest with all my energies to keep as far as you can from the threads. Luckily we have better abstractions which take care of what's happening below, and in your case it appears to me that you do not need to use actors (while you can) but you can use a simpler abstraction, called Futures. They are a part of Akka open source library, and I think in the future will be a part of the Scala standard library as well.

A Future [T ]只是会在将来返回T的东西。

运行将来的所有操作就是拥有一个隐式的ExecutionContext,您可以从Java executor服务派生。然后,您将可以享受优雅的API,并了解到期货是一个单子,可以将集合转换为期货集合,收集结果等等。我建议您看一下 http://doc.akka .io / docs / akka / 2.0.1 / scala / futures.html

All you need to run a future, is to have an implicit ExecutionContext, which you can derive from a java executor service. Then you will be able to enjoy the elegant API and the fact that a future is a monad to transform collections into collections of futures, collect the result and so on. I suggest you to give a look to http://doc.akka.io/docs/akka/2.0.1/scala/futures.html

object TestingFutures {
  implicit val executorService = Executors.newFixedThreadPool(20)
  implicit val executorContext = ExecutionContext.fromExecutorService(executorService)

  def testFutures(myList:List[String]):List[String]= {

    val listOfFutures : Future[List[String]] = Future.traverse(myList){
      aString => Future{
                        aString.reverse
                       }
     }
    val result:List[String] = Await.result(listOfFutures,1 minute)
    result

  }
}

这里发生了很多事情:


  • 我正在使用 Future.traverse ,它接收的第一个参数是 M [T]<:Traversable [T] 并作为第二个参数 T => Future [T] 或如果您更喜欢 Function1 [T,Future [T]] 并返回Future [M [T]]

  • 我正在使用 Future.apply 方法创建类型为 Future [T]

  • I am using Future.traverse which receives as a first parameter which is M[T]<:Traversable[T] and as second parameter a T => Future[T] or if you prefer a Function1[T,Future[T]] and returns Future[M[T]]
  • I am using the Future.apply method to create an anonymous class of type Future[T]

还有很多其他理由可以考虑Akka期货。

There are many other reasons to look at Akka futures.


  • 可以映射期货,因为它们是monad,即可以链接期货执行:

  • Futures can be mapped because they are monad, i.e. you can chain Futures execution :

期货{3} .map {_ * 2} .map {_.toString}

期货具有回调:future.onComplete, onSuccess,onFailure和Then等。

Futures have callback: future.onComplete, onSuccess, onFailure, andThen etc.

未来不仅支持遍历,还支持理解

Futures support not only traverse, but also for comprehension

这篇关于Scala中的并行文件处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆