使用Akka Streams读取大文件 [英] Reading a large file using Akka Streams

查看:127
本文介绍了使用Akka Streams读取大文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试Akka流,这是我的一小段内容:

I'm trying out Akka Streams and here is a short snippet that I have:

  override def main(args: Array[String]) {
    val filePath = "/Users/joe/Softwares/data/FoodFacts.csv"//args(0)

    val file = new File(filePath)
    println(file.getAbsolutePath)
    // read 1MB of file as a stream
    val fileSource = SynchronousFileSource(file, 1 * 1024 * 1024)
    val shaFlow = fileSource.map(chunk => {
      println(s"the string obtained is ${chunk.toString}")
    })
    shaFlow.to(Sink.foreach(println(_))).run // fails with a null pointer

    def sha256(s: String) = {
      val  messageDigest = MessageDigest.getInstance("SHA-256")
      messageDigest.digest(s.getBytes("UTF-8"))
    }
  }

当我运行此代码段时,我得到:

When I ran this snippet, I get:

Exception in thread "main" java.lang.NullPointerException
    at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:365)
    at com.test.api.consumer.DataScienceBoot$.main(DataScienceBoot.scala:30)
    at com.test.api.consumer.DataScienceBoot.main(DataScienceBoot.scala)

在我看来,它不是fileSource只是空的吗?为什么是这样?有任何想法吗? FoodFacts.csv如果大小为40MB,而我想做的就是创建1MB的数据流!

It seems to me that it is not fileSource is just empty? Why is this? Any ideas? The FoodFacts.csv if 40MB in size and all I'm trying to do is to create a 1MB stream of data!

即使使用defaultChunkSize为8192也不起作用!

Even using the defaultChunkSize of 8192 did not work!

推荐答案

不推荐使用 1.0 。如果可以的话,使用 2.x

Well 1.0 is deprecated. And if you can, use 2.x.

当我尝试使用 2.0时。使用 FileIO.fromFile(file)而不是 SynchronousFileSource 的1 版本,它是一个消息编译失败,空指针失败。这仅仅是因为它的作用域没有 ActorMaterializer 。包括它在内,使其可以工作:

When I try with 2.0.1 version by using FileIO.fromFile(file) instead of SynchronousFileSource, it is a compile failure with message fails with null pointer. This was simply because it didnt have ActorMaterializer in scope. Including it, makes it work:

object TImpl extends App {
import java.io.File

  implicit val system = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  val file = new File("somefile.csv")
  val fileSource = FileIO.fromFile(file,1 * 1024 * 1024 )
  val shaFlow: Source[String, Future[Long]] = fileSource.map(chunk => {
    s"the string obtained is ${chunk.toString()}"
  })

  shaFlow.runForeach(println(_))    
}

此功能适用于任何大小的文件。有关调度程序配置的更多信息,请参考此处

This works for file of any size. For more information on configuration of dispatcher, refer here.

这篇关于使用Akka Streams读取大文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆