在F#中实现CCR交错仲裁器 [英] Implement CCR Interleave Arbiter in F#
问题描述
我想在F#中实现CCR框架端口的概念(因为.Net 4.0并未正式支持CCR).我知道人们可以使用F#中的 MailboxProcessor 类来执行此操作. 这对于简单的 Receive 仲裁器非常有效,但我需要 Interleave 仲裁器的概念,即我想控制哪些消息专门处理,哪些消息同时处理. 到目前为止,我还不知道要在F#中实现这一点,感谢您的帮助.
I want to implement the concept of a Port of the CCR Framework in F# (as CCR is not officially supported for .Net 4.0). I know that one can use the MailboxProcessor class in F# to do this. This works perfectly for simple Receive Arbiters but I need the concept of the Interleave Arbiter, i.e. I want to control which messages are processed exclusively and which are processed concurrently. So far I've got no idea to implement this in F# and I would be grateful for your help.
推荐答案
我对CCR不太熟悉,但我会尽力回答-我的理解是 interleave arbiter 的行为有点像ReaderWriterLock
.也就是说,您可以指定一些可以并行运行的操作(读取)和某些排他的操作(写入).
I'm not very familiar with CCR, but I'll try to answer - my understanding is that interleave arbiter behaves a bit like ReaderWriterLock
. That is, you can specify some operations that can run in parallel (reads) and some operations that are exclusive (writes).
以下代理是实现它的一种方式(未经测试,但类型检查:-)).该代理公开了两个供公众使用的操作.最后一个是内部的:
The following agent is one way to implement it (not tested, but type checks :-)). The agent exposes two operations that are intended for public use. The last one is internal:
type Message<'T> =
| PerformReadOperation of ('T -> Async<unit>)
| PerformWriteOperation of ('T -> Async<'T>)
| ReadOperationCompleted
-
通过发送代理
PerformReadOperation
,您将为其赋予应使用状态运行一次(一次)的操作,并且可能与其他读取操作并行运行.By sending the agent
PerformReadOperation
, you're giving it an operation that should be run (once) using the state and possibly in parallel with other read operations.通过发送代理
PerformWriteOperation
,您为它提供了一个计算新状态的操作,必须在所有读取操作完成后执行该操作. (如果您使用的是不可变状态,那将使事情变得更简单-您不必等到读者完成!但是下面的实现实现了等待).By sending the agent
PerformWriteOperation
, you're giving it an operation that calculates a new state and must be executed after all read operations complete. (If you were working with immutable state, that would make things simpler - you wouldn't have to wait until readers complete! But the implementation below implements the waiting).代理开始时具有一些初始状态:
The agent starts with some initial state:
let initial = // initial state
其余的代理通过两个循环实现:
And the rest of the agent is implemented using two loops:
let interleaver = MailboxProcessor.Start(fun mbox -> // Asynchronously wait until all read operations complete let rec waitUntilReadsComplete reads = if reads = 0 then async { return () } else mbox.Scan(fun msg -> match msg with | ReadOperationCompleted -> Some(waitUntilReadsComplete (reads - 1)) | _ -> None) let rec readingLoop state reads = async { let! msg = mbox.Receive() match msg with | ReadOperationCompleted -> // Some read operation completed - decrement counter return! readingLoop state (reads - 1) | PerformWriteOperation(op) -> do! waitUntilReadsComplete reads let! newState = op state return! readingLoop newState 0 | PerformReadOperation(op) -> // Start the operation in background & increment counter async { do! op state mbox.Post(ReadOperationCompleted) } |> Async.Start return! readingLoop state (reads + 1) } readingLoop initial 0)
这篇关于在F#中实现CCR交错仲裁器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!