我如何“拆分"?fs2 中的流? [英] How do I "split" a stream in fs2?

查看:37
本文介绍了我如何“拆分"?fs2 中的流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想做这样的事情:

def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) = 
  (stream, stream.map(split)

但这不起作用,因为它从源拉"了两次 - 当我同时排出 streamstream.map(split) 时,每次一次.我如何防止这种情况?以某种方式通过维护我自己的内部缓冲区切换到基于推送"的模型,这样我就不会拉两次?

But this does not work as it "pulls" from the source twice - once each when I drain both stream and stream.map(split). How do I prevent this? Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?

推荐答案

通过维护我自己的内部缓冲区以某种方式切换到基于推送"的模型,这样我就不会拉两次?

Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?

是的.例如,您可以使用 fs2 中的队列:

Yes. E.g., you can use a queue from fs2:

def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] = 
  for {
    q <- Queue.noneTerminated[F, A]
  } yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)), q.dequeue.map(split))

当然,这里的问题是,如果调用者忽略任何一个流,另一个将死锁并且永远不会发出任何内容.这通常是您在尝试将一个流分成多个流时遇到的问题,并且保证每个子流中都出现一个值,而不管它何时被订阅.

Of course, here the problem is that if a caller ignores either stream, the other one will deadlock and never emit anything. This is generally the issue you run into when trying to make a stream into several ones, and have a value guaranteed to appear in each substream irrespective of when it's subscribed to.

我通常采用的解决方案是组合更大的动作并使用诸如 broadcastparJoin 之类的运算符:

The solution I usually go for is to combine larger actions and use operators like broadcast or parJoin:

def splitAndRun[F[_]: Concurrent, A](
  base: Stream[F, A],
  runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
): F[Unit] =
  base.broadcastTo(run: _*).compile.drain

在这里,您知道将有多少消费者,因此首先不会有被忽略的流.

Here, you know how many consumers you are going to have, so there will not be an ignored stream in the first place.

这篇关于我如何“拆分"?fs2 中的流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆