如何在 C# 中复制 F# 区分联合类型? [英] How can I duplicate the F# discriminated union type in C#?

查看:21
本文介绍了如何在 C# 中复制 F# 区分联合类型?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个名为 Actor 的新类,用于处理传递给它的消息.我遇到的问题是找出将相关但不同的消息传递给 Actor 的最优雅方式.我的第一个想法是使用继承,但它看起来很臃肿,但它是强类型,这是一个明确的要求.

I've created a new class called Actor which processes messages passed to it. The problem I am running into is figuring out what is the most elegant way to pass related but different messages to the Actor. My first idea is to use inheritance but it seems so bloated but it is strongly types which is a definite requirement.

有什么想法吗?

private abstract class QueueMessage { }

private class ClearMessage : QueueMessage 
{
    public static readonly ClearMessage Instance = new ClearMessage();

    private ClearMessage() { }
}

private class TryDequeueMessage : QueueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();

    private TryDequeueMessage() { }
}

private class EnqueueMessage : QueueMessage 
{
    public TValue Item { get; private set; }

    private EnqueueMessage(TValue item)
    {
        Item = item;
    }
}

演员类

/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);

/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
    /// <summary>The default total number of threads to process messages.</summary>
    private const Int32 DefaultThreadCount = 1;


    /// <summary>Used to serialize access to the message queue.</summary>
    private readonly Locker Locker;

    /// <summary>Stores the messages until they can be processed.</summary>
    private readonly System.Collections.Generic.Queue<Message> MessageQueue;

    /// <summary>Signals the actor thread to process a new message.</summary>
    private readonly ManualResetEvent PostEvent;

    /// <summary>This tells the actor thread to stop reading from the queue.</summary>
    private readonly ManualResetEvent DisposeEvent;

    /// <summary>Processes the messages posted to the actor.</summary>
    private readonly List<Thread> ActorThreads;


    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    public Actor() : this(DefaultThreadCount) { }

    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    /// <param name="thread_count"></param>
    public Actor(Int32 thread_count)
    {
        if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");

        Locker = new Locker();
        MessageQueue = new System.Collections.Generic.Queue<Message>();
        EnqueueEvent = new ManualResetEvent(true);
        PostEvent = new ManualResetEvent(false);
        DisposeEvent = new ManualResetEvent(true);
        ActorThreads = new List<Thread>();

        for (Int32 i = 0; i < thread_count; i++)
        {
            var thread = new Thread(ProcessMessages);
            thread.IsBackground = true;
            thread.Start();
            ActorThreads.Add(thread);
        }
    }


    /// <summary>Posts a message and waits for the reply.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <returns>The reply from the actor.</returns>
    public TReply PostWithReply(TMessage message)
    {
        using (var wrapper = new Message(message))
        {
            lock (Locker) MessageQueue.Enqueue(wrapper);
            PostEvent.Set();
            wrapper.Channel.CompleteEvent.WaitOne();
            return wrapper.Channel.Value;
        }
    }

    /// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <param name="callback">The callback that will be invoked once the replay is received.</param>
    public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
    {
        if (callback == null) throw new ArgumentNullException("callback");
        ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
    }

    /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
    public void Dispose()
    {
        if (DisposeEvent.WaitOne(10))
        {
            DisposeEvent.Reset();
            PostEvent.Set();

            foreach (var thread in ActorThreads)
            {
                thread.Join();
            }

            ((IDisposable)PostEvent).Dispose();
            ((IDisposable)DisposeEvent).Dispose();
        }
    }

    /// <summary>Processes a message posted to the actor.</summary>
    /// <param name="message">The message to be processed.</param>
    protected abstract void ProcessMessage(Message message);

    /// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
    private void ProcessMessages()
    {
        while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
        {
            var message = (Message)null;

            while (true)
            {
                lock (Locker)
                {
                    message = MessageQueue.Count > 0 ?
                        MessageQueue.Dequeue() :
                        null;

                    if (message == null)
                    {
                        PostEvent.Reset();
                        break;
                    }
                }

                try
                {
                    ProcessMessage(message);
                }
                catch
                {

                }
            }
        }
    }


    /// <summary>Represents a message that is passed to an actor.</summary>
    protected class Message : IDisposable
    {
        /// <summary>The actual value of this message.</summary>
        public TMessage Value { get; private set; }

        /// <summary>The channel used to give a reply to this message.</summary>
        public Channel Channel { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
        /// <param name="value">The actual value of the message.</param>
        public Message(TMessage value)
        {
            Value = value;
            Channel = new Channel();
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Channel.Dispose();
        }
    }

    /// <summary>Represents a channel used by an actor to reply to a message.</summary>         
    protected class Channel : IDisposable
    {
        /// <summary>The value of the reply.</summary>
        public TReply Value { get; private set; }

        /// <summary>Signifies that the message has been replied to.</summary>
        public ManualResetEvent CompleteEvent { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
        public Channel()
        {
            CompleteEvent = new ManualResetEvent(false);
        }

        /// <summary>Reply to the message received.</summary>
        /// <param name="value">The value of the reply.</param>
        public void Reply(TReply value)
        {
            Value = value;
            CompleteEvent.Set();
        }

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            ((IDisposable)CompleteEvent).Dispose();
        }
    }
}

推荐答案

在您的示例代码中,您根据 PostWithReply 实现了 PostWithAsyncReply.这并不理想,因为这意味着当你调用 PostWithAsyncReply 并且actor 需要一段时间来处理它时,实际上有两个线程被捆绑:一个执行actor,另一个等待它完成.最好让一个线程执行actor,然后在异步情况下调用回调.(显然在同步情况下无法避免两个线程的绑定).

In your example code, you implement PostWithAsyncReply in terms of PostWithReply. That isn't ideal, because it means that when you call PostWithAsyncReply and the actor takes a while to handle it, there are actually two threads tied up: the one executing the actor and the one waiting for it to finish. It would be better to have the one thread executing the actor and then calling the callback in the asynchronous case. (Obviously in the synchronous case there's no avoiding the tying up of two threads).

更新:

关于以上内容的更多信息:您构造一个带有参数的actor,告诉它要运行多少个线程.为简单起见,假设每个actor 都运行一个线程(实际上是一种很好的情况,因为actor 可以拥有内部状态而不对其进行锁定,因为只有一个线程可以直接访问它).

More on the above: you construct an actor with an argument telling it how many threads to run. For simplicity suppose every actor runs with one thread (actually quite a good situation because actors can then have internal state with no locking on it, as only one thread accesses it directly).

Actor A 调用 actor B,等待响应.为了处理请求,actor B 需要调用actor C.所以现在只有 A 和 B 的线程在等待,而 C 是唯一真正让 CPU 做任何工作的线程.多线程就这么多!但如果你一直在等待答案,这就是你得到的.

Actor A calls actor B, waiting for a response. In order to handle the request, actor B needs to call actor C. So now A and B's only threads are waiting, and C's is the only one actually giving the CPU any work to do. So much for multi-threading! But this is what you get if you wait for answers all the time.

好的,您可以增加在每个 actor 中启动的线程数.但是你会启动它们,这样它们就可以无所事事地坐着.堆栈会占用大量内存,并且上下文切换可能很昂贵.

Okay, you could increase the number of threads you start in each actor. But you'd be starting them so they could sit around doing nothing. A stack uses up a lot of memory, and context switching can be expensive.

所以最好异步发送消息,带有回调机制,这样你就可以获取完成的结果.您实现的问题在于您从线程池中获取另一个线程,纯粹是坐等.因此,您基本上应用了增加线程数的解决方法.您为从不运行的任务分配了一个线程.

So it's better to send messages asynchronously, with a callback mechanism so you can pick up the finished result. The problem with your implementation is that you grab another thread from the thread pool, purely to sit around and wait. So you basically apply the workaround of increasing the number of threads. You allocate a thread to the task of never running.

PostWithAsyncReply 方面实现 PostWithReply 会更好,即相反的方式.异步版本是低级的.基于我的基于委托的示例(因为它涉及较少的代码输入!):

It would be better to implement PostWithReply in terms of PostWithAsyncReply, i.e. the opposite way round. The asynchronous version is low-level. Building on my delegate-based example (because it involves less typing of code!):

private bool InsertCoinImpl(int value) 
{
    // only accept dimes/10p/whatever it is in euros
    return (value == 10);
}

public void InsertCoin(int value, Action<bool> accepted)
{
    Submit(() => accepted(InsertCoinImpl(value)));
}

所以私有实现返回一个布尔值.公共异步方法接受一个将接收返回值的动作;私有实现和回调操作都在同一个线程上执行.

So the private implementation returns a bool. The public asynchronous method accepts an action that will receive the return value; both the private implementation and the callback action are executed on the same thread.

希望同步等待的需求将成为少数情况.但是当你需要它时,它可以由一个辅助方法提供,完全通用,与任何特定的参与者或消息类型无关:

Hopefully the need to wait synchronously is going to be the minority case. But when you need it, it could be supplied by a helper method, totally general purpose and not tied to any specific actor or message type:

public static T Wait<T>(Action<Action<T>> activity)
{
    T result = default(T);
    var finished = new EventWaitHandle(false, EventResetMode.AutoReset);

    activity(r =>
        {
            result = r;
            finished.Set();
        });

    finished.WaitOne();
    return result;
}

所以现在在其他演员身上我们可以说:

So now in some other actor we can say:

bool accepted = Helpers.Wait<bool>(r => chocMachine.InsertCoin(5, r));

Wait 的类型参数可能是不必要的,还没有尝试编译任何这些.但是 Wait 基本上是为您魔术化了一个回调,因此您可以将其传递给某个异步方法,而在外部,您只需返回传递给回调的任何内容作为您的返回值.请注意,您传递给 Wait 的 lambda 实际上仍然在调用 Wait 的同一线程上执行.

The type argument to Wait may be unnecessary, haven't tried compiling any of this. But Wait basically magics-up a callback for you, so you can pass it to some asynchronous method, and on the outside you just get back whatever was passed to the callback as your return value. Note that the lambda you pass to Wait still actually executes on the same thread that called Wait.

我们现在让您返回到我们的常规计划......

至于您询问的实际问题,您向演员发送消息以使其执行某事.代表在这里很有帮助.它们让您可以有效地让编译器为您生成一个包含一些数据的类、一个您甚至不必显式调用的构造函数以及一个方法.如果您必须编写一堆小类,请切换到委托.

As for the actual problem you asked about, you send a message to an actor to get it to do something. Delegates are helpful here. They let you effectively get the compiler to generate you a class with some data, a constructor that you don't even have to call explicitly and also a method. If you're having to write a bunch of little classes, switch to delegates.

abstract class Actor
{
    Queue<Action> _messages = new Queue<Action>();

    protected void Submit(Action action)
    {
        // take out a lock of course
        _messages.Enqueue(action);
    }

    // also a "run" that reads and executes the 
    // message delegates on background threads
}

现在一个特定的派生actor遵循这个模式:

Now a specific derived actor follows this pattern:

class ChocolateMachineActor : Actor
{
    private void InsertCoinImpl(int value) 
    {
        // whatever...
    }

    public void InsertCoin(int value)
    {
        Submit(() => InsertCoinImpl(value));
    }
}

所以要向演员发送消息,您只需调用公共方法.私有的 Impl 方法完成了真正的工作.无需手工编写一堆消息类.

So to send a message to the actor, you just call the public methods. The private Impl method does the real work. No need to write a bunch of message classes by hand.

显然我已经省略了关于回复的内容,但这一切都可以通过更多的参数来完成.(见上面的更新).

Obviously I've left out the stuff about replying, but that can all be done with more parameters. (See update above).

这篇关于如何在 C# 中复制 F# 区分联合类型?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆