将导管与多个输入连接 [英] Fusing conduits with multiple inputs

查看:85
本文介绍了将导管与多个输入连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个可以消耗多个输入流的管道。我需要能够按照没有特定顺序(例如,不交替)的方式等待输入流中的一个,使得zip无用。这里没有任何平行或不确定的事情发生:我在一个或另一个流中等待。我希望能够编写类似于以下内容的代码(其中 awaitA awaitB 等待第一个或第二个输入:

  do 
_< - awaitA
x< - awaitA
y < - awaitB
yield(x,y)
_< - awaitB
_< - awaitB
y'< - awaitB
yield(x, y')

我的最佳解决方案是让内部monad具有另一个导管,例如

  foo :: Sink i1(ConduitM i2 om)()

然后允许

  awaitA =等待
awaitB =提升等待

这大部分工作。不幸的是,这似乎使得在外部导管完全连接之前融合到内部导管是非常困难的。我试过的第一件事是:

  fuseInner :: Monad m => 
Conduit i2'm i2 - >
Sink i1(ConduitM i2 o m)() - >
Sink i1(ConduitM i2'om)()
fuseInner x = transPipe(x = $ =)

但是这不起作用,至少当 x 是有状态的,因为(x = $ =)多次运行,每次都有效地重新启动 x 。有没有什么方法可以写fuseInner,没有闯入管道的内部(看起来会非常混乱)?有没有更好的方法来处理多个输入流?我只是远远超出了设计的管道?



谢谢!

解决方案

这个可以通过潜入管道的内部来完成。我想避免这个,因为它看起来非常混乱。根据这里的回答,这听起来像没有办法解决它(但我真的很感谢一个更清洁的解决方案)。



关键难点在于(x = $ =)是一个纯函数,但为了让 transPipe 给出正确的答案,它需要一种有状态的函数类似的事情:

pre $ data $ StatefulMorph mn = StatefulMorph
{stepStatefulMorph :: forall a。 m a - > n(StatefulMorph mn,a)
,finalizeStatefulMorph :: n()}

code> StatefulMorph mn 在 m 中取值,返回 n ,该值和下一个 StatefulMorph ,它们应该用于转换下一个 m 值。最后一个 StatefulMorph 应该被终结(对于有状态(x = $ =))完成 x 管道。



Conduit融合可以实现为 StatefulMorph ,使用 pipeL 代码进行细微更改。签名是:

  fuseStateful :: Monad m 
=> Conduit amb
- > StatefulMorph(ConduitM bcm)(ConduitM acm)

我还需要替换 transPipe (一个特殊情况 hoist > StatefulMorph 值而不是函数。

  class StatefulHoist t其中
statefulHoist ::(Monad m,Monad n)
=> StatefulMorph mn
- > tmr - > tnr

一个 StatefulHoist 实例用于 ConduitM io can使用<$的代码编写c $ c> transPipe 稍作修改。

fuseInner 然后很容易

  fuseInner :: Monad m 
=>导管a m b
- > ConduitM i o(ConduitM b c m)r
- > ConduitM io(ConduitM acm)r
fuseInner left = statefulHoist(fuseStateful left)



I'已经在这里写了更详细的解释并发布了完整的代码这里。如果有人能想出一个更清洁的解决方案,或使用管道公共API的,请发布。



感谢您提供的所有建议和意见!


I am trying to create a conduit that can consume multiple input streams. I need to be able to await on one or the other of the input streams in no particular order (e.g., not alternating) making zip useless. There is nothing parallel or non-deterministic going on here: I await on one stream or the other. I want to be able to write code similar to the following (where awaitA and awaitB await on the first or second input stream respectively):

do
  _ <- awaitA
  x <- awaitA
  y <- awaitB
  yield (x,y)
  _ <- awaitB
  _ <- awaitB
  y' <- awaitB
  yield (x,y')

The best solution I have is to make the inner monad another conduit, e.g.

foo :: Sink i1 (ConduitM i2 o m) ()

Which then allows

awaitA = await
awaitB = lift await

And this mostly works. Unfortunately, this seems to make it very difficult to fuse to the inner conduit before the outer conduit is fully connected. The first thing I tried was:

fuseInner :: Monad m =>
                Conduit i2' m i2 -> 
                Sink i1 (ConduitM i2 o m) () -> 
                Sink i1 (ConduitM i2' o m) ()
fuseInner x = transPipe (x =$=)

But this doesn't work, at least when x is stateful since (x =$=) is run multiple times, effectively restarting x each time.

Is there any way to write fuseInner, short of breaking into the internals of conduit (which looks like it would be pretty messy)? Is there some better way to handle multiple input streams? Am I just way to far beyond what conduit was designed for?

Thanks!

解决方案

This can be done by diving into the internals of conduit. I wanted to avoid this because it looked extremely messy. Based on the responses here, it sounds like there is no way around it (but I would really appreciate a cleaner solution).

The key difficulty is that (x =$=) is a pure function, but to make transPipe give the correct answer, it needs a kind of stateful, function-like thing:

data StatefulMorph m n = StatefulMorph
    { stepStatefulMorph :: forall a. m a -> n (StatefulMorph m n, a)
    , finalizeStatefulMorph :: n () }

Stepping StatefulMorph m n takes a value in m and returns, in n, both that value and the next StatefulMorph, which should be used to transform the next m value. The last StatefulMorph should be finalized (which, in the case of the "stateful (x =$=)", finalizes the x conduit.

Conduit fusion can be implemented as a StatefulMorph, using the code for pipeL with minor changes. The signature is:

fuseStateful :: Monad m
             => Conduit a m b
             -> StatefulMorph (ConduitM b c m) (ConduitM a c m)

I also need a replacement for transPipe (a special case of hoist) that uses StatefulMorph values instead of functions.

class StatefulHoist t where
    statefulHoist :: (Monad m, Monad n)
                  => StatefulMorph m n
                  -> t m r -> t n r

A StatefulHoist instance for ConduitM i o can be written using the code for transPipe with some minor changes.

fuseInner is then easy to implement.

fuseInner :: Monad m
          => Conduit a m b
          -> ConduitM i o (ConduitM b c m) r
          -> ConduitM i o (ConduitM a c m) r
fuseInner left = statefulHoist (fuseStateful left)

I've written a more detailed explanation here and posted the full code here. If someone can come up with a cleaner solution, or one that uses the conduit public API, please post it.

Thanks for all the suggestions and input!

这篇关于将导管与多个输入连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆