Main Content

parallel.pool.PollableDataQueue

Send and poll data between client and workers

    Description

    A PollableDataQueue object enables synchronous sending and polling for data or messages between workers and client in a parallel pool while a computation is carried out. For example, you can send intermediate values to the client and use the values in another computation.

    To send data from a parallel pool worker back to the client, first create a PollableDataQueue object at the client. Pass this PollableDataQueue object into a parfor-loop or other parallel language construct, such as parfeval. From the workers, call send to send data back to the client. At the client, use poll to retrieve the result of a message or data sent from a worker.

    • You can call send from the worker or client that created the PollableDataQueue, if required.

    • You can create the queue on the workers and send it back to the client to enable communication in the reverse direction.

      Before R2023b: You cannot send a queue from one worker to another. To transfer data between workers, use spmd, spmdSend, or spmdReceive instead.

    • Unlike all other handle objects, PollableDataQueue and DataQueue instances do remain connected when they are sent to workers.

    Creation

    Description

    example

    p = parallel.pool.PollableDataQueue creates an object that can be used to send and poll for messages (or data) from different workers. Create the PollableDataQueue on the worker or client where you want to receive the data.

    Properties

    expand all

    This property is read-only.

    The number of items of data waiting to be removed from the queue, specified as a zero or positive integer. The value is 0 or a positive integer on the worker or client that creates the PollableDataQueue instance. If the client creates the PollableDataQueue instance, the value is 0 on all workers. If a worker creates the PollableDataQueue, the value is 0 on the client and all other workers.

    Object Functions

    poll Retrieve data sent from a worker
    sendSend data from worker to client using a data queue

    Examples

    collapse all

    Construct a PollableDataQueue.

    p = parallel.pool.PollableDataQueue;
    

    Start a parfor-loop, and send a message, such as data with the value 1.

    parfor i = 1
        send(p, i); 
    end
    

    Poll for the result.

    poll(p)
    1
    

    For more details on polling for data using a PollableDataQueue, see poll.

    When you send a message to a PollableDataQueue object, the message waits in the queue. Each message adds 1 to the queue length. When you use poll, one message is collected from the queue. In this example, you use the QueueLength property to find the length of a PollableDataQueue object.

    When a client or worker creates a PollableDataQueue object, any messages that are sent to the queue are held in the memory of that client or worker. If the client creates a DataQueue object, the QueueLength property on all workers is 0. In this example, you create a PollableDataQueue object on the client, and send data from a worker.

    First, create a parallel pool with one worker.

    parpool(1);
    Starting parallel pool (parpool) using the 'local' profile ...
    Connected to the parallel pool (number of workers: 1).
    

    Create a PollableDataQueue.

    pdq = parallel.pool.PollableDataQueue
    pdq = 
      PollableDataQueue with properties:
    
        QueueLength: 0
    
    

    A newly created PollableDataQueue has an empty queue. You can use parfor to find pdq.QueueLength on the worker. Find the queue length on the client, and the queue length on the worker.

    fprintf('On the client: %i\n', pdq.QueueLength)
    On the client: 0
    
    parfor i = 1
        fprintf('On the worker: %i\n', pdq.QueueLength)
    end
    On the worker: 0
    

    As the queue is empty, the QueueLength is 0 for both the client and the worker. Next, send a message to the queue. Then, use the QueueLength property to find the length of the queue.

    % Send a message first
    parfor i = 1
        send(pdq, 'A message');
    end
    
    % Find the length
    fprintf('On the client: %i\n', pdq.QueueLength)
    On the client: 1
    
    parfor i = 1
        fprintf('On the worker: %i\n', pdq.QueueLength)
    end
    On the worker: 0
    

    The QueueLength property is 1 on the client, and 0 on the worker. Use poll to retrieve the message from the queue.

    msg = poll(pdq);
    disp(msg)
    A message
    

    Use the QueueLength property to find the length of the queue.

    fprintf('On the client: %i\n', pdq.QueueLength)
    On the client: 0
    

    QueueLength is 0 because the queue processing is complete.

    This example shows how to setup a data queue on the workers to receive data.

    You can use a data queue to transfer data or messages between the client and the workers.

    This example generates instrument data on the workers and sends the data back to the client. To start and stop the signal generation, the client can send a message to the workers using a data queue. This approach provides a smoother way to stop a parfeval computation on a worker.

    Start a parallel pool with three workers.

    pool = parpool("Processes",3);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 3 workers.
    

    Set Up Queue to Receive Data on Client

    Prepare and initialize plots to visualize the instrument data from the workers. The createPlots function is defined at the end of the example.

    [fig,p] = createPlots;

    Create a DataQueue and use afterEach to specify the function to execute each time the queue receives data. The receiveDataOnClient function plots the data received from the workers and is defined at the end of the example.

    clientQueue = parallel.pool.DataQueue;
    afterEach(clientQueue,@(data) receiveDataOnClient(p,data));

    Set Up Queues to Receive Communications on Workers

    Create a helper PollableDataQueue on the client.

    helperClientQueue = parallel.pool.PollableDataQueue;

    Use parfeval to establish data queues on the three workers in the parallel pool. The connectToWorker helper function assigns a unique ID to each worker, creates a PollableDataQueue on each worker, and sends the data queue to the client using the helperClientQueue queue. The workers then wait for instructions from the client to start data generation.

    wkrF(1:3) = parallel.FevalFuture;
    for ID = 1:3
        wkrF(ID) = parfeval(@connectToWorker,0,clientQueue,helperClientQueue,ID);
    end

    At the client, receive the labelled worker queues. You can now use these queues to send data to each worker.

    allWkrQueues = struct('ID',{},'Queue',{});
    for i = 1:3
        wkrQueue = poll(helperClientQueue,inf);
        allWkrQueues(wkrQueue.ID) = wkrQueue;
    end

    Start and Stop Data Generation

    Next, instruct the workers to start generating data.

    for ID = 1:3
        send(allWkrQueues(ID).Queue,"Start generating data");
    end

    This figure shows the instrument data each worker generates and sends to the client.

    fig.Visible="on";

    Generate data for a period of 10 seconds.

    pause(10)

    To stop gathering data on worker 2, send a message to the worker using the queue created on worker 2. You can observe that the line for Instrument 2 stops around 0.9 seconds.

    send(allWkrQueues(2).Queue,"stop");

    Poll the helperClientQueue queue to receive confirmation from worker 2.

    [status, ~] = poll(helperClientQueue,inf);
    disp(status)
    Data generation stopped on worker 2
    

    Wait for the other workers to finish their computation.

    wait(wkrF);

    Helper Functions

    The connectToWorker function creates a PollableDataQueue on the workers, sends them to the client, then polls the wkrQueue queue to wait for instructions from the client.

    When the worker receives a message from the client, the function generates a dummy signal on the workers that mimics continuous data from an instrument. At each time step, the worker sends one point of the signal to the client with the clientQueue queue, then polls the wkrQueue queue to check if the queue has data. If there is data to receive, the worker stops generating data and sends a message to the client to confirm that it has stopped generating data.

    function connectToWorker(clientQueue,helperClientQueue,ID)
    % Assign an ID to this worker.
    wkrQueue.ID = ID;
    % Create a PollableDataQueue on this specific worker.
    wkrQueue.Queue = parallel.pool.PollableDataQueue;
    % Send the queue to the client.
    send(helperClientQueue,wkrQueue);
    
    % Wait for instructions from client.
    [~, OK] = poll(wkrQueue.Queue,inf);
    if OK
        t = 0:0.01:4;
        step = 1;
        while step < numel(t)
            % Generate dummy instrument data.
            data_point = sin(ID*2*pi*t(step));
            % Send data to client using a data queue.
            send(clientQueue,{ID,t(step),data_point});
            % Check if worker queue has data to receive and use a timeout.
            [~, OK] = poll(wkrQueue.Queue,0.1);
            if OK
                send(helperClientQueue,sprintf("Data generation stopped on worker %d",ID));
                return
            else
                step = step + 1;
            end
        end
    else
        return
    end
    end

    Define a function to prepare and initialize plots to visualize the data from the workers. Specify different line properties for each worker.

    function [fig,p] = createPlots
    fig = figure(Name="Signal from Instruments",Visible="off");
    t = tiledlayout(fig,3,1);
    lineColor = ["k","b","g"];
    p = gobjects(1,3);
    for i=1:3
        nexttile(t);
        xlabel("Time (s)");
        ylabel("Amplitude");
        title(sprintf("Instrument %d",i))
        p(i) = animatedline(NaN,NaN,Color=lineColor(i));
    end
    end

    Define a function to update the plots when the workers send data to the client.

    function receiveDataOnClient(p,data)
    addpoints(p(data{1,1}),data{1,2},data{1,3})
    drawnow limitrate;
    end

    Tips

    • You can only manually retrieve data or messages sent using a PollableDataQueue object. To automatically process data after it is received on the client, use a parallel.pool.DataQueue object to send the data instead.

    Extended Capabilities

    Version History

    Introduced in R2017a