我如何“拆分"?fs2 中的流? [英] How do I "split" a stream in fs2?
问题描述
我想做这样的事情:
def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) =
(stream, stream.map(split)
但这不起作用,因为它从源拉"了两次 - 当我同时排出 stream
和 stream.map(split)
时,每次一次.我如何防止这种情况?以某种方式通过维护我自己的内部缓冲区切换到基于推送"的模型,这样我就不会拉两次?
But this does not work as it "pulls" from the source twice - once each when I drain both stream
and stream.map(split)
. How do I prevent this? Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?
推荐答案
通过维护我自己的内部缓冲区以某种方式切换到基于推送"的模型,这样我就不会拉两次?
Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?
是的.例如,您可以使用 fs2 中的队列:
Yes. E.g., you can use a queue from fs2:
def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] =
for {
q <- Queue.noneTerminated[F, A]
} yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)), q.dequeue.map(split))
当然,这里的问题是,如果调用者忽略任何一个流,另一个将死锁并且永远不会发出任何内容.这通常是您在尝试将一个流分成多个流时遇到的问题,并且保证每个子流中都出现一个值,而不管它何时被订阅.
Of course, here the problem is that if a caller ignores either stream, the other one will deadlock and never emit anything. This is generally the issue you run into when trying to make a stream into several ones, and have a value guaranteed to appear in each substream irrespective of when it's subscribed to.
我通常采用的解决方案是组合更大的动作并使用诸如 broadcast
或 parJoin
之类的运算符:
The solution I usually go for is to combine larger actions and use operators like broadcast
or parJoin
:
def splitAndRun[F[_]: Concurrent, A](
base: Stream[F, A],
runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
): F[Unit] =
base.broadcastTo(run: _*).compile.drain
在这里,您知道将有多少消费者,因此首先不会有被忽略的流.
Here, you know how many consumers you are going to have, so there will not be an ignored stream in the first place.
这篇关于我如何“拆分"?fs2 中的流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!