具有下游状态的惯性双向管道没有损失 [英] Idiomatic bidirectional Pipes with downstream state without loss

查看:81
本文介绍了具有下游状态的惯性双向管道没有损失的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有简单的生产者/消费者模式,消费者希望将某些国家传递给生产者。例如,让下游流动的对象成为我们想要写入文件的对象,而上游对象是表示对象写入文件的位置的某个标记(例如偏移量)。

这两个过程可能如下所示(使用 pipes-4.0 ),

  { - #LANGUAGE GeneralizedNewtypeDivingiving# - } 

导入管道
导入Pipes.Core
导入Control.Monad.Trans .State
import Control.Monad

newtype Object = Obj Int
deriving(Show)

newtype ObjectId = ObjId Int
deriving( Show,Num)

writeObjects :: Proxy ObjectId Object()X IO r
writeObjects = evalStateT(forever go)(ObjId 0)
where go = do i < - get
obj< - lift $ request i
lift $ lift $ putStrLn $Wrote++ show obj
modify(+1)

produceObjects :: [对象] - > Proxy X()ObjectId Object IO()
produceObjects = go
where go [] = return()
go(obj:rest)= do
lift $ putStrLn $生产++ show obj
objId< - 响应obj
lift $ putStrLn $Object++ show obj ++has ID++ show objId
go rest

objects = [Obj i |我< - [0..10]]

一些关于如何构成它们的难点推理。理想情况下,我们需要基于推送的控制流,如下所示:


  1. writeObjects 首先在请求上阻塞,发送了初始 ObjId 0 上游。

  2. produceObjects 发送第一个对象, Obj 0 ,下游

  3. writeObjects 写入对象并递增其状态,并等待 request ,这次发送 ObjId 1 上游

  4. 响应 in produceObjects 在步骤(2)中继续使用 ObjId 0

  5. produceObjects 第二个对象, Obj 1

我最初的尝试是使用基于推送的构图如下所示:

  main = void $ run $ produceObjects对象>>〜const writeObjects 

请注意使用 const 来解决否则不兼容的问题pes(这可能是问题所在)。然而,在这种情况下,我们发现 ObjId 0 被吃掉,

 生成对象0 
写入对象0
对象对象0有ID对象1
生成对象1
...

一种基于拉的方法,

  main = void $ run $ const(produceObjects objects)+>> writeObjects 

存在类似的问题,这次下降 Obj 0

如何以合适的方式创作这些作品?

选择使用哪种组合取决于哪​​个组件应该启动整个过程。如果您希望下游管道启动进程,那么您希望使用基于拉的组合(即(> +>) / (+ >>)),但是如果您希望上游管道启动进程,那么您应该使用基于推送的组合(即(>>〜) / (大于〜>))。你得到的类型错误实际上是警告你在你的代码中存在一个逻辑错误:你没有清楚地确定哪个组件首先启动了这个过程。

从你的描述很显然,您希望控制流从 produceObjects 开始,因此您希望使用基于推送的组合。一旦使用基于推送的组合,组合操作符的类型将告诉您需要了解的有关如何修复代码的所有信息。我将采取它的类型,并将其专门化到您的合成链中:

   - 这里我使用`Server`和客户端类型同义词简化类型
(>>〜):: Server ObjectId对象IO()
- > (Object - > Client ObjectId Object IO())
- >效果IO()

正如你已经注意到的,当你试图使用(>>〜)告诉您,您的类型的参数缺少 Object writeObjects 函数。这会静态地强制您在接收第一个 Object (通过初始参数)之前,无法在 writeObjects 中运行任何代码。



解决方法是像这样重写 writeObjects 函数:

  writeObjects :: Object  - > Proxy ObjectId Object()X IO r 
writeObjects obj0 = evalStateT(go obj0)(ObjId 0)
where obj = do i < - get
lift $ lift $ putStrLn $写入++ show obj
modify(+1)
obj'< - lift $ request i
go obj'

然后给出正确的行为:

 >>>运行$ produceObjects对象>>〜writeObjects 
生成对象0
写入对象0
对象对象0有ID对象0
生成对象1
写入对象1
Object Obj 1有ID ObjId 1
产生Obj 2
产生Obj 2
Object Obj 2产生ID ObjId 2
产生Obj 3
产生Obj 3
Object Obj 3有ID ObjId 3
产生Obj 4
产生Obj 4
Object Obj 4产生ID ObjId 4
产生Obj 5
产生Obj 5
Object Obj 5有ID ObjId 5
产生Obj 6
产生Obj 6
Object Obj 6产生ID ObjId 6
产生Obj 7
产生Obj 7
Object Obj 7有ID ObjId 7
产生Obj 8
产生Obj 8
Object Obj 8产生ID ObjId 8
产生Obj 9
产生Obj 9
Object Obj 9有ID ObjId 9
产生Obj 10
写入Obj 10
Object Obj 10有ID ObjId 10

您可能想知道为什么这个需求是两个管道中的一个初始参数是合理的,除了抽象的ju这是类别法律所要求的。简单的英语解释是,替代方法是您需要缓冲 writeObjects Object $ c>达到了它的第一个请求语句。这种方法会产生很多有问题的行为和错误的角落案例,但最重要的问题可能是管道组合不再是联想性的,并且效果的顺序会根据您编写事物的顺序而改变。



双向管道组成运算符的好处是类型能够发挥作用,以便始终可以推导组件是活动(即启动控制)还是被动(即等待输入)纯粹通过研究类型。如果组合表示某个管道(如 writeObjects )必须有一个参数,那么它是被动的。如果它没有参数(例如 produceObjects ),那么它是活动的并启动控制。所以组合会迫使你在你的管道中最多只有一个活动管道(不接受初始参数的管道),这就是开始控制的管道。


Say I have simple producer/consumer model where the consumer wants to pass back some state to the producer. For instance, let the downstream-flowing objects be objects we want to write to a file and the upstream objects be some token representing where the object was written in the file (e.g. an offset).

These two processes might look something like this (with pipes-4.0),

{-# LANGUAGE GeneralizedNewtypeDeriving #-}

import Pipes
import Pipes.Core
import Control.Monad.Trans.State       
import Control.Monad

newtype Object = Obj Int
               deriving (Show)

newtype ObjectId = ObjId Int
                 deriving (Show, Num)

writeObjects :: Proxy ObjectId Object () X IO r
writeObjects = evalStateT (forever go) (ObjId 0)
  where go = do i <- get
                obj <- lift $ request i
                lift $ lift $ putStrLn $ "Wrote "++show obj
                modify (+1)

produceObjects :: [Object] -> Proxy X () ObjectId Object IO ()
produceObjects = go
  where go [] = return ()
        go (obj:rest) = do
            lift $ putStrLn $ "Producing "++show obj
            objId <- respond obj
            lift $ putStrLn $ "Object "++show obj++" has ID "++show objId
            go rest

objects = [ Obj i | i <- [0..10] ]

Simple as this might be, I've had a fair bit of difficulty reasoning about how to compose them. Ideally, we'd want a push-based flow of control like the following,

  1. writeObjects starts by blocking on request, having sent the initial ObjId 0 upstream.
  2. produceObjects sends the first object, Obj 0, downstream
  3. writeObjects writes the object and increments its state, and waits on request, this time sending ObjId 1 upstream
  4. respond in produceObjects returns with ObjId 0
  5. produceObjects continues at Step (2) with the second object, Obj 1

My initial attempt was with push-based composition as follows,

main = void $ run $ produceObjects objects >>~ const writeObjects

Note the use of const to work around the otherwise incompatible types (this is likely where the problem lies). In this case, however, we find that ObjId 0 gets eaten,

Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 1
Producing Obj 1
...

A pull-based approach,

main = void $ run $ const (produceObjects objects) +>> writeObjects

suffers a similar issue, this time dropping Obj 0.

How might one go about composing these pieces in the desired manner?

解决方案

The choice of which composition to use depends on which component should initiate the entire process. If you want the downstream pipe to initiate the process then you want to use pull-based composition (i.e. (>+>)/(+>>)) but if you want the upstream pipe to initiate the process then you should use push-based composition (i.e. (>>~)/(>~>)). The type errors you got were actually warning you that there is a logical error in your code: you haven't clearly established which component initiates the process first.

From your description, it's obvious that you want control flow to begin from produceObjects so you want to use push-based composition. Once you use push-based composition, the type of the composition operator will tell you everything you need to know about how to fix your code. I'll take its type and specialize it to your composition chain:

-- Here I'm using the `Server` and `Client` type synonyms to simplify the types
(>>~) :: Server ObjectId Object IO ()
      -> (Object -> Client ObjectId Object IO ())
      -> Effect IO ()

As you already noticed, the type error you got when you tried to use (>>~) told you that you were missing an argument of type Object to your writeObjects function. This statically enforces that you cannot run any code in writeObjects before receiving your first Object (through the initial argument).

The solution is to rewrite your writeObjects function like this:

writeObjects :: Object -> Proxy ObjectId Object () X IO r
writeObjects obj0 = evalStateT (go obj0) (ObjId 0)
  where go obj = do i <- get
                    lift $ lift $ putStrLn $ "Wrote "++ show obj
                    modify (+1)
                    obj' <- lift $ request i
                    go obj'

This then gives the correct behavior:

>>> run $ produceObjects objects >>~ writeObjects
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 0
Producing Obj 1
Wrote Obj 1
Object Obj 1 has ID ObjId 1
Producing Obj 2
Wrote Obj 2
Object Obj 2 has ID ObjId 2
Producing Obj 3
Wrote Obj 3
Object Obj 3 has ID ObjId 3
Producing Obj 4
Wrote Obj 4
Object Obj 4 has ID ObjId 4
Producing Obj 5
Wrote Obj 5
Object Obj 5 has ID ObjId 5
Producing Obj 6
Wrote Obj 6
Object Obj 6 has ID ObjId 6
Producing Obj 7
Wrote Obj 7
Object Obj 7 has ID ObjId 7
Producing Obj 8
Wrote Obj 8
Object Obj 8 has ID ObjId 8
Producing Obj 9
Wrote Obj 9
Object Obj 9 has ID ObjId 9
Producing Obj 10
Wrote Obj 10
Object Obj 10 has ID ObjId 10

You might wonder why this requirement that one of the two pipes takes an initial argument makes sense, other than the abstract justification that this is what the category laws require. The plain English explanation is that the alternative is that you would need buffer the first transmitted Object "in between" the two pipes before writeObjects reached its first request statement. This approach produces a lot of problematic behavior and buggy corner cases, but probably the most significant problem is that pipe composition would no longer be associative and the order of effects would change based on the order in which you composed things.

The nice thing about the bidirectional pipe composition operators is that the types work out so that you can always deduce whether or not a component is "active" (i.e. initiates control) or "passive" (i.e. waits for input) purely by studying the type. If composition says that a certain pipe (like writeObjects) must take an argument, then it's passive. If it takes no argument (like produceObjects), then it's active and initiates control. So composition forces you to have at most one active pipe within your pipeline (the pipe that doesn't take an initial argument) and that's the pipe that begins control.

这篇关于具有下游状态的惯性双向管道没有损失的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆