map和mapAsync之间的区别 [英] Difference between map and mapAsync

查看:588
本文介绍了map和mapAsync之间的区别的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有人可以向我解释map和mapAsync w.r.t AKKA流之间的区别吗? 在文档中据说

Can anyone please explain me difference between map and mapAsync w.r.t AKKA stream? In the documentation it is said that


涉及基于外部非流
服务的流转换和副作用可以通过mapAsync或mapAsyncUnordered

Stream transformations and side effects involving external non-stream based services can be performed with mapAsync or mapAsyncUnordered

为什么我们不能简单地在这里映射?我假设Flow,Source,Sink本质上都是Monadic,因此map应该在这些属性的Delay上可以正常工作?

Why cant we simply us map here? I assume that Flow, Source, Sink all would be Monadic in nature and thus map should work fine w.r.t the Delay in the nature of these ?

推荐答案

签名

最好在签名 Flow.map 接受一个返回的函数类型 T Flow.mapAsync 接受一个返回类型 Future [T ]

The difference is best highlighted in the signatures: Flow.map takes in a function that returns a type T while Flow.mapAsync takes in a function that returns a type Future[T].

实际示例

作为示例,假设我们有一个函数根据用户ID在数据库中查询用户的全名:

As an example, suppose that we have a function which queries a database for a user's full name based on a user id:

type UserID   = String
type FullName = String

val databaseLookup : UserID => FullName = ???  //implementation unimportant

给出akka流 UserID 值,我们可以在流中使用 Flow.map 查询数据库并将全名打印到控制台:

Given an akka stream Source of UserID values we could use Flow.map within a stream to query the database and print the full names to the console:

val userIDSource : Source[UserID, _] = ???

val stream = 
  userIDSource.via(Flow[UserID].map(databaseLookup))
              .to(Sink.foreach[FullName](println))
              .run()

这种方法的局限性在于,该流只会在时间。这种串行查询将是一个瓶颈,并有可能阻止我们的流最大吞吐量。

One limitation of this approach is that this stream will only make 1 db query at a time. This serial querying will be a "bottleneck" and likely prevent maximum throughput in our stream.

我们可以尝试使用 Future 通过并发查询提高性能:

We could try to improve performance through concurrent queries using a Future:

def concurrentDBLookup(userID : UserID) : Future[FullName] = 
  Future { databaseLookup(userID) }

val concurrentStream = 
  userIDSource.via(Flow[UserID].map(concurrentDBLookup))
              .to(Sink.foreach[Future[FullName]](_ foreach println))
              .run()

这个简单的附录的问题在于我们已经有效地消除了背压。

The problem with this simplistic addendum is that we have effectively eliminated backpressure.

接收器刚刚拉入未来,并添加了 foreach println ,与数据库查询相比,它相对较快。该流将不断将需求传播到源,并在 Flow.map 内产生更多的期货。因此,并发运行的 databaseLookup 的数量没有限制。不受约束的并行查询最终可能会使数据库超载。

The Sink is just pulling in the Future and adding a foreach println, which is relatively fast compared to database queries. The stream will continuously propagate demand to the Source and spawn off more Futures inside of the Flow.map. Therefore, there is no limit to the number of databaseLookup running concurrently. Unfettered parallel querying could eventually overload the database.

Flow.mapAsync ;我们可以同时进行数据库访问,同时限制同时查找的次数:

Flow.mapAsync to the rescue; we can have concurrent db access while at the same time capping the number of simultaneous lookups:

val maxLookupCount = 10

val maxLookupConcurrentStream = 
  userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
              .to(Sink.foreach[FullName](println))
              .run()

还要注意 Sink.foreach 变得更简单了,它不再使用 Future [FullName] 而是只使用 FullName

Also notice that the Sink.foreach got simpler, it no longer takes in a Future[FullName] but just a FullName instead.

无序异步映射

如果不需要保持UserID到FullName的顺序排序,则您可以使用 Flow.mapAsyncUnordered 。例如:您只需要将所有名称打印到控制台上,而不必关心它们的打印顺序。

If maintaining a sequential ordering of the UserIDs to FullNames is unnecessary then you can use Flow.mapAsyncUnordered. For example: you just need to print all of the names to the console but didn't care about order they were printed.

这篇关于map和mapAsync之间的区别的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆