线程和C#中的异步操作 [英] Threading and asynchronous operations in C#

查看:118
本文介绍了线程和C#中的异步操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是一个老的狗努力学习新把戏。我非常熟悉的一个名为PowerBuilder的语言,这种语言,当你希望异步做的事情,你在一个新的线程产卵的对象。我会重申:整个对象是在一个单独的线程实例和具有不同的执行上下文的。该对象的任何和所有的方法来执行在独立线程的上下文。

I'm an old dog trying to learn a new trick. I'm extremely familiar with a language called PowerBuilder and in that language, when you want to do things asynchronously, you spawn an object in a new thread. I'll reiterate that: the entire object is instantiated in a separate thread and has a different execution context. Any and all methods on that object execute in the context of that separate thread.

现在好了,我想实现一些异步使用C#和.NET中的线程模型执行的感觉完全不一样对我。它看起来像我在一个线程,但我可以指定(通话按通话的基础上),某些方法在不同的线程执行的实例化对象。

Well now, I'm trying to implement some asynchronous executing using C# and the threading model in .NET feels completely different to me. It looks like I'm instantiating objects in one thread but that I can specify (on a call-by-call basis) that certain methods execute in a different thread.

的差别似乎微妙的,但它的令人沮丧的我。我的老派思说,我有一个名为鲍勃的帮手。鲍勃熄灭,做的东西。新学校的思想,我的理解是正确的,是我的上午的鲍勃。如果我需要,我有时会擦我的肚子,并在同一时间拍拍我的头的

The difference seems subtle, but it's frustrating me. My old-school thinking says, "I have a helper named Bob. Bob goes off and does stuff." The new-school thinking, if I understand it right, is "I am Bob. If I need to, I can sometimes rub my belly and pat my head at the same time."

我的现实世界中的编码问题:我正在写通过TCP接收邮件,将其解析成可用数据的接口引擎,然后把这些数据到数据库中。 解析一个消息需要大约一秒钟。根据所分析的数据,该数据库操作可能需要不到一秒或它可能需要十秒钟。 (由所有时间上澄清的问题。)

My real-world coding problem: I'm writing an interface engine that accepts messages via TCP, parses them into usable data, then puts that data into a database. "Parsing" a message takes approximately one second. Depending on the parsed data, the database operation may take less than a second or it might take ten seconds. (All times made up to clarify the problem.)

我的老派的思维告诉我,我的数据库类应该生活在一个单独的线程,并有一些像的 ConcurrentQueue 。它只会旋转该队列,处理任何可能在那里。解析器,另一方面,将需要推送消息到该队列。这些消息会(代表?)之类的东西创建基于数据的顺序这个对象或更新在这个对象。这可能是值得指出的是,其实我是想在一个严格的,单线程FIFO的顺序来处理的队列中的信息。

My old-school thinking tells me that my database class should live in a separate thread and have something like a ConcurrentQueue. It would simply spin on that queue, processing anything that might be in there. The Parser, on the other hand, would need to push messages into that queue. These messages would be (delegates?) things like "Create an order based on the data in this object" or "Update an order based on the data in this object". It might be worth noting that I actually want to process the "messages" in the "queue" in a strict, single-threaded FIFO order.

基本上,我的数据库连接不能始终保持了我的解析器。我需要一种方法来确保,而我的数据库进程试图赶上我的解析器不会减慢。建议?

Basically, my database connection can't always keep up with my parser. I need a way to make sure my parser doesn't slow down while my database processes try to catch up. Advice?

- 编辑:用code!
每个人,一切都告诉我要使用 BlockingCollection 。所以这里有一个的简短的最终目标和code的解释去用它:

-- edit: with code! Everyone and everything is telling me to use BlockingCollection. So here's a brief explanation of the end goal and code to go with it:

这将是一个Windows服务。在启动时,它会产生多个环境,用含有一种dbworker和一个接口的每个环境。 接口,将有一个解析器和一个听众。

This will be a Windows service. When started, it will spawn multiple "environments", with each "environment" containing one "dbworker" and one "interface". The "interface" will have one "parser" and one "listener".

class cEnvironment {
    private cDBWorker MyDatabase;
    private cInterface MyInterface;

    public void OnStart () {
        MyDatabase = new cDBWorker ();
        MyInterface = new cInterface ();

        MyInterface.OrderReceived += this.InterfaceOrderReceivedEventHandler;

        MyDatabase.OnStart ();
        MyInterface.OnStart ();
    }

    public void OnStop () {
        MyInterface.OnStop ();
        MyDatabase.OnStop ();

        MyInterface.OrderReceived -= this.InterfaceOrderReceivedEventHandler;
    }

    void InterfaceOrderReceivedEventHandler (object sender, OrderReceivedEventArgs e) {
        MyDatabase.OrderQueue.Add (e.Order);
    }
}

class cDBWorker {
    public BlockingCollection<cOrder> OrderQueue = new BlockingCollection<cOrder> ();
    private Task ProcessingTask;

    public void OnStart () {
        ProcessingTask = Task.Factory.StartNew (() => Process (), TaskCreationOptions.LongRunning);
    }

    public void OnStop () {
        OrderQueue.CompleteAdding ();
        ProcessingTask.Wait ();
    }

    public void Process () {
        foreach (cOrder Order in OrderQueue.GetConsumingEnumerable ()) {
            switch (Order.OrderType) {
                case 1:
                    SuperFastMethod (Order);
                    break;

                case 2:
                    ReallySlowMethod (Order);
                    break;
            }
        }
    }

    public void SuperFastMethod (cOrder Order) {
    }

    public void ReallySlowMethod (cOrder Order) {
    }
}

class cInterface {
    protected cListener MyListener;
    protected cParser MyParser;

    public void OnStart () {
        MyListener = new cListener ();
        MyParser = new cParser ();

        MyListener.DataReceived += this.ListenerDataReceivedHandler;
        MyListener.OnStart ();
    }

    public void OnStop () {
        MyListener.OnStop ();
        MyListener.DataReceived -= this.ListenerDataReceivedHandler;
    }

    public event OrderReceivedEventHandler OrderReceived;

    protected virtual void OnOrderReceived (OrderReceivedEventArgs e) {
        if (OrderReceived != null)
            OrderReceived (this, e);
    }

    void ListenerDataReceivedHandler (object sender, DataReceivedEventArgs e) {
        foreach (string Message in MyParser.GetMessages (e.RawData)) {
            OnOrderReceived (new OrderReceivedEventArgs (MyParser.ParseMessage (Message)));
        }
    }

它编译。 (船舶它!)但是,这是否意味着我这样做对吗?

It compiles. (SHIP IT!) But does that mean that I'm doing it right?

推荐答案

BlockingCollection 让把这种东西一起pretty容易:

BlockingCollection makes putting this kind of thing together pretty easy:

// the queue
private BlockingCollection<Message> MessagesQueue = new BlockingCollection<Message>();


// the consumer
private MessageParser()
{
    foreach (var msg in MessagesQueue.GetConsumingEnumerable())
    {
        var parsedMessage = ParseMessage(msg);
        // do something with the parsed message
    }
}

// In your main program
// start the consumer
var consumer = Task.Factory.StartNew(() => MessageParser(),
    TaskCreationOptions.LongRunning);

// the main loop
while (messageAvailable)
{
    var msg = GetMessageFromTcp();
    // add it to the queue
    MessagesQueue.Add(msg);
}

// done receiving messages
// tell the consumer that no more messages will be added
MessagesQueue.CompleteAdding();

// wait for consumer to finish
consumer.Wait();

消费者确实队列非忙等待,所以它不是吃CPU资源时,有什么用。

The consumer does a non-busy wait on the queue, so it's not eating CPU resources when there's nothing available.

这篇关于线程和C#中的异步操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆