转换CompletableFuture< Stream< T>到发布商< T&gt ;? [英] Is it correct to convert a CompletableFuture<Stream<T>> to a Publisher<T>?

查看:124
本文介绍了转换CompletableFuture< Stream< T>到发布商< T&gt ;?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为了允许对CompletableFuture<Stream<String>>的结果流进行多次迭代,我正在考虑以下方法之一:

To allow multiple iterations on the resulting stream from a CompletableFuture<Stream<String>> I am considering one of the following approaches:

  1. 通过以下方式将结果转换为CompletableFuture<List<String>>:teams.thenApply(st -> st.collect(toList()))

使用缓存:Flux.fromStream(teams::join).cache();

Flux<T>Publisher<T>在项目反应堆中的实现.

Flux<T> is the implementation of Publisher<T> in project reactor.

用例:

我想从一个数据源获取一个包含英超联赛球队名称(例如Stream<String>)的序列,该数据源为一个League对象提供一个Standing[](基于足球数据RESTful API,例如 http://api.football-data.org/v1/soccerseasons/445/leagueTable ).使用AsyncHttpClientGson,我们可以:

I would like to get a sequence with the premier league teams names (e.g. Stream<String>) from a data source which provides a League object with a Standing[] (based on football-data RESTful API, e.g. http://api.football-data.org/v1/soccerseasons/445/leagueTable). Using AsyncHttpClient and Gson we have:

CompletableFuture<Stream<String>> teams = asyncHttpClient
    .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
    .execute()
    .toCompletableFuture()
    .thenApply(Response::getResponseBody)
    .thenApply(body -> gson.fromJson(body, League.class));
    .thenApply(l -> stream(l.standings).map(s -> s.teamName));

要重用生成的流,我有两个选择:

To re-use the resulting stream I have two options:

1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))

2. Flux<String> res = Flux.fromStream(teams::join).cache()

Flux<T>不太冗长,可以提供我需要的所有内容.但是,在这种情况下使用它是否正确?

Flux<T> is less verbose and provides all that I need. Yet, is it correct to use it in this scenario?

还是应该改用CompletableFuture<List<String>>?还是有其他更好的选择?

Or should I use CompletableFuture<List<String>> instead? Or is there any other better alternative?

已更新一些想法(2018-03-16):

CompletableFuture<List<String>>:

  • [PROS] List<String>将连续收集,当我们需要进行将来的结果处理时,也许它已经完成了.
  • [CONS]声明详细程度.
  • [CONS]如果我们只想使用一次,那么我们就不需要将这些物品收集在List<T>中.
  • [PROS] The List<String> will be collected in a continuation and when we need to proceed with the result of the future, maybe it is already completed.
  • [CONS] Declaration verbosity.
  • [CONS] If we just want to use it once, then we did not need to collect those items in a List<T>.

Flux<String>:

  • [PROS]声明简洁性
  • [PROS]如果我们只想使用一次,则可以省略.cache()并将其转发到下一层,这可以利用反应性API的优势,例如卷筒磁通无功控制器,例如@GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…}
  • [CONS]如果要重用Flux<T>,我们必须将其包装在可缓存的Flux<T>(….cache())中,这反过来会增加第一次遍历的开销,因为它必须存储结果项在内部缓存中.
  • [PROS] Declaration conciseness
  • [PROS] If we just want to use it once, then we can omit .cache() and forward it to the next layer, which can take advantage of the reactive API, e.g. web flux reactive controller, e.g. @GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…}
  • [CONS] If we want to reuse that Flux<T> we have to wrap it in a cacheable Flux<T> (….cache()) which in turn will add overhead on the first traversal, because it has to store the resulting items in an internal cache.

推荐答案

    CompletableFuture<Stream<String>> teams = ...;
    Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));

Flux.fromStream(teams::join)是一种代码异味,因为它持有一个线程以从运行在另一个线程上的CompletableFuture中获取结果.

Flux.fromStream(teams::join) is a code smell because it's holding a thread to fetch the result from CompletableFuture which is running on another thread.

这篇关于转换CompletableFuture&lt; Stream&lt; T&gt;到发布商&lt; T&gt ;?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆