replace parallel code with dragonfly submodule

parent 9c216dc0
Pipeline #3303 passed with stages
in 68 minutes and 16 seconds
......@@ -29,3 +29,6 @@
[submodule "preprocessor"]
path = preprocessor
url = ../../Dynare/preprocessor.git
[submodule "matlab/modules/dragonfly"]
path = contrib/dragonfly
url = https://gitlab.com/dragonflytool/dragonfly.git
Subproject commit 633663320ac624f9e1208b176c6100f7b4a89697
......@@ -255,11 +255,16 @@ lines starting with a hashtag (#).
The directory to be used for remote computation. Required for
remote runs on all platforms.
.. option:: DynarePath = PATH
.. option:: ProgramPath = PATH
The path to the matlab subdirectory within the Dynare
installation directory. The default is the empty string.
.. option:: ProgramConfig = PATH_AND_FILE
A MATLAB/Octave script that should be run to setup the MATLAB/Octave
environment (e.g. `dynare_config`).
.. option:: MatlabOctavePath = PATH_AND_FILE
The path to the MATLAB or Octave executable. The default value
......
......@@ -52,7 +52,7 @@ p = {'/distributions/' ; ...
'/ms-sbvar/identification/' ; ...
'/../contrib/ms-sbvar/TZcode/MatlabFiles/' ; ...
'/../contrib/jsonlab/' ; ...
'/parallel/' ; ...
'/../contrib/dragonfly/SrcDragonfly/' ; ...
'/particles/src' ; ...
'/gsa/' ; ...
'/ep/' ; ...
......
This diff is collapsed.
function [TiSt] = CreateTimeString()
% PARALLEL CONTEXT
% In a parallel context, this is a specialized version of clock() function.
%
% INPUTS
% None
%
% OUTPUTS
% o TiSt [] ...
%
% Copyright (C) 2009-2017 Dynare Team
%
% This file is part of Dynare.
%
% Dynare is free software: you can redistribute it and/or modify
% it under the terms of the GNU General Public License as published by
% the Free Software Foundation, either version 3 of the License, or
% (at your option) any later version.
%
% Dynare is distributed in the hope that it will be useful,
% but WITHOUT ANY WARRANTY; without even the implied warranty of
% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
% GNU General Public License for more details.
%
% You should have received a copy of the GNU General Public License
% along with Dynare. If not, see <http://www.gnu.org/licenses/>.
TiSt=[];
T=fix(clock);
S1=num2str(T(1));
S2=num2str(T(2));
S3=num2str(T(3));
S4=num2str(T(4));
S5=num2str(T(5));
S6=num2str(T(6));
TiSt=[S1 '-' S2 '-' S3 '-' S4 'h' S5 'm' S6 's'];
function [nCPU]= GiveCPUnumber (ComputerInformations, Environment)
% PARALLEL CONTEXT
% In a parallel context this function return the CPUs or cores numer avaiable
% on the computer used for run parallel code.
%
% INPUTS
% an array contained several fields that describe the hardaware
% software enviroments of a generic computer.
%
% OUTPUTS
% The CPUs or Cores numbers of computer.
%
% SPECIAL REQUIREMENTS
% none
% Copyright (C) 2010-2017 Dynare Team
%
% This file is part of Dynare.
%
% Dynare is free software: you can redistribute it and/or modify
% it under the terms of the GNU General Public License as published by
% the Free Software Foundation, either version 3 of the License, or
% (at your option) any later version.
%
% Dynare is distributed in the hope that it will be useful,
% but WITHOUT ANY WARRANTY; without even the implied warranty of
% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
% GNU General Public License for more details.
%
% You should have received a copy of the GNU General Public License
% along with Dynare. If not, see <http://www.gnu.org/licenses/>.
nCPU='';
if nargin < 2
% Determine a specific operating system or software version when necessary
% for different command (sintax, name, ...).
Environment=~ispc;
end
switch Environment
case 0 %WINDOWS OPERATING SYSTEM
OffSet=27;
SringPosition=strfind(ComputerInformations, 'Processors:');
nCPU=ComputerInformations(SringPosition+OffSet);
% We check if there are Processors/Cores more than 9.
t0=ComputerInformations(SringPosition+OffSet+1);
t1=str2num(t0);
t1=isempty(t1);
% if t1 is 0 the machine have more than 9 CPU.
if t1==0
nCPU=strcat(nCPU,t0);
end
nCPU=str2num(nCPU);
return
case 1 %LIKE UNIX OPERATING SYSTEM
nCPU=str2num(ComputerInformations);
case 2 %MAC-OS OPERATING SYSTEM
nCPU=str2num(ComputerInformations);
end
function InitializeComputationalEnvironment()
% PARALLEL CONTEXT
% In a parallel context, this function is used to Initialize the computational enviroment according with
% the user request.
%
% INPUTS
% o DataInput [] ...
%
% OUTPUTS
% None
%
% Copyright (C) 2009-2017 Dynare Team
%
% This file is part of Dynare.
%
% Dynare is free software: you can redistribute it and/or modify
% it under the terms of the GNU General Public License as published by
% the Free Software Foundation, either version 3 of the License, or
% (at your option) any later version.
%
% Dynare is distributed in the hope that it will be useful,
% but WITHOUT ANY WARRANTY; without even the implied warranty of
% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
% GNU General Public License for more details.
%
% You should have received a copy of the GNU General Public License
% along with Dynare. If not, see <http://www.gnu.org/licenses/>.
% This is simple and check!
% The default value for the new field MatlabOctavePath now is 'matlab' or
% 'octave'. Then if the field is empty it is necessary to fill it with the
% default value.
% Deactivate some 'Parallel/Warning' message in Octave!
% Comment the line 'warning('off');' in order to view the warning message
% in Octave!
if isoctave
warning('off');
end
global options_
isHybridMatlabOctave = false;
for j=1:length(options_.parallel)
if isempty(options_.parallel(j).MatlabOctavePath)
if isoctave
options_.parallel(j).MatlabOctavePath = 'octave';
else
options_.parallel(j).MatlabOctavePath = 'matlab';
end
end
if options_.parallel(j).Local && isempty(options_.parallel(j).DynarePath)
dynareroot = strrep(which('dynare'),'dynare.m','');
options_.parallel(j).DynarePath=dynareroot;
end
isHybridMatlabOctave = isHybridMatlabOctave || any(regexpi([options_.parallel(j).MatlabOctavePath], 'octave'));
end
isHybridMatlabOctave = isHybridMatlabOctave && ~isoctave;
options_.parallel_info.isHybridMatlabOctave = isHybridMatlabOctave;
if isHybridMatlabOctave
% Reset dynare random generator and seed.
set_dynare_seed('default');
end
% Invoke masterParallel with 8 arguments and the last equal to 1. With this shape
% for input data, masterParallel only create a new directory for remote
% computation. The name of this directory is time depending. For local
% parallel computations with Strategy == 1 delete the traces (if exists) of
% previous computations.
delete(['P_slave_*End.txt']);
masterParallel(options_.parallel,[],[],[],[],[],[],options_.parallel_info,1);
% We sort in the user CPUWeight and most important the Parallel vector
% in accord with this operation.
lP=length(options_.parallel);
for j=1:lP
CPUWeight(j)=str2num(options_.parallel(j).NodeWeight);
end
NewPosition=ones(1,lP)*(-1);
CPUWeightTemp=ones(1,lP)*(-1);
CPUWeightTemp=CPUWeight;
for i=1:lP
[NoNServes, mP]=max(CPUWeightTemp);
NewPosition(i)=mP;
CPUWeightTemp(mP)=-1;
end
CPUWeight=sort(CPUWeight,'descend');
for i=1:lP
ParallelTemp(i)=options_.parallel(NewPosition(i));
end
Parallel=[];
options_.parallel=ParallelTemp;
return
\ No newline at end of file
function closeSlave(Parallel,TmpFolder,partial)
% PARALLEL CONTEXT
% In parallel context, this utility closes all remote matlab instances
% called by masterParallel when strategy (1) is active i.e. always open (which leaves
% open remote matlab instances).
%
% INPUTS
% o Parallel [struct vector] copy of options_.parallel.
% o TmpFolder string if islocal==0, is the name of didectory devoted to remote computation.
% This directory is named using current date
% and is used only one time and then deleted.
% If islocal==1, TmpFolder=''.
%
%
% OUTPUTS
% None
%
% Copyright (C) 2010-2017 Dynare Team
%
% This file is part of Dynare.
%
% Dynare is free software: you can redistribute it and/or modify
% it under the terms of the GNU General Public License as published by
% the Free Software Foundation, either version 3 of the License, or
% (at your option) any later version.
%
% Dynare is distributed in the hope that it will be useful,
% but WITHOUT ANY WARRANTY; without even the implied warranty of
% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
% GNU General Public License for more details.
%
% You should have received a copy of the GNU General Public License
% along with Dynare. If not, see <http://www.gnu.org/licenses/>.
if nargin<3
partial=0;
end
s=warning('off');
if partial==1
save('slaveParallel_break.mat','partial')
for indPC=1:length(Parallel)
if (Parallel(indPC).Local==0)
dynareParallelSendFiles('slaveParallel_break.mat',TmpFolder,Parallel(indPC));
end
end
% delete('slaveParallel_break')
return
end
if partial==-1
delete('slaveParallel_break.mat')
for indPC=1:length(Parallel)
if (Parallel(indPC).Local==0)
dynareParallelDelete( 'slaveParallel_break.mat',TmpFolder,Parallel(indPC));
end
end
% delete('slaveParallel_break')
return
end
for indPC=1:length(Parallel)
if (Parallel(indPC).Local==0)
dynareParallelDelete( 'slaveParallel_input*.mat',TmpFolder,Parallel(indPC));
end
delete( 'slaveParallel_input*.mat');
delete( 'slaveJob*.mat');
pause(1)
delete(['slaveParallel_*.log']);
delete ConcurrentCommand1.bat;
end
while(1)
if isempty(dynareParallelDir(['P_slave_',int2str(j),'End.txt'],TmpFolder,Parallel))
for indPC=1:length(Parallel)
if (Parallel(indPC).Local==0)
dynareParallelRmDir(TmpFolder,Parallel(indPC))
end
end
break
end
end
s=warning('on');
\ No newline at end of file
function [nCPU, totCPU, nBlockPerCPU, totSLAVES] = distributeJobs(Parallel, fBlock, nBlock)
% PARALLEL CONTEXT
% In parallel context this function is used to determine the total number of available CPUs,
% and the number of threads to run on each CPU.
%
% INPUTS
% o Parallel [struct vector] copy of options_.parallel
% o fBlock [int] index number of the first job (e.g. MC iteration or MH block)
% (between 1 and nBlock)
% o nBlock [int] index number of the last job.
%
% OUTPUT
% o nBlockPerCPU [int vector] for each CPU used, indicates the number of
% threads run on that CPU
% o totCPU [int] total number of CPU used (can be lower than
% the number of CPU declared in "Parallel", if
% the number of required threads is lower!)
% o nCPU the number of CPU in user format.
% o totSLAVES the number of cluster's node currently
% involved in parallel computing step.
% It is a number between 1 and length(Parallel).
% Copyright (C) 2010-2017 Dynare Team
%
% This file is part of Dynare.
%
% Dynare is free software: you can redistribute it and/or modify
% it under the terms of the GNU General Public License as published by
% the Free Software Foundation, either version 3 of the License, or
% (at your option) any later version.
%
% Dynare is distributed in the hope that it will be useful,
% but WITHOUT ANY WARRANTY; without even the implied warranty of
% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
% GNU General Public License for more details.
%
% You should have received a copy of the GNU General Public License
% along with Dynare. If not, see <http://www.gnu.org/licenses/>.
% The Parallel vector has already been sorted
% (in accord with the CPUWeight values) in DESCENDING order in
% InitializeComputationalEnvironment!
totCPU=0;
lP=length(Parallel);
CPUWeight=ones(1,length(Parallel))*(-1);
for j=1:lP
if mod(length(Parallel(j).CPUnbr),Parallel(j).NumberOfThreadsPerJob)
skipline()
disp(['PARALLEL_ERROR:: NumberOfThreadsPerJob = ',int2str(Parallel(j).NumberOfThreadsPerJob),' is not an exact divisor of number of CPUs = ',int2str(length(Parallel(j).CPUnbr)),'!'])
disp([' You must re-set properly NumberOfThreadsPerJob of node ' int2str(j) ' ' Parallel(j).ComputerName])
disp([' in your configuration file'])
error(['PARALLEL_ERROR:: NumberOfThreadsPerJob is not an exact divisor of CPUnbr'])
end
nCPU(j)=length(Parallel(j).CPUnbr)/Parallel(j).NumberOfThreadsPerJob;
totCPU=totCPU+nCPU(j);
CPUWeight(j)=str2num(Parallel(j).NodeWeight);
end
% Copy of original nCPU.
nCPUoriginal=nCPU;
nCPU=cumsum(nCPU);
% Number of Nodes in Cluster.
nC=lP;
% Numbers of Jobs.
NumbersOfJobs=nBlock-fBlock+1;
SumOfJobs=0;
JobsForNode=zeros(1,nC);
for j=1:lP
CPUWeight(j)=str2num(Parallel(j).NodeWeight)*nCPUoriginal(j);
end
CPUWeight=CPUWeight./sum(CPUWeight);
% Redistributing the jobs among the cluster nodes according to the
% CPUWeight.
for i=1:nC
JobsForNode(i)=CPUWeight(i)*NumbersOfJobs;
% Many choices are possible:
% JobsForNode(i)=round(JobsForNode(i));
% JobsForNode(i)=floor(JobsForNode(i));
JobsForNode(i)=ceil(JobsForNode(i));
end
% Check if there are more (fewer) jobs.
% This can happen when we use ceil, round, ... functions.
SumOfJobs=sum(JobsForNode);
if SumOfJobs~=NumbersOfJobs
if SumOfJobs>NumbersOfJobs
% Many choices are possible:
% - Remove the excess works at the node that has the greatest
% number of jobs.
% - Remove the excess works at the node slower.
VerySlow=nC;
while SumOfJobs>NumbersOfJobs
if JobsForNode(VerySlow)==0
VerySlow=VerySlow-1;
continue
end
JobsForNode(VerySlow)=JobsForNode(VerySlow)-1;
SumOfJobs=SumOfJobs-1;
end
end
if SumOfJobs<NumbersOfJobs
% Many choices are possible:
% - ... (see above).
[NonServe VeryFast]= min(CPUWeight);
while SumOfJobs<NumbersOfJobs
JobsForNode(VeryFast)=JobsForNode(VeryFast)+1;
SumOfJobs=SumOfJobs+1;
end
end
end
% Redistributing the jobs among the cpu/core nodes.
JobsForCpu=zeros(1,nCPU(nC));
JobAssignedCpu=0;
RelativePosition=1;
for i=1:nC
% Many choices are possible:
% - ... (see above).
JobAssignedCpu=max(1,floor(JobsForNode(i)/nCPUoriginal(i)));
ChekOverFlow=0;
for j=RelativePosition:nCPU(i)
JobsForCpu(j)=JobAssignedCpu;
ChekOverFlow=ChekOverFlow+JobAssignedCpu;
if ChekOverFlow>=JobsForNode(i)
break
end
end
% Check if there are more (fewer) jobs.
% This can happen when we use ceil, round, ... functions.
if ChekOverFlow ~=(JobsForNode(i))
if ChekOverFlow >(JobsForNode(i))
while ChekOverFlow>JobsForNode(i)
JobsForCpu(nCPU(i))=JobsForCpu(nCPU(i))-1;
ChekOverFlow=ChekOverFlow-1;
end
end
if ChekOverFlow <(JobsForNode(i))
while ChekOverFlow<JobsForNode(i)
JobsForCpu(nCPU(i))=JobsForCpu(nCPU(i))+1;
ChekOverFlow=ChekOverFlow+1;
end
end
end
RelativePosition=nCPU(i)+1;
end
% Reshape the variables totCPU,totSLAVES and nBlockPerCPU in accord with
% the syntax rquired by a previous version of parallel package ...
totCPU=0;
totSLAVES=0;
nBlockPerCPU=[];
for i=1:nCPU(nC)
if JobsForCpu(i)~=0
totCPU=totCPU+1;
end
end
for i=1:nC
if JobsForNode(i)~=0
totSLAVES=totSLAVES+1;
end
end
RelativeCounter=1;
for i=1:nCPU(nC)
if JobsForCpu(i)~=0
nBlockPerCPU(RelativeCounter)=JobsForCpu(i);
RelativeCounter=RelativeCounter+1;
end
end
function dynareParallelDelete(fname,pname,Parallel)
% PARALLEL CONTEXT
% In a parallel context, this is a specialized version of delete() function.
%
% INPUTS
% o fname [] ...
% o pname [] ...
% o Parallel [] ...
%
% OUTPUTS
% None
%
%
% Copyright (C) 2009-2020 Dynare Team
%
% This file is part of Dynare.
%
% Dynare is free software: you can redistribute it and/or modify
% it under the terms of the GNU General Public License as published by
% the Free Software Foundation, either version 3 of the License, or
% (at your option) any later version.
%
% Dynare is distributed in the hope that it will be useful,
% but WITHOUT ANY WARRANTY; without even the implied warranty of
% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
% GNU General Public License for more details.
%
% You should have received a copy of the GNU General Public License
% along with Dynare. If not, see <http://www.gnu.org/licenses/>.
if nargin ~= 3
disp('dynareParallelDelete(fname,pname,Parallel)')
return
end
if ~isempty(pname)
pname=[pname,filesep];
end
for indPC=1:length(Parallel)
if ~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)
if ~isempty(Parallel(indPC).Port)
ssh_token = ['-p ',Parallel(indPC).Port ' '];
else
ssh_token = ' ';
end
username = Parallel(indPC).UserName;
if ~isempty(username)
username = [username '@'];
end
directory = Parallel(indPC).RemoteDirectory;
if ~isempty(directory)
directory = [directory '/'];
end
[~, ~] = system(['ssh ',ssh_token,username,Parallel(indPC).ComputerName,' ''/bin/bash --norc -c "rm -f ',directory,pname,fname,'"''']);
else
fname_temp=['\\',Parallel(indPC).ComputerName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteDirectory,'\',pname,fname];
if exist(fname_temp,'file')
delete(fname_temp);
end
end
end
function dynareParallelDeleteNewFiles(PRCDir,Parallel,PRCDirSnapshot,varargin)
% PARALLEL CONTEXT
% In a parallel context, this is a specialized function able to ...
%
%
% INPUTS
%
% o PRCDir [] ...
% o Parallel [] ...
% o PRCDirSnapshot [] ...
%
%
% OUTPUTS
% o PRCDirSnapshot [] ...
%
%
%
% Copyright (C) 2009-2017 Dynare Team
%
% This file is part of Dynare.
%
% Dynare is free software: you can redistribute it and/or modify
% it under the terms of the GNU General Public License as published by
% the Free Software Foundation, either version 3 of the License, or
% (at your option) any later version.
%
% Dynare is distributed in the hope that it will be useful,
% but WITHOUT ANY WARRANTY; without even the implied warranty of
% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
% GNU General Public License for more details.
%
% You should have received a copy of the GNU General Public License
% along with Dynare. If not, see <http://www.gnu.org/licenses/>.
NewFilesFromSlaves={};
% try
for indPC=1:length(Parallel)
if Parallel(indPC).Local==0
[NewFilesFromSlaves, PRCDirSnapshot{indPC}]=dynareParallelFindNewFiles(PRCDirSnapshot{indPC},Parallel(indPC), PRCDir);
if ~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)
fS='/';
else
fS='\';
end
if ~isempty(NewFilesFromSlaves)
for i=1:length(NewFilesFromSlaves)
SlashNumberAndPosition=[];
PRCDirPosition=findstr(NewFilesFromSlaves{i}, ([PRCDir]));
sT=NewFilesFromSlaves{i};
sT(1:(PRCDirPosition+length([PRCDir]))-2)=[];
sT(1)='.';
SlashNumberAndPosition=findstr(sT,fS);
fileaddress={sT(1:SlashNumberAndPosition(end)),sT(SlashNumberAndPosition(end)+1:end)};
exception_flag=0;
for indexc=1:length(varargin)
exception_flag=exception_flag+(~isempty(strfind(fileaddress{2},varargin{indexc})));
end
if exception_flag==0
dynareParallelDelete(fileaddress{2},[PRCDir,fS,fileaddress{1}],Parallel(indPC));