通过异步接口从工作线程使用WCF服务,我如何确保活动从客户端&QUOT发送;为了" [英] Using WCF service via async interface from worker thread, how do I ensure that events are sent from the client "in order"

查看:88
本文介绍了通过异步接口从工作线程使用WCF服务,我如何确保活动从客户端&QUOT发送;为了"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写一个Silverlight类库抽象接口WCF服务。 WCF服务提供了一个集中日志记录服务。 Silverlight的类库提供了日志记录简化log4net的一样的界面(logger.Info,logger.Warn等)。从类库我打算提供的选项,使得记录的消息可以在客户端上被累积并在突发发送到WCF日志服务,而不是像它发生发送的每个消息。通常,这是工作良好。类库确实积累的消息,它并发送消息的集合到WCF日志服务,他们是由底层的日志框架记录。

I am writing a Silverlight class library to abstract the interface to a WCF service. The WCF service provides a centralized logging service. The Silverlight class library provides a simplified log4net-like interface (logger.Info, logger.Warn, etc) for logging. From the class library I plan to provide options such that logged messages can be accumulated on the client and sent in "bursts" to the WCF logging service, rather than sending each message as it occurs. Generally, this is working well. The class library does accumulate messages and it does send collections of messages to the WCF logging service, where they are logged by an underlying logging framework.

我现在的问题是,信息(从单个客户端使用一个线程 - 所有的日志code在按钮点击事件)正在成为日志服务交错。我意识到,这至少一部分是可能是由于实例化(PerCall)或同步的WCF日志服务的。然而,它也似乎我的消息在这样快速连续即,该上留下异步调用消息的脉冲串实际上是在比生成它们以不同的顺序离去客户端发生

My current problem is that the messages (from a single client with a single thread - all logging code is in button click events) are becoming interleaved in the logging service. I realize that the at least part of this is probably due to the instancing (PerCall) or Synchronization of the WCF logging service. However, it also seems that my messages are occurring in such rapid succession that that the "bursts" of messages leaving on the async calls are actually "leaving" the client in a different order than they were generated.

我试图这里与建立一个生产者消费者队列描述轻微的(或者应该说是轻微与空气引号)的变化,工作方法块(WaitOne的),直到异步调用返回(即直到异步回调执行)。我们的想法是,当消息之一突发被发送到的WCF日志服务,队列应该等待,直到该脉冲串已经在发送下一个突发之前被处理。

I have tried to set up a producer consumer queue as described here with a slight (or should that be "slight" with air quotes) change that the Work method blocks (WaitOne) until the async call returns (i.e. until the async callback executes). The idea is that when one burst of messages is sent to the WCF logging service, the queue should wait until that burst has been processed before sending the next burst.

也许什么,我试图做的是不可行的,或者我试图解决错误的问题,(也许我只是不知道我在做什么!)。

Maybe what I am trying to do is not feasible, or maybe I am trying to solve the wrong problem, (or maybe I just don't know what I am doing!).

总之,这里是我的生产者/消费者队列code:

Anyway, here is my producer/consumer queue code:

  internal class ProducerConsumerQueue : IDisposable
  {
    EventWaitHandle wh = new AutoResetEvent(false);
    Thread worker;
    readonly object locker = new object();
    Queue<ObservableCollection<LoggingService.LogEvent>> logEventQueue = new Queue<ObservableCollection<LoggingService.LogEvent>>();

    LoggingService.ILoggingService loggingService;

    internal ProducerConsumerQueue(LoggingService.ILoggingService loggingService)
    {
      this.loggingService = loggingService;
      worker = new Thread(Work);
      worker.Start();
    }

    internal void EnqueueLogEvents(ObservableCollection<LoggingService.LogEvent> logEvents)
    {
      //Queue the next burst of messages
      lock(locker)
      {
        logEventQueue.Enqueue(logEvents);
        //Is this Set conflicting with the WaitOne on the async call in Work?
        wh.Set();        
      }
    }

    private void Work()
    {
      while(true)
      {
        ObservableCollection<LoggingService.LogEvent> events = null;

        lock(locker)
        {
          if (logEventQueue.Count > 0)
          {
            events = logEventQueue.Dequeue();
            if (events == null || events.Count == 0) return;            
          }
        }

        if (events != null && events.Count > 0)
        {
          System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count);

          //
          // This seems to be the key...
          // Send one burst of messages via an async call and wait until the async call completes.
          //
          loggingService.BeginLogEvents(events, ar =>
          {
            try
            {
              loggingService.EndLogEvents(ar);
              System.Diagnostics.Debug.WriteLine("3. Work - Back");
              wh.Set();
            }
            catch (Exception ex)
            {
            }
          }, null);

          System.Diagnostics.Debug.WriteLine("2. Work - Waiting");

          wh.WaitOne();

          System.Diagnostics.Debug.WriteLine("4. Work - Finished");
        }
        else
        {
          wh.WaitOne();
        }
      }
    }

    #region IDisposable Members

    public void Dispose()
    {
      EnqueueLogEvents(null);
      worker.Join();
      wh.Close();
    }

    #endregion
  }

在我的测试中它基本上是这样调用:

In my test it is essentially called like this:

//Inside of LogManager, get the LoggingService and set up the queue.
ILoggingService loggingService = GetTheLoggingService();
ProducerConsumerQueue loggingQueue = new ProducerConsumerQueue(loggingService);

//Inside of client code, get a logger and log with it
ILog logger = LogManager.GetLogger("test");

for (int i = 0; i < 100; i++)
{
  logger.InfoFormat("logging message [{0}]", i);
}

在内部,记录器/日志管理并称组消息到队列前累积日志消息的一定数量(比如25)。事情是这样的:

Internally, logger/LogManager accumulates some number of logging messages (say 25) before adding that group of messages to the queue. Something like this:

internal void AddNewMessage(string message)
{
  lock(logMessages)
  {
    logMessages.Add(message);
    if (logMessages.Count >= 25)
    {
      ObservableCollection<LogMessage> messages = new ObservableCollection<LogMessage>(logMessages);
      logMessages.Clear();
      loggingQueue.EnqueueLogEvents(messages);
    }
  }
}

因此​​,在这种情况下,我会希望有4个突发每25消息。根据我在ProducerConsumerQueue code调试语句(?也许不是调试最好的方法),我希望看到这样的事情:

So, in this case I would expect to have 4 bursts of 25 messages each. Based on the Debug statements in my ProducerConsumerQueue code (maybe not the best way to debug this?), I would expect to see something like this:


  1. 工作 - 发送25个事件

  2. 工作 - 等待

  3. 工作 - 返回

  4. 工作 - 成品

重复4次。

相反,我看到的是这样的:

Instead I am seeing something like this:

* 1。工作 - 发送25个事件

*1. Work - Sending 25 events

* 2。工作 - 等待

*2. Work - Waiting

* 4。工作 - 成品

*4. Work - Finished

* 1。工作 - 发送25个事件

*1. Work - Sending 25 events

* 2。工作 - 等待

*2. Work - Waiting

* 3。工作 - 返回

*3. Work - Back

* 4。工作 - 成品

*4. Work - Finished

* 1。工作 - 发送25个事件

*1. Work - Sending 25 events

* 2。工作 - 等待

*2. Work - Waiting

* 3。工作 - 返回

*3. Work - Back

* 4。工作 - 成品

*4. Work - Finished

* 1。工作 - 发送25个事件

*1. Work - Sending 25 events

* 2。工作 - 等待

*2. Work - Waiting

* 3。工作 - 返回

*3. Work - Back

* 3。工作 - 返回

*3. Work - Back

* 4。工作 - 成品

*4. Work - Finished

(由领先的*,以使线不会受到所以autonumbered)

(Added leading * so that the lines would not be autonumbered by SO)

我想我本来预计,队列将允许消息多次爆发无以复加,但它会完全处理一个突发(等待的acync调用完成)处理一个突发之前。它似乎并没有被这样做。它似乎并没有在异步调用完成可靠地等待着。我确实有设置来电的 EnqueueLogEvents ,这也许就是取消的WaitOne 工作方法?

I guess I would have expected that, the queue would have allowed multiple bursts of messages to be added, but that it would completely process one burst (waiting on the acync call to complete) before processing the next burst. It doesn't seem to be doing this. It does not seem to be reliably waiting on the completion of the async call. I do have a call to Set in the EnqueueLogEvents, maybe that is cancelling the WaitOne from the Work method?

所以,我有几个问题:
1.请问我是什么,我试图完成有意义的解释(是我的解释清楚,不能是一个好主意或没有)?

So, I have a few questions: 1. Does my explanation of what I am trying to accomplish make sense (is my explanation clear, not is it a good idea or not)?


  1. 就是我想?(发送 - 从客户端 - 从单一线程的消息,因为它们发生在同一时间完全处理一组消息的顺序)一个好主意。

  1. Is what I am trying to (transmit - from the client - the messages from a single thread, in the order that they occurred, completely processing one set of messages at a time) a good idea?

我是收?

能不能做到?

如果不能做到?

感谢您的帮助!


更多的调查和感谢布赖恩的建议后,我们能够得到这个工作。我抄修改code。关键是,我们现在使用为瓦等严格处理为ProducerConsumerQueue功能。而不是使用WH等待异步调用来完成,我们现在等待res.AsyncWaitHandle,这是由BeginLogEvents调用返回。

After more investigation and thanks to Brian's suggestion, we were able to get this working. I have copied the modified code. The key is that we are now using the "wh" wait handle strictly for ProducerConsumerQueue functions. Rather than using wh to wait for the async call to complete, we are now waiting on res.AsyncWaitHandle, which is returned by the BeginLogEvents call.

  internal class LoggingQueue : IDisposable
  {
    EventWaitHandle wh = new AutoResetEvent(false);
    Thread worker;
    readonly object locker = new object();
    bool working = false;

    Queue<ObservableCollection<LoggingService.LogEvent>> logEventQueue = new Queue<ObservableCollection<LoggingService.LogEvent>>();

    LoggingService.ILoggingService loggingService;

    internal LoggingQueue(LoggingService.ILoggingService loggingService)
    {
      this.loggingService = loggingService;
      worker = new Thread(Work);
      worker.Start();
    }

    internal void EnqueueLogEvents(ObservableCollection<LoggingService.LogEvent> logEvents)
    {
      lock (locker)
      {
        logEventQueue.Enqueue(logEvents);

        //System.Diagnostics.Debug.WriteLine("EnqueueLogEvents calling Set");

        wh.Set();
      }
    }

    private void Work()
    {
      while (true)
      {
        ObservableCollection<LoggingService.LogEvent> events = null;

        lock (locker)
        {
          if (logEventQueue.Count > 0)
          {
            events = logEventQueue.Dequeue();
            if (events == null || events.Count == 0) return;
          }
        }

        if (events != null && events.Count > 0)
        {
          //System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count);

          IAsyncResult res = loggingService.BeginLogEvents(events, ar =>
          {
            try
            {
              loggingService.EndLogEvents(ar);
              //System.Diagnostics.Debug.WriteLine("3. Work - Back");
            }
            catch (Exception ex)
            {
            }
          }, null);

          //System.Diagnostics.Debug.WriteLine("2. Work - Waiting");

          // Block until async call returns.  We are doing this so that we can be sure that all logging messages
          // are sent FROM the client in the order they were generated.  ALSO, we don't want interleave blocks of logging
          // messages from the same client by sending a new block of messages before the previous block has been
          // completely processed.

          res.AsyncWaitHandle.WaitOne();

          //System.Diagnostics.Debug.WriteLine("4. Work - Finished");
        }
        else
        {
          wh.WaitOne();
        }
      }
    }

    #region IDisposable Members

    public void Dispose()
    {
      EnqueueLogEvents(null);
      worker.Join();
      wh.Close();
    }

    #endregion
  }

正如我在最初的问题,在我的意见,乔恩和Brian提到的,我还是不知道做这一切的工作是个好主意,但至少code做什么,我是想做。这意味着,至少我有做这种方式或其他方式(如还原事实后顺序)的选择,而不是没有选择。

As I mentioned in my initial question and in my comments to Jon and Brian, I still don't know if doing all of this work is a good idea, but at least the code does what I wanted it to do. That means that I at least have the choice of doing it this way or some other way (such as restoring order after the fact) rather than not having the choice.

推荐答案

你所得到的那种测序的原因是因为你试图使用相同的等待句柄的生产者 - 消费者队列使用不同的目的。这会导致各种混乱。在某些时候的事情将每况愈下,并且队列会得到最终活锁定。你真的应该创建一个单独的的WaitHandle 来等待日志服务的完成。或者,如果 BeginLoggingEvents 符合标准模式它会返回一个的IAsyncResult 包含 WaitHandle的,你可以改用创建自己的。

The reason you are getting that kind of sequencing is because you are trying to use the same wait handle that the producer-consumer queue is using for a different purpose. That is going to cause all kinds of chaos. At some point things will go from bad to worse and the queue will get live-locked eventually. You really should create a separate WaitHandle to wait for completion of the logging service. Or if the BeginLoggingEvents fits the standard pattern it will return a IAsyncResult that contains a WaitHandle that you can use instead of creating your own.

作为一个方面说明,我真的不喜欢psented的Albarahi网站上的生产者 - 消费者模式$ P $。的问题是,它不适合多个消费者安全(显然这是不关心你的)。我说,所有应有的尊重,因为我觉得他的网站是多线程编程最好的资源之一。如果 BlockingCollection 是提供给你然后使用这个来代替。

As a side note, I really do not like the producer-consumer pattern presented on the Albarahi website. The problem is that it is not safe for multiple consumers (obviously that is of no concern to you). And I say that with all due respect because I think his website is one of the best resources for multithreaded programming. If BlockingCollection is available to you then use that instead.

这篇关于通过异步接口从工作线程使用WCF服务,我如何确保活动从客户端&QUOT发送;为了&QUOT;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆