如何使用反应性流进行NIO二进制处理? [英] How to use Reactive Streams for NIO binary processing?

查看:81
本文介绍了如何使用反应性流进行NIO二进制处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否存在一些使用Java NIO使用 org.reactivestreams 库处理大型数据流的代码示例(为了获得高性能)?我的目标是分布式处理,因此使用Akka的示例将是最好的,但我可以弄清楚.

在Scala中读取文件的大多数(我希望不是全部)示例似乎仍然使用Source(非二进制)或直接Java NIO(甚至包括Files.readAllBytes之类的东西!)

也许我错过了一个激活器模板? (使用Scala的Akka Streams!可以满足我除二进制文件之外的所有需求/NIO端)

解决方案

请勿使用scala.collection.immutable.Stream消耗此类文件,原因是它执行备忘录功能-也就是说,虽然它很懒,但它会保留整个文件流在内存中缓冲(已存储)!

这绝对不是在考虑流处理文件"时想要的. Scala的Stream之所以这样工作的原因是,在功能设置中它具有完整的意义-例如,由于此,您可以避免一次又一次轻松地计算fibbonachi数,例如,有关更多详细信息,请参见

有关使用带有Akka流的IO 请注意,这是针对Akka的当前版本,因此是2.5.x系列.

希望这会有所帮助!

Are there some code examples of using org.reactivestreams libraries to process large data streams using Java NIO (for high performance)? I'm aiming at distributed processing, so examples using Akka would be best, but I can figure that out.

It still seems to be the case that most (I hope not all) examples of reading files in scala resort to Source (non-binary) or direct Java NIO (and even things like Files.readAllBytes!)

Perhaps there is an activator template I've missed? (Akka Streams with Scala! is close addressing everything I need except the binary/NIO side)

解决方案

Do not use scala.collection.immutable.Stream to consume files like this, the reason being that it performs memoization - that is, while yes it is lazy it will keep the entire stream buffered (memoized) in memory!

This is definitely not what you want when you think about "stream processing a file". The reason Scala's Stream works like this is because in a functional setting it makes complete sense - you can avoid calculating fibbonachi numbers again and again easily thanks to this for example, for more details see the ScalaDoc.

Akka Streams provides Reactive Streams implementations and provides a FileIO class that you could use here (it will properly back-pressure and pull the data out of the file only when needed and the rest of the stream is ready to consume it):

import java.io._
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Sink, Source }

object ExampleApp extends App {


  implicit val sys = ActorSystem()
  implicit val mat = FlowMaterializer()

  FileIO.fromPath(Paths.get("/example/file.txt"))
    .map(c ⇒ { print(c); c })
    .runWith(Sink.onComplete(_ ⇒ { f.close(); sys.shutdown() } ))
}

Here are more docs about working with IO with Akka Streams Note that this is for the current-as-of writing version of Akka, so the 2.5.x series.

Hope this helps!

这篇关于如何使用反应性流进行NIO二进制处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆