How can an infinitely running worker be gracefully terminated?

조회 수: 4 (최근 30일)
Quant
Quant 2021년 11월 16일
편집: Quant 2021년 11월 18일
The code is adapted from that thread too, so it may be advisable to read that thread first.
I have a similar problem: I need to write and read in parallel to/from a hardware device (under Linux) for the purpose of testing the device and it must happen infinitely until instructed to stop. The device must be then properly closed.
[write_queue, write_thread]=startInterruptibleThread(@writeXillybus); %start writing thread
[read_queue, read_thread]=startInterruptibleThread(@readXillybus);
pause %let the workers run indefinitely until abutton is pressed
send(write_queue, 0);%signal the worker to stop
send(read_queue, 0);
% adapted from https://www.mathworks.com/matlabcentral/answers/424145-how-can-i-send-data-on-the-fly-to-a-worker-when-using-parfeval
function [workerQueueClient, thread]=startInterruptibleThread(func)
% Get the worker to construct a data queue on which it can receive
% messages from the client
workerQueueConstant = parallel.pool.Constant(@parallel.pool.PollableDataQueue);
% Get the worker to send the queue object back to the client
workerQueueClient = fetchOutputs(parfeval(@(x) x.Value, 1, workerQueueConstant));
% Get the worker to start waiting for messages
thread=parfeval(func, 1, workerQueueConstant);
end
function out=writeXillybus(qConst)
test_fid=fopen('/dev/xillybus_testdata', 'w');%open device file
pattern=int16([ 0x0102, 0x0304, 0x0506, 0x0708; %test pattern is chosen so individual bytes could be identified
0x090a, 0x0b0c, 0x0d0e, 0x0f10;
0x1112, 0x1314, 0x1516, 0x1718;
0x191a, 0x1b1c, 0x1d1e, 0x1f20]);
testdata=repmat(pattern, 1, 8192*32); %replicate for 32768 test data points
cont=true;
while(cont)
out=fwrite(test_fid, testdata,'int16');
[~, OK]=poll(qConst.Value);% check for signal
cont=~OK; %set flag
end
fclose(test_fid); %close device
end
function out=readXillybus(qConst)
data_fid=fopen('/dev/xillybus_data', 'r');
cont=true;
while(cont)
out=fread(data_fid, [4, 32768*32], '*int16');
[~, OK]=poll(qConst.Value);
cont=~OK;
end
fclose(data_fid);
end
The Problem: the code doesn't work. writeXillybus is never executed, despite that the worker seems to start. and the script gets stuck at the 1st line of the second call to startInterruptibleThread. I don't understand, what I should improve.
Another note: the device is realised on an FPGA and I can observe its state over a debug interface. Also the function has already been verified with sequential writes/reads. But for the highest performance parallel write/read is required.
Addendum:
following code does work
write_thread=parfeval(@writeXillybus, 0);
read_thread=parfeval(@readXillybus,0);
pause %let the workers run indefinitely
cancel(write_thread);%terminate threads
cancel(read_thread);
fclose('all');% apparently doesn't help
function out=writeXillybus
test_fid=fopen('/dev/xillybus_testdata', 'w');%open device file
pattern=int16([ 0x0102, 0x0304, 0x0506, 0x0708; %test pattern is chosen so individual bytes could be identified
0x090a, 0x0b0c, 0x0d0e, 0x0f10;
0x1112, 0x1314, 0x1516, 0x1718;
0x191a, 0x1b1c, 0x1d1e, 0x1f20]);
testdata=repmat(pattern, 1, 8192*32); %replicate for 32768 test data points
cont=true;
while(cont)
out=fwrite(test_fid, testdata,'int16');
end
fclose(test_fid);
end
function out=readXillybus
data_fid=fopen('/dev/xillybus_data', 'r');
cont=true;
while(cont)
out=fread(data_fid, [4, 32768*32], '*int16');
end
fclose(data_fid);
end
However it runs only once, probabyl because the device files aren't properly closed. It is necessary to close the parallel pool (so OS performs the clean up), before running it again. No question, this a bad programming practice, but demonstrates how it can work in principle.

채택된 답변

Edric Ellis
Edric Ellis 2021년 11월 17일
The simplest way to fix this is to use onCleanup to ensure that your fclose statement is executed. This is good practice anyway, and will make your code more robust in all circumstances. For example,
function out=readXillybus
data_fid=fopen('/dev/xillybus_data', 'r');
% when 'cleanup' goes out of scope by any means - error, cancellation
% etc., `fclose(data_fid)` will be executed.
cleanup = onCleanup(@() fclose(data_fid));
cont=true;
while(cont)
out=fread(data_fid, [4, 32768*32], '*int16');
end
% No need for `fclose(data_fid)` here.
end
  댓글 수: 3
Edric Ellis
Edric Ellis 2021년 11월 18일
A cancelled Future cannot return data. To do that sort of thing, you need to send the data back to the client as you're going along using a parallel.pool.DataQueue. You could even use onCleanup to trigger the send, like this:
% Create a DataQueue to receive the result
q = parallel.pool.DataQueue();
% Set up a function to be called with the result.
% (Or, use a PollableDataQueue and use `poll(q)` to pick
% up the result)
afterEach(q, @(val) fprintf('Got value: %d\n', val));
fut = parfeval(@doStuff, 0, q);
pause(2);
% Cancel the future. The `afterEach` function will be triggered.
cancel(fut);
function doStuff(q)
% When this function terminates, call nested function
% nSend with the supplied DataQueue
x = onCleanup(@() nSend(q));
% This is going to be the result. We need to use
% a nested function to access the latest value
val = 1;
function nSend(q)
send(q, val);
end
% Compute 'for ever' - until cancellation.
while true
pause(0.1);
val = 1 + val;
end
end
Quant
Quant 2021년 11월 18일
편집: Quant 2021년 11월 18일
Got it, thanks
please have a look at another problem I have with parfeval

댓글을 달려면 로그인하십시오.

추가 답변 (0개)

카테고리

Help CenterFile Exchange에서 Asynchronous Parallel Programming에 대해 자세히 알아보기

제품


릴리스

R2020b

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by