在Akka流中从Future创建背压 [英] Create backpressure from a Future inside an Akka stream

查看:87
本文介绍了在Akka流中从Future创建背压的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Akka流和流的新手,所以我可能在概念层面上完全误解了一些东西,但是在将来解决之前,有什么方法可以产生背压?基本上我想做的是这样的:

I'm new to Akka streams and streams in general so I might have completely misunderstood something at a conceptual level, but is there any way I can create backpressure until a future resolves? Essentially what I want to do is like this:

object Parser {
    def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .map(values => doAsyncOp(values))
  .runWith(Sink.seq)

def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???

从文件中读取字节并将其传输到解析器中,解析器发出 Seq ExampleObject s,然后将这些流传输到异步操作,该操作返回 Future 。我要这样做,以便在 Future 解析之前,其余流将受到反压,然后在Future解析后继续恢复,并传递另一个 Seq [ExampleObject] doAsyncOp ,将恢复背压,依此类推。

Bytes are read from a file and streamed to the parser, which emits Seqs of ExampleObjects, and those are streamed to an async operation that returns a Future. I want to make it so that until the Future resolves, the rest of the stream gets backpressured, then resumes once the Future is resolved, passing another Seq[ExampleObject] to doAsyncOp, which resumes the backpressure and so on.

右现在我可以使用此方法了:

Right now I've got this working with:

Await.result(doAsyncOp(values), 10 seconds)

但是我的理解是,这会锁定整个线程,而且很糟糕。有什么更好的方法吗?

But my understanding is that this locks up the entire thread and is bad. Is there any better way around it?

如果有帮助,总的来说是我正在尝试解析一个非常大的JSON文件(太大而无法容纳在内存中) )与Jawn进行逐块处理,然后将对象解析时将其传递给ElasticSearch进行索引-ElasticSearch拥有50个待处理操作的队列,如果该操作溢出,则会开始拒绝新对象。

If it helps, the big picture is that I'm trying to parse an extremely large JSON file (too big to fit in memory) chunk-by-chunk with Jawn, then pass objects to ElasticSearch to be indexed as they're parsed - ElasticSearch has a queue of 50 pending operations, if that overflows it starts rejecting new objects.

推荐答案

这很容易。您需要使用 mapAync :)

It's quite easy. You need to use mapAync :)

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .mapAsync(4)(values => doAsyncOp(values))
  .runWith(Sink.seq)

其中 4 是并行度。

这篇关于在Akka流中从Future创建背压的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆