使用LIFO逻辑运行的MailboxProcessor [英] A MailboxProcessor that operates with a LIFO logic

查看:102
本文介绍了使用LIFO逻辑运行的MailboxProcessor的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在了解F#代理( MailboxProcessor )。

I am learning about F# agents (MailboxProcessor).

我正在处理一个非常常规的问题。

I am dealing with a rather unconventional problem.


  • 我有一个代理( dataSource ),它是流数据的来源。数据必须由一系列代理程序( dataProcessor )处理。我们可以考虑将 dataProcessor 作为某种跟踪设备。

  • 数据流入的速度可能比 dataProcessor 可能能够处理其输入。

  • 可以延迟一下。但是,我必须确保代理始终处于工作状态,并且不会因过时的观察而堆积

  • I have one agent (dataSource) which is a source of streaming data. The data has to be processed by an array of agents (dataProcessor). We can consider dataProcessor as some sort of tracking device.
  • Data may flow in faster than the speed with which the dataProcessor may be able to process its input.
  • It is OK to have some delay. However, I have to ensure that the agent stays on top of its work and does not get piled under obsolete observations

我正在探索各种方法解决这个问题。

I am exploring ways to deal with this problem.

第一个想法是实施堆栈(LIFO)在 dataSource 中。当 dataProcessor 可以接收和处理数据时, dataSource 将发送可用的最新观测值。此解决方案可能有效,但由于可能需要阻止并重新激活 dataProcessor ,因此它可能会变得复杂。并将其状态传达给 dataSource ,导致双向通讯问题。此问题可能归结为阻止队列 = vs.100%29.aspx rel = nofollow noreferrer>消费者-生产者问题,但我不确定。.

The first idea is to implement a stack (LIFO) in dataSource. dataSource would send over the latest observation available when dataProcessor becomes available to receive and process the data. This solution may work but it may get complicated as dataProcessor may need to be blocked and re-activated; and communicate its status to dataSource, leading to a two way communication problem. This problem may boil down to a blocking queue in the consumer-producer problem but I am not sure..

第二个想法是让 dataProcessor 负责消息排序。在这种架构中, dataSource 只会将更新发布在 dataProcessor 的队列中。 dataProcessor 将使用 Scan 来获取队列中可用的最新数据。这可能是要走的路。但是,我不确定在 MailboxProcessor 的当前设计中是否可以清除消息队列,删除较旧的过时消息。此外,在此处中,这样写:

The second idea is to have dataProcessor taking care of message sorting. In this architecture, dataSource will simply post updates in dataProcessor's queue. dataProcessor will use Scanto fetch the latest data available in his queue. This may be the way to go. However, I am not sure if in the current design of MailboxProcessorit is possible to clear a queue of messages, deleting the older obsolete ones. Furthermore, here, it is written that:


不幸的是,当前版本的F#中的TryScan函数以两种方式破坏了
。首先,重点是指定一个超时
,但是实现实际上并没有兑现它。具体来说,与
不相关的消息会重置计时器。其次,与其他Scan
函数一样,在锁定状态下检查消息队列,该锁定可防止在扫描期间任何其他
个线程都过帐,这可能是任意长时间的
。因此,TryScan函数本身
倾向于锁定并发系统,甚至可能引入死锁
,因为调用者的代码是在锁内求值的(例如,将
从函数参数中张贴到Scan或TryScan中)当锁下的代码阻塞等待获取锁的代码已经在
之下时,可以使代理
死锁。

Unfortunately, the TryScan function in the current version of F# is broken in two ways. Firstly, the whole point is to specify a timeout but the implementation does not actually honor it. Specifically, irrelevant messages reset the timer. Secondly, as with the other Scan function, the message queue is examined under a lock that prevents any other threads from posting for the duration of the scan, which can be an arbitrarily long time. Consequently, the TryScan function itself tends to lock-up concurrent systems and can even introduce deadlocks because the caller's code is evaluated inside the lock (e.g. posting from the function argument to Scan or TryScan can deadlock the agent when the code under the lock blocks waiting to acquire the lock it is already under).

使最新观察结果反弹可能是一个问题。
这篇文章的作者@Jon Harrop建议

Having the latest observation bounced back may be a problem. The author of this post, @Jon Harrop, suggests that


我设法围绕它进行了架构,因此生成的架构实际上是更好。本质上,我渴望接收所有消息并使用自己的本地队列进行过滤。

I managed to architect around it and the resulting architecture was actually better. In essence, I eagerly Receive all messages and filter using my own local queue.

这个想法肯定值得探索,但是在开始使用代码之前,我将欢迎大家提出一些有关如何构建解决方案的意见。

This idea is surely worth exploring but, before starting to play around with code, I would welcome some inputs on how I could structure my solution.

谢谢。 / p>

Thank you.

推荐答案

tl; dr 我会尝试这样做:从FSharp.Actor或Zach Bray的博客中获取邮箱实现发布后,将ConcurrentQueue替换为ConcurrentStack(并添加一些限制容量的逻辑),并使用此已更改的代理作为调度程序,将消息从dataSource传递到实现为普通MBP或Actor的dataProcessor大军。

tl;dr I would try this: take Mailbox implementation from FSharp.Actor or Zach Bray's blog post, replace ConcurrentQueue by ConcurrentStack (plus add some bounded capacity logic) and use this changed agent as a dispatcher to pass messages from dataSource to an army of dataProcessors implemented as ordinary MBPs or Actors.

tl; dr2 如果工作人员是稀缺且缓慢的资源,而我们需要处理的消息是工作人员准备就绪时的最新消息,那么这一切都归结为座席使用堆栈,而不是队列(具有一些有限制的容量逻辑)加上一个工作人员的BlockingQueue。分派器使准备就绪的工作人员出队,然后从堆栈中弹出一条消息,然后将此消息发送给该工作人员。作业完成后,工作人员会在准备就绪时进入队列(例如,在 let!msg = inbox.Receive()之前)。然后,分派器使用者线程将阻塞,直到任何工作程序就绪为止,而生产者线程将使有界堆栈保持更新。 (可以使用数组+偏移量+锁中的大小来完成有界堆栈,下面是太复杂的了)

tl;dr2 If workers are a scarce and slow resource and we need to process a message that is the latest at the moment when a worker is ready, then it all boils down to an agent with a stack instead of a queue (with some bounded capacity logic) plus a BlockingQueue of workers. Dispatcher dequeues a ready worker, then pops a message from the stack and sends this message to the worker. After the job is done the worker enqueues itself to the queue when becomes ready (e.g. before let! msg = inbox.Receive()). Dispatcher consumer thread then blocks until any worker is ready, while producer thread keeps the bounded stack updated. (bounded stack could be done with an array + offset + size inside a lock, below is too complex one)

Details

MailBoxProcessor设计为只有一个使用者。 MBP的源代码甚至对此都进行了注释此处(搜索单词 DRAGONS :))

MailBoxProcessor is designed to have only one consumer. This is even commented in the source code of MBP here (search for the word 'DRAGONS' :) )

如果将数据发布到MBP,则只有一个线程可以从内部获取数据队列或堆栈。
在您的特定用例中,我将使用 ConcurrentStack 直接或更好地包装为 BlockingCollection

If you post your data to MBP then only one thread could take it from internal queue or stack. In you particular use case I would use ConcurrentStack directly or better wrapped into BlockingCollection:


  • 它将允许许多并发消费者

  • 它非常快速且线程安全

  • BlockingCollection 具有 BoundedCapacity 属性,可用于限制大小集合。它会引发 Add ,但是您可以捕获它或使用 TryAdd 。如果A是主堆栈而B是备用堆栈,则将 TryAdd 添加到A,将错误的 Add 添加到B并交换 Interlocked.Exchange 的两个,然后在A中处理所需的消息,清除消息,使其成为新的备用数据库-如果处理A的时间可能长于B的时间,则使用三个堆栈;这样,您不会阻塞也不丢失任何消息,但是可以丢弃不需要的消息是一种受控方式。

  • It will allow many concurrent consumers
  • It is very fast and thread safe
  • BlockingCollection has BoundedCapacity property that allows you to limit the size of a collection. It throws on Add, but you could catch it or use TryAdd. If A is a main stack and B is a standby, then TryAdd to A, on false Add to B and swap the two with Interlocked.Exchange, then process needed messages in A, clear it, make a new standby - or use three stacks if processing A could be longer than B could become full again; in this way you do not block and do not lose any messages, but could discard unneeded ones is a controlled way.

BlockingCollection具有方法就像AddToAny / TakeFromAny一样,它可以处理BlockingCollections的数组。这可能有帮助,例如:

BlockingCollection has methods like AddToAny/TakeFromAny, which work on an arrays of BlockingCollections. This could help, e.g.:


  • dataSource使用ConcurrentStack实现(BCCS)将消息生成到BlockingCollection

  • 另一个线程使用来自BCCS的消息,并将它们发送到处理BCCS的数组。您说有很多数据。您可能会牺牲一个线程来无限期地阻塞和分发消息

  • 每个处理代理都有自己的BCCS或被实现为分发者向其发布消息的代理/演员/ MBP。在您的情况下,您只需要发送一条消息给一个processorAgent,因此您可以将处理代理存储在循环缓冲区中,以始终将消息发送给最近最少使用的处理器。

类似这样的事情:

            (data stream produces 'T)
                |
            [dispatcher's BCSC]
                |
            (a dispatcher thread consumes 'T  and pushes to processors, manages capacity of BCCS and LRU queue)
                 |                               |
            [processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP]
                 |                               |
               (process)                         (process)

您可能想阅读有关< a href = https://github.com/fsprojects/FSharpx.Collections/blob/master/src/FSharpx.Collections/Heap.fsi rel = nofollow>堆数据结构。如果您需要消息的某些属性来获取最新消息,例如时间戳,而不是按照它们到达堆栈的顺序(例如,如果运输和到达顺序可能会延迟<>创建顺序),则可以使用堆获取最新消息。

Instead of ConcurrentStack, you may want to read about heap data structure. If you need your latest messages by some property of messages, e.g. timestamp, rather than by the order in which they arrive to the stack (e.g. if there could be delays in transit and arrival order <> creation order), you can get the latest message by using heap.

如果您仍然需要Agents语义/ API,则可以阅读除Dave链接以外的其他资源,并以某种方式对多个并发使用者采用实现:

If you still need Agents semantics/API, you could read several sources in addition to Dave's links, and somehow adopt implementation to multiple concurrent consumers:


  • 一个有趣的文章。在那里,您确实需要替换(在注释 //下可能希望将此调用安排在另一个线程上。)行 execute true 由行 async {execute true} |> Async.Start 或类似的名称,因为否则,产生线程将消耗线程-对单个快速产生器不利。但是,对于如上所述的调度程序,这正是需要的。

  • An interesting article by Zach Bray on efficient Actors implementation. There you do need to replace (under the comment // Might want to schedule this call on another thread.) the line execute true by a line async { execute true } |> Async.Start or similar, because otherwise producing thread will be consuming thread - not good for a single fast producer. However, for a dispatcher like described above this is exactly what needed.

FSharp.Actor (又名 Fakka 开发分支和FSharp MPB源代码(上面的第一个链接)对于实现细节可能非常有用。 FSharp.Actors库已经冻结了几个月,但在dev分支中有一些活动。

FSharp.Actor (aka Fakka) development branch and FSharp MPB source code (first link above) here could be very useful for implementation details. FSharp.Actors library has been in a freeze for several months but there is some activity in dev branch.

不要错过讨论Google网上论坛中的Fakka

我有一个类似的用例,在过去的两天中,我研究了可以在F#Agents / Actor上找到的所有内容。这个答案是我自己尝试这些想法的一种TODO,其中一半是在撰写过程中诞生的。

I have a somewhat similar use case and for the last two days I have researched everything I could find on the F# Agents/Actors. This answer is a kind of TODO for myself to try these ideas, of which half were born during writing it.

这篇关于使用LIFO逻辑运行的MailboxProcessor的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆