多线程Windows服务以处理Windows消息队列 [英] Multi threaded Windows service to process Windows Message Queue

查看:87
本文介绍了多线程Windows服务以处理Windows消息队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我第一次编写Windows服务.

This is my first attempt at writing a Windows service.

此Windows服务必须处理2个Windows消息队列.

This windows service has to process 2 windows message Queues.

每个Message Queue应该都有自己的线程,但是我似乎无法在适当的位置获得体系结构.

Each Message Queue should have there own thread, but I can't seem to get the Architecture in Place.

我遵循了 Windows Service能够持续运行创建一个我正在处理一个队列的线程.

I followed this Windows Service to run constantly which allowed me to create one thread in which I am Processing one Queue.

这是我的服务类别:

    protected override void OnStart(string[] args)
    {
        _thread = new Thread(WorkerThreadFunc) { Name = "Address Calculator Thread", IsBackground = true };
        _thread.Start();
    }

    private void WorkerThreadFunc()
    {
        _addressCalculator = new GACAddressCalculator();

        while (!_shutdownEvent.WaitOne(0))
        {
            _addressCalculator.StartAddressCalculation();
        }
    }



    protected override void OnStop()
    {
        _shutdownEvent.Set();
        if (!_thread.Join(5000))
        { // give the thread 5 seconds to stop
            _thread.Abort();
        }
    }

在我的GACAddressCalculator.StartAddressCalculation()中,我正在创建一个如下所示的队列处理器对象:

In My GACAddressCalculator.StartAddressCalculation() I am creating a Queue Processor Object which looks like this:

    public void StartAddressCalculation()
    {
        try
        {
            var googleQueue = new GISGoogleQueue("VehMonLogGISGoogle", 1, _gacLogger, 1);
            googleQueue.ProccessMessageQueue();

        }
        catch (Exception ex)
        {

        }

    }

这是GISGoogleQueue:

public class GISGoogleQueue : BaseMessageQueue
{


    public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
        : base(queueName, threadCount, logger, messagesPerThread)
    {
    }

    public override void ProccessMessageQueue()
    {
        if (!MessageQueue.Exists(base.QueueName))
        {
            _logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
            return;
        }

        var messageQueue = new MessageQueue(QueueName);
        var myVehMonLog = new VehMonLog();
        var o = new Object();
        var arrTypes = new Type[2];
        arrTypes[0] = myVehMonLog.GetType();
        arrTypes[1] = o.GetType();
        messageQueue.Formatter = new XmlMessageFormatter(arrTypes);

        using (var pool = new Pool(ThreadCount))
        {

            // Infinite loop to process all messages in Queue
            for (; ; )
            {
                for (var i = 0; i < MessagesPerThread; i++)
                {
                    try
                    {
                        while (pool.TaskCount() >= MessagesPerThread) ; // Stop execution until Tasks in pool have been executed


                        var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0)); // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins

                        if (message != null) // Check if message is not Null
                        {
                            var monLog = (VehMonLog)message.Body;
                            pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
                        }

                    }
                    catch (Exception ex)
                    {

                    }

                }
            }
        }
    }

}

现在这对于1个消息队列来说可以正常工作,但是如果我要处理另一个消息队列,则不会发生,因为我在ProccessMessageQueue方法中存在无限循环.

Now this works fine for 1 Message queue, but if I want to process another message Queue it won't happen as I have an infinite loop in the ProccessMessageQueue method.

我想在单独的线程中执行每个队列.

I want to execute each Queue in a separate thread.

我认为我在WorkerThreadFunc()中犯了一个错误,我必须以某种方式从那里或OnStart()中启动两个线程.

I think I am making a mistake in WorkerThreadFunc(), I have to somehow start two threads from there or in the OnStart().

如果您对如何改善这项服务有任何建议,那也很好.

Also if you have any tips on how to improve this service would be great.

顺便说一句,我正在使用此答案中的池类 https://stackoverflow.com/a/436552/1910735表示ProccessMessageQueue

By the way I am using the Pool Class from this Answer https://stackoverflow.com/a/436552/1910735 for the thread Pool inside ProccessMessageQueue

推荐答案

我建议如下更改您的服务类(以下注释):

I would suggest changing your service class as follows (comments below):

protected override void OnStart(string[] args)
{
    _thread = new Thread(WorkerThreadFunc)
              {
                  Name = "Run Constantly Thread",
                  IsBackground = true
              };
    _thread.Start();
}

GISGoogleQueue _googleQueue1;
GISGoogleQueue _googleQueue2;
private void WorkerThreadFunc()
{
    // This thread is exclusively used to keep the service running.
    // As such, there's no real need for a while loop here.  Create
    // the necessary objects, start them, wait for shutdown, and
    // cleanup.
    _googleQueue1 = new GISGoogleQueue(...);
    _googleQueue1.Start();
    _googleQueue2 = new GISGoogleQueue(...);
    _googleQueue2.Start();

    _shutdownEvent.WaitOne();  // infinite wait

    _googleQueue1.Shutdown();
    _googleQueue2.Shutdown();
}

protected override void OnStop()
{
    _shutdownEvent.Set();
    if (!_thread.Join(5000))
    {
        // give the thread 5 seconds to stop
        _thread.Abort();
    }
}

我无视您的GACAddressCalculator.根据您的显示,它似乎是GISGoogleQueue周围的薄包装.显然,如果它确实做了您未显示的操作,则需要重新考虑它.

I'm ignoring your GACAddressCalculator. From what you showed, it appeared to be a thin wrapper around GISGoogleQueue. Obviously, if it actually does something that you didn't show, it'll need to be factored back in.

请注意,在WorkerThreadFunc()中创建了两个GISGoogleQueue对象.因此,让我们接下来看看如何创建这些对象以实现适当的线程模型.

Notice that two GISGoogleQueue objects were created in the WorkerThreadFunc(). So let's next look at how to create those objects to achieve the appropriate threading model.

public class GISGoogleQueue : BaseMessageQueue
{
    System.Threading.Thread _thread;
    System.Threading.ManualResetEvent _shutdownEvent;

    public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
         : base(queueName, threadCount, logger, messagesPerThread)
    {
        // Let this class wrap a thread object.  Create it here.
        _thread = new Thread(RunMessageQueueFunc()
                  {
                      Name = "Run Message Queue Thread " + Guid.NewGuid().ToString(),
                      IsBackground = true
                  };
        _shutdownEvent = new ManualResetEvent(false);
    }

    public Start()
    {
        _thread.Start();
    }

    public Shutdown()
    {
        _shutdownEvent.Set();
        if (!_thread.Join(5000))
        {
            // give the thread 5 seconds to stop
            _thread.Abort();
        }
    }

    private void RunMessageQueueFunc()
    {
        if (!MessageQueue.Exists(base.QueueName))
        {
            _logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
            return;
        }

        var messageQueue = new MessageQueue(QueueName);
        var myVehMonLog = new VehMonLog();
        var o = new Object();
        var arrTypes = new Type[2];
        arrTypes[0] = myVehMonLog.GetType();
        arrTypes[1] = o.GetType();
        messageQueue.Formatter = new XmlMessageFormatter(arrTypes);

        using (var pool = new Pool(ThreadCount))
        {
            // Here's where we'll wait for the shutdown event to occur.
            while (!_shutdownEvent.WaitOne(0))
            {
                for (var i = 0; i < MessagesPerThread; i++)
                {
                    try
                    {
                        // Stop execution until Tasks in pool have been executed
                        while (pool.TaskCount() >= MessagesPerThread) ;

                        // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins
                        var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0));

                        if (message != null) // Check if message is not Null
                        {
                            var monLog = (VehMonLog)message.Body;
                            pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
                        }
                    }
                    catch (Exception ex)
                    {
                    }
                }
            }
        }
    }
}

此方法围绕使用由GISGoogleQueue类包装的Thread对象.对于创建的每个GISGoogleQueue对象,您都会获得一个包装好的线程,一旦在GISGoogleQueue对象上调用Start(),该线程便会完成工作.

This approach centers around using a Thread object wrapped by the GISGoogleQueue class. For each GISGoogleQueue object you create, you get a wrapped thread that will do the work once Start() is called on the GISGoogleQueue object.

几点.在RunMessageQueueFunc()中,您正在检查队列名称是否存在.如果不是,函数将退出.发生 IF 时,线程也退出.关键是您可能希望在此过程的早期进行检查.只是一个想法.

A couple of points. In the RunMessageQueueFunc(), you're checking to see if the name of the queue exists. If it doesn't, the function exits. IF that happens, the thread exits, too. The point is that you may wish to do that check earlier in the process. Just a thought.

第二,请注意,您的无限循环已由对_shutdownEvent对象的检查代替.这样,当服务关闭时,循环将停止.为了及时起见,您需要确保通过循环的整个过程不会花费太长时间.否则,您可能会在关闭后5秒钟终止线程.仅在进行中止操作时才能确保一切正常,但应尽可能避免.

Second, note that your infinite loop has been replaced by a check against the _shutdownEvent object. That way, the loop will stop when the service shuts down. For timeliness, you'll need to make sure that a complete pass through the loop doesn't take too long. Otherwise, you may end up aborting the thread 5 seconds after shutdown. The abort is only there to make sure things are torn down, but should be avoided if possible.

我知道很多人会喜欢使用Task类来做这样的事情.您似乎在RunMessageQueueFunc()内部.但是对于在整个过程中运行的线程,我认为Task类是错误的选择,因为它占用了线程池中的线程.对我来说,Thread类是用于构建的.

I know a lot of people will prefer using the Task class to do things like this. It appears that you are inside RunMessageQueueFunc(). But for threads that run for the duration of the process, I think the Task class is the wrong choice because it ties up threads in the thread pool. To me, that what the Thread class is build for.

HTH

这篇关于多线程Windows服务以处理Windows消息队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆