TThreadedQueue不能多个消费者? [英] TThreadedQueue not capable of multiple consumers?

查看:232
本文介绍了TThreadedQueue不能多个消费者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尝试在单个生产者多用户方案中使用TThreadedQueue(Genericics.Collections)。 (Delphi-XE)。
想法是将对象推入一个队列,并让几个工作线程排出队列。

Trying to use the TThreadedQueue (Generics.Collections) in a single producer multiple consumer scheme. (Delphi-XE). The idea is to push objects into a queue and let several worker threads draining the queue.

尽管如此,它不会按预期工作。
当两个或更多工作线程正在调用PopItem时,从TThreadedQueue抛出访问冲突。

It does not work as expected, though. When two or more worker threads are calling PopItem, access violations are thrown from the TThreadedQueue.

如果对PopItem的调用序列化为关键部分,则所有很好,

If the call to PopItem is serialized with a critical section, all is fine.

当然,TThreadedQueue应该能够处理多个消费者,所以我缺少一些东西,还是TThreadedQueue中的一个纯粹的错误?

Surely the TThreadedQueue should be able to handle multiple consumers, so am I missing something or is this a pure bug in TThreadedQueue ?

这是一个简单的例子来产生错误。

Here is a simple example to produce the error.

program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
  Windows,
  Messages,
  Classes,
  SysUtils,
  SyncObjs,
  Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName, ': ', E.Message);
        ReadLn;
      end;
  end;
end.

更新:导致TThreadedQueue崩溃的TMonitor中的错误已修复Delphi XE2。

Update : The error in TMonitor that caused TThreadedQueue to crash is fixed in Delphi XE2.

更新2 :上述测试强调队列处于空状态。 Darian Miller发现,在完全状态下强调队列,仍然可以重现XE2中的错误。 TMonitor再次出现错误。有关更多信息,请参阅下面的答案。还有一个链接到QC101114。

Update 2 : The above test stressed the queue in the empty state. Darian Miller found that stressing the queue at full state, still could reproduce the error in XE2. The error once again is in the TMonitor. See his answer below for more information. And also a link to the QC101114.

更新3
使用Delphi-XE2更新4, code> TMonitor 可以解决 TThreadedQueue 中的问题。到目前为止,我的测试无法再现 TThreadedQueue 中的任何错误。
当队列为空且已满时,测试单个生产者/多个消费者线程。
还测试了多个生产者/多个消费者。我将读取器线程和写入程序线程从1改为100,没有任何故障。但是,了解历史,我敢于别人打破 TMonitor

Update 3 : With Delphi-XE2 update 4 there was an announced fix for TMonitor that would cure the problems in TThreadedQueue. My tests so far are not able to reproduce any errors in TThreadedQueue anymore. Tested single producer/multiple consumer threads when queue is empty and full. Also tested multiple producers/multiple consumers. I varied the reader threads and writer threads from 1 to 100 without any glitch. But knowing the history, I dare others to break TMonitor.

推荐答案

嗯,很难确定没有很多测试,但它肯定看起来像是一个错误, TThreadedQueue或TMonitor。无论哪种方式都在RTL中,而不是您的代码。您应该将其作为QC报告提交,并将上面的示例用作如何重现代码。

Well, it's hard to be sure without a lot of testing, but it certainly looks like this is a bug, either in TThreadedQueue or in TMonitor. Either way it's in the RTL and not your code. You ought to file this as a QC report and use your example above as the "how to reproduce" code.

这篇关于TThreadedQueue不能多个消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆