Rate this Page

분산 데이터 병렬 처리 시작하기#

저자: Shen Li

감수: Joe Zhu, Chirag Pandya

번역: 조병근

참고

edit 이 튜토리얼의 소스 코드는 GitHub 에서 확인하고 변경해 볼 수 있습니다.

선수과목(Prerequisites):

DistributedDataParallel (DDP) is a powerful module in PyTorch that allows you to parallelize your model across multiple machines, making it perfect for large-scale deep learning applications. To use DDP, you’ll need to spawn multiple processes and create a single instance of DDP per process.

But how does it work? DDP uses collective communications from the torch.distributed package to synchronize gradients and buffers across all processes. This means that each process will have its own copy of the model, but they’ll all work together to train the model as if it were on a single machine.

To make this happen, DDP registers an autograd hook for each parameter in the model. When the backward pass is run, this hook fires and triggers gradient synchronization across all processes. This ensures that each process has the same gradients, which are then used to update the model.

For more information on how DDP works and how to use it effectively, be sure to check out the DDP design note. With DDP, you can train your models faster and more efficiently than ever before!

The recommended way to use DDP is to spawn one process for each model replica. The model replica can span multiple devices. DDP processes can be placed on the same machine or across machines. Note that GPU devices cannot be shared across DDP processes (i.e. one GPU for one DDP process).

In this tutorial, we’ll start with a basic DDP use case and then demonstrate more advanced use cases, including checkpointing models and combining DDP with model parallel.

참고

이 튜토리얼의 코드는 8-GPU 서버에서 실행되지만 다른 환경에서도 쉽게 적용할 수 있습니다.

DataParallelDistributedDataParallel 간의 비교 ———————————————————-===

내용에 들어가기에 앞서 복잡성이 증가했음에도 불구하고 DataParallelDistributedDataParallel 사용을 고려하는 이유를 생각해봅시다.

  • 첫째, DataParallel은 단일 작업, 멀티쓰레드이지만 단일 기기에서만 작동하는 반면, DistributedDataParallel은 다중 작업이며 단일 및 다중 기기 학습을 모두 지원합니다. 쓰레드간 GIL 경합, 복제 모델의 반복 당 생성, 산란 입력 및 수집 출력으로 인한 추가적인 오버헤드로 인해, 단일 기기에서조차 DataParallel은 일반적으로 DistributedDataParallel보다 느립니다.

  • 모델이 너무 커서 단일 GPU에 맞지 않을 경우 model parallel을 사용하여 여러 GPU로 분할해야 한다는 prior tutorial을 떠올려 보세요. DistributedDataParallelmodel parallel에서 실행되지만 DataParallel은 이때 실행되지 않습니다. DDP를 모델 병렬 처리와 결합하면 각 DDP 작업은 모델 병렬 처리를 사용하며 모든 작업은 데이터 병렬 처리를 사용합니다.

기본적인 사용법#

DDP 모듈을 생성하기 전에 반드시 우선 작업 그룹을 올바르게 설정해야 합니다. 자세한 내용은 PYTORCH로 분산 어플리케이션 개발하기에서 확인할 수 있습니다.

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

# 윈도우 플랫폼에서 torch.distributed 패키지는
# Gloo backend, FileStore 및 TcpStore 만을 지원합니다.
# FileStore의 경우, init_process_group 에서
# init_method 매개변수를 로컬 파일로 설정합니다.
# 다음 예시:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# TcpStore의 경우 리눅스와 동일한 방식입니다.

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # 작업 그룹 초기화
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

이제 DDP로 감싸여진 Toy 모듈을 생성하고 더미 입력 데이터를 입력해 보겠습니다. 우선 DDP는 0순위 작업에서부터 DDP 생성자의 다른 모든 작업들에게 모델의 상태를 전달하므로, 다른 모델의 매개 변수 초기값들에서 시작하는 다른 DDP 작업들에 대하여 걱정할 필요가 없습니다.

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # 모델을 생성하고 순위 아이디가 있는 GPU로 전달
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running basic DDP example on rank {rank}.")


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

보여지는 바와 같이 DDP는 하위 수준의 분산 커뮤니케이션 세부 사항을 포함하고 로컬 모델처럼 깔끔한 API를 제공합니다. 변화도 동기화 통신(gradient synchronization communications)은 역전파 전달(backward pass)간 수행되며 역전파 계산(backward computation)과 겹치게 됩니다. backword()가 반환되면 param.grad에는 동기화된 변화도 텐서(synchronized gradient tensor)가 포함되어 있습니다. 기본적으로 DDP는 작업 그룹을 설정하는데 몇 줄의 코드들이 더 필요하지만, 보다 다양하게 사용하는 경우 주의가 필요합니다.

비대칭 작업 속도#

DDP에서는 생성자, 순전파(forward pass) 및 역전파 전달 호출 지점이 분산 동기화 지점(distribute synchronization point)입니다. 서로 다른 작업이 동일한 수의 동기화를 시작하고 동일한 순서로 이러한 동기화 지점에 도달하여 각 동기화 지점을 거의 동시에 진입을 요구합니다. 그렇지 않으면 빠른 작업이 일찍 도착하고 다른 작업의 대기 시간이 초과될 수 있습니다. 따라서 사용자는 작업 간의 작업량을 균형 있게 분배할 필요가 있습니다. 때때로 비대칭 작업(skewed processing) 속도는 다음과 같은 이유로 인하여 불가피하게 발생합니다. 예를 들어, 네트워크 지연, 리소스 경쟁(resource contentions), 예측하지 못한 작업량 급증 등입니다. 이러한 상황에서 시간 초과를 방지하려면, init_process_group를 호출할 때 충분한 timeout값을 전달해야 합니다.

체크포인트를 저장하고 읽어오기#

학습 중에 torch.savetorch.load 로 모듈의 체크포인트를 만들고 그 체크포인트로부터 복구하는 것이 일반적입니다. 더 자세한 내용은 SAVING AND LOADING MODELS를 참고하세요. DDP를 사용할 때, 최적의 방법은 모델을 한 작업에만 저장하고 그 모델을 모든 작업에 쓰기 과부하(write overhead)를 줄이며 읽어오는 것입니다. 이는 모든 작업이 같은 매개변수로부터 시작되고 변화도는 역전파 전달로 동기화되므로 옵티마이저(optimizer)는 매개변수를 동일한 값으로 계속 설정해야 하기 때문에 정확합니다. (하나의 프로세스에서 저장하고 다른 모든 곳에서 불러오는 등) 이러한 최적화를 사용하는 경우, 저장이 완료되기 전에 불러오는 어떠한 작업도 시작하지 않도록 해야 합니다. 더불어, 모듈을 읽어올 때 작업이 다른 기기에 접근하지 않도록 적절한 map_location 인자를 제공해야합니다. map_location값이 없을 경우, torch.load는 먼저 모듈을 CPU에 읽어온 다음 각 매개변수가 저장된 위치로 복사하여 동일한 장치를 사용하는 동일한 기기에서 모든 작업을 발생시킵니다. 더 추가적인 실패 복구와 엘라스틱(elasticity support)은 TorchElastic을 참고하세요.

def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # 모든 작업은 같은 매개변수로부터 시작된다고 생각해야 합니다.
        # 무작위의 매개변수와 변화도는 역전파 전달로 동기화됩니다.
        # 그럼으로, 하나의 작업은 모델을 저장하기에 충분합니다.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # 작업 0이 저장한 후 작업 1이 모델을 읽어오도록 barrier()를 사용합니다.
    dist.barrier()
    # configure map_location properly
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location, weights_only=True))

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)

    loss_fn(outputs, labels).backward()
    optimizer.step()

    # 파일삭제를 보호하기 위해 아래에 dist.barrier()를 사용할 필요는 없습니다.
    # DDP의 역전파 전달 과정에 있는 AllReduce 옵스(ops)가 동기화 기능을 수행했기 때문에

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()
    print(f"Finished running DDP checkpoint example on rank {rank}.")

모델 병렬 처리를 활용한 DDP#

DDP는 다중 GPU 모델에서도 작동합니다. 다중 GPU 모델을 활용한 DDP는 대용량의 데이터를 가진 대용량 모델을 학습시킬 때 특히 유용합니다.

class ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)

    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)

다중 GPU 모델을 DDP로 전달할 때, device_idsoutput_device를 설정하지 않아야 합니다. 입력 및 출력 데이터는 어플리케이션 또는 모델 forward()에 의해 적절한 장치에 배치됩니다.

def demo_model_parallel(rank, world_size):
    print(f"Running DDP with model parallel example on rank {rank}.")
    setup(rank, world_size)

    # 작업을 위한 mp_model 및 장치 설정
    dev0 = rank * 2
    dev1 = rank * 2 + 1
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    # 출력값은 dev1에 저장
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running DDP with model parallel example on rank {rank}.")


if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    run_demo(demo_basic, world_size)
    run_demo(demo_checkpoint, world_size)
    run_demo(demo_model_parallel, world_size)

Initialize DDP with torch.distributed.run/torchrun#

We can leverage PyTorch Elastic to simplify the DDP code and initialize the job more easily. Let’s still use the Toymodel example and create a file named elastic_ddp.py.

import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim

from torch.nn.parallel import DistributedDataParallel as DDP

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic():
    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    dist.init_process_group("nccl")
    rank = dist.get_rank()
    print(f"Start running basic DDP example on rank {rank}.")

    # create model and move it to GPU with id rank
    device_id = rank % torch.cuda.device_count()
    model = ToyModel().to(device_id)
    ddp_model = DDP(model, device_ids=[device_id])
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_id)
    loss_fn(outputs, labels).backward()
    optimizer.step()
    dist.destroy_process_group()
    print(f"Finished running basic DDP example on rank {rank}.")

if __name__ == "__main__":
    demo_basic()

In the example above, we are running the DDP script on two hosts and we run with 8 processes on each host. That is, we are running this job on 16 GPUs. Note that $MASTER_ADDR must be the same across all nodes.

Here torchrun will launch 8 processes and invoke elastic_ddp.py on each process on the node it is launched on, but user also needs to apply cluster management tools like slurm to actually run this command on 2 nodes.

For example, on a SLURM enabled cluster, we can write a script to run the command above and set MASTER_ADDR as:

export MASTER_ADDR=$(scontrol show hostname ${SLURM_NODELIST} | head -n 1)

Then we can just run this script using the SLURM command: srun --nodes=2 ./torchrun_script.sh.

This is just an example; you can choose your own cluster scheduling tools to initiate the torchrun job.

For more information about Elastic run, please see the quick start document.