groupBy的子流能否取决于它们从中生成的键? [英] Can the subflows of groupBy depend on the keys they were generated from ?

查看:101
本文介绍了groupBy的子流能否取决于它们从中生成的键?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个与用户相关的数据流。我还为每个用户提供了一个状态,可以从DB异步获取。

I have a flow with data associated to users. I also have a state for each user, that I can get asynchronously from DB.

我想将每个用户的流与一个子流分开,并为每个用户加载状态用户在实现子流时可以相对于这种状态对待子流的元素。

I want to separate my flow with one subflow per user, and load the state for each user when materializing the subflow, so that the elements of the subflow can be treated with respect to this state.

如果我不想在下游合并子流,则可以可以使用 groupBy Sink.lazyInit 来做某事:

If I don't want to merge the subflows downstream, I can do something with groupBy and Sink.lazyInit :

def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element, _] = ...

val treatByUser: Sink[Element] = Flow[Element].groupBy(
  Int.MaxValue, 
  getUserId
).to(
  Sink.lazyInit(
    elt => getState(getUserId(elt)).map(treatUser),
    ??? // this is never called, since the subflow is created when an element comes
  )
)

但是,如果 treatUser 成为 Flow ,因为 Sink.lazyInit

However, this does not work if treatUser becomes a Flow, since there is no equivalent for Sink.lazyInit.

由于 groupBy 的子流仅在推送新元素时才会实现,因此应该可以使用此元素来实现子流,但是我无法为groupBy改编源代码,以使其始终如一地工作。同样, Sink.lazyInit 似乎不容易翻译成 Flow 的情况。

Since subflows of groupBy are materialized only when a new element is pushed, it should be possible to use this element to materialize the subflow, but I wasn't able to adapt the source code for groupBy so that this work consistently. Likewise, Sink.lazyInitdoesn't seem to be easily translatable to the Flow case.

关于如何解决此问题的任何想法?

Any idea on how to solve this issue ?

推荐答案

您必须解决的相关Akka问题看是#20129:添加Sink.dynamic和Flow.dynamic

The relevant Akka issue you have to look at is #20129: add Sink.dynamic and Flow.dynamic.

在关联的PR中#20579 他们实际上实现了 LazySink 东西。

In the associated PR #20579 they actually implemented LazySink stuffs.

他们计划进行 LazyFlow 接下来:


下一个将执行具有相似签名的lazyFlow。

Will do next lazyFlow with similar signature.

不幸的是,您必须等待该功能在Akka中实现或自行编写(然后考虑对Akka进行PR)。

Unfortunately you have to wait for that functionality to be implemented in Akka or write it yourself (then consider a PR to Akka).

这篇关于groupBy的子流能否取决于它们从中生成的键?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆