[Анализ исходного кода] Параллельная реализация конвейера PyTorch (2) — как разделить модель

машинное обучение глубокое обучение

0x00 сводка

В предыдущей статье мы представили основы параллелизма конвейера PyTorch, а в этой статье мы представляем его механизм автоматической балансировки и сегментацию модели.

Ссылки на другие статьи о конвейерном параллелизме:

[Анализ исходного кода] Конвейер глубокого обучения, параллельный Gpipe(1) --- Базовая реализация конвейера

[Анализ исходного кода] Конвейер глубокого обучения, параллельный GPipe (2) ----- накопление градиента

[Анализ исходного кода] Конвейер глубокого обучения, параллельный GPipe(3) -- перерасчет

[Анализ исходного кода] PipeDream (1) --- Этап профиля параллельного конвейера глубокого обучения

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream(2) --- Вычислительный раздел

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream (3) --- модель преобразования

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream(4) --- механизм выполнения

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream (5) --- коммуникационный модуль

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream (6) --- Стратегия 1F1B

[Анализ исходного кода] Параллельная реализация конвейера PyTorch (1) -- базовые знания

Изображение в этой статье взято из бумаги и исходного кода github.

0x01 проблема

Первая проблема, с которой сталкивается конвейерный параллелизм:

  • Как разделить большую модель на несколько маленьких моделей? Каков алгоритм сегментации?
  • Как распределить эти маленькие модели на несколько устройств? Каков алгоритм распределения?
  • Как добиться оптимальной или приблизительно оптимальной общей производительности? Что такое показатели?

Например, большая модель с 6 слоями, как ее разделить на три маленькие модели?

+-----------------------------------------------------------------------------------------+
|                                                                                         |
| Layer 1 +--->  Layer 2 +-----> Layer 3 +----->  Layer 4 +-----> Layer 5  +---> Layer 6  |
|                                                                                         |
+------------------------------------------+----------------------------------------------+
                                           |
                                           |
                                           | ? ? ? ? ?
                                           |
                                           |
                                           v
​
  +--------------------+         +---------------------+      +--------------------+
  |Device 1            |         |Device 2             |      |Device 3            |
  |                    |         |                     |      |                    |
  |      Layer 1       |    +---------> Layer 4        |      |                    |
  |         +          |    |    |         +           |  +------->   Layer 6      |
  |         |          |    |    |         |           |  |   |                    |
  |         v          |    |    |         |           |  |   |                    |
  |      Layer 2       |    |    |         |           |  |   |                    |
  |         +          |    |    |         v           |  |   |                    |
  |         |          |    |    |      Layer 5 +---------+   |                    |
  |         v          |    |    |                     |      |                    |
  |      Layer 3  +---------+    |                     |      |                    |
  |                    |         |                     |      |                    |
  +--------------------+         +---------------------+      +--------------------+

Далее посмотрим, как torchgpipe решает эти проблемы.

0x01 Автобаланс

torchgpipeПодмодуль torchgpipe.balance предназначен для расчета разделов, чтобы сделать разницу в ресурсах между попарными разделами как можно меньше. Занятость ресурсов рассчитывается путем профилирования.

1.1 Automatic Balancing

Модель сегментации повлияет на загрузку GPU, например, слой с большим объемом вычислений будет замедлять скорость нисходящего потока, поэтому необходимо найти наилучшую точку баланса модели. Однако определить наилучший баланс для модели сложно, особенно если пользователь все еще разрабатывает модель, а архитектура модели может со временем измениться. В этом случае TorchPipe настоятельно рекомендует использоватьtorchgpipe.balanceавтоматически балансировать. Это не даст пользователю наилучший баланс, но это достаточно хороший баланс.

Обратите внимание, что эта функция созданаtorchgpipeПредоставляется вместо оригинальной бумаги GPipe от Huang et al.

torchgpipeПредусмотрены два инструмента балансировки, оба из которых используются на основе результатов профиля каждого слоя, и пользователь может выбрать инструмент балансировки в соответствии со своими потребностями.

  • ~torchgpipe.balance.balance by_time: отслеживает время работы каждого слоя.
  • ~torchgpipe.balance.balance by_size: определяет использование памяти CUDA для каждого слоя.

Конкретное использование заключается в следующем: пользователю необходимо ввести образец ввода в модель.

   from torchgpipe import GPipe
   from torchgpipe.balance import balance_by_time
​
   partitions = torch.cuda.device_count()
   sample = torch.rand(128, 3, 224, 224) # 用户需要向模型中输入一个样本输入
   balance = balance_by_time(partitions, model, sample)
​
   model = GPipe(model, balance, chunks=8)

1.2 Основные функции/функции

1.2.1 Batch

Пакет — это базовый класс, расположенный в torchgpipe/microbatch.py, Его функция — инкапсулировать тензоры или тензоры для унифицированной обработки. Пакет сохраняет тензоры в собственной переменной-члене значения. При вызове метода call входящий метод применяется к тензору значений.

Например, метод Pipeline.compute, о котором мы поговорим позже, будет иметь следующее, которое должно применить разделение к тензорам в пакете:

def compute(batch: Batch = batch,
                            partition: nn.Sequential = partition,
                            skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
                            ) -> Batch:
  with use_skip_tracker(skip_tracker):
    return batch.call(partition)
​
  task = Task(streams[j], compute=compute, finalize=None)

Конкретная партия определяется следующим образом:

Tensors = Tuple[Tensor, ...]
TensorOrTensors = Union[Tensor, Tensors]
Function = Callable[[TensorOrTensors], TensorOrTensors]

class Batch:
    """An abstraction of an atomic tensor or a tuple of tensors. This
    eliminates every boilerplate code to classify an atomic tensor or a tuple
    of tensors.
    ::

        x = generate_tensor_or_tensors()
        x = Batch(x)

        # in-place update
        x[0] = F.apply(x[0])
        x[:] = F.apply(*x)

        # f(x) if x is a tensor.
        # f(*x) if x is a tuple of tensors.
        # y is also a batch.
        y = x.call(f)

    """

    def __init__(self, value: TensorOrTensors) -> None:
        self.value = value
        self.atomic = torch.is_tensor(value)

    @property
    def tensor(self) -> Tensor:
        """Retrieves the underlying tensor."""
        if not self.atomic:
            raise AttributeError('not atomic batch')
        return cast(Tensor, self.value)

    @property
    def tensors(self) -> Tensors:
        """Retrieves the underlying tensors."""
        if self.atomic:
            raise AttributeError('batch is atomic')
        return cast(Tensors, self.value)

    @property
    def tensor_or_tensors(self) -> TensorOrTensors:
        """Retrieves the underlying tensor or tensors regardless of type."""
        return self.value

    def call(self, function: Function) -> 'Batch': # 这里是关键方法
        """Calls a function by the underlying tensor or tensors. It also wraps
        the output with :class:`Batch`.
        """
        return Batch(function(self.value)) # 调用模型的forward         

1.2.2 layerwise_sandbox

Роль метода layerwise_sandbox состоит в том, чтобы копировать слои модели, не затрагивая исходную модель, что упрощает профилирование.

def layerwise_sandbox(module: nn.Sequential,
                      device: torch.device,
                      ) -> Generator[nn.Module, None, None]:
    """Copies layers for ease to profile. It doesn't modify the given
    module.
    """
    for layer in module:
        layer_copy = copy.deepcopy(layer)
        layer_copy.to(device)
        layer_copy.train()
        yield layer_copy

1.2.3 detach

Роль метода detach состоит в том, чтобы отсоединить некоторые тензоры от графа autograd, чтобы получить новый набор тензоров. Эти тензоры отделены от текущего графа вычислений. Но он по-прежнему указывает на место хранения исходной переменной. detach может отключить обратное распространение некоторых ветвей.

def detach(batch: Batch) -> None:
    """Detaches from autograd graph."""
    for i, x in enumerate(batch):
        batch[i] = x.detach().requires_grad_(x.requires_grad)

существуетtorchgpipeВ коде часто можно увидеть использование detach, что видно из комментариев, это обходной путь, принятый из-за ошибки в PyTorch.

    # A Python autograd function might fail with this error:
    #
    #   RuntimeError: Returning Variables sharing storage with other Variables
    #   that require grad is not supported in Python functions. Please submit a
    #   feature request if you hit this error.
    #
    # It doesn't look like an essential restriction. But it happens on the
    # current PyTorch version. To avoid it, we should detach the tensor before
    # returning by identity autograd functions, such as Wait, Fork, and Join.
    #

1.3 Баланс по времени расчета

Роль метода balance_by_time заключается в балансировке по времени работы, а параметры следующие:

  • разделы : количество разделов
  • модуль: последовательная модель, которая должна быть разделена
  • sample : образец заданного размера партии

По сути, это вызов profile_times, чтобы получить время работы по образцу, а затем партиционирование.

def balance_by_time(partitions: int,
                    module: nn.Sequential,
                    sample: TensorOrTensors,
                    *,
                    timeout: float = 1.0,
                    device: Device = torch.device('cuda'),
                    ) -> List[int]:
    """Naive automatic balancing by elapsed time per layer.
    ::
​
        sample = torch.empty(128, 3, 224, 224)
        balance = balance_by_time(torch.cuda.device_count(), model, sample)
        gpipe = GPipe(model, balance, chunks=8)
​
    Args:
        partitions (int):
            intended number of partitions
        module (torch.nn.Sequential):
            sequential module to be partitioned
        sample (torch.Tensor):
            example input with arbitrary batch size
​
    Keyword Args:
        timeout (float):
            profiling iterates again if the timeout (in second) is not exceeded
            (default: ``1.0``)
        device ('cpu' or 'cuda' device):
            CPU or CUDA device where each layer is profiled (default: the
            current CUDA device)
​
    Returns:
        A list of number of layers in each partition. Use it for the `balance`
        parameter of :class:`~torchgpipe.GPipe`.
​
    .. note::
        `module` and `sample` must be placed on the same device.
​
    """
    times = profile_times(module, sample, timeout, torch.device(device))
    return balance_cost(times, partitions)

Здесь класс Batch предназначен для инкапсуляции тензоров или тензорных массивов, и его методы можно использовать единообразно.

profile_times получает время работы в соответствии с образцом, конкретная логика такова:

  • Переберите слои в модели для каждого слоя:

    • Дождитесь завершения всех ядер во всех потоках на текущем устройстве.
    • Время начала записи
    • выполнить опережающее вычисление на слое
    • Получите тензор, которому нужен градиент, если он существует, выполните обратный расчет
    • Дождитесь завершения всех ядер во всех потоках на текущем устройстве.
    • время окончания записи
  • Наконец, возвращается список сред выполнения для каждого слоя.

def profile_times(module: nn.Sequential,
                  sample: TensorOrTensors,
                  timeout: float,
                  device: torch.device,
                  ) -> List[int]:
    """Profiles elapsed times per layer."""
    if any(p.grad is not None for p in module.parameters()):
        raise ValueError('some parameter already has gradient')

    _batch = Batch(sample)
    for i, x in enumerate(_batch):
        _batch[i] = x.detach().to(device).requires_grad_(x.requires_grad)

    time_bufs: List[List[float]] = [[] for _ in module]
    begun_at = time.time()

    while time.time() - begun_at < timeout:
        batch = _batch

        # 遍历模型中的层
        for i, layer in enumerate(layerwise_sandbox(module, device)):
            detach(batch)

            if device.type == 'cuda':
                torch.cuda.synchronize(device) # 等待当前设备上所有流中的所有kernel完成
            tick = time.time()# 起始运行时间

            # Forward
            batch = batch.call(layer) # 对某层进行前向计算

            # Backward
            # 得到需要梯度的张量
            backward_tensors = tuple(y for y in batch if y.requires_grad)
            # 进行后向计算
            if backward_tensors:
                torch.autograd.backward(backward_tensors, backward_tensors)

            if device.type == 'cuda':
                torch.cuda.synchronize(device) # 等待当前设备上所有流中的所有kernel完成
            tock = time.time() # 终止时间

            time_bufs[i].append(tock - tick)

    us = 1_000_000
    return [sum(int(t*us) for t in buf) for buf in time_bufs]


1.4 Баланс по объему памяти

Роль метода balance_by_size заключается в балансировке в соответствии с объемом оперативной памяти, где параметры следующие:

  • partitions : количество разделов, из примера можно рассматривать как количество устройств.
  • модуль: последовательная модель, которая должна быть разделена
  • sample : образец заданного размера партии

По сути, это вызов profile_sizes по образцу, чтобы получить размер оперативной памяти, а затем партицию.

Во время обучения память, необходимая для параметров, зависит от того, какой оптимизатор используется. Оптимизатор может использовать буфер для каждого параметра, чтобы отслеживать внутри него статистику оптимизации, такую ​​как импульсный буфер в SGD.

Для более надежного баланса на основе размера пользователь должен указать соответствующий «param_scale» для оптимизатора. Значение по умолчанию «param_scale» равно 2, а не 1, потому что накопление градиента требуется для каждого оптимизатора. Некоторые справочные значения также приведены в комментариях ниже.

def balance_by_size(partitions: int,
                    module: nn.Sequential,
                    input: TensorOrTensors,
                    *,
                    chunks: int = 1,
                    param_scale: float = 2.0,
                    device: Device = torch.device('cuda'),
                    ) -> List[int]:
    """Naive automatic balancing by CUDA memory usage per layer.

    During training, required memory for parameters depends on which optimizer
    is used. Optimizers may use buffers for each parameter to track
    optimization statistics internally, such as momentum buffer in SGD.

    To get more reliable size based balance, you should specify `param_scale`
    with regard to your optimizer. The default `param_scale` is 2 instead of 1
    due to gradient accumulation which is necessary for every optimizer.

    Follow this guide to choose correct `param_scale` for typical optimizers:

    =========  =============  =========================================
    Optimizer  `param_scale`  Internal State
    =========  =============  =========================================
    SGD        2--3           (momentum_buffer)
    Adam       4--5           exp_avg, exp_avg_sq, (max_exp_avg_sq)
    Adadelta   4              square_avg, acc_delta
    Adagrad    3              sum
    RMSprop    3--5           square_avg, (momentum_buffer), (grad_avg)
    =========  =============  =========================================

    Here's a simple example with the Adam optimizer::

        balance = balance_by_size(
            torch.cuda.device_count(),
            model,

            # Same size with mini-batch to train
            torch.empty(1024, 3, 224, 224),

            # Number of micro-batches to train with GPipe
            chunks=8,

            # 4 for Adam
            param_scale=4.0,
        )

        gpipe = GPipe(model, balance, chunks=8)
        adam = Adam(gpipe.parameters())

    Args:
        partitions (int):
            intended number of partitions
        module (torch.nn.Sequential):
            sequential module to be partitioned
        input (torch.Tensor):
            example mini-batch with the same size to train

    Keyword Args:
        chunks (int):
            number of micro-batches will be used to train (default: ``1``)
        param_scale (float):
            how many copies of parameters would be allocated for training. It
            depends on optimizer. See the above guide. (default: ``2.0``)
        device ('cuda' device):
            CUDA device where each layer is profiled (default: the current CUDA
            device)

    Returns:
        A list of number of layers in each partition. Use it for the `balance`
        parameter of :class:`~torchgpipe.GPipe`.

    .. note::
        `module` and `input` must be placed on the same CUDA device.

    """
    sizes = profile_sizes(module, input, chunks, param_scale, torch.device(device))
    return balance_cost(sizes, partitions)


Логика profile_sizes следующая:

  • Переберите слои в модели для каждого слоя:

    • Используйте torch.cuda.memory_allocated для расчета памяти, используемой для прямого распространения, которая является значением активации. torch.cuda.memory_allocated(device=None) возвращает текущую память графического процессора, занятую тензорами для данного устройства.

    • использоватьp.storage().size() * p.storage().element_size()Расчет размеров параметров.

      • Хранилище в pytorch относится к непрерывному блоку памяти, а тензор можно рассматривать как представление, сопоставленное с хранилищем.
      • element_size() возвращает байты одного элемента.
    • Добавьте значение активации и параметр вместе, вставив список.

  • Возвращает список размеров памяти.

def profile_sizes(module: nn.Sequential,
                  input: TensorOrTensors,
                  chunks: int,
                  param_scale: float,
                  device: torch.device,
                  ) -> List[int]:
    """Profiles CUDA memory usage per layer."""
    if device.type != 'cuda':
        raise ValueError('size profiler supports only CUDA device')

    batch = Batch(input)
    sizes: List[int] = []

    latent_scale = batch[0].size(0) / chunks
    for i, x in enumerate(batch):
        batch[i] = x[:1].detach().to(device).requires_grad_(x.requires_grad)

    for layer in layerwise_sandbox(module, device):
        detach(batch)

        # Detect memory usage at forward.
        # 计算前向传播用到的显存,就是激活值
        memory_before = torch.cuda.memory_allocated(device)
        batch = batch.call(layer) # 对某层进行前向传播
        memory_after = torch.cuda.memory_allocated(device)
        latent_size = memory_after - memory_before

        # Analyze size of parameters.
        # 计算参数尺寸
        param_size = sum(p.storage().size() * p.storage().element_size()
                         for p in layer.parameters())

        # 把激活值和参数加在一起,插入列表
        # Combine size of parameters and activations with normalize scales.
        size = latent_size*latent_scale + param_size*param_scale
        sizes.append(int(size))

    return sizes # 返回内存大小列表

1.5 Алгоритм сегментации

После того, как будет получено время расчета или объем памяти каждого слоя, конкретная сегментация будет выполняться с помощью следующего кода.

times = profile_times(module, sample, timeout, torch.device(device))
return balance_cost(times, partitions)

Конкретный balance_cost — это просто пакет, а алгоритм по-прежнему blockpartition.solve.

def balance_cost(cost: List[int], partitions: int) -> List[int]:
    partitioned = blockpartition.solve(cost, partitions)
    return [len(p) for p in partitioned]

Судя по комментариям, blockpartition.solve реализует алгоритм этой статьи.

Implements "Block Partitions of Sequences" by Imre Bárány et al.Paper: https://arxiv.org/pdf/1308.2452.pdf

Это математическая статья, и ее псевдокод алгоритма выглядит следующим образом (в основном соответствующий комментариям в последующей реализации).

img

Бумага — это чисто математический аргумент, мы не изучаем его внутренний механизм, а только смотрим на результаты его работы.

Напомним, что поддерживаемая здесь модель является последовательной моделью, поэтому вне зависимости от времени или объема памяти это список. Функция решения состоит в том, чтобы как можно более равномерно распределить список на несколько групп.

Предполагая, что модель имеет 6 слоев, время работы каждого слоя следующее, и его нужно распределить между двумя устройствами, так как же его разделить?

blockpartition.solve([1, 2, 3, 4, 5, 6], partitions=2) # 就是第一层运行时间是1个单位,第二层运行时间是2个单位,依次类推。
​
结果是 [[1, 2, 3, 4], [5, 6]],可以看到,这个6个层被比较均匀的按照运行时间分成了两个partition

Если разделить на три устройства, то:

solve([1, 2, 3, 4, 5, 6], partitions=3)
​
结果是 [[1, 2, 3], [4, 5], [6]],可以看到,这个6个层被比较均匀的按照运行时间分成了三个partition

Затем balance_cost получит конкретные сведения о каждом разделе.слои, окончательный результат получения баланса:

[3,2,1]

Конкретный код алгоритма разделения выглядит следующим образом, и заинтересованные друзья могут внимательно изучить его в сочетании с статьей.

def solve(sequence: List[int], partitions: int = 1) -> List[List[int]]:
    """Splits a sequence into several partitions to minimize variance for each
    partition.

    The result might not be optimal. However, it can be done only in O(kn³),
    where k is the number of partitions and n is the length of the sequence.

    """
    if partitions < 1:
        raise ValueError(f'partitions must be a positive integer ({partitions} < 1)')

    n = len(sequence)
    if n < partitions:
        raise ValueError(f'sequence is shorter than intended partitions ({n} < {partitions})')

    # Normalize the sequence in [0, 1].
    minimum = min(sequence)
    maximum = max(sequence) - minimum

    normal_sequence: List[float]
    if maximum == 0:
        normal_sequence = [0 for _ in sequence]
    else:
        normal_sequence = [(x-minimum)/maximum for x in sequence]

    splits = [n//partitions * (x+1) for x in range(partitions-1)] + [n]

    def block_size(i: int) -> float:
        start = splits[i-1] if i > 0 else 0
        stop = splits[i]
        return sum(normal_sequence[start:stop])

    def leaderboard() -> Iterator[Tuple[float, int]]:
        return ((block_size(i), i) for i in range(partitions))

    while True:
        """
        (1) Fix p ∈ [k] with M(P) = bp. So Bp is a maximal block of P.
        """
        # max_size: M(P)
        max_size, p = max(leaderboard())

        while True:
            """
            (2) If M(P) ≤ m(P) + 1, then stop.
            """
            # min_size: m(P)
            min_size, q = min(leaderboard())

            if max_size <= min_size + 1:
                return [sequence[i:j] for i, j in zip([0]+splits[:-1], splits)]

            """
            (3) If M(P) > m(P) + 1, then let m(P) = bq for the q ∈ [k] which is
            closest to p (ties broken arbitrarily). Thus Bq is a minimal block
            of P. Let Bh be the block next to Bq between Bp and Bq. (Note that
            Bh is a non-empty block: if it were, then m(P) = 0 and we should
            have chosen Bh instead of Bq.)
            """
            if p < q:
                """
                So either p < q and then h = q−1 and we define P ∗ by moving
                the last element from Bh = Bq−1 to Bq,
                """
                h = q - 1
                splits[h] -= 1
            else:
                """
                or q < p, and then h = q + 1 and P ∗ is obtained by moving the
                first element of Bh = Bq+1 to Bq.
                """
                h = q + 1
                splits[q] += 1

            """
            Set P = P ∗ . If p = h, then go to (1), else go to (2).
            """
            if p == h:
                break

0x02 Разделение модели

2.1 вызов

Теперь, когда результаты профиля получены, следует сегментировать различные слои модели. Пример использования можно посмотреть в комментариях ниже, передав баланс в качестве параметра конструктору GPipe.

'''
If your model is still under development, its optimal balance would change
frequently. In this case, we highly recommend 'torchgpipe.balance' for naive
automatic balancing:
​
  from torchgpipe import GPipe
  from torchgpipe.balance import balance_by_time
​
  partitions = torch.cuda.device_count()
  sample = torch.empty(...)
  balance = balance_by_time(partitions, model, sample)
​
  model = GPipe(model, balance, ...)
'''

2.2 Строительство GPipe

Gpipe__init__Как видите, для разделения используется функция split_module:

    def __init__(self,
                 module: nn.Sequential,
                 balance: Optional[Iterable[int]] = None,
                 *,
                 devices: Optional[Devices] = None,
                 chunks: int = chunks,
                 checkpoint: str = checkpoint,
                 spawn_workersdeferred_batch_norm: bool = False,
                 ) -> None:
        super().__init__()
​
        chunks = int(chunks)
        checkpoint = str(checkpoint)
​
        verify_module(module)
        # Verify if the underlying skippable modules satisfy integrity. The
        # integrity can be verified before forward() because it is static.
        verify_skippables(module)
​
        self.chunks = chunks
        self.checkpoint = checkpoint
​
        if deferred_batch_norm:
            module = DeferredBatchNorm.convert_deferred_batch_norm(module, chunks)
​
        if devices is None:
            devices = range(torch.cuda.device_count())
        devices = [torch.device(d) for d in devices]
        devices = cast(List[torch.device], devices)
​
        try:
            # 对模型进行切分
            self.partitions, self.balance, self.devices = split_module(module, balance, devices)
        except BalanceError as exc:
            raise ValueError(recommend_auto_balance(str(exc)))
​
        self._copy_streams: List[List[AbstractStream]] = []
        self._skip_layout = inspect_skip_layout(self.partitions)

Итак, давайте посмотрим на функцию split_module, ее основная логика такова:

  • Обход слоев, содержащихся в модели

    • Добавьте новый слой к слоям массива

    • Если размер массива равен balance[j], достигнуто количество слоев, которое должно содержать устройство j, тогда:

      • Встройте массив разделов в последовательный модуль и получите переменный раздел.
      • Используйте partition.to(device) для размещения раздела на соответствующем устройстве, упомянутом выше,~torchgpipe.GPipeИспользуйте CUDA для обучения. Пользователям не нужно самостоятельно перемещать модули на GPU, т.к.~torchgpipe.GPipeАвтоматически перемещайте каждый раздел на другое устройство.
      • Добавьте этот раздел в массив разделов
      • Затем перейдите к следующему устройству, чтобы увидеть
  • Наконец верните разделы, баланс, устройства.

def split_module(module: nn.Sequential,
                 balance: Iterable[int],
                 devices: List[torch.device],
                 ) -> Tuple[List[nn.Sequential], List[int], List[torch.device]]:
    """Splits a module into multiple partitions.

    Returns:
        A tuple of (partitions, balance, devices).

        Partitions are represented as a :class:`~torch.nn.ModuleList` whose
        item is a partition. All layers in a partition are placed in the
        same device.

    Raises:
        BalanceError:
            wrong balance
        IndexError:
            the number of devices is fewer than the number of partitions.

    """
    balance = list(balance)

    j = 0
    partitions = []
    layers: NamedModules = OrderedDict()

    for name, layer in module.named_children(): # 遍历模型包含的层
        layers[name] = layer # 把新的层加入到数组中

        if len(layers) == balance[j]: # 如果数组大小等于balance[j],就是达到了device j应该包含的层数
            # Group buffered layers as a partition.
            partition = nn.Sequential(layers) # 把层数组组合成一个sequential module

            device = devices[j]
            partition.to(device) # 把层放置到相关设备之上

            partitions.append(partition) # 这个新module加入到分区数组中

            # Prepare for the next partition.
            layers.clear()
            j += 1 # 去下一个device看看

    partitions = cast(List[nn.Sequential], nn.ModuleList(partitions))
    del devices[j:]

    return partitions, balance, devices

В сочетании с приведенным выше примером баланс выглядит следующим образом:

[3,2,1]

Итак, первые три слоя [1, 2, 3] объединены в модуль, два средних слоя [4, 5] объединены в модуль, а последний слой [6] является модулем.

Окончательный секционированный массив:

[ module([1, 2, 3]),  module([4, 5]),  module([6])]

2.3 Пример

Давайте посмотрим на конкретную распечатку.Модель содержит 6 слоев, которые разделены на 3 раздела.Количество слоев в разделах: 3, 2 и 1.

a = nn.Linear(1, 1)
b = nn.Linear(1, 1)
c = nn.Linear(1, 1)
d = nn.Linear(1, 1)
e = nn.Linear(1, 1)
f = nn.Linear(1, 1)
​
balance = [3,2,1] # 分成了3个partition,第一个partition包括3个层,第2个包括2个层,第3个包括1个层。
model = nn.Sequential(a, b, c, d, e, f)
print(model)
model = GPipe(model, balance, devices=['gpu', 'gpu','gpu'])
print(model)

Результаты следующие: вы можете видеть, что исходная модель разделена на 3 раздела, и каждый раздел является последовательным.

Sequential(
  (0): Linear(in_features=1, out_features=1, bias=True)
  (1): Linear(in_features=1, out_features=1, bias=True)
  (2): Linear(in_features=1, out_features=1, bias=True)
  (3): Linear(in_features=1, out_features=1, bias=True)
  (4): Linear(in_features=1, out_features=1, bias=True)
  (5): Linear(in_features=1, out_features=1, bias=True)
)

GPipe(
  (partitions): ModuleList(
    (0): Sequential(
      (0): Linear(in_features=1, out_features=1, bias=True)
      (1): Linear(in_features=1, out_features=1, bias=True)
      (2): Linear(in_features=1, out_features=1, bias=True)
    )
    (1): Sequential(
      (3): Linear(in_features=1, out_features=1, bias=True)
      (4): Linear(in_features=1, out_features=1, bias=True)
    )
    (2): Sequential(
      (5): Linear(in_features=1, out_features=1, bias=True)
    )
  )
)

Переменные времени выполнения следующие:

model = {GPipe: 6} 
 balance = {list: 3} [3, 2, 1]
 checkpoint = {str} 'except_last'
 chunks = {int} 1
 devices = {list: 3} 
  0 = {device} gpu
  1 = {device} gpu
  2 = {device} gpu
 partitions = {ModuleList: 3} 
   _modules = 
   '0' = {Sequential: 3} 
        Sequential( 
        (0): Linear(in_features=1, out_features=1, bias=True)  
        (1): Linear(in_features=1, out_features=1, bias=True) 
        (2): Linear(in_features=1, out_features=1, bias=True))   
   '1' = {Sequential: 2} 
        Sequential(  
        (3): Linear(in_features=1, out_features=1, bias=True)  
        (4): Linear(in_features=1, out_features=1, bias=True))
   '2' = {Sequential: 1} 
        Sequential(
        (5): Linear(in_features=1, out_features=1, bias=True))

Следует отметить одну вещь: переменная-член разделов GPipe имеет тип nn.ModuleList. nn.ModuleList — это контейнер, который хранит разные модули и автоматически добавляет параметры каждого модуля в сеть. Однако nn.ModuleList не определяет сеть, а просто хранит разные модули вместе, последовательности между этими модулями нет, порядок выполнения сети определяется в соответствии с прямой функцией.

2.4 Резюме

  1. Используйте balance_by_size или balance_by_time, чтобы сначала запустить систему и получить результат профиля.
  2. Затем используйте split_module, чтобы разделить модель.
  3. Наконец, получается относительно сбалансированный результат разделения.
  4. Назначьте эти разделы различным устройствам.

Подробности следующие:

+-----------------------------------------------------------------------------------------+
|                                                                                         |
| Layer 1 +--->  Layer 2 +-----> Layer 3 +----->  Layer 4 +-----> Layer 5  +---> Layer 6  |
|                                                                                         |
+--------------------------+---------------------------+----------------------------------+
                           |                           |
           balance_by_size | 1                       1 |  balance_by_time
                           |                           |
                           v                           v
                [[1, 2, 3], [4, 5], [6]]         [[1, 2, 3, 4], [5, 6]]
                           +                           +
                           |                           |
                           +-----------+      +--------+
                                       |      |
                                       v      v
                                 2  split_module
                                          +
                                          |
                                          |
   3                                      v
 +------------------------------------------------------------------------------------+
 | +--------------------+         +---------------------+      +--------------------+ |
 | |Partition 1         |         |Partition 2          |      |Partition 3         | |
 | |                    |         |                     |      |                    | |
 | |      Layer 1       |    +---------> Layer 4        |      |                    | |
 | |         +          |    |    |         +           |  +------->   Layer 6      | |
 | |         |          |    |    |         |           |  |   |                    | |
 | |         v          |    |    |         |           |  |   |                    | |
 | |      Layer 2       |    |    |         |           |  |   |                    | |
 | |         +          |    |    |         v           |  |   |                    | |
 | |         |          |    |    |      Layer 5 +---------+   |                    | |
 | |         v          |    |    |                     |      |                    | |
 | |      Layer 3  +---------+    |                     |      |                    | |
 | |                    |         |                     |      |                    | |
 | +---------+----------+         +---------+-----------+      +-----------+--------+ |
 |           |                              |                              |          |
 +------------------------------------------------------------------------------------+
             |                              |                              |
           4 |                            4 |                            4 |
             v                              v                              v
   +---------+----------+         +---------+-----------+       +----------+---------+
   |                    |         |                     |       |                    |
   |    Device 1        |         |     Device 2        |       |     Device 3       |
   |                    |         |                     |       |                    |
   +--------------------+         +---------------------+       +--------------------+

До сих пор мы анализировали механизм автоматической балансировки, в следующей статье мы рассмотрим, как нарезать данные и некоторые механизмы времени выполнения.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

docs.NVIDIA.com/Bulky/Bulky-Day…

Обучение CUDA: краткое изложение основ

Использование Stream в очерках CUDA

Архитекторы решений NVIDIA глубоко анализируют крупномасштабную параметрическую языковую модель Megatron-BERT

Accelerating Wide & Deep Recommender Inference on GPUs

HugeCTR: High-Performance Click-Through Rate Estimation Training

обсудить.py torch.org/he/how-to-cheat…

GitHub.com/NVIDIA/апекс…

GitHub.com/just и URI St…

py torch.org/tutorials/i…

py torch.org/docs/stable…

Пишите на torch.org/docs/notes/…

zhuanlan.zhihu.com/p/61765561

py torch.Apache N.org/docs/1.7/64…

instructive.com/afraid/217999.contract…