C#事件:如何以并行方式处理事件 [英] C# Events: How to process event in a parallel manner

查看:179
本文介绍了C#事件:如何以并行方式处理事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个我想要并行处理的事件。我的想法是使每个回调被添加到ThreadPool,有效的方法注册了由ThreadPool处理的事件。



我的试用代码看起来像以下:

  Delegate [] delegates = myEvent.GetInvocationList(); 
IAsyncResult [] results = new IAsyncResult [delegates.Count< Delegate>()]; (int i = 0; i< delegates.Count< Delegate>(); i ++)
{
IAsyncResult result =((TestDelegate)delegates [i])

.BeginInvoke(BeginInvoke / EndInvoke,null,null);
results [i] = result;


for(int i = 0; i< delegates.Length; i ++)
{
((TestDelegate)delegates [i]).EndInvoke结果[i]);
}

这只是为了玩,因为我很好奇如何做。我相信有更好的方法来做到这一点。 我不喜欢有一个创建一个持有lambda的WaitCallback的Func。此外,与直接调用委托相比,DynamicInvoke相当慢。我怀疑这种处理方式比只是按顺序执行的更快。



我的问题是:如何以并行方式处理事件,最好使用ThreadPool?



由于我通常使用Mono,.NET 4.0或任务并行库都不是一个选项。



谢谢!



EDITS:
- 更正了例子,谢谢Earwickers的回答。
- 更新的试用代码

解决方案

我会去使用DynamicMethod(LCG)和一个状态对象,它携带参数,并保持跟踪的呼叫(以便您可以等待他们完成)。



代码:
这样的东西应该做(不经过测试,但在某些情况下可能会抛出一些讨厌的例外):

  //< summary> 
///用于动态并发调用MulticastDelegate的类。
///(C)2009Arsènevon Wyss,avw@gmx.ch
///不保证任何形式的使用,您自己承担风险。重新使用时,版权声明必须保存在资料来源中。
///< / summary>
public static class ParallelInvoke {
private class ParallelInvokeContext< TDelegate>其中TDelegate:class {
private static readonly DynamicMethod invoker;
private static readonly Type [] parameterTypes;

static ParallelInvokeContext(){
if(!typeof(Delegate).IsAssignableFrom(typeof(TDelegate))){
throw new InvalidOperationException(TDelegate类型必须是委托);
}
Debug.Assert(monitor_enter!= null,找不到方法Monitor.Enter());
Debug.Assert(monitor_pulse!= null,找不到方法Monitor.Pulse());
Debug.Assert(monitor_exit!= null,找不到方法Monitor.Exit());
FieldInfo parallelInvokeContext_activeCalls = typeof(ParallelInvokeContext< TDelegate>)。GetField(activeCalls,BindingFlags.Instance | BindingFlags.NonPublic);
Debug.Assert(parallelInvokeContext_activeCalls!= null,找不到私有字段ParallelInvokeContext.activeCalls);
FieldInfo parallelInvokeContext_arguments = typeof(ParallelInvokeContext< TDelegate>)。GetField(arguments,BindingFlags.Instance | BindingFlags.NonPublic);
Debug.Assert(parallelInvokeContext_arguments!= null,找不到私有字段ParallelInvokeContext.arguments);
MethodInfo delegate_invoke = typeof(TDelegate).GetMethod(Invoke,BindingFlags.Instance | BindingFlags.Public);
Debug.Assert(delegate_invoke!= null,string.Format(找不到方法{0} .Invoke(),typeof(TDelegate).FullName));
if(delegate_invoke.ReturnType!= typeof(void)){
throw new InvalidOperationException(TDelegate delegate不能有返回值);
}
ParameterInfo [] parameters = delegate_invoke.GetParameters();
parameterTypes = new Type [parameters.Length];
invoker = new DynamicMethod(string.Format(Invoker< {0}>,typeof(TDelegate).FullName),typeof(void),new [] {typeof(ParallelInvokeContext< TDelegate>),typeof object)},
typeof(ParallelInvokeContext< TDelegate>),true);
ILGenerator il = invoker.GetILGenerator();
LocalBuilder args =(parameters.Length> 2)? il.DeclareLocal(typeof(object [])):null;
bool skipLoad = false;
il.BeginExceptionBlock();
il.Emit(OpCodes.Ldarg_1); //代理我们要调用
if(args!= null){
Debug.Assert(args.LocalIndex == 0);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld,parallelInvokeContext_arguments);
il.Emit(OpCodes.Dup);
il.Emit(OpCodes.Stloc_0);
skipLoad = true;
}
foreach(参数中的ParameterInfo参数){
if(parameter.ParameterType.IsByRef){
throw new InvalidOperationException(TDelegate delegate must note have out or ref parameters );
}
parameterTypes [parameter.Position] = parameter.ParameterType;
if(args == null){
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld,parallelInvokeContext_arguments);
} else if(skipLoad){
skipLoad = false;
} else {
il.Emit(OpCodes.Ldloc_0);
}
il.Emit(OpCodes.Ldc_I4,parameter.Position);
il.Emit(OpCodes.Ldelem_Ref);
if(parameter.ParameterType.IsValueType){
il.Emit(OpCodes.Unbox_Any,parameter.ParameterType);
}
}
il.Emit(OpCodes.Callvirt,delegate_invoke);
il.BeginFinallyBlock();
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Call,monitor_enter);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Dup);
il.Emit(OpCodes.Ldfld,parallelInvokeContext_activeCalls);
il.Emit(OpCodes.Ldc_I4_1);
il.Emit(OpCodes.Sub);
il.Emit(OpCodes.Dup);
Label noPulse = il.DefineLabel();
il.Emit(OpCodes.Brtrue,noPulse);
il.Emit(OpCodes.Stfld,parallelInvokeContext_activeCalls);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Call,monitor_pulse);
标签exit = il.DefineLabel();
il.Emit(OpCodes.Br,exit);
il.MarkLabel(noPulse);
il.Emit(OpCodes.Stfld,parallelInvokeContext_activeCalls);
il.MarkLabel(exit);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Call,monitor_exit);
il.EndExceptionBlock()
il.Emit(OpCodes.Ret);
}

[Conditional(DEBUG)]
private static void VerifyArgumentsDebug(object [] args){
for(int i = 0; i&参数Types.Length; i ++){
if(args [i] == null){
if(parameterTypes [i] .IsValueType){
throw new ArgumentException(string.Format参数{0}不能为null,因为它是一个值类型,i));
}
} else if(!parameterTypes [i] .IsAssignableFrom(args [i] .GetType())){
throw new ArgumentException(string.Format(参数{0}不兼容,i));
}
}
}

private readonly object [] arguments;
私有readonly WaitCallback invokeCallback;
private int activeCalls;

public ParallelInvokeContext(object [] args){
if(parameterTypes.Length> 0){
if(args == null){
throw new ArgumentNullException (args);
}
if(args.Length!= parameterTypes.Length){
throw new ArgumentException(参数计数不匹配);
}
VerifyArgumentsDebug(args);
arguments = args;
} else if((args!= null)&&(args.Length> 0)){
throw new ArgumentException(此委托不期望任何参数);
}
invokeCallback =(WaitCallback)invoker.CreateDelegate(typeof(WaitCallback),this);
}

public void QueueInvoke(Delegate @delegate){
Debug.Assert(@delegate is TDelegate);
activeCalls ++;
ThreadPool.QueueUserWorkItem(invokeCallback,@delegate);
}
}

private static readonly MethodInfo monitor_enter;
private static readonly MethodInfo monitor_exit;
private static readonly MethodInfo monitor_pulse;

static ParallelInvoke(){
monitor_enter = typeof(Monitor).GetMethod(Enter,BindingFlags.Static | BindingFlags.Public,null,new [] {typeof(object)},空值);
monitor_pulse = typeof(Monitor).GetMethod(Pulse,BindingFlags.Static | BindingFlags.Public,null,new [] {typeof(object)},null);
monitor_exit = typeof(Monitor).GetMethod(Exit,BindingFlags.Static | BindingFlags.Public,null,new [] {typeof(object)},null);
}

public static void Invoke< TDelegate>(TDelegate @delegate)其中TDelegate:class {
Invoke(@delegate,null);
}

public static void Invoke< TDelegate>(TDelegate @delegate,params object [] args)其中TDelegate:class {
if(@delegate == null){
抛出新的ArgumentNullException(delegate);
}
ParallelInvokeContext< TDelegate> context = new ParallelInvokeContext< TDelegate>(args);
lock(context){
foreach(Delegate invocationDelegate in((Delegate)(object)@delegate).GetInvocationList()){
context.QueueInvoke(invocationDelegate);
}
Monitor.Wait(context);
}
}
}

用法: / strong>

  ParallelInvoke.Invoke(yourDelegate,arguments); 

注意:




  • 不处理事件处理程序中的异常(但是IL代码最终会递减计数器,以便方法可以正确结束),这可能会导致麻烦。可能会捕获并转移IL代码中的异常。


  • 不继承之外的隐式转换(如int到double)不执行并将抛出异常。


  • 使用的同步技术不分配OS等待句柄,这通常对性能有好处。可以在 Joseph Albahari的页面找到监视器工作的描述。


  • 经过一些性能测试,似乎这种方法比使用代理(至少在MS CLR)中的本地BeginInvoke / EndInvoke调用的方法更好。



I have an event which I would like to have processed in a parallel manner. My idea is to make each callback be added to the ThreadPool, effectivley having each method that registered the event handled by the ThreadPool.

My try-out code looks something like the following:

Delegate[] delegates = myEvent.GetInvocationList();
IAsyncResult[] results = new IAsyncResult[ delegates.Count<Delegate>() ];

for ( int i = 0; i < delegates.Count<Delegate>(); i++ )
{
    IAsyncResult result = ( ( TestDelegate )delegates[ i ] ).BeginInvoke( "BeginInvoke/EndInvoke", null, null );
    results[ i ] = result;
}

for ( int i = 0; i < delegates.Length; i++ )
{
    ( ( TestDelegate )delegates[ i ] ).EndInvoke( results[ i ] );
}

This is just for playing around since I am curious how to do it. I am sure there is a better way to do it. I don't like having a Func which creates a WaitCallback that holds a lambda. Also, DynamicInvoke is pretty slow compared to calling a delegate directly. I doubt this way of processing the event is any faster than just doing it sequentially.

My question is: How can I process an event in a parallel manner, preferably by using ThreadPool?

Since I usually work with Mono, .NET 4.0 or the Task Parallel Library are both not an option.

Thank you!

EDITS: - Corrected example thanks to Earwickers answer. - Updated try-out code

解决方案

I'd go for an approach using DynamicMethod (LCG) and a state object which carries the arguments and does keep track of the calls (so that you can wait for them to complete).

Code: Something like this should do (not throughly tested yet though, may therefore throw some nasty exceptions in some situations):

/// <summary>
/// Class for dynamic parallel invoking of a MulticastDelegate.
/// (C) 2009 Arsène von Wyss, avw@gmx.ch
/// No warranties of any kind, use at your own risk. Copyright notice must be kept in the source when re-used.
/// </summary>
public static class ParallelInvoke {
    private class ParallelInvokeContext<TDelegate> where TDelegate: class {
        private static readonly DynamicMethod invoker;
        private static readonly Type[] parameterTypes;

        static ParallelInvokeContext() {
            if (!typeof(Delegate).IsAssignableFrom(typeof(TDelegate))) {
                throw new InvalidOperationException("The TDelegate type must be a delegate");
            }
            Debug.Assert(monitor_enter != null, "Could not find the method Monitor.Enter()");
            Debug.Assert(monitor_pulse != null, "Could not find the method Monitor.Pulse()");
            Debug.Assert(monitor_exit != null, "Could not find the method Monitor.Exit()");
            FieldInfo parallelInvokeContext_activeCalls = typeof(ParallelInvokeContext<TDelegate>).GetField("activeCalls", BindingFlags.Instance|BindingFlags.NonPublic);
            Debug.Assert(parallelInvokeContext_activeCalls != null, "Could not find the private field ParallelInvokeContext.activeCalls");
            FieldInfo parallelInvokeContext_arguments = typeof(ParallelInvokeContext<TDelegate>).GetField("arguments", BindingFlags.Instance|BindingFlags.NonPublic);
            Debug.Assert(parallelInvokeContext_arguments != null, "Could not find the private field ParallelInvokeContext.arguments");
            MethodInfo delegate_invoke = typeof(TDelegate).GetMethod("Invoke", BindingFlags.Instance|BindingFlags.Public);
            Debug.Assert(delegate_invoke != null, string.Format("Could not find the method {0}.Invoke()", typeof(TDelegate).FullName));
            if (delegate_invoke.ReturnType != typeof(void)) {
                throw new InvalidOperationException("The TDelegate delegate must not have a return value");
            }
            ParameterInfo[] parameters = delegate_invoke.GetParameters();
            parameterTypes = new Type[parameters.Length];
            invoker = new DynamicMethod(string.Format("Invoker<{0}>", typeof(TDelegate).FullName), typeof(void), new[] {typeof(ParallelInvokeContext<TDelegate>), typeof(object)},
                                        typeof(ParallelInvokeContext<TDelegate>), true);
            ILGenerator il = invoker.GetILGenerator();
            LocalBuilder args = (parameters.Length > 2) ? il.DeclareLocal(typeof(object[])) : null;
            bool skipLoad = false;
            il.BeginExceptionBlock();
            il.Emit(OpCodes.Ldarg_1); // the delegate we are going to invoke
            if (args != null) {
                Debug.Assert(args.LocalIndex == 0);
                il.Emit(OpCodes.Ldarg_0);
                il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
                il.Emit(OpCodes.Dup);
                il.Emit(OpCodes.Stloc_0);
                skipLoad = true;
            }
            foreach (ParameterInfo parameter in parameters) {
                if (parameter.ParameterType.IsByRef) {
                    throw new InvalidOperationException("The TDelegate delegate must note have out or ref parameters");
                }
                parameterTypes[parameter.Position] = parameter.ParameterType;
                if (args == null) {
                    il.Emit(OpCodes.Ldarg_0);
                    il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
                } else if (skipLoad) {
                    skipLoad = false;
                } else {
                    il.Emit(OpCodes.Ldloc_0);
                }
                il.Emit(OpCodes.Ldc_I4, parameter.Position);
                il.Emit(OpCodes.Ldelem_Ref);
                if (parameter.ParameterType.IsValueType) {
                    il.Emit(OpCodes.Unbox_Any, parameter.ParameterType);
                }
            }
            il.Emit(OpCodes.Callvirt, delegate_invoke);
            il.BeginFinallyBlock();
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_enter);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Dup);
            il.Emit(OpCodes.Ldfld, parallelInvokeContext_activeCalls);
            il.Emit(OpCodes.Ldc_I4_1);
            il.Emit(OpCodes.Sub);
            il.Emit(OpCodes.Dup);
            Label noPulse = il.DefineLabel();
            il.Emit(OpCodes.Brtrue, noPulse);
            il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_pulse);
            Label exit = il.DefineLabel();
            il.Emit(OpCodes.Br, exit);
            il.MarkLabel(noPulse);
            il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
            il.MarkLabel(exit);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_exit);
            il.EndExceptionBlock();
            il.Emit(OpCodes.Ret);
        }

        [Conditional("DEBUG")]
        private static void VerifyArgumentsDebug(object[] args) {
            for (int i = 0; i < parameterTypes.Length; i++) {
                if (args[i] == null) {
                    if (parameterTypes[i].IsValueType) {
                        throw new ArgumentException(string.Format("The parameter {0} cannot be null, because it is a value type", i));
                    }
                } else if (!parameterTypes[i].IsAssignableFrom(args[i].GetType())) {
                    throw new ArgumentException(string.Format("The parameter {0} is not compatible", i));
                }
            }
        }

        private readonly object[] arguments;
        private readonly WaitCallback invokeCallback;
        private int activeCalls;

        public ParallelInvokeContext(object[] args) {
            if (parameterTypes.Length > 0) {
                if (args == null) {
                    throw new ArgumentNullException("args");
                }
                if (args.Length != parameterTypes.Length) {
                    throw new ArgumentException("The parameter count does not match");
                }
                VerifyArgumentsDebug(args);
                arguments = args;
            } else if ((args != null) && (args.Length > 0)) {
                throw new ArgumentException("This delegate does not expect any parameters");
            }
            invokeCallback = (WaitCallback)invoker.CreateDelegate(typeof(WaitCallback), this);
        }

        public void QueueInvoke(Delegate @delegate) {
            Debug.Assert(@delegate is TDelegate);
            activeCalls++;
            ThreadPool.QueueUserWorkItem(invokeCallback, @delegate);
        }
    }

    private static readonly MethodInfo monitor_enter;
    private static readonly MethodInfo monitor_exit;
    private static readonly MethodInfo monitor_pulse;

    static ParallelInvoke() {
        monitor_enter = typeof(Monitor).GetMethod("Enter", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
        monitor_pulse = typeof(Monitor).GetMethod("Pulse", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
        monitor_exit = typeof(Monitor).GetMethod("Exit", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
    }

    public static void Invoke<TDelegate>(TDelegate @delegate) where TDelegate: class {
        Invoke(@delegate, null);
    }

    public static void Invoke<TDelegate>(TDelegate @delegate, params object[] args) where TDelegate: class {
        if (@delegate == null) {
            throw new ArgumentNullException("delegate");
        }
        ParallelInvokeContext<TDelegate> context = new ParallelInvokeContext<TDelegate>(args);
        lock (context) {
            foreach (Delegate invocationDelegate in ((Delegate)(object)@delegate).GetInvocationList()) {
                context.QueueInvoke(invocationDelegate);
            }
            Monitor.Wait(context);
        }
    }
}

Usage:

ParallelInvoke.Invoke(yourDelegate, arguments);

Notes:

  • Exceptions in the event handlers are not handled (but the IL code has a finally to decrement the counter, so that the method sould end correctly) and this could cause trouble. It would be possible to catch and transfer the exceptions in the IL code as well.

  • Implicit conversions other than inheritance (such as int to double) are not performed and will throw an exception.

  • The synchronization technique used does not allocate OS wait handles, which is usually good for performance. A description of the Monitor workings can be found on Joseph Albahari's page.

  • After some performance testing, it seems that this approach scales much better than any approach using the "native" BeginInvoke/EndInvoke calls on delegates (at least in the MS CLR).

这篇关于C#事件:如何以并行方式处理事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆