Main Content

이 번역 페이지는 최신 내용을 담고 있지 않습니다. 최신 내용을 영문으로 보려면 여기를 클릭하십시오.

parallel.pool.DataQueue

클라이언트와 워커 간에 데이터 전송 및 수신 대기

    설명

    DataQueue 객체를 사용하면 계산이 수행되는 동안 병렬 풀에서 워커와 클라이언트 간에 전송되는 데이터 또는 메시지의 비동기 처리와 자동 처리가 가능합니다. 예를 들어 중간값을 클라이언트로 전송하고 계산 진행률을 자동으로 계산할 수 있습니다.

    병렬 풀 워커의 데이터를 클라이언트로 다시 보내려면 먼저 클라이언트에서 DataQueue 객체를 만듭니다. 이 DataQueueparfor 루프 또는 다른 병렬 언어 구문(예: spmd)으로 전달합니다. 워커에서 send를 호출하여 클라이언트로 데이터를 다시 보냅니다. 클라이언트에서 afterEach를 사용하여, 수신한 데이터를 자동으로 처리할 함수를 지정합니다.

    • 필요한 경우 DataQueue를 만든 워커 또는 클라이언트에서 send를 호출할 수 있습니다.

    • 워커에 대기열을 만들고 이를 클라이언트로 다시 보내 역방향 통신을 활성화할 수 있습니다.

      R2023b 이전: 한 워커에서 다른 워커로 대기열을 보낼 수는 없습니다. 워커 간에 데이터를 전송하려면 spmd, spmdSend 또는 spmdReceive를 대신 사용합니다.

    • 다른 모든 핸들 객체와 달리 DataQueue 인스턴스와 PollableDataQueue 인스턴스는 워커로 전송될 때 연결된 상태로 유지됩니다.

    생성

    설명

    예제

    q = parallel.pool.DataQueue는 클라이언트와 워커 간에 메시지(또는 데이터)를 전송하거나 수신 대기하는 데 사용할 수 있는 객체를 만듭니다. 데이터를 수신할 워커 또는 클라이언트에 DataQueue를 만드십시오.

    속성

    모두 확장

    이 속성은 읽기 전용입니다.

    대기열에서 제거 대기 중인 데이터 항목의 수로, 0 또는 양의 정수로 지정됩니다. 값은 DataQueue 인스턴스를 만든 워커 또는 클라이언트에서 0 또는 양의 정수입니다. 클라이언트가 DataQueue 인스턴스를 만드는 경우 값은 모든 워커에서 0입니다. 워커가 DataQueue 인스턴스를 만드는 경우 값은 클라이언트와 모든 다른 워커에서 0입니다.

    객체 함수

    afterEachDataQueue에서 새 데이터를 받을 때 호출할 함수를 정의합니다.
    send데이터 대기열을 사용하여 워커에서 클라이언트로 데이터 보내기

    예제

    모두 축소

    DataQueue를 생성하고 afterEach를 호출합니다.

    q = parallel.pool.DataQueue;
    afterEach(q, @disp);
    
    parfor 루프를 시작하고 메시지를 보냅니다. 보류 중인 메시지가 afterEach 함수로 전달되는데 이 예제에서는 @disp로 전달됩니다.

    parfor i = 1:3
        send(q, i); 
    end;
         1
    
         2
    
         3

    DataQueue를 사용하여 데이터를 수신 대기하는 방법에 대한 자세한 내용은 afterEach 항목을 참조하십시오.

    메시지를 DataQueue 객체로 보내면 메시지는 리스너에 의해 처리될 때까지 대기열에서 대기합니다. 각 메시지는 대기열 길이에 1을 추가합니다. 이 예제에서는 QueueLength 속성을 사용하여 DataQueue 객체의 길이를 구합니다.

    클라이언트 또는 워커가 DataQueue 객체를 만들면, 대기열로 전송된 모든 메시지는 해당 클라이언트 또는 워커의 메모리에 유지됩니다. 클라이언트가 DataQueue 객체를 만드는 경우 모든 워커의 QueueLength 속성은 0입니다. 이 예제에서는 클라이언트에서 DataQueue 객체를 만들고 워커에서 데이터를 보냅니다.

    먼저, 1개의 워커가 있는 병렬 풀을 만듭니다.

    parpool(1);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 1 workers.
    

    그런 다음, DataQueue를 만듭니다.

    q = parallel.pool.DataQueue
    q = 
      DataQueue with properties:
    
        QueueLength: 0
    
    

    새로 만든 DataQueue에는 빈 대기열이 있습니다. parfor를 사용하여 워커에서 q.QueueLength를 구할 수 있습니다. 클라이언트에서의 대기열 길이와 워커에서의 대기열 길이를 구합니다.

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    대기열이 비어 있으므로 클라이언트와 워커 모두에 대해 QueueLength0입니다. 다음으로, 워커에서 대기열로 메시지를 보냅니다. 그런 다음 QueueLength 속성을 사용하여 대기열 길이를 구합니다.

    % Send a message first
    parfor i = 1
        send(q, 'A message');
    end
    
    % Find the length
    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 1
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    QueueLength 속성은 클라이언트에서 1이고, 워커에서 0입니다. 데이터를 즉시 표시하여 대기열을 처리할 리스너를 만듭니다.

    el = afterEach(q, @disp);

    대기열이 빌 때까지 기다린 다음 리스너를 삭제합니다.

    while q.QueueLength > 0
        pause(0.1);
    end
    delete(el);

    QueueLength 속성을 사용하여 대기열 길이를 구합니다.

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    

    대기열 처리가 완료되었으므로 QueueLength0입니다.

    이 예제에서는 DataQueue를 사용하여 parfor 루프 진행률로 대기 표시줄을 업데이트합니다.

    parfor 루프를 만들 때 병렬 풀의 워커에 각 반복을 분담합니다. parfor 루프가 완료된 경우에만 워커에서 정보가 반환됩니다. 각 반복이 끝날 때 DataQueue를 사용하여 대기 표시줄을 업데이트할 수 있습니다.

    parfor 루프 진행률로 대기 표시줄을 업데이트하는 경우 클라이언트는 남은 반복 횟수에 대한 정보를 기록해야 합니다.

    새 병렬 코드를 만들고 코드 진행률을 모니터링하려면 parfeval 워크플로를 사용하는 것이 좋습니다. 자세한 내용은 afterEach 및 afterAll을 사용하여 사용자 인터페이스를 비동기식으로 업데이트하기 항목을 참조하십시오.

    이 예제의 마지막 부분에 정의된 헬퍼 함수 parforWaitbar는 대기 표시줄을 업데이트합니다. 함수는 persistent를 사용하여 남은 반복 횟수에 대한 정보를 저장합니다.

    waitbar를 사용하여 대기 표시줄 w를 만듭니다.

    w = waitbar(0,'Please wait ...');

    DataQueue D를 만듭니다. 그런 다음 메시지가 DataQueue에 전송된 후 afterEach를 사용하여 parforWaitbar를 실행합니다.

    % Create DataQueue and listener
    D = parallel.pool.DataQueue;
    afterEach(D,@parforWaitbar);

    parfor 루프 N에 대한 반복 횟수를 설정합니다. 대기 표시줄 w와 반복 횟수 N을 사용하여 함수 parforWaitbar를 초기화합니다.

    parfor 루프의 각 반복이 끝나면 클라이언트는 parforWaitbar를 실행하고 대기 표시줄을 점진적으로 업데이트합니다.

    N = 100;
    parforWaitbar(w,N)
    

    함수 parforWaitbar는 영속 변수를 사용하여 완료된 반복 횟수를 클라이언트에 저장합니다. 워커의 정보는 필요하지 않습니다.

    N번 반복하여 parfor 루프를 실행합니다. 이 예제에서는 pauserand를 사용하여 일부 작업을 시뮬레이션합니다. 각 반복 후 send를 사용하여 DataQueue에 메시지를 보냅니다. 메시지가 DataQueue로 전송되면 대기 표시줄이 업데이트됩니다. 워커의 정보가 필요하지 않으므로 불필요한 데이터 전송을 방지하기 위해 빈 메시지를 보냅니다.

    parfor 루프가 완료된 후 delete를 사용하여 대기 표시줄을 닫습니다.

    parfor i = 1:N
        pause(rand)
        send(D,[]);
    end
    
    delete(w);
    

    헬퍼 함수 parforWaitbar를 정의합니다. 2개의 입력 인수를 사용하여 parforWaitbar를 실행하면, 이 함수는 3개의 영속 변수(count, h, N)를 초기화합니다. 하나의 입력 인수로 parforWaitbar를 실행하면 대기 표시줄이 업데이트됩니다.

    function parforWaitbar(waitbarHandle,iterations)
        persistent count h N
        
        if nargin == 2
            % Initialize
            
            count = 0;
            h = waitbarHandle;
            N = iterations;
        else
            % Update the waitbar
            
            % Check whether the handle is a reference to a deleted object
            if isvalid(h)
                count = count + 1;
                waitbar(count / N,h);
            end
        end
    end

    Status bar indicating roughly one third completion.

    이 예제에서는 parfeval을 사용하여 병렬 파라미터 스윕을 수행하고 DataQueue 객체를 사용하여 계산 중에 결과를 다시 보내는 방법을 보여줍니다.

    parfeval은 MATLAB을 차단하지 않으므로 계산이 수행되는 동안 계속해서 작업할 수 있습니다.

    이 예제는 로렌츠 연립상미분방정식에서 파라미터 σρ에 대한 파라미터 스윕을 수행하고 이 연립방정식의 혼돈적인 성질을 보여줍니다.

    ddtx=σ(y-z)ddty=x(ρ-z)-yddtz=xy-βx

    파라미터 그리드 만들기

    파라미터 스윕에서 탐색할 파라미터의 범위를 정의합니다.

    gridSize = 40;
    sigma = linspace(5, 45, gridSize);
    rho = linspace(50, 100, gridSize);
    beta = 8/3;

    meshgrid 함수를 사용하여 파라미터의 2차원 그리드를 만듭니다.

    [rho,sigma] = meshgrid(rho,sigma);

    figure 객체를 만들고 'Visible'true로 설정하면 라이브 스크립트 밖의 새 창에서 열립니다. 파라미터 스윕의 결과를 시각화하려면 곡면 플롯을 만드십시오. NaN을 사용하여 곡면의 Z 구성요소를 초기화하면 빈 플롯이 만들어집니다.

    figure('Visible',true);
    surface = surf(rho,sigma,NaN(size(sigma)));
    xlabel('\rho','Interpreter','Tex')
    ylabel('\sigma','Interpreter','Tex')

    병렬 환경 설정하기

    parpool 함수를 사용하여 병렬 워커의 풀을 만듭니다.

    parpool;
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to the parallel pool (number of workers: 6).
    

    워커에서 데이터를 보내려면 DataQueue 객체를 만드십시오. 워커가 afterEach 함수를 사용하여 데이터를 보낼 때마다 곡면 플롯을 업데이트하는 함수를 설정합니다. updatePlot 함수는 예제의 마지막 부분에 정의된 지원 함수입니다.

    Q = parallel.pool.DataQueue;
    afterEach(Q,@(data) updatePlot(surface,data));

    병렬 파라미터 스윕 수행하기

    파라미터를 정의한 후 병렬 파라미터 스윕을 수행할 수 있습니다.

    작업량을 분산하면 parfeval은 더 효율적으로 작동합니다. 작업량을 분산하려면 탐색하고자 하는 파라미터를 파티션 단위로 그룹화하십시오. 이 예제에서는 콜론 연산자(:)를 사용하여 균일한 크기(step)의 파티션으로 분할합니다. 결과로 생성되는 배열 partitions에는 파티션 경계가 담겨 있습니다. 이때 마지막 파티션의 끝점을 추가해야 함을 명심하십시오.

    step = 100;
    partitions = [1:step:numel(sigma), numel(sigma)+1]
    partitions = 1×17
    
               1         101         201         301         401         501         601         701         801         901        1001        1101        1201        1301        1401        1501        1601
    
    

    최상의 성능을 구현하려면 다음과 같은 파티션 크기로 분할하십시오.

    • 파티션을 나누는데 드는 오버헤드에 비해 계산 시간이 더 길 정도로 충분히 큰 크기.

    • 모든 워커에서 계산을 수행할 수 있을 정도로 충분히 작은 크기.

    병렬 워커에서 함수 실행을 표현하고 결과를 유지하려면 Future 객체를 사용하십시오.

    f(1:numel(partitions)-1) = parallel.FevalFuture;

    parfeval 함수를 사용하여 병렬 워커에 계산을 분담할 수 있습니다. parameterSweep은 탐색할 파라미터의 파티션에 대해 로렌츠 연립방정식을 푸는 아래 스크립트의 마지막에 정의된 헬퍼 함수입니다. 출력 인수가 하나이므로 parfeval에서 출력값 개수로 1을 지정해야 합니다.

    for ii = 1:numel(partitions)-1
        f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q);
    end

    parfeval은 MATLAB을 차단하지 않으므로 계산이 수행되는 동안 계속해서 작업할 수 있습니다. 워커는 병렬로 계산하며 중간 결과가 나오는 대로 DataQueue를 통해 전송합니다.

    parfeval이 완료될 때까지 MATLAB을 차단하려면 Future 객체에 wait 함수를 사용하십시오. wait 함수는 후속 코드가 parfeval 계산의 완료 여부에 종속되는 경우에 유용하게 사용할 수 있습니다.

    wait(f);

    parfeval 계산을 마치면 wait가 완료된 후에 나머지 코드를 실행할 수 있습니다. 예를 들어, 결과 곡면의 등고선을 플로팅해 보겠습니다. fetchOutputs 함수를 사용하여 Future 객체에 저장된 결과를 가져오십시오.

    results = reshape(fetchOutputs(f),gridSize,[]);
    contourf(rho,sigma,results)
    xlabel('\rho','Interpreter','Tex')
    ylabel('\sigma','Interpreter','Tex')

    파라미터 스윕에 계산 리소스가 더 필요하고 클러스터에 액세스할 수 있는 경우 parfeval 계산을 확장할 수 있습니다. 자세한 내용은 Scale Up from Desktop to Cluster 항목을 참조하십시오.

    헬퍼 함수 정의하기

    탐색할 파라미터의 파티션에 대해 로렌츠 연립방정식을 푸는 헬퍼 함수를 정의합니다. DataQueue 객체의 send 함수를 사용하여 중간 결과를 MATLAB 클라이언트로 전송합니다.

    function results = parameterSweep(first,last,sigma,rho,beta,Q)
        results = zeros(last-first,1);
        for ii = first:last-1
            lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)];
            [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]);
            result = a(end,3);
            send(Q,[ii,result]);
            results(ii-first+1) = result;
        end
    end

    새 데이터가 도착하면 곡면 플롯을 업데이트하는 또다른 헬퍼 함수를 정의합니다.

    function updatePlot(surface,data)
        surface.ZData(data(1)) = data(2);
        drawnow('limitrate');
    end

    • DataQueue 객체를 사용하여, 전송된 데이터 또는 메시지만 자동으로 처리할 수 있습니다. 클라이언트 또는 워커에서 데이터를 수신한 후 이러한 데이터를 수동으로 가져오려면 대신 parallel.pool.PollableDataQueue 객체를 사용하여 데이터를 전송합니다.

    버전 내역

    R2017a에 개발됨