将导管与多个输入连接 [英] Fusing conduits with multiple inputs
问题描述
我正在尝试创建一个可以消耗多个输入流的管道。我需要能够按照没有特定顺序(例如,不交替)的方式等待输入流中的一个,使得zip无用。这里没有任何平行或不确定的事情发生:我在一个或另一个流中等待。我希望能够编写类似于以下内容的代码(其中 awaitA
和 awaitB
等待第一个或第二个输入:
do
_< - awaitA
x< - awaitA
y < - awaitB
yield(x,y)
_< - awaitB
_< - awaitB
y'< - awaitB
yield(x, y')
我的最佳解决方案是让内部monad具有另一个导管,例如
foo :: Sink i1(ConduitM i2 om)()
然后允许
awaitA =等待
awaitB =提升等待
这大部分工作。不幸的是,这似乎使得在外部导管完全连接之前融合到内部导管是非常困难的。我试过的第一件事是:
fuseInner :: Monad m =>
Conduit i2'm i2 - >
Sink i1(ConduitM i2 o m)() - >
Sink i1(ConduitM i2'om)()
fuseInner x = transPipe(x = $ =)
但是这不起作用,至少当 x 是有状态的,因为
(x = $ =)
多次运行,每次都有效地重新启动 x
。有没有什么方法可以写fuseInner,没有闯入管道的内部(看起来会非常混乱)?有没有更好的方法来处理多个输入流?我只是远远超出了设计的管道?
谢谢!
关键难点在于(x = $ =)
是一个纯函数,但为了让 transPipe
给出正确的答案,它需要一种有状态的函数类似的事情:
pre $ data $ StatefulMorph mn = StatefulMorph
{stepStatefulMorph :: forall a。 m a - > n(StatefulMorph mn,a)
,finalizeStatefulMorph :: n()}
code> StatefulMorph mn 在 m
中取值,返回 n
,该值和下一个 StatefulMorph
,它们应该用于转换下一个 m
值。最后一个 StatefulMorph
应该被终结(对于有状态(x = $ =)
)完成 x
管道。
Conduit融合可以实现为 StatefulMorph
,使用 pipeL
代码进行细微更改。签名是:
fuseStateful :: Monad m
=> Conduit amb
- > StatefulMorph(ConduitM bcm)(ConduitM acm)
我还需要替换 transPipe
(一个特殊情况 hoist
> StatefulMorph 值而不是函数。
class StatefulHoist t其中
statefulHoist ::(Monad m,Monad n)
=> StatefulMorph mn
- > tmr - > tnr
一个 StatefulHoist 实例用于
稍作修改。 ConduitM io
can使用<$的代码编写c $ c> transPipe
fuseInner
然后很容易
fuseInner :: Monad m
=>导管a m b
- > ConduitM i o(ConduitM b c m)r
- > ConduitM io(ConduitM acm)r
fuseInner left = statefulHoist(fuseStateful left)
I'已经在这里写了更详细的解释并发布了完整的代码这里。如果有人能想出一个更清洁的解决方案,或使用管道公共API的,请发布。
感谢您提供的所有建议和意见!
I am trying to create a conduit that can consume multiple input streams. I need to be able to await on one or the other of the input streams in no particular order (e.g., not alternating) making zip useless. There is nothing parallel or non-deterministic going on here: I await on one stream or the other. I want to be able to write code similar to the following (where awaitA
and awaitB
await on the first or second input stream respectively):
do
_ <- awaitA
x <- awaitA
y <- awaitB
yield (x,y)
_ <- awaitB
_ <- awaitB
y' <- awaitB
yield (x,y')
The best solution I have is to make the inner monad another conduit, e.g.
foo :: Sink i1 (ConduitM i2 o m) ()
Which then allows
awaitA = await
awaitB = lift await
And this mostly works. Unfortunately, this seems to make it very difficult to fuse to the inner conduit before the outer conduit is fully connected. The first thing I tried was:
fuseInner :: Monad m =>
Conduit i2' m i2 ->
Sink i1 (ConduitM i2 o m) () ->
Sink i1 (ConduitM i2' o m) ()
fuseInner x = transPipe (x =$=)
But this doesn't work, at least when x
is stateful since (x =$=)
is run multiple times, effectively restarting x
each time.
Is there any way to write fuseInner, short of breaking into the internals of conduit (which looks like it would be pretty messy)? Is there some better way to handle multiple input streams? Am I just way to far beyond what conduit was designed for?
Thanks!
This can be done by diving into the internals of conduit. I wanted to avoid this because it looked extremely messy. Based on the responses here, it sounds like there is no way around it (but I would really appreciate a cleaner solution).
The key difficulty is that (x =$=)
is a pure function, but to make transPipe
give the correct answer, it needs a kind of stateful, function-like thing:
data StatefulMorph m n = StatefulMorph
{ stepStatefulMorph :: forall a. m a -> n (StatefulMorph m n, a)
, finalizeStatefulMorph :: n () }
Stepping StatefulMorph m n
takes a value in m
and returns, in n
, both that value and the next StatefulMorph
, which should be used to transform the next m
value. The last StatefulMorph
should be finalized (which, in the case of the "stateful (x =$=)
", finalizes the x
conduit.
Conduit fusion can be implemented as a StatefulMorph
, using the code for pipeL
with minor changes. The signature is:
fuseStateful :: Monad m
=> Conduit a m b
-> StatefulMorph (ConduitM b c m) (ConduitM a c m)
I also need a replacement for transPipe
(a special case of hoist
) that uses StatefulMorph
values instead of functions.
class StatefulHoist t where
statefulHoist :: (Monad m, Monad n)
=> StatefulMorph m n
-> t m r -> t n r
A StatefulHoist
instance for ConduitM i o
can be written using the code for transPipe
with some minor changes.
fuseInner
is then easy to implement.
fuseInner :: Monad m
=> Conduit a m b
-> ConduitM i o (ConduitM b c m) r
-> ConduitM i o (ConduitM a c m) r
fuseInner left = statefulHoist (fuseStateful left)
I've written a more detailed explanation here and posted the full code here. If someone can come up with a cleaner solution, or one that uses the conduit public API, please post it.
Thanks for all the suggestions and input!
这篇关于将导管与多个输入连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!