为什么多线程内存分配/释放密集型应用程序不随线程数扩展? [英] Why multithreaded memory allocate/deallocate intensive application does not scale with number of threads?
问题描述
通知:
原始帖子标题
为什么DWScript中的多线程JSON解析器不随线程数扩展?
已更改,因为此问题与使用DWScript处理JSON数据无关.问题出在Delphi XE2到XE7的默认内存管理器中(已经测试过XE2和试用版XE7),但是问题首先出现在此类应用程序中.
我有多线程Win32/Win64 vcl应用程序,该应用程序在Delphi XE2中处理JSON数据.
每个线程使用DWScript中的 TdwsJSONValue.ParseString(sJSON)
解析JSON数据,使用DWScript方法读取值并将结果存储为记录.
出于测试目的,我在每个线程中处理相同的JSON数据.
单次运行需要线程内 N
秒来处理数据.将线程数增加到 M
线性(大约 M * N
)会增加处理同一数据所需的单线程内时间.
结果是没有速度改善.该应用程序的其他部分(JSON数据传递,将结果存储在目标环境中)-可以按预期扩展.
可能是什么原因?任何想法表示赞赏.
补充信息:
-
在Win7/32和Win7/64,Win8/64从2核到12核(不带HT的系统)上进行了测试
-
DWScript被选为最快可用的(经过大量测试,其中包括:Superobject,内置Delphi).SO的行为类似于DWS中的JSON单元.
-
下面是完整的控制台应用程序,阐明了该问题.要运行它,我们需要在此处提供示例json数据: https://www.dropbox.com/s/4iuv87ytpcdugk6/json1.zip?dl=0 此文件包含第一个线程的数据
json1.dat
.对于最多16个线程,只需将json1.dat复制到json2.dat ... json16.dat.程序和数据应该在同一文件夹中.要运行:convert.exe N,其中N是线程数.
程序将执行时间(以毫秒为单位)写入到stout-在线程中花费的时间,解析数据的时间以及释放(销毁)TdwsJSONValue对象的时间.语句
_dwsjvData.Destroy;
无法缩放.
程序转换;{$ APPTYPE控制台}{$ R * .res}用途System.SysUtils,系统诊断系统类dwsJSON.pas中的dwsJSON,dwsStrings.pas中的dwsStrings,dwsUtils.pas中的dwsUtils,dwsXPlatform.pas中的dwsXPlatform;类型TWorkerThread =类(TThread)私人的_iUid:整数;_swWatch:TStopwatch;_lRunning:布尔值;_sFileJSonData:字符串;_fJsonData:TextFile;受保护的构造函数Create(AUid:Integer);程序执行;覆盖已发表属性Running:布尔型读取_lRunning;结尾;TConverter =类(TObject)私人的_swWatch0,_swWatch1,_swWatch2:TStopwatch;_dwsjvData:TdwsJSONValue;受保护的构造函数Create;销毁覆盖函数Calculate(AUid:Integer; AJSonData:String; var AParse,ADestroy:Integer):Integer;结尾;constMAX_THREADS = 16;变种iHowMany:整数;athWorker:指针的数组[1..MAX_THREADS];aiElapsed:整数[1..MAX_THREADS]的数组;aiElapsedParse:整数[1..MAX_THREADS]的数组;aiElapsedDestroy:整数[1..MAX_THREADS]的数组;aiFares:整数[1..MAX_THREADS]的数组;swWatchT,swWatchP:TStopwatch;构造函数TWorkerThread.Create(AUid:Integer);开始继承了Create(True);_iUid:= AUid;_swWatch:= TStopwatch.Create;_sFileJSonData:= ExtractFilePath(ParamStr(0))+'json'+修剪(IntToStr(_iUid))+'.dat';_lRunning:= False;暂停:= False;结尾;过程TWorkerThread.Execute;变种j:整数;sLine:字符串;slLines:TStringList;oS:TConverter;开始_lRunning:= True;oS:= TConverter.Create;slLines:= TStringList.Create;System.AssignFile(_fJsonData,_sFileJSonData);System.Reset(_fJsonData);j:= 0;重复System.Readln(_fJsonData,sLine);slLines.Add(sLine);Inc(j);直到(j = 50);//直到(System.Eof(_fJsonData));System.Close(_fJsonData);睡眠(1000);_swWatch.Reset;_swWatch.Start;aiFares [_iUid]:= 0;aiElapsedParse [_iUid]:= 0;aiElapsedDestroy [_iUid]:= 0;对于j:= 1到slLines.Count做aiFares [_iUid]:= aiFares [_iUid] + oS.Calculate(_iUid,slLines.Strings [j-1],aiElapsedParse [_iUid],aiElapsedDestroy [_iUid]);_swWatch.Stop;slLines.Free;os.销毁;aiElapsed [_iUid]:= _swWatch.ElapsedMilliseconds;_lRunning:= False;结尾;构造函数TConverter.Create;开始继承了Create;_swWatch0:= TStopwatch.Create;_swWatch1:= TStopwatch.Create;_swWatch2:= TStopwatch.Create;结尾;析构函数TConverter.Destroy;开始遗传;结尾;函数TConverter.Calculate(AUid:Integer; AJSonData:String; var AParse,ADestroy:Integer):Integer;变种jFare,jTotalFares,iElapsedParse,iElapsedDestroy,iElapsedTotal:整数;开始_swWatch0.Reset;_swWatch0.Start;_swWatch1.Reset;_swWatch1.Start;_dwsjvData:= TdwsJSONValue.ParseString(AJSonData);_swWatch1.Stop;iElapsedParse:= _swWatch1.ElapsedMilliseconds;如果(_dwsjvData.ValueType = jvtArray)然后开始_swWatch2.Reset;_swWatch2.Start;jTotalFares:= _dwsjvData.ElementCount;对于jFare:= 0至(jTotalFares-1)如果(_dwsjvData.Elements [jFare] .ValueType = jvtObject),则开始_swWatch1.Reset;_swWatch1.Start;_swWatch1.Stop;结尾;结尾;_swWatch1.Reset;_swWatch1.Start;_dwsjvData.Destroy;_swWatch1.Stop;iElapsedDestroy:= _swWatch1.ElapsedMilliseconds;_swWatch0.Stop;iElapsedTotal:= _swWatch0.ElapsedMilliseconds;Inc(AParse,iElapsedParse);Inc(ADestroy,iElapsedDestroy);结果:= jTotalFares;结尾;过程MultithreadStart;变种j:整数;开始对于j:= 1到iHowMany如果(athWorker [j] = nil),则开始athWorker [j]:= TWorkerThread.Create(j);TWorkerThread(athWorker [j]).FreeOnTerminate:= False;TWorkerThread(athWorker [j]).Priority:= tpNormal;结尾;结尾;过程MultithreadStop;变种j:整数;开始对于j:= 1到MAX_THREADS做如果(athWorker [j]<> nil),则开始TWorkerThread(athWorker [j]).Terminate;TWorkerThread(athWorker [j]).WaitFor;TWorkerThread(athWorker [j]).Free;athWorker [j]:= nil;结尾;结尾;程序序言;变种j:整数;开始iHowMany:= StrToInt(ParamStr(1));对于j:= 1到MAX_THREADS做athWorker [j]:= nil;swWatchT:= TStopwatch.Create;swWatchT.Reset;swWatchP:= TStopwatch.Create;swWatchP.Reset;结尾;程序RunConvert;函数__IsRunning:布尔值;变种j:整数;开始结果:= False;对于j:= 1到MAX_THREADS做结果:=结果或(((athWorker [j]<> nil)和TWorkerThread(athWorker [j]).Running);结尾;开始swWatchT.Start;MultithreadStart;睡眠(1000);而(__isRunning)做睡眠(500);MultithreadStop;swWatchT.Stop;Writeln(#13#10,'Total time:',swWatchT.ElapsedMilliseconds);结尾;程序结语;变种j:整数;开始对于j:= 1到iHowManyWriteln(#13#10,'Thread#',j,'tot.time:',aiElapsed [j],'fares:',aiFares [j],'tot.parse:',aiElapsedParse [j],'tot.destroy:',aiElapsedDestroy [j]);Readln;结尾;开始尝试序幕;RunConvert;结语;除了在E上:例外Writeln(E.ClassName,':',E.Message);结尾;结尾.
解决方案是将默认的Delphi XE2或XE7内存管理器与Intel®Threading Building Blocks内存管理器交换.在示例应用程序中,它缩放比例为ca.当应用程序为64位时,线程数最多为16的线性.
更新:假设正在运行的线程数少于内核数
此软件已在2cores/4ht至12cores/24ht的计算机上运行KVM虚拟化Windows 7(具有124GB RAM)的测试
有趣的是,虚拟化Win7.内存分配和释放比本地Win 7快2倍.
结论:如果在多线程(超过4-8个线程)应用程序的线程中对10kB-10MB块执行大量内存分配/释放操作,请仅使用Intel的内存管理器.
@André:感谢您给我指出正确方向的提示!
这里是带有TBB内存管理器进行测试的单元,它必须在主项目文件.dpr中的单元列表上显示为第1个.
单位TBBMem;界面函数ExpandableGetMem(ASize:NativeInt):指针;cdecl;外部'tbbmalloc'名称'scalable_malloc';可扩展FreeMem过程(APtr:Pointer);cdecl;外部'tbbmalloc'名称'scalable_free';函数ExpandableReAlloc(APtr:Pointer; Size:NativeInt):Pointer;cdecl;外部'tbbmalloc'名称'scalable_realloc';执行函数TBBGetMem(ASize:整数):指针;开始结果:=可扩展GetMem(ASize);结尾;函数TBBFreeMem(APtr:指针):整数;开始可扩展FreeMem(APtr);结果:= 0;结尾;函数TBBReAllocMem(APtr:指针; ASize:整数):指针;开始结果:=可伸缩重新分配(APtr,ASize);结尾;constTBBMemoryManager:TMemoryManager =(GetMem:TBBGetmem;FreeMem:TBBFreeMem;ReAllocMem:TBBReAllocMem;);变种oldMemoryManager:TMemoryManager;初始化GetMemoryManager(oldMemoryManager);SetMemoryManager(TBBMemoryManager);定案SetMemoryManager(oldMemoryManager);结尾.
Notice:
Original post title
Why multithreaded JSON parser from DWScript does not scale with number of threads?
was changed because this problem is not related to processing JSON data with DWScript. The problem is in default memory manager in Delphi XE2 to XE7 ( tested were XE2 and trial XE7 ), but problem appeared first in such type of application.
I have multithreaded Win32/Win64 vcl application which process JSON data in Delphi XE2.
Each thread parses JSON data using TdwsJSONValue.ParseString(sJSON)
from DWScript, reads values using DWScript methods and stores result as records.
For testing purposes I process same JSON data in each thread.
Single thead run takes N
seconds within thread to process data. Increasing number of threads to M
lineary (approx. M * N
) increases time within single thread necessary to process same data.
In result there is no speed improvment. Other parts of this applications ( JSON data delivery, storing results in target environment ) - scale as expected.
What could be a reason ? Any ideas appreciated.
Supplemental information:
Tested on Win7/32 and Win7/64, Win8/64 from 2-core to 12-core (w/w-out HT) systems
DWScript was choosen as fastest available (tested a bunch, among them: Superobject, build-in Delphi). SO behaves similar as JSON unit from DWS.
Below is complete console app illustrating the problem. To run it we need sample json data available here: https://www.dropbox.com/s/4iuv87ytpcdugk6/json1.zip?dl=0 This file contains data
json1.dat
for first thread. For threads up to 16 just copy json1.dat to json2.dat...json16.dat.Program and data shoule be in the same folder. To run: convert.exe N, where N is number of threads.
Program writes time of execution in msecs to stout - spent in thread, time of parsing data and time of releasing (Destroy) TdwsJSONValue object. Statement
_dwsjvData.Destroy;
does not scale.
program Convert;
{$APPTYPE CONSOLE}
{$R *.res}
uses
System.SysUtils,
System.Diagnostics,
System.Classes,
dwsJSON in 'dwsJSON.pas',
dwsStrings in 'dwsStrings.pas',
dwsUtils in 'dwsUtils.pas',
dwsXPlatform in 'dwsXPlatform.pas';
type
TWorkerThread = class (TThread)
private
_iUid: Integer;
_swWatch: TStopwatch;
_lRunning: Boolean;
_sFileJSonData: String;
_fJsonData: TextFile;
protected
constructor Create (AUid: Integer);
procedure Execute; override;
published
property Running: Boolean read _lRunning;
end;
TConverter = class (TObject)
private
_swWatch0, _swWatch1, _swWatch2: TStopwatch;
_dwsjvData: TdwsJSONValue;
protected
constructor Create;
destructor Destroy; override;
function Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
end;
const
MAX_THREADS = 16;
var
iHowMany: Integer;
athWorker: array [1..MAX_THREADS] of Pointer;
aiElapsed: array [1..MAX_THREADS] of Integer;
aiElapsedParse: array [1..MAX_THREADS] of Integer;
aiElapsedDestroy: array [1..MAX_THREADS] of Integer;
aiFares: array [1..MAX_THREADS] of Integer;
swWatchT, swWatchP: TStopwatch;
constructor TWorkerThread.Create (AUid: Integer);
begin
inherited Create (True);
_iUid := AUid;
_swWatch := TStopwatch.Create;
_sFileJSonData := ExtractFilePath (ParamStr (0)) + 'json' + Trim (IntToStr (_iUid)) + '.dat';
_lRunning := False;
Suspended := False;
end;
procedure TWorkerThread.Execute;
var
j: Integer;
sLine: String;
slLines: TStringList;
oS: TConverter;
begin
_lRunning := True;
oS := TConverter.Create;
slLines := TStringList.Create;
System.AssignFile (_fJsonData, _sFileJSonData);
System.Reset (_fJsonData);
j := 0;
repeat
System.Readln (_fJsonData, sLine);
slLines.Add (sLine);
Inc (j);
until (j = 50);
// until (System.Eof (_fJsonData));
System.Close (_fJsonData);
Sleep (1000);
_swWatch.Reset;
_swWatch.Start;
aiFares [_iUid] := 0;
aiElapsedParse [_iUid] := 0;
aiElapsedDestroy [_iUid] := 0;
for j := 1 to slLines.Count do
aiFares [_iUid] := aiFares [_iUid] + oS.Calculate (_iUid, slLines.Strings [j - 1], aiElapsedParse [_iUid], aiElapsedDestroy [_iUid]);
_swWatch.Stop;
slLines.Free;
os.Destroy;
aiElapsed [_iUid] := _swWatch.ElapsedMilliseconds;
_lRunning := False;
end;
constructor TConverter.Create;
begin
inherited Create;
_swWatch0 := TStopwatch.Create;
_swWatch1 := TStopwatch.Create;
_swWatch2 := TStopwatch.Create;
end;
destructor TConverter.Destroy;
begin
inherited;
end;
function TConverter.Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
var
jFare, jTotalFares, iElapsedParse, iElapsedDestroy, iElapsedTotal: Integer;
begin
_swWatch0.Reset;
_swWatch0.Start;
_swWatch1.Reset;
_swWatch1.Start;
_dwsjvData := TdwsJSONValue.ParseString (AJSonData);
_swWatch1.Stop;
iElapsedParse := _swWatch1.ElapsedMilliseconds;
if (_dwsjvData.ValueType = jvtArray) then
begin
_swWatch2.Reset;
_swWatch2.Start;
jTotalFares := _dwsjvData.ElementCount;
for jFare := 0 to (jTotalFares - 1) do
if (_dwsjvData.Elements [jFare].ValueType = jvtObject) then
begin
_swWatch1.Reset;
_swWatch1.Start;
_swWatch1.Stop;
end;
end;
_swWatch1.Reset;
_swWatch1.Start;
_dwsjvData.Destroy;
_swWatch1.Stop;
iElapsedDestroy := _swWatch1.ElapsedMilliseconds;
_swWatch0.Stop;
iElapsedTotal := _swWatch0.ElapsedMilliseconds;
Inc (AParse, iElapsedParse);
Inc (ADestroy, iElapsedDestroy);
result := jTotalFares;
end;
procedure MultithreadStart;
var
j: Integer;
begin
for j := 1 to iHowMany do
if (athWorker [j] = nil) then
begin
athWorker [j] := TWorkerThread.Create (j);
TWorkerThread (athWorker [j]).FreeOnTerminate := False;
TWorkerThread (athWorker [j]).Priority := tpNormal;
end;
end;
procedure MultithreadStop;
var
j: Integer;
begin
for j := 1 to MAX_THREADS do
if (athWorker [j] <> nil) then
begin
TWorkerThread (athWorker [j]).Terminate;
TWorkerThread (athWorker [j]).WaitFor;
TWorkerThread (athWorker [j]).Free;
athWorker [j] := nil;
end;
end;
procedure Prologue;
var
j: Integer;
begin
iHowMany := StrToInt (ParamStr (1));
for j := 1 to MAX_THREADS do
athWorker [j] := nil;
swWatchT := TStopwatch.Create;
swWatchT.Reset;
swWatchP := TStopwatch.Create;
swWatchP.Reset;
end;
procedure RunConvert;
function __IsRunning: Boolean;
var
j: Integer;
begin
result := False;
for j := 1 to MAX_THREADS do
result := result or ((athWorker [j] <> nil) and TWorkerThread (athWorker [j]).Running);
end;
begin
swWatchT.Start;
MultithreadStart;
Sleep (1000);
while (__isRunning) do
Sleep (500);
MultithreadStop;
swWatchT.Stop;
Writeln (#13#10, 'Total time:', swWatchT.ElapsedMilliseconds);
end;
procedure Epilogue;
var
j: Integer;
begin
for j := 1 to iHowMany do
Writeln ( #13#10, 'Thread # ', j, ' tot.time:', aiElapsed [j], ' fares:', aiFares [j], ' tot.parse:', aiElapsedParse [j], ' tot.destroy:', aiElapsedDestroy [j]);
Readln;
end;
begin
try
Prologue;
RunConvert;
Epilogue;
except
on E: Exception do
Writeln (E.ClassName, ': ', E.Message);
end;
end.
The solution is exchange default Delphi XE2 or XE7 memory manager with Intel® Threading Building Blocks memory manager. In example application it scales ca. lineary with number of threads up to 16 when app is 64 bits.
update: with assumption that number of threads running is less than number of cores
This was tested on machines from 2cores/4ht to 12cores/24ht running KVM virtualized Windows 7 with 124GB RAM
Interesting thing is virtualizing Win 7. memory allocation and deallocation is from 2 x faster as in native Win 7.
Conclusion: if you do a lot of memory allocation / deallocation operations of 10kB-10MB blocks in threads of multithreaded ( more than 4-8 threads) application - use only memory manager from Intel.
@André: thanks for tip pointing me to right direction!
Here is unit with TBB memory manager taken for tests, it has to appear as 1st on unit list in main project file .dpr
unit TBBMem;
interface
function ScalableGetMem (ASize: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_malloc';
procedure ScalableFreeMem (APtr: Pointer); cdecl; external 'tbbmalloc' name 'scalable_free';
function ScalableReAlloc (APtr: Pointer; Size: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_realloc';
implementation
Function TBBGetMem (ASize: Integer): Pointer;
begin
result := ScalableGetMem (ASize);
end;
Function TBBFreeMem (APtr: Pointer): Integer;
begin
ScalableFreeMem (APtr);
result := 0;
end;
Function TBBReAllocMem (APtr: Pointer; ASize: Integer): Pointer;
begin
result := ScalableRealloc (APtr, ASize);
end;
const
TBBMemoryManager: TMemoryManager = ( GetMem: TBBGetmem;
FreeMem: TBBFreeMem;
ReAllocMem: TBBReAllocMem; );
var
oldMemoryManager: TMemoryManager;
initialization
GetMemoryManager (oldMemoryManager);
SetMemoryManager (TBBMemoryManager);
finalization
SetMemoryManager (oldMemoryManager);
end.
这篇关于为什么多线程内存分配/释放密集型应用程序不随线程数扩展?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!