C#的活动:如何以并行方式处理事件 [英] C# Events: How to process event in a parallel manner

查看:773
本文介绍了C#的活动:如何以并行方式处理事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有,我想以并行的方式已处理的事件。我的想法是让每个回调被添加到线程池,effectivley是每一个已注册的线程池处理该事件的方法。

I have an event which I would like to have processed in a parallel manner. My idea is to make each callback be added to the ThreadPool, effectivley having each method that registered the event handled by the ThreadPool.

我试出的代码看起来像以下内容:

My try-out code looks something like the following:

Delegate[] delegates = myEvent.GetInvocationList();
IAsyncResult[] results = new IAsyncResult[ delegates.Count<Delegate>() ];

for ( int i = 0; i < delegates.Count<Delegate>(); i++ )
{
    IAsyncResult result = ( ( TestDelegate )delegates[ i ] ).BeginInvoke( "BeginInvoke/EndInvoke", null, null );
    results[ i ] = result;
}

for ( int i = 0; i < delegates.Length; i++ )
{
    ( ( TestDelegate )delegates[ i ] ).EndInvoke( results[ i ] );
}

这仅仅是玩弄,因为我很好奇,如何做到这一点。我肯定有一个更好的方式来做到这一点。 <罢>我不喜欢有一个Func键创建将持有的lambda一个WaitCallback。此外,DynamicInvoke相比直接调用委托是相当缓慢的。我怀疑处理事件的方式是任何不仅仅是做连续更快

This is just for playing around since I am curious how to do it. I am sure there is a better way to do it. I don't like having a Func which creates a WaitCallback that holds a lambda. Also, DynamicInvoke is pretty slow compared to calling a delegate directly. I doubt this way of processing the event is any faster than just doing it sequentially.

我的问题是:我怎么可以处理以并行方式的事件最好使用线程池?

My question is: How can I process an event in a parallel manner, preferably by using ThreadPool?

由于我平时使用Mono工作,.NET 4.0或任务并行库均不是一种选择。

Since I usually work with Mono, .NET 4.0 or the Task Parallel Library are both not an option.

感谢您

编辑:!
- 更正例如由于Earwickers答案。
- 更新试出代码

EDITS: - Corrected example thanks to Earwickers answer. - Updated try-out code

推荐答案

我会去使用DynamicMethod的(LCG)的方法和状态。对象携带的参数并执行跟踪调用(这样就可以等待它们完成)

I'd go for an approach using DynamicMethod (LCG) and a state object which carries the arguments and does keep track of the calls (so that you can wait for them to complete).

代码: $ b像这样$ b的东西应该做的(不throughly测试尚未虽然,所以可以扔在某些情况下,一些讨厌的例外):

Code: Something like this should do (not throughly tested yet though, may therefore throw some nasty exceptions in some situations):

/// <summary>
/// Class for dynamic parallel invoking of a MulticastDelegate.
/// (C) 2009 Arsène von Wyss, avw@gmx.ch
/// No warranties of any kind, use at your own risk. Copyright notice must be kept in the source when re-used.
/// </summary>
public static class ParallelInvoke {
	private class ParallelInvokeContext<TDelegate> where TDelegate: class {
		private static readonly DynamicMethod invoker;
		private static readonly Type[] parameterTypes;

		static ParallelInvokeContext() {
			if (!typeof(Delegate).IsAssignableFrom(typeof(TDelegate))) {
				throw new InvalidOperationException("The TDelegate type must be a delegate");
			}
			Debug.Assert(monitor_enter != null, "Could not find the method Monitor.Enter()");
			Debug.Assert(monitor_pulse != null, "Could not find the method Monitor.Pulse()");
			Debug.Assert(monitor_exit != null, "Could not find the method Monitor.Exit()");
			FieldInfo parallelInvokeContext_activeCalls = typeof(ParallelInvokeContext<TDelegate>).GetField("activeCalls", BindingFlags.Instance|BindingFlags.NonPublic);
			Debug.Assert(parallelInvokeContext_activeCalls != null, "Could not find the private field ParallelInvokeContext.activeCalls");
			FieldInfo parallelInvokeContext_arguments = typeof(ParallelInvokeContext<TDelegate>).GetField("arguments", BindingFlags.Instance|BindingFlags.NonPublic);
			Debug.Assert(parallelInvokeContext_arguments != null, "Could not find the private field ParallelInvokeContext.arguments");
			MethodInfo delegate_invoke = typeof(TDelegate).GetMethod("Invoke", BindingFlags.Instance|BindingFlags.Public);
			Debug.Assert(delegate_invoke != null, string.Format("Could not find the method {0}.Invoke()", typeof(TDelegate).FullName));
			if (delegate_invoke.ReturnType != typeof(void)) {
				throw new InvalidOperationException("The TDelegate delegate must not have a return value");
			}
			ParameterInfo[] parameters = delegate_invoke.GetParameters();
			parameterTypes = new Type[parameters.Length];
			invoker = new DynamicMethod(string.Format("Invoker<{0}>", typeof(TDelegate).FullName), typeof(void), new[] {typeof(ParallelInvokeContext<TDelegate>), typeof(object)},
			                            typeof(ParallelInvokeContext<TDelegate>), true);
			ILGenerator il = invoker.GetILGenerator();
			LocalBuilder args = (parameters.Length > 2) ? il.DeclareLocal(typeof(object[])) : null;
			bool skipLoad = false;
			il.BeginExceptionBlock();
			il.Emit(OpCodes.Ldarg_1); // the delegate we are going to invoke
			if (args != null) {
				Debug.Assert(args.LocalIndex == 0);
				il.Emit(OpCodes.Ldarg_0);
				il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
				il.Emit(OpCodes.Dup);
				il.Emit(OpCodes.Stloc_0);
				skipLoad = true;
			}
			foreach (ParameterInfo parameter in parameters) {
				if (parameter.ParameterType.IsByRef) {
					throw new InvalidOperationException("The TDelegate delegate must note have out or ref parameters");
				}
				parameterTypes[parameter.Position] = parameter.ParameterType;
				if (args == null) {
					il.Emit(OpCodes.Ldarg_0);
					il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
				} else if (skipLoad) {
					skipLoad = false;
				} else {
					il.Emit(OpCodes.Ldloc_0);
				}
				il.Emit(OpCodes.Ldc_I4, parameter.Position);
				il.Emit(OpCodes.Ldelem_Ref);
				if (parameter.ParameterType.IsValueType) {
					il.Emit(OpCodes.Unbox_Any, parameter.ParameterType);
				}
			}
			il.Emit(OpCodes.Callvirt, delegate_invoke);
			il.BeginFinallyBlock();
			il.Emit(OpCodes.Ldarg_0);
			il.Emit(OpCodes.Call, monitor_enter);
			il.Emit(OpCodes.Ldarg_0);
			il.Emit(OpCodes.Dup);
			il.Emit(OpCodes.Ldfld, parallelInvokeContext_activeCalls);
			il.Emit(OpCodes.Ldc_I4_1);
			il.Emit(OpCodes.Sub);
			il.Emit(OpCodes.Dup);
			Label noPulse = il.DefineLabel();
			il.Emit(OpCodes.Brtrue, noPulse);
			il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
			il.Emit(OpCodes.Ldarg_0);
			il.Emit(OpCodes.Call, monitor_pulse);
			Label exit = il.DefineLabel();
			il.Emit(OpCodes.Br, exit);
			il.MarkLabel(noPulse);
			il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
			il.MarkLabel(exit);
			il.Emit(OpCodes.Ldarg_0);
			il.Emit(OpCodes.Call, monitor_exit);
			il.EndExceptionBlock();
			il.Emit(OpCodes.Ret);
		}

		[Conditional("DEBUG")]
		private static void VerifyArgumentsDebug(object[] args) {
			for (int i = 0; i < parameterTypes.Length; i++) {
				if (args[i] == null) {
					if (parameterTypes[i].IsValueType) {
						throw new ArgumentException(string.Format("The parameter {0} cannot be null, because it is a value type", i));
					}
				} else if (!parameterTypes[i].IsAssignableFrom(args[i].GetType())) {
					throw new ArgumentException(string.Format("The parameter {0} is not compatible", i));
				}
			}
		}

		private readonly object[] arguments;
		private readonly WaitCallback invokeCallback;
		private int activeCalls;

		public ParallelInvokeContext(object[] args) {
			if (parameterTypes.Length > 0) {
				if (args == null) {
					throw new ArgumentNullException("args");
				}
				if (args.Length != parameterTypes.Length) {
					throw new ArgumentException("The parameter count does not match");
				}
				VerifyArgumentsDebug(args);
				arguments = args;
			} else if ((args != null) && (args.Length > 0)) {
				throw new ArgumentException("This delegate does not expect any parameters");
			}
			invokeCallback = (WaitCallback)invoker.CreateDelegate(typeof(WaitCallback), this);
		}

		public void QueueInvoke(Delegate @delegate) {
			Debug.Assert(@delegate is TDelegate);
			activeCalls++;
			ThreadPool.QueueUserWorkItem(invokeCallback, @delegate);
		}
	}

	private static readonly MethodInfo monitor_enter;
	private static readonly MethodInfo monitor_exit;
	private static readonly MethodInfo monitor_pulse;

	static ParallelInvoke() {
		monitor_enter = typeof(Monitor).GetMethod("Enter", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
		monitor_pulse = typeof(Monitor).GetMethod("Pulse", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
		monitor_exit = typeof(Monitor).GetMethod("Exit", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
	}

	public static void Invoke<TDelegate>(TDelegate @delegate) where TDelegate: class {
		Invoke(@delegate, null);
	}

	public static void Invoke<TDelegate>(TDelegate @delegate, params object[] args) where TDelegate: class {
		if (@delegate == null) {
			throw new ArgumentNullException("delegate");
		}
		ParallelInvokeContext<TDelegate> context = new ParallelInvokeContext<TDelegate>(args);
		lock (context) {
			foreach (Delegate invocationDelegate in ((Delegate)(object)@delegate).GetInvocationList()) {
				context.QueueInvoke(invocationDelegate);
			}
			Monitor.Wait(context);
		}
	}
}



用法:

ParallelInvoke.Invoke(yourDelegate, arguments);

备注:


  • 在事件处理程序异常不处理(但IL代码有一个最后递减计数器,使前人的精力方法正确结束),这可能会造成麻烦。这将有可能赶上和IL代码传输异常也是如此。

  • Exceptions in the event handlers are not handled (but the IL code has a finally to decrement the counter, so that the method sould end correctly) and this could cause trouble. It would be possible to catch and transfer the exceptions in the IL code as well.

隐式转换(如int加倍)不执行并会抛出异常。

Implicit conversions other than inheritance (such as int to double) are not performed and will throw an exception.

用于不分配OS等待句柄,这通常是性能不错的同步技术。监视器运作的描述可以在约瑟夫阿尔巴哈利的页面找到。

The synchronization technique used does not allocate OS wait handles, which is usually good for performance. A description of the Monitor workings can be found on Joseph Albahari's page.

在一些性能测试,看来这种方法缩放比使用任何方法好得多的本土的BeginInvoke / EndInvoke会的代表(至少在MS CLR)调用。

After some performance testing, it seems that this approach scales much better than any approach using the "native" BeginInvoke/EndInvoke calls on delegates (at least in the MS CLR).

这篇关于C#的活动:如何以并行方式处理事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆