Akka流:读取多个文件 [英] Akka streams: Reading multiple files

查看:105
本文介绍了Akka流:读取多个文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个文件列表。我要:

I have a list of files. I want:


  1. 要将所有文件作为单个源读取。

  2. 文件应为按顺序阅读。 (无循环)

  3. 在任何时候都不应要求任何文件完全位于内存中。

  4. 从文件读取错误应该崩溃

  1. To read from all of them as a single Source.
  2. Files should be read sequentially, in-order. (no round-robin)
  3. At no point should any file be required to be entirely in memory.
  4. An error reading from a file should collapse the stream.

感觉应该可行:(Scala,akka-streams v2.4.7)

It felt like this should work: (Scala, akka-streams v2.4.7)

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
    .map(bs => bs.utf8String)
  )
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines

但这会导致编译错误,因为 FileIO 具有与之关联的物化值,而 Source.combine 不支持该值。

But that results in a compile error since FileIO has a materialized value associated with it, and Source.combine doesn't support that.

映射物化值使我想知道如何处理文件读取错误,但是ile:

Mapping the materialized value away makes me wonder how file-read errors get handled, but does compile:

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
    .map(bs => bs.utf8String)
    .mapMaterializedValue(f => NotUsed.getInstance())
  )
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _))  // counting lines

但是在运行时抛出IllegalArgumentException:

But throws an IllegalArgumentException at runtime:

java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]


推荐答案

下面的代码并不那么简洁,以便清楚地模块化不同的关注点。

The code below is not as terse as it could be, in order to clearly modularize the different concerns.

// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String)

// given as stream of Paths we read those files and count the number of lines
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right)

// Here's our test data source (replace paths with real paths)
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath))

// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes
testFiles runWith lineCounter foreach println

这篇关于Akka流:读取多个文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆