TThreadedQueue 不能有多个消费者? [英] TThreadedQueue not capable of multiple consumers?

查看:19
本文介绍了TThreadedQueue 不能有多个消费者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尝试在单生产者多消费者方案中使用 TThreadedQueue (Generics.Collections).(德尔福-XE).这个想法是将对象推入队列并让多个工作线程排空队列.

Trying to use the TThreadedQueue (Generics.Collections) in a single producer multiple consumer scheme. (Delphi-XE). The idea is to push objects into a queue and let several worker threads draining the queue.

不过,它没有按预期工作.当两个或多个工作线程调用 PopItem 时,会从 TThreadedQueue 中抛出访问冲突.

It does not work as expected, though. When two or more worker threads are calling PopItem, access violations are thrown from the TThreadedQueue.

如果对 PopItem 的调用使用临界区序列化,则一切正常.

If the call to PopItem is serialized with a critical section, all is fine.

当然,TThreadedQueue 应该能够处理多个消费者,所以我是否遗漏了什么或者这是 TThreadedQueue 中的一个纯粹的错误?

Surely the TThreadedQueue should be able to handle multiple consumers, so am I missing something or is this a pure bug in TThreadedQueue ?

这是一个产生错误的简单示例.

Here is a simple example to produce the error.

program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '......FastMM4FastMM4.pas',
  Windows,
  Messages,
  Classes,
  SysUtils,
  SyncObjs,
  Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName, ': ', E.Message);
        ReadLn;
      end;
  end;
end.

更新:在 Delphi XE2 中修复了导致 TThreadedQueue 崩溃的 TMonitor 中的错误.

Update : The error in TMonitor that caused TThreadedQueue to crash is fixed in Delphi XE2.

更新 2 :上述测试强调队列处于空状态.Darian Miller 发现在满状态下强调队列,仍然可以重现 XE2 中的错误.错误再次出现在 TMonitor 中.有关更多信息,请参阅下面的回答.还有一个链接到 QC101114.

Update 2 : The above test stressed the queue in the empty state. Darian Miller found that stressing the queue at full state, still could reproduce the error in XE2. The error once again is in the TMonitor. See his answer below for more information. And also a link to the QC101114.

更新 3:在 Delphi-XE2 更新 4 中,发布了一个针对 TMonitor 的修复程序,可以解决 TThreadedQueue 中的问题.到目前为止,我的测试无法再重现 TThreadedQueue 中的任何错误.当队列为空和满时测试单生产者/多消费者线程.还测试了多个生产者/多个消费者.我将读者线程和作者线程从 1 更改为 100,没有任何故障.但是知道历史,我敢让别人破解TMonitor.

Update 3 : With Delphi-XE2 update 4 there was an announced fix for TMonitor that would cure the problems in TThreadedQueue. My tests so far are not able to reproduce any errors in TThreadedQueue anymore. Tested single producer/multiple consumer threads when queue is empty and full. Also tested multiple producers/multiple consumers. I varied the reader threads and writer threads from 1 to 100 without any glitch. But knowing the history, I dare others to break TMonitor.

推荐答案

好吧,如果不进行大量测试就很难确定,但看起来这确实是一个错误,无论是在 TThreadedQueue 中还是在 TMonitor 中.无论哪种方式,它都在 RTL 中,而不是您的代码中.您应该将此作为 QC 报告提交,并使用上面的示例作为如何重现"代码.

Well, it's hard to be sure without a lot of testing, but it certainly looks like this is a bug, either in TThreadedQueue or in TMonitor. Either way it's in the RTL and not your code. You ought to file this as a QC report and use your example above as the "how to reproduce" code.

这篇关于TThreadedQueue 不能有多个消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆