Main Content

Interactively Import and Process Data in Parallel

Since R2023b

This example shows how to import and process data simultaneously in an interactive parallel pool. To streamline your workflow, you can overlap parfeval and parfor computations on the same parallel pool.

In this example, you import audio signals in the background with parfeval and simultaneously perform some basic signal processing with parfor. The data import and processing approach described in this example is generally applicable and is not reliant on the specific details of the import and process functions.

This image provides a summary of the order of computations.

Set Up

To simulate importing audio from a database, the example generates audio data using the acquireAudio function, which is defined in a supporting file. The acquireAudio function also partitions the audio into frames to facilitate parallelized audio processing. Before you start, specify the number of audio files to import, as well as the duration and sample rate of the audio. Partition the audio into 30 frames.

numAudio = 3;
audioDuration = 300;
sampleRate = 44100;
numFrames = 30;

Start a parallel pool with six process workers.

pool = parpool("Processes",6);
Starting parallel pool (parpool) using the 'Processes' profile ...
Connected to parallel pool with 6 workers.

The example stores the processed data in the parallel pool ValueStore object. To provide a visual representation of the computation progress, use the waitbar function to create a simple user interface. Set up an update function called handleValueStoreEntry to run each time an entry is added to the ValueStore object. The handleValueStoreEntry function, defined at the end of the example, uses persistent to store information about the parfor iterations. To initialize the persistent variables, run the handleValueStoreEntry function.

analysisWaitBar = waitbar(0,"Waiting for data...",Name="Analyzing Audio Data");

store = pool.ValueStore;
store.KeyUpdatedFcn = @(store,key) handleValueStoreEntry(store,key);

handleValueStoreEntry(numAudio,numFrames,analysisWaitBar);
Wait bar counter reset to 0.

Acquire and Process Data

Acquire and automatically process the audio data iteratively. To import the data, schedule the acquireAudio function to run asynchronously with parfeval. To process the data in parallel, use the processAudio function in a parfor-loop. The processAudio function is defined in a supporting file to this example.

To minimize the waiting time for workers to receive data, stagger the computations. Submit a parfeval computation to acquire the first audio before you start the for-loop.

importFuture = parfeval(@acquireAudio,1,audioDuration,numFrames,sampleRate);
for idx = 1:numAudio
    % Retrieve audio data from the parfeval computation.
    audio = fetchOutputs(importFuture);
    % Schedule the next parfeval computation to run in the background.
    if idx < numAudio
        importFuture = parfeval(@acquireAudio,1,audioDuration,numFrames,sampleRate);
    end
    % Process the audio data.
    parfor frame = 1:numFrames
        store = getCurrentValueStore;
        key = strcat("Audio_",num2str(idx),"_Frame_",num2str(frame));
        % Perform some signal processing.
        inputSignal = audio{1,frame};
        store(key) = processAudio(inputSignal,sampleRate);
    end
end

Retrieve Processed Data

You can retrieve the processed audio from the ValueStore object for further computations. For example, retrieve and plot the first frame of each audio using the plotFrame function defined at the end of the example.

plotData = [store("Audio_1_Frame_1"),store("Audio_2_Frame_1"), ...
    store("Audio_3_Frame_1")];
t = (0:length(plotData)-1)/sampleRate;
plotFrames(t,plotData);

Clean Up

Delete the wait bar and pool after use.

delete(analysisWaitBar);
delete(pool);
Parallel pool using the 'Processes' profile is shutting down.

Helper Functions

Update a wait bar when an entry is added to the pool ValueStore by using persistent variables to perform the counting.

function handleValueStoreEntry(numAudio,numFrames,analysisWaitBar)
persistent count currentAudio nAudio nFrames bar
if nargin == 3
    % Initialize counting variables.
    count = 0;
    currentAudio = 1;
    nAudio = numAudio;
    nFrames = numFrames;
    bar = analysisWaitBar;
    fprintf("Wait bar counter reset to 0.")
else
    count = count + 1;
    progress = count/nFrames;
    waitbar(progress,bar, ...
        sprintf("Processing audio %d of %d",currentAudio,nAudio))

    if currentAudio==nAudio && progress==1
        waitbar(progress,bar,"Audio import and processing complete")
    end

    if  count == nFrames
        currentAudio = currentAudio + 1;
        count = 0;
    end
end
end

Define a function to plot the frames for each audio file.

function plotFrames(t,plotData)
fig = figure(Name="First Frame of Each Audio");
tl = tiledlayout(fig,3,1);
for i=1:3
    nexttile(tl);
    plot(t,plotData(:,i));
    title(sprintf("Audio %d",i))
end
title(tl,"First Frame of Each Audio")
xlabel(tl,"Time (s)");
ylabel(tl,"Amplitude");
end

See Also

|

Related Topics