在多线程代码中使用F#事件和异步 [英] Using a F# event and asynchronous in multi-threaded code

查看:106
本文介绍了在多线程代码中使用F#事件和异步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用F#中的异步工作流和代理进行大量工作.当我深入研究事件时,我注意到Event< _>()类型不是线程安全的.

I'm working a lot with asynchronous workflows and agents in F#. While I was going a little bit deeper into events I noticed that the Event<_>() type is not thread-safe.

在这里,我不是在谈论引发事件的常见问题.我实际上是在谈论订阅和从事件中删除/处置.出于测试目的,我编写了这个简短程序:

Here I'm not talking about the common problem of raising an event. I'm actually talking about subscribing and removing/disposing from an event. For testing purposes, I have written this short program:

let event = Event<int>()
let sub   = event.Publish

[<EntryPoint>]
let main argv =
    let subscribe sub x = async {
        let mutable disposables = []
        for i=0 to x do
            let dis = Observable.subscribe (fun x -> printf "%d" x) sub
            disposables <- dis :: disposables
        for dis in disposables do
            dis.Dispose()
    }

    Async.RunSynchronously(async{
        let! x = Async.StartChild (subscribe sub 1000)
        let! y = Async.StartChild (subscribe sub 1000)
        do! x
        do! y
        event.Trigger 1
        do! Async.Sleep 2000
    })
    0

程序很简单.我创建一个事件和一个向事件订阅特定数量事件的函数,然后配置每个处理程序.我使用另一个异步计算来生成带有Async.StartChild的那些函数的两个实例.在两个函数完成之后,我触发事件以查看是否还有一些处理程序.

The program is simple. I create an event and a function that subscribes a specific amount of events to it, and after that dispose every handler. I use another asynchronous computation to spawn two instances of those function with Async.StartChild. After both functions finished I trigger the event to see if there are some handlers still left.

但是当调用event.Trigger(1)时,结果是仍然有一些处理程序注册到该事件.由于某些"1"将被打印到控制台.通常,这意味着订阅和/或处分不是线程安全的.

But when event.Trigger(1) is called the result is that there are still some handlers registered to the event. As some "1" will be printed to the console. That in general means that subscribing and/or Disposing is not thread-safe.

那是我没想到的.如果订阅和处置不是线程安全的,那么一般如何安全地使用事件?

And that is what I didn't expected. If subscribing and disposing is not thread-safe, how can events in general safely be used?

当然,事件也可以在线程外使用,并且触发器不会在并行或不同线程上产生任何函数.但是在某种程度上,事件在Async,基于代理的代码中或通常与线程一起使用对我来说是很正常的.它们通常用作收集Backroundworker线程信息的通信.

Sure events also can be used outside of threads, and a trigger don't spawn any function in parallel or on different threads. But it is somehow normal to me that events are used in Async, agent-based code or in general with threads. They are often used as a communication to gather information of Backroundworker threads.

使用Async.AwaitEvent可以预订事件.如果订阅和处置不是线程安全的,那么在这样的环境中如何使用事件?并具有Async.AwaitEvent的目的是什么?考虑到异步工作流确实可以线程化,因此如果默认情况下订阅/处理事件不是线程安全的,则希望仅使用Async.AwaitEvent就被设计破坏了".

With Async.AwaitEvent it is possible to subscribe to an event. If subscribing and disposing is not thread-safe, how is it possible to use events in such an environment? And which purpose has Async.AwaitEvent? Considering that an asynchronous workflow does thread, hoping just using Async.AwaitEvent is basically "broken by design" if subscribing/disposing to an event is not thread-safe by default.

我面临的一般问题是:订阅和处置不是线程安全的,这是正确的吗?从我的示例看来,它看起来很像,但可能我错过了一些重要的细节.我目前在设计中经常使用事件,并且通常使用MailboxProcessor并使用事件进行通知.所以问题是.如果事件不是线程安全的,那么我当前使用的整个设计根本就不是线程安全的.那么,这种情况的解决方法是什么?创建一个全新的线程安全事件实现?是否已经存在一些面对此问题的实现?还是有其他选择可以在高度线程化的环境中安全地使用事件?

The general question I'm facing is: Is it correct that subscribing and disposing is not thread-safe? From my example it seems to look like it, but probably I missed some important detail. I currently use events a lot in my design, and I usually have MailboxProcessors and use events for notification. So the question is. If events are not thread-safe the whole design I'm currently using is not thread-safe at all. So what is an fix for this situation? Creating a whole new thread-safe event implementation? Do some implementations already exist that face this problem? Or are there other options to use events safely in a highly threaded environment?

推荐答案

FYI;在此处.

有趣的是:

member e.AddHandler(d) =
  x.multicast <- (System.Delegate.Combine(x.multicast, d) :?> Handler<'T>)
member e.RemoveHandler(d) = 
  x.multicast <- (System.Delegate.Remove(x.multicast, d) :?> Handler<'T>)

订阅事件将当前事件处理程序与传递给订阅的事件处理程序组合在一起.这个组合的事件处理程序替换了当前的事件处理程序.

Subscribing to an event combines the current event handler with the event handler passed into subscribe. This combined event handler replaces the current one.

从并发性角度来看的问题是,在这里我们有一个竞争条件,即并发订阅者可能会使用当前事件处理程序与之结合,而最后一个"会写回处理程序获胜(最后是一个困难的概念这些天,但nvm并发.

The problem from a concurrency perspective is that here we have a race-condition in that concurrent subscribers might use the came current event handler to combine with and the "last" one that writes back the handler win (last is a difficult concept in concurrency these days but nvm).

这里可以做的是使用Interlocked.CompareAndExchange引入CAS循环,但是这会增加性能开销,从而伤害非并发用户.不过,可以通过PR来查看它是否受到F#社区的青睐.

What could be done here is to introduce a CAS loop using Interlocked.CompareAndExchange but that adds performance overhead that hurts non-concurrent users. It's something one could make a PR off though and see if it viewed favourably by the F# community.

关于您要做什么的第二个问题,我只能说我会做什么.我会选择创建支持受保护的订阅/取消订阅的FSharpEvent版本.如果您的公司的FOSS政策允许的话,也许以FSharpEvent为基础.如果成功,那么它可以构成将来对F#核心库的PR.

WRT to your second question on what to do about it I can just say what I would do. I would go for the option of creating a version of FSharpEvent that supports protected subscribe/unsubscribe. Perhaps base it of FSharpEvent if your company FOSS policy allows it. If it turns out a success then it could form a future PR to F# core libary.

我不知道您的要求,但是如果您需要的是协程(即异步)而不是线程,那么也有可能重写程序以仅使用1个线程,因此您不会受到此影响比赛条件.

I don't know your requirements but it's also possible that if what you need is coroutines (ie Async) and not threads then it's possible to rewrite the program to use only 1 thread and thus you won't be affected by this race condition.

这篇关于在多线程代码中使用F#事件和异步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆