为什么多线程内存分配/释放密集型应用程序不随线程数扩展? [英] Why multithreaded memory allocate/deallocate intensive application does not scale with number of threads?

查看:53
本文介绍了为什么多线程内存分配/释放密集型应用程序不随线程数扩展?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

通知:

原始帖子标题

为什么DWScript中的多线程JSON解析器不随线程数扩展?

已更改,因为此问题与使用DWScript处理JSON数据无关.问题出在Delphi XE2到XE7的默认内存管理器中(已经测试过XE2和试用版XE7),但是问题首先出现在此类应用程序中.


我有多线程Win32/Win64 vcl应用程序,该应用程序在Delphi XE2中处理JSON数据.

每个线程使用DWScript中的 TdwsJSONValue.ParseString(sJSON)解析JSON数据,使用DWScript方法读取值并将结果存储为记录.

出于测试目的,我在每个线程中处理相同的JSON数据.

单次运行需要线程内 N 秒来处理数据.将线程数增加到 M 线性(大约 M * N )会增加处理同一数据所需的单线程内时间.

结果是没有速度改善.该应用程序的其他部分(JSON数据传递,将结果存储在目标环境中)-可以按预期扩展.

可能是什么原因?任何想法表示赞赏.

补充信息:

  1. 在Win7/32和Win7/64,Win8/64从2核到12核(不带HT的系统)上进行了测试

  2. DWScript被选为最快可用的(经过大量测试,其中包括:Superobject,内置Delphi).SO的行为类似于DWS中的JSON单元.

  3. 下面是完整的控制台应用程序,阐明了该问题.要运行它,我们需要在此处提供示例json数据: https://www.dropbox.com/s/4iuv87ytpcdugk6/json1.zip?dl=0 此文件包含第一个线程的数据 json1.dat .对于最多16个线程,只需将json1.dat复制到json2.dat ... json16.dat.

    程序和数据应该在同一文件夹中.要运行:convert.exe N,其中N是线程数.

    程序将执行时间(以毫秒为单位)写入到stout-在线程中花费的时间,解析数据的时间以及释放(销毁)TdwsJSONValue对象的时间.语句 _dwsjvData.Destroy; 无法缩放.


 程序转换;{$ APPTYPE控制台}{$ R * .res}用途System.SysUtils,系统诊断系统类dwsJSON.pas中的dwsJSON,dwsStrings.pas中的dwsStrings,dwsUtils.pas中的dwsUtils,dwsXPlatform.pas中的dwsXPlatform;类型TWorkerThread =类(TThread)私人的_iUid:整数;_swWatch:TStopwatch;_lRunning:布尔值;_sFileJSonData:字符串;_fJsonData:TextFile;受保护的构造函数Create(AUid:Integer);程序执行;覆盖已发表属性Running:布尔型读取_lRunning;结尾;TConverter =类(TObject)私人的_swWatch0,_swWatch1,_swWatch2:TStopwatch;_dwsjvData:TdwsJSONValue;受保护的构造函数Create;销毁覆盖函数Calculate(AUid:Integer; AJSonData:String; var AParse,ADestroy:Integer):Integer;结尾;constMAX_THREADS = 16;变种iHowMany:整数;athWorker:指针的数组[1..MAX_THREADS];aiElapsed:整数[1..MAX_THREADS]的数组;aiElapsedParse:整数[1..MAX_THREADS]的数组;aiElapsedDestroy:整数[1..MAX_THREADS]的数组;aiFares:整数[1..MAX_THREADS]的数组;swWatchT,swWatchP:TStopwatch;构造函数TWorkerThread.Create(AUid:Integer);开始继承了Create(True);_iUid:= AUid;_swWatch:= TStopwatch.Create;_sFileJSonData:= ExtractFilePath(ParamStr(0))+'json'+修剪(IntToStr(_iUid))+'.dat';_lRunning:= False;暂停:= False;结尾;过程TWorkerThread.Execute;变种j:整数;sLine:字符串;slLines:TStringList;oS:TConverter;开始_lRunning:= True;oS:= TConverter.Create;slLines:= TStringList.Create;System.AssignFile(_fJsonData,_sFileJSonData);System.Reset(_fJsonData);j:= 0;重复System.Readln(_fJsonData,sLine);slLines.Add(sLine);Inc(j);直到(j = 50);//直到(System.Eof(_fJsonData));System.Close(_fJsonData);睡眠(1000);_swWatch.Reset;_swWatch.Start;aiFares [_iUid]:= 0;aiElapsedParse [_iUid]:= 0;aiElapsedDestroy [_iUid]:= 0;对于j:= 1到slLines.Count做aiFares [_iUid]:= aiFares [_iUid] + oS.Calculate(_iUid,slLines.Strings [j-1],aiElapsedParse [_iUid],aiElapsedDestroy [_iUid]);_swWatch.Stop;slLines.Free;os.销毁;aiElapsed [_iUid]:= _swWatch.ElapsedMilliseconds;_lRunning:= False;结尾;构造函数TConverter.Create;开始继承了Create;_swWatch0:= TStopwatch.Create;_swWatch1:= TStopwatch.Create;_swWatch2:= TStopwatch.Create;结尾;析构函数TConverter.Destroy;开始遗传;结尾;函数TConverter.Calculate(AUid:Integer; AJSonData:String; var AParse,ADestroy:Integer):Integer;变种jFare,jTotalFares,iElapsedParse,iElapsedDestroy,iElapsedTotal:整数;开始_swWatch0.Reset;_swWatch0.Start;_swWatch1.Reset;_swWatch1.Start;_dwsjvData:= TdwsJSONValue.ParseString(AJSonData);_swWatch1.Stop;iElapsedParse:= _swWatch1.ElapsedMilliseconds;如果(_dwsjvData.ValueType = jvtArray)然后开始_swWatch2.Reset;_swWatch2.Start;jTotalFares:= _dwsjvData.ElementCount;对于jFare:= 0至(jTotalFares-1)如果(_dwsjvData.Elements [jFare] .ValueType = jvtObject),则开始_swWatch1.Reset;_swWatch1.Start;_swWatch1.Stop;结尾;结尾;_swWatch1.Reset;_swWatch1.Start;_dwsjvData.Destroy;_swWatch1.Stop;iElapsedDestroy:= _swWatch1.ElapsedMilliseconds;_swWatch0.Stop;iElapsedTotal:= _swWatch0.ElapsedMilliseconds;Inc(AParse,iElapsedParse);Inc(ADestroy,iElapsedDestroy);结果:= jTotalFares;结尾;过程MultithreadStart;变种j:整数;开始对于j:= 1到iHowMany如果(athWorker [j] = nil),则开始athWorker [j]:= TWorkerThread.Create(j);TWorkerThread(athWorker [j]).FreeOnTerminate:= False;TWorkerThread(athWorker [j]).Priority:= tpNormal;结尾;结尾;过程MultithreadStop;变种j:整数;开始对于j:= 1到MAX_THREADS做如果(athWorker [j]<> nil),则开始TWorkerThread(athWorker [j]).Terminate;TWorkerThread(athWorker [j]).WaitFor;TWorkerThread(athWorker [j]).Free;athWorker [j]:= nil;结尾;结尾;程序序言;变种j:整数;开始iHowMany:= StrToInt(ParamStr(1));对于j:= 1到MAX_THREADS做athWorker [j]:= nil;swWatchT:= TStopwatch.Create;swWatchT.Reset;swWatchP:= TStopwatch.Create;swWatchP.Reset;结尾;程序RunConvert;函数__IsRunning:布尔值;变种j:整数;开始结果:= False;对于j:= 1到MAX_THREADS做结果:=结果或(((athWorker [j]<> nil)和TWorkerThread(athWorker [j]).Running);结尾;开始swWatchT.Start;MultithreadStart;睡眠(1000);而(__isRunning)做睡眠(500);MultithreadStop;swWatchT.Stop;Writeln(#13#10,'Total time:',swWatchT.ElapsedMilliseconds);结尾;程序结语;变种j:整数;开始对于j:= 1到iHowManyWriteln(#13#10,'Thread#',j,'tot.time:',aiElapsed [j],'fares:',aiFares [j],'tot.parse:',aiElapsedParse [j],'tot.destroy:',aiElapsedDestroy [j]);Readln;结尾;开始尝试序幕;RunConvert;结语;除了在E上:例外Writeln(E.ClassName,':',E.Message);结尾;结尾. 

解决方案

解决方案是将默认的Delphi XE2或XE7内存管理器与Intel®Threading Building Blocks内存管理器交换.在示例应用程序中,它缩放比例为ca.当应用程序为64位时,线程数最多为16的线性.

更新:假设正在运行的线程数少于内核数

此软件已在2cores/4ht至12cores/24ht的计算机上运行KVM虚拟化Windows 7(具有124GB RAM)的测试

有趣的是,虚拟化Win7.内存分配和释放比本地Win 7快2倍.

结论:如果在多线程(超过4-8个线程)应用程序的线程中对10kB-10MB块执行大量内存分配/释放操作,请仅使用Intel的内存管理器.

@André:感谢您给我指出正确方向的提示!

这里是带有TBB内存管理器进行测试的单元,它必须在主项目文件.dpr中的单元列表上显示为第1个.

 单位TBBMem;界面函数ExpandableGetMem(ASize:NativeInt):指针;cdecl;外部'tbbmalloc'名称'scalable_malloc';可扩展FreeMem过程(APtr:Pointer);cdecl;外部'tbbmalloc'名称'scalable_free';函数ExpandableReAlloc(APtr:Pointer; Size:NativeInt):Pointer;cdecl;外部'tbbmalloc'名称'scalable_realloc';执行函数TBBGetMem(ASize:整数):指针;开始结果:=可扩展GetMem(ASize);结尾;函数TBBFreeMem(APtr:指针):整数;开始可扩展FreeMem(APtr);结果:= 0;结尾;函数TBBReAllocMem(APtr:指针; ASize:整数):指针;开始结果:=可伸缩重新分配(APtr,ASize);结尾;constTBBMemoryManager:TMemoryManager =(GetMem:TBBGetmem;FreeMem:TBBFreeMem;ReAllocMem:TBBReAllocMem;);变种oldMemoryManager:TMemoryManager;初始化GetMemoryManager(oldMemoryManager);SetMemoryManager(TBBMemoryManager);定案SetMemoryManager(oldMemoryManager);结尾. 

Notice:

Original post title

Why multithreaded JSON parser from DWScript does not scale with number of threads?

was changed because this problem is not related to processing JSON data with DWScript. The problem is in default memory manager in Delphi XE2 to XE7 ( tested were XE2 and trial XE7 ), but problem appeared first in such type of application.


I have multithreaded Win32/Win64 vcl application which process JSON data in Delphi XE2.

Each thread parses JSON data using TdwsJSONValue.ParseString(sJSON) from DWScript, reads values using DWScript methods and stores result as records.

For testing purposes I process same JSON data in each thread.

Single thead run takes N seconds within thread to process data. Increasing number of threads to M lineary (approx. M * N) increases time within single thread necessary to process same data.

In result there is no speed improvment. Other parts of this applications ( JSON data delivery, storing results in target environment ) - scale as expected.

What could be a reason ? Any ideas appreciated.

Supplemental information:

  1. Tested on Win7/32 and Win7/64, Win8/64 from 2-core to 12-core (w/w-out HT) systems

  2. DWScript was choosen as fastest available (tested a bunch, among them: Superobject, build-in Delphi). SO behaves similar as JSON unit from DWS.

  3. Below is complete console app illustrating the problem. To run it we need sample json data available here: https://www.dropbox.com/s/4iuv87ytpcdugk6/json1.zip?dl=0 This file contains data json1.dat for first thread. For threads up to 16 just copy json1.dat to json2.dat...json16.dat.

    Program and data shoule be in the same folder. To run: convert.exe N, where N is number of threads.

    Program writes time of execution in msecs to stout - spent in thread, time of parsing data and time of releasing (Destroy) TdwsJSONValue object. Statement _dwsjvData.Destroy; does not scale.


program Convert;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils,
  System.Diagnostics,
  System.Classes,
  dwsJSON in 'dwsJSON.pas',
  dwsStrings in 'dwsStrings.pas',
  dwsUtils in 'dwsUtils.pas',
  dwsXPlatform in 'dwsXPlatform.pas';

type

  TWorkerThread = class (TThread)
  private
    _iUid:  Integer;
    _swWatch:  TStopwatch;
    _lRunning:  Boolean;

    _sFileJSonData:  String;
    _fJsonData:  TextFile;

  protected
    constructor Create (AUid: Integer);
    procedure Execute; override;

  published
    property Running: Boolean read _lRunning;

  end;

  TConverter = class (TObject)
  private
    _swWatch0, _swWatch1, _swWatch2:  TStopwatch;

    _dwsjvData:  TdwsJSONValue;

  protected
    constructor Create;
    destructor Destroy; override;

    function Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
  end;

const
  MAX_THREADS = 16;

var
  iHowMany:  Integer;
  athWorker:  array [1..MAX_THREADS] of Pointer;
  aiElapsed:  array [1..MAX_THREADS] of Integer;
  aiElapsedParse:  array [1..MAX_THREADS] of Integer;
  aiElapsedDestroy:  array [1..MAX_THREADS] of Integer;
  aiFares:  array [1..MAX_THREADS] of Integer;
  swWatchT, swWatchP:  TStopwatch;


constructor TWorkerThread.Create (AUid: Integer);
begin
  inherited Create (True);

  _iUid := AUid;
  _swWatch := TStopwatch.Create;
  _sFileJSonData := ExtractFilePath (ParamStr (0)) + 'json' + Trim (IntToStr (_iUid)) + '.dat';

  _lRunning := False;

  Suspended := False;
end;

procedure TWorkerThread.Execute;
var
  j:  Integer;
  sLine:  String;
  slLines:  TStringList;

  oS:  TConverter;
begin
  _lRunning := True;

  oS := TConverter.Create;

  slLines := TStringList.Create;
  System.AssignFile (_fJsonData, _sFileJSonData);
  System.Reset (_fJsonData);
  j := 0;
  repeat
    System.Readln (_fJsonData, sLine);
    slLines.Add (sLine);
    Inc (j);
  until (j = 50);
//  until (System.Eof (_fJsonData));
  System.Close (_fJsonData);

  Sleep (1000);

  _swWatch.Reset;
  _swWatch.Start;

  aiFares [_iUid] := 0;
  aiElapsedParse [_iUid] := 0;
  aiElapsedDestroy [_iUid] := 0;
  for j := 1 to slLines.Count do
    aiFares [_iUid] := aiFares [_iUid] + oS.Calculate (_iUid, slLines.Strings [j - 1], aiElapsedParse [_iUid], aiElapsedDestroy [_iUid]);

  _swWatch.Stop;

  slLines.Free;
  os.Destroy;

  aiElapsed [_iUid] := _swWatch.ElapsedMilliseconds;

  _lRunning := False;
end;

constructor TConverter.Create;
begin
  inherited Create;

  _swWatch0 := TStopwatch.Create;
  _swWatch1 := TStopwatch.Create;
  _swWatch2 := TStopwatch.Create;
end;

destructor TConverter.Destroy;
begin
  inherited;
end;

function TConverter.Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
var
  jFare, jTotalFares, iElapsedParse, iElapsedDestroy, iElapsedTotal:  Integer;
begin
  _swWatch0.Reset;
  _swWatch0.Start;

  _swWatch1.Reset;
  _swWatch1.Start;
  _dwsjvData := TdwsJSONValue.ParseString (AJSonData);
  _swWatch1.Stop;
  iElapsedParse := _swWatch1.ElapsedMilliseconds;

  if (_dwsjvData.ValueType = jvtArray) then
  begin
    _swWatch2.Reset;
    _swWatch2.Start;

    jTotalFares := _dwsjvData.ElementCount;
    for jFare := 0 to (jTotalFares - 1) do
      if (_dwsjvData.Elements [jFare].ValueType = jvtObject) then
      begin

        _swWatch1.Reset;
        _swWatch1.Start;

        _swWatch1.Stop;
      end;
  end;

  _swWatch1.Reset;
  _swWatch1.Start;
  _dwsjvData.Destroy;
  _swWatch1.Stop;
  iElapsedDestroy := _swWatch1.ElapsedMilliseconds;

  _swWatch0.Stop;
  iElapsedTotal := _swWatch0.ElapsedMilliseconds;

  Inc (AParse, iElapsedParse);
  Inc (ADestroy, iElapsedDestroy);

  result := jTotalFares;
end;

procedure MultithreadStart;
var
  j:  Integer;
begin
  for j := 1 to iHowMany do
    if (athWorker [j] = nil) then
    begin
      athWorker [j] := TWorkerThread.Create (j);

      TWorkerThread (athWorker [j]).FreeOnTerminate := False;
      TWorkerThread (athWorker [j]).Priority := tpNormal;
    end;
end;

procedure MultithreadStop;
var
  j:  Integer;
begin
  for j := 1 to MAX_THREADS do
    if (athWorker [j] <> nil) then
    begin
      TWorkerThread (athWorker [j]).Terminate;
      TWorkerThread (athWorker [j]).WaitFor;

      TWorkerThread (athWorker [j]).Free;
      athWorker [j] := nil;
    end;
end;

procedure Prologue;
var
  j:  Integer;
begin
  iHowMany := StrToInt (ParamStr (1));

  for j := 1 to MAX_THREADS do
    athWorker [j] := nil;

  swWatchT := TStopwatch.Create;
  swWatchT.Reset;

  swWatchP := TStopwatch.Create;
  swWatchP.Reset;
end;

procedure RunConvert;

  function __IsRunning: Boolean;
  var
    j:  Integer;
  begin
    result := False;
    for j := 1 to MAX_THREADS do
      result := result or ((athWorker [j] <> nil) and TWorkerThread (athWorker [j]).Running);
  end;

begin

  swWatchT.Start;

  MultithreadStart;

  Sleep (1000);
  while (__isRunning) do
    Sleep (500);

  MultithreadStop;

  swWatchT.Stop;
  Writeln (#13#10, 'Total time:', swWatchT.ElapsedMilliseconds);
end;

procedure Epilogue;
var
  j:  Integer;
begin
  for j := 1 to iHowMany do
    Writeln ( #13#10, 'Thread # ', j, '  tot.time:', aiElapsed [j], '  fares:', aiFares [j], '  tot.parse:', aiElapsedParse [j], '  tot.destroy:', aiElapsedDestroy [j]);

  Readln;
end;

begin
  try
    Prologue;
    RunConvert;
    Epilogue;

  except
    on E: Exception do
      Writeln (E.ClassName, ': ', E.Message);
  end;
end.

解决方案

The solution is exchange default Delphi XE2 or XE7 memory manager with Intel® Threading Building Blocks memory manager. In example application it scales ca. lineary with number of threads up to 16 when app is 64 bits.

update: with assumption that number of threads running is less than number of cores

This was tested on machines from 2cores/4ht to 12cores/24ht running KVM virtualized Windows 7 with 124GB RAM

Interesting thing is virtualizing Win 7. memory allocation and deallocation is from 2 x faster as in native Win 7.

Conclusion: if you do a lot of memory allocation / deallocation operations of 10kB-10MB blocks in threads of multithreaded ( more than 4-8 threads) application - use only memory manager from Intel.

@André: thanks for tip pointing me to right direction!

Here is unit with TBB memory manager taken for tests, it has to appear as 1st on unit list in main project file .dpr

unit TBBMem;

interface

function  ScalableGetMem  (ASize: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_malloc';
procedure ScalableFreeMem (APtr: Pointer); cdecl; external 'tbbmalloc' name 'scalable_free';
function  ScalableReAlloc (APtr: Pointer; Size: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_realloc';

implementation

Function TBBGetMem (ASize: Integer): Pointer;
begin
  result := ScalableGetMem (ASize);
end;

Function TBBFreeMem (APtr: Pointer): Integer;
begin
  ScalableFreeMem (APtr);
  result := 0;
end;

Function TBBReAllocMem (APtr: Pointer; ASize: Integer): Pointer;
begin
  result := ScalableRealloc (APtr, ASize);
end;

const
  TBBMemoryManager:  TMemoryManager = ( GetMem: TBBGetmem;
                                        FreeMem: TBBFreeMem;
                                        ReAllocMem:  TBBReAllocMem; );
var
  oldMemoryManager:  TMemoryManager;

initialization
  GetMemoryManager (oldMemoryManager);
  SetMemoryManager (TBBMemoryManager);

finalization
  SetMemoryManager (oldMemoryManager);

end.

这篇关于为什么多线程内存分配/释放密集型应用程序不随线程数扩展?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆