Each PARFOR Worker Writes to the Same File

조회 수: 16 (최근 30일)
Paul Safier
Paul Safier 2022년 8월 3일
댓글: Paul Safier 2023년 1월 30일
I understand that having each worker write to a single file is a no-no. Perhaps as expected, when I run this code, it shows some corrupted values in the final output file; about 10% fails.
I have no interest in the output being in a deterministic order. The workers are spread across multiple Linux machines. The amount of time to complete a run is long compared to the time to write a single line of output.
Can someone recommend an alternative?
% Run a parametric study
var1 = (-60:0.5:60)';
var2 = (-110:0.5:110)';
var3 = (3.5:0.5:18.5)';
% Remove zero entries since their usage prohibited
var1(var1 == 0) = [];
var2(var2 == 0) = [];
var3(var3 == 0) = [];
NS = length(var1)*length(var2)*length(var3); % Number of runs
% Set up the design matrix, desMat
desMat = {var1,var2,var3};
[desMat{:}]=ndgrid(desMat{:});
n=length(desMat);
desMat = reshape(cat(n+1,desMat{:}),[],n);
if exist('./Results.csv', 'file')==2
delete('./Results.csv');
end
parfor kk = 1:NS
var1a = desMat(kk,1); var2a = desMat(kk,2); var3a = desMat(kk,3);
[out1 out2 out3] = Function_Pd(var1a,var2a,var3a);
vec = [var1a var2a var3a out1 out2 out3];
fileID = fopen('Results.csv','a');
fprintf(fileID,'%f %f %f %f %f %f\n',vec);
fclose(fileID);
end
  댓글 수: 2
jessupj
jessupj 2022년 8월 3일
i don't have a solution. YOu can probably just change
fileID = fopen('Results.csv','a');
to
fileID = fopen( sprintf('Results_%02d.csv',kk),'a');
Unrecognized function or variable 'kk'.
to save things in different files and then assemble them afterward.
Also, you might check out the exchange if you're just using parallelization for speed without requiring any interim communincation: https://www.mathworks.com/matlabcentral/fileexchange/13775-multicore-parallel-processing-on-multiple-cores
Paul Safier
Paul Safier 2022년 8월 3일
@jessupj The individual files would work except there would be over 3 million files and I believe file servers can have issues with that many...

댓글을 달려면 로그인하십시오.

채택된 답변

Bruno Luong
Bruno Luong 2022년 8월 3일
편집: Bruno Luong 2022년 8월 3일
May be (I didn't test) you could write in binary file at a deterministic place:
fileID = fopen('Results.bin','wb');
parfor ...
...
fseek(fileID, (kk-1)*length(vec)*8, 'bof'); % 8 is byte size of double, assuming vec is double
fwrite(fileID, vec);
end
fclose(fileID);
  댓글 수: 5
Bruno Luong
Bruno Luong 2022년 8월 4일
@Paul Safier "Second, is it straightforward to then read the binary file and convert to ascii?"
Yes just read the file by chunks depending on your RAM available then write to ascii file.
But if whatever app that needs those data can read binary file, why not leave it alone. It's luch better than ascii file: smaller, faster, no precision lost.
Paul Safier
Paul Safier 2022년 8월 4일
@Walter Roberson thanks for the advice. I will look into how to use memmapfile. It may also be an option to fill a binary file with garbage before I start the parfor loop, then overwrite it as I go because of the nature of fseek you mention. There's still the option that @Jeff Miller brings to light, namely having each worker write to its own file than I can cat them at the end, however that may take some leg work to get running. Thanks for the suggestion about the binary file, @Bruno Luong.

댓글을 달려면 로그인하십시오.

추가 답변 (3개)

Jeff Miller
Jeff Miller 2022년 8월 3일
Maybe have each worker write to its own output file and then assemble those after? This answer shows how to get the id for each worker.
  댓글 수: 3
Jeff Miller
Jeff Miller 2022년 8월 4일
Oh, sorry, I thought that suggestion was to write one file for each iteration of the parfor loop rather than for each separate worker.
Paul Safier
Paul Safier 2022년 8월 4일
@Jeff Miller I think it was to write each iteration to its own file, which would be too much for the file server (>3 million files). Your suggestion about each worker writing to its own file would be a managable amount of files. I will look into the link you sent. Thanks.

댓글을 달려면 로그인하십시오.


Raymond Norris
Raymond Norris 2022년 8월 12일
@Paul Safier since the order of the file doesn't have to be deterministic, use a data queue to write back to the client and have the client write the csv file.
% Run a parametric study
var1 = (-60:0.5:60)';
var2 = (-110:0.5:110)';
var3 = (3.5:0.5:18.5)';
% Remove zero entries since their usage prohibited
var1(var1 == 0) = [];
var2(var2 == 0) = [];
var3(var3 == 0) = [];
NS = length(var1)*length(var2)*length(var3); % Number of runs
% Set up the design matrix, desMat
desMat = {var1,var2,var3};
[desMat{:}]=ndgrid(desMat{:});
n=length(desMat);
desMat = reshape(cat(n+1,desMat{:}),[],n);
if exist('./Results.csv', 'file')==2
delete('./Results.csv');
end
fileID = fopen('Results.csv','a');
D = parallel.pool.DataQueue;
afterEach(D,@(V)logger(fileID,V))
c = onCleanup(@()fclose(fileID));
parfor kk = 1:NS
var1a = desMat(kk,1); var2a = desMat(kk,2); var3a = desMat(kk,3);
[out1 out2 out3] = Function_Pd(var1a,var2a,var3a);
vec = [var1a var2a var3a out1 out2 out3];
send(D,vec)
end
function logger(fileID,vec)
fprintf(fileID,'%f %f %f %f %f %f\n',vec);
end
  댓글 수: 4
Raymond Norris
Raymond Norris 2022년 8월 19일
@Paul Safier somewhere/how, you've already closed your file. I can reproduce your warning here
fileID = fopen('Results.csv','a');
c = onCleanup(@()fclose(fileID));
fprintf(fileID,"%f\n",rand);
fclose(fileID);
>> safier
>> clear
Warning: The following error was caught while executing 'onCleanup' class destructor:
Error using fclose
Invalid file identifier. Use fopen to generate a valid file identifier.
Error in safier>@()fclose(fileID) (line 2)
c = onCleanup(@()fclose(fileID));
Error in onCleanup/delete (line 23)
obj.task();
I wouldn't have gotten the warning if I hadn't called
fclose(fileID);
Paul Safier
Paul Safier 2022년 8월 23일

댓글을 달려면 로그인하십시오.


Alexander Denman
Alexander Denman 2023년 1월 29일
One option you might consider is using a database, for example PostgreSQL. Ensuring that concurrent writes don't interfere with each other is one of the core functions of a relational database management system.
To do this, you would install PostgreSQL on a machine that is reachable from all of your worker nodes, then create a table to hold the results. You can store essentially any matlab variable in a postgres "bytea" colum by using typecast(getByteStreamFromArray(someVariable),'int8') to convert the variable to one long stream of 8 bit integers.
Then, when each worker is ready to save its results, it opens a database connection using the 'database' function, uploads the results using 'sqlwrite' or 'datainsert', and then closes the connection.
When you retrieve the results from the database, you r-convert the data to its original form using getArrayFromByteStream(typecast(binaryDataFromDatabase),'uint8')).
  댓글 수: 1
Paul Safier
Paul Safier 2023년 1월 30일
@Alexander Denman, thanks for this interesting option. I will keep this in mind. I have adopted the approach suggested by @Raymond Norris above and it works perfectly. Thanks for you input.

댓글을 달려면 로그인하십시오.

카테고리

Help CenterFile Exchange에서 Parallel Computing Fundamentals에 대해 자세히 알아보기

제품


릴리스

R2022a

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by