[Анализ исходного кода] Распределенный оптимизатор PyTorch (3) ---- параллелизм модели

машинное обучение искусственный интеллект PyTorch

0x00 сводка

В этой серии статей представлен распределенный оптимизатор, который разделен на три статьи, а именно краеугольную статью, оптимизатор параллельных данных в DP/DDP/Horovod и распределенный оптимизатор PyTorch, которые постепенно расширяются. В этой статье представлены распределенный оптимизатор PyTorch и оптимизатор в PipeDream, в основном задействующие модельный параллелизм (конвейерный параллелизм).

Другие статьи о распространении PyTorch:

[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор

[Анализ исходного кода] Как PyTorch использует GPU

[Анализ исходного кода] Распределенный PyTorch (2) ----- DataParallel (включен)

[Анализ исходного кода] Распределенный PyTorch (3) ----- DataParallel (ниже)

[Анализ исходного кода] Распределенный PyTorch (4) ------ Основная концепция распределенного приложения

[Анализ исходного кода] Распределенный PyTorch (5) ------ Обзор DistributedDataParallel и способы его использования

[Анализ исходного кода] Распределенный PyTorch (6) ---DistributedDataParallel -- инициализация и хранение

[Анализ исходного кода] Распределенный PyTorch (7) ----- Группа процессов DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (8) -------- Бумага DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (9) ----- Инициализация DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (10) ------ Редуктор статической архитектуры DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (11) ----- DistributedDataParallel для создания операций Reducer и Join

[Анализ исходного кода] Распределенный PyTorch (12) ----- Прямое распространение до DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (13) ----- Обратное распространение DistributedDataParallel

[Анализ исходного кода] PyTorch, распространяемый Autograd (1) ---- дизайн

[Анализ исходного кода] PyTorch, распространяемый Autograd (2) ---- Фонд RPC

[Анализ исходного кода] PyTorch, распространяемый Autograd (3) ---- контекстно-зависимый

[Анализ исходного кода] PyTorch распространяет Автоград (4) ---- как врезаться в движок

[Анализ исходного кода] PyTorch, распространяемый Autograd (5) ---- движок (включен)

[Анализ исходного кода] PyTorch, распространяемый Autograd (6) ---- Engine (ниже)

[Анализ исходного кода] Распределенный оптимизатор PyTorch (1) ---- краеугольный камень

[Анализ исходного кода] Распределенный оптимизатор PyTorch (2) ---- оптимизатор параллельных данных

Для лучшего объяснения код в этой статье будет соответственно упрощен в соответствии с конкретной ситуацией.

0x01 предыдущий обзор

Раньше, будь то DP, DDP или Horovod, это был по существу параллелизм обработки данных, такой как DDP, реплицирующий одну и ту же модель на все графические процессоры, где каждый графический процессор использует отдельный раздел входных данных. Хотя это может значительно ускорить процесс обучения, оно не подходит для определенных случаев использования, когда модель слишком велика для одного графического процессора. Вот люди и ввели модельный параллелизм (model parallel).

Соответственно, оптимизатору также необходимо вносить различные модификации для удовлетворения потребностей параллелизма модели. Для лучшего анализа в этой статье сначала представлен параллелизм модели с одной машиной, а затем представлен распределенный оптимизатор PyTorch.

0x02 Автономная модель

Текст ниже переведен сpy torch.org/tutorials/i…

Модельный параллелизм широко используется для распределенного обучения. иDataParallelНапротив, параллелизм моделей разделяет одну модель на разные графические процессоры, а не дублирует всю модель на каждом графическом процессоре (в частности, если предположить, что модельmСодержит 10 слоев, при использованииDataParallel, каждый GPU будет иметь полную копию этих 10 слоев, в то время как каждый GPU может содержать 5 слоев при использовании параллелизма моделей на двух GPU).

Высокоуровневая идея параллелизма модели заключается в размещении разных подсетей модели на разных устройствах и реализацииforwardметод перемещения промежуточных выходов между устройствами. Поскольку на одном устройстве работает только часть модели, группа устройств может совместно обслуживать более крупную модель.

В этом посте мы не будем пытаться создавать огромные модели и втискивать их в ограниченное количество графических процессоров. Вместо этого этот пост фокусируется на демонстрации идеи параллелизма моделей. Читатели могут применить эти идеи на практике.

2.1 Основное использование

Начнем с игрушечной модели с двумя линейными слоями. Чтобы запустить эту модель на двух графических процессорах, просто поместите каждый линейный слой на другой графический процессор и соответствующим образом переместите входные и промежуточные выходные данные, чтобы они соответствовали устройству слоя.

import torch
import torch.nn as nn
import torch.optim as optim


class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = torch.nn.Linear(10, 10).to('cuda:0')
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to('cuda:1')

    def forward(self, x):
        x = self.relu(self.net1(x.to('cuda:0')))
        return self.net2(x.to('cuda:1'))

ToyModelКод выглядит очень похоже на то, как он реализован на одном GPU. Модифицируются только две части: часть построения сети и прямая часть.

  • __init__метод использует дваto(device)Оператор используется для размещения линейного слоя на соответствующем устройстве, что разделяет всю сеть на две части, которые затем можно запускать на отдельных графических процессорах.
  • Прямой метод использует дваto(device)Оператор используется для размещения тензоров на соответствующем устройстве, чтобы выходные данные одного уровня можно было скопировать на графический процессор другого уровня с помощью семантики tensor.to.

Это единственное место в модели, которое нужно изменить.backward()иtorch.optimС этим справятся, они автоматически берут на себя градиенты, как если бы модель была на GPU. При вызове функции потерь нужно просто убедиться, что метки находятся на том же устройстве, что и выход сети.

model = ToyModel()
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)

optimizer.zero_grad()
outputs = model(torch.randn(20, 10))
labels = torch.randn(20, 5).to('cuda:1')
loss_fn(outputs, labels).backward()
optimizer.step()

Самое важное здесь — это labels = torch.randn(20, 5).to('cuda:1') , что гарантирует, что метки будут в cuda:1'.

Вспомним предыдущий код переадресации: self.net2(x.to('cuda:1')). Эти две строки кода гарантируют, что метка находится на том же устройстве cuda:1', что и вывод.

После инициализации это так:

+--------------------+                       +------------------------+
| cuda:0             |                       | cuda:1                 |
|                    |                       |                        |
|                    |                       |                        |
|                    |                       |                        |
|       net1(x)      |                       |        net2(x)         |
|                    |                       |                        |
|                    |                       |                        |
|                    |                       |                        |
+--------------------+                       +------------------------+

После прямой операции и установки метки следующим образом вывод и метка теперь находятся на GPU 1:

               +--------------------+                       +------------------------+
               | cuda:0             |                       | cuda:1                 |
               |                    |                       |                        |
               |                    |                       |                        |
               |                    |                       |                        |
x.to('cuda:0')-------> net1(x)  +-------> x.to('cuda:1') +-------->  net2(x)         |
               |                    |                       |                        |
               |                    |                       |   labels.to('cuda:1')  |
               |                    |                       |                        |
               +--------------------+                       +------------------------+

2.2 Параллельное применение моделей к существующим модулям

Также можно преобразовать существующий модуль с одним GPU для работы на нескольких GPU, изменив несколько строк кода. В приведенном ниже коде показано, как разложитьtorchvision.models.resnet50()на два графических процессора. Основная идея заключается в наследовании существующегоResNetмодуль и разделить слои на два графических процессора в процессе сборки. Затем перезагрузитеforwardметод, чтобы сшить две подсети вместе,forwardВ частности, это делается путем соответствующего перемещения промежуточных выходов.

from torchvision.models.resnet import ResNet, Bottleneck

num_classes = 1000

class ModelParallelResNet50(ResNet):
    def __init__(self, *args, **kwargs):
        super(ModelParallelResNet50, self).__init__(
            Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs)

        self.seq1 = nn.Sequential(
            self.conv1,
            self.bn1,
            self.relu,
            self.maxpool,

            self.layer1,
            self.layer2
        ).to('cuda:0')

        self.seq2 = nn.Sequential(
            self.layer3,
            self.layer4,
            self.avgpool,
        ).to('cuda:1')

        self.fc.to('cuda:1')

    def forward(self, x):
        x = self.seq2(self.seq1(x).to('cuda:1'))
        return self.fc(x.view(x.size(0), -1))

Вышеупомянутая реализация решает проблему в случаях, когда модель слишком велика, чтобы поместиться на одном графическом процессоре. Однако вы, возможно, заметили, что даже если ваша модель подходит для этой ситуации, она может работать медленнее, чем работа на одном графическом процессоре. Это связано с тем, что в любой момент времени работает только один из двух графических процессоров, а другой ничего не делает. существуетlayer2иlayer3который должен преобразовать промежуточный вывод изcuda:0скопировать вcuda:1, что еще больше приведет к снижению производительности.

Давайте проведем эксперимент, чтобы получить более количественную оценку времени выполнения. В этом эксперименте мы тренируемся, запуская случайные входные данные и метки.ModelParallelResNet50и существующиеtorchvision.models.resnet50(). После обучения модель не выдает никаких полезных предсказаний, но мы можем получить разумное представление о времени выполнения.

import torchvision.models as models

num_batches = 3
batch_size = 120
image_w = 128
image_h = 128


def train(model):
    model.train(True)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001)

    one_hot_indices = torch.LongTensor(batch_size) \
                           .random_(0, num_classes) \
                           .view(batch_size, 1)

    for _ in range(num_batches):
        # generate random inputs and labels
        inputs = torch.randn(batch_size, 3, image_w, image_h)
        labels = torch.zeros(batch_size, num_classes) \
                      .scatter_(1, one_hot_indices, 1)

        # run forward pass
        optimizer.zero_grad()
        outputs = model(inputs.to('cuda:0'))

        # run backward pass
        labels = labels.to(outputs.device)
        loss_fn(outputs, labels).backward()
        optimizer.step()

вышесказанноеtrain(model)использование методаnn.MSELossв качестве функции потерь используйтеoptim.SGDкак оптимизатор. это имитирует128 X 128Обучение на изображениях, организованных в 3 группы по 120 изображений в каждой. Затем мы используемtimeitбежатьtrain(model)10 раз и постройте время выполнения со стандартным отклонением.

import matplotlib.pyplot as plt
plt.switch_backend('Agg')
import numpy as np
import timeit

num_repeat = 10

stmt = "train(model)"

setup = "model = ModelParallelResNet50()"
mp_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
mp_mean, mp_std = np.mean(mp_run_times), np.std(mp_run_times)

setup = "import torchvision.models as models;" + \
        "model = models.resnet50(num_classes=num_classes).to('cuda:0')"
rn_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
rn_mean, rn_std = np.mean(rn_run_times), np.std(rn_run_times)


def plot(means, stds, labels, fig_name):
    fig, ax = plt.subplots()
    ax.bar(np.arange(len(means)), means, yerr=stds,
           align='center', alpha=0.5, ecolor='red', capsize=10, width=0.6)
    ax.set_ylabel('ResNet50 Execution Time (Second)')
    ax.set_xticks(np.arange(len(means)))
    ax.set_xticklabels(labels)
    ax.yaxis.grid(True)
    plt.tight_layout()
    plt.savefig(fig_name)
    plt.close(fig)


plot([mp_mean, rn_mean],
     [mp_std, rn_std],
     ['Model Parallel', 'Single GPU'],
     'mp_vs_rn.png')

img

Получается, что модельный параллелизм требует большего времени выполнения, чем реализация на GPU.4.02/3.75-1=7%. Таким образом, мы можем сделать вывод, что копирование тензоров между графическими процессорами требует около 7% накладных расходов.

2.3 Проблемы и решения

2.3.1 Текущее состояние

Подытожим текущую ситуацию:

  • Несмотря на наличие нескольких графических процессоров, в каждый момент всего процесса выполнения только один графический процессор выполняет вычисления, а остальные графические процессоры простаивают.
  • Также происходит копирование промежуточных результатов вычислений между GPU, что также снижает производительность.

Таким образом, мы должны целенаправленно решать эти две проблемы:

  • Заставьте все графические процессоры работать.
  • Сокращение времени передачи копии.

2.3.2 Решения

Два решения проблемы заключаются в следующем:

Заставьте все графические процессоры двигатьсяОдним из вариантов является добавление механизма конвейера: каждый пакет далее делится на разделенный конвейер, так что, когда одно разделение достигает второй подсети, следующее разделение может быть передано в первую подсеть. Таким образом, два последовательных разделения могут выполняться одновременно на обоих графических процессорах.

Почему это можно сделать? Это связано с логикой асинхронного параллельного выполнения CUDA.

  • Некоторые операции CUDA являются асинхронными, например: запуск ядра, копирование данных между устройствами, копирование небольших блоков памяти между хостом и устройством и т.д.
  • Почти все устройства CUDA с вычислительной мощностью 1.1 и выше поддерживают параллельную репликацию и выполнение ядра, то есть копирование данных и числовые вычисления могут выполняться параллельно.
  • Некоторые устройства с вычислительными возможностями 2.x могут одновременно выполнять несколько ядер.
  • На некоторых устройствах с вычислительной мощностью 2.x копии в обоих направлениях могут быть распараллелены (GPU-CPU, CPU-GPU).

какСокращение времени передачи копии? Это может использовать некоторую комбинацию аппаратного и программного обеспечения для увеличения пропускной способности и уменьшения задержки, например:

  • Аппаратный уровень включает в себя: PCIe, NVlink, NVSwitch внутри одной машины, сеть RDMA (IB или RoCE) между несколькими машинами.
  • Стек программного обеспечения включает в себя: ряд технологий GPUDirect: P2P (Peer-to-Peer), RDMA, Async, Storage и др.

PyTorch использует библиотеку NCCL (на основе вычислений CUDA).

2.4 Ускорение за счет конвейерного ввода

В следующих экспериментах мы далее делим каждую «партию из 120 изображений» на «20 частей изображения». Поскольку PyTorch запускает операции CUDA асинхронно, реализации не нужно создавать несколько потоков для достижения параллелизма.

class PipelineParallelResNet50(ModelParallelResNet50):
    def __init__(self, split_size=20, *args, **kwargs):
        super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
        self.split_size = split_size

    def forward(self, x):
        splits = iter(x.split(self.split_size, dim=0))
        s_next = next(splits)
        s_prev = self.seq1(s_next).to('cuda:1')
        ret = []

        for s_next in splits:
            # A. s_prev runs on cuda:1
            s_prev = self.seq2(s_prev)
            ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

            # B. s_next runs on cuda:0, which can run concurrently with A
            s_prev = self.seq1(s_next).to('cuda:1')

        s_prev = self.seq2(s_prev)
        ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

        return torch.cat(ret)


setup = "model = PipelineParallelResNet50()"
pp_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)

plot([mp_mean, rn_mean, pp_mean],
     [mp_std, rn_std, pp_std],
     ['Model Parallel', 'Single GPU', 'Pipelining Model Parallel'],
     'mp_vs_rn_vs_pp.png')

Обратите внимание, что операции тензорного копирования между устройствами синхронизируются с текущим потоком на исходном и целевом устройствах. Если вы создаете несколько потоков, вы должны убедиться, что операции репликации правильно синхронизированы. Запись в исходный тензор или чтение/запись целевого тензора до завершения операции копирования может привести к неопределенному поведению. Вышеупомянутая реализация использует только поток по умолчанию на исходном и целевом устройствах, поэтому нет необходимости применять дополнительную синхронизацию.

img

Экспериментальные результаты показывают, что после добавления конвейерного ввода в параллелизм модели ResNet50 процесс обучения ускоряется примерно на3.75/2.51-1=49%. Хотя до идеального 100% разгона далеко. Так как мы ввели новый параметр в параллельную реализацию пайплайнаsplit_sizes, поэтому неясно, как этот новый параметр влияет на общее время обучения. Интуитивно используйте небольшиеsplit_sizeприведет к запуску многих крошечных ядер CUDA при использовании большихsplit_sizeРезультатом является относительно длительное время простоя во время первого и последнего разделения. Ни то, ни другое не является оптимальным.split_sizeДля этого конкретного эксперимента может существовать оптимальная конфигурация. Давайте попробуем использовать несколько разныхsplit_sizeзначение, чтобы запустить эксперимент, чтобы найти его.

means = []
stds = []
split_sizes = [1, 3, 5, 8, 10, 12, 20, 40, 60]

for split_size in split_sizes:
    setup = "model = PipelineParallelResNet50(split_size=%d)" % split_size
    pp_run_times = timeit.repeat(
        stmt, setup, number=1, repeat=num_repeat, globals=globals())
    means.append(np.mean(pp_run_times))
    stds.append(np.std(pp_run_times))

fig, ax = plt.subplots()
ax.plot(split_sizes, means)
ax.errorbar(split_sizes, means, yerr=stds, ecolor='red', fmt='ro')
ax.set_ylabel('ResNet50 Execution Time (Second)')
ax.set_xlabel('Pipeline Split Size')
ax.set_xticks(split_sizes)
ax.yaxis.grid(True)
plt.tight_layout()
plt.savefig("split_size_tradeoff.png")
plt.close(fig)

img

Результаты показывают, что установкаsplit_sizeНаибольшая скорость обучения достигается за 12, в результате чего3.75/2.43-1=54%ускорить. У нас еще есть возможность еще больше ускорить тренировочный процесс. Например, в настоящее время всеcuda:0операции над ним помещаются в его поток по умолчанию. Это означает, что вычисление следующего разделения не может перекрывать операцию копирования предыдущего разделения. Однако, поскольку предыдущее и следующее расщепления являются разными тензорами, нет проблем с перекрытием вычисления одного тензора копией другого. Эта реализация требует использования нескольких потоков на обоих графических процессорах, а разные структуры подсети требуют разных стратегий управления потоками. Поскольку не существует универсального многопоточного решения для всех случаев использования с параллельными моделями, мы не будем обсуждать его в этом руководстве.

В этом посте представлены несколько измерений производительности. При запуске одного и того же кода на собственном компьютере вы можете увидеть разные результаты производительности, поскольку результаты зависят от базового оборудования и программного обеспечения. Чтобы получить наилучшую производительность для вашей среды, правильный подход заключается в том, чтобы сначала создать результирующую кривую, определить оптимальный размер разделения на кривой, а затем применить этот размер разделения к входным данным конвейера.

0x03 Распределенные проблемы и решения

Мы узнали о параллелизме моделей на одной машине, а затем нам нужно посмотреть на параллельное обучение распределенных моделей на нескольких серверах.

3.1 Идеи

Давайте сначала представим, как с этим бороться, если вы реализуете распределённый оптимизатор самостоятельно.

Если модель разделена на три части, есть три хоста, которых можно обучить.

+----------------------------------------------------------------+
| Model                                                          |
|                                                                |
| +-----------------+  +------------------+  +-----------------+ |
| | Sub+model 1     |  | Sub+model 2      |  | Sub+model 3     | |
| |                 |  |                  |  |                 | |
| |                 |  |                  |  |                 | |
| +-----------------+  +------------------+  +-----------------+ |
|                                                                |
+----------------------------------------------------------------+

+-------------------+  +------------------+  +-----------------+
| Host 1            |  | Host 2           |  | Host 3          |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
+-------------------+  +------------------+  +-----------------+

Мы явно развернем эти три части на три хоста, каждый из которых имеет свой набор обучающих кодов, и у каждого обучающего кода есть свой локальный оптимизатор, отвечающий за оптимизацию локальных подсистем параметров модели.

+---------------------+         +---------------------+         +---------------------+
| Host 1              |         | Host 2              |         | Host 3              |
|                     |         |                     |         |                     |
| +-----------------+ |         | +-----------------+ |         | +-----------------+ |
| | Sub model 1     | |forward  | | Sub model 2     | |forward  | | Sub model 3     | |
| |                 | +-------> | |                 | +-------> | |                 | |
| |_parameters <--+ | |         | |_parameters <--+ | |         | |_parameters <--+ | |
| |               | | | <-------+ |               | | | <-------+ |               | | |
| |               | | | backward| |               | | | backward| |               | | |
| +-----------------+ |         | +-----------------+ |         | +-----------------+ |
|                 |   |         |                 |   |         |                 |   |
|                 |   |         |                 |   |         |                 |   |
| ------------------+ |         | +-----------------+ |         | +-----------------+ |
| |Optimizer 1    | | |         | | Optimizer 2   | | |         | | Optimizer 3   | | |
| |               | | |         | |               | | |         | |               | | |
| |    step() +---+ | |         | |    step() +---+ | |         | |     step()+---+ | |
| |                 | |         | |                 | |         | |                 | |
| +-----------------+ |         | +-----------------+ |         | +-----------------+ |
+---------------------+         +---------------------+         +---------------------+

Но есть несколько проблем, которые нам необходимо решить:

  • Как разделить модель на разные машины? Как разделить код на разные машины?
  • Как соединить прямое распространение и обратное распространение по машинам?
  • Машины работают синхронно или асинхронно?
  • Если это синхронно, как заставить всю систему работать с одинаковыми шагами?
  • Как эти оптимизаторы сочетаются друг с другом? Или оптимизаторы занимаются своими делами и не имеют друг к другу никакого отношения?
  • Как сделать так, чтобы пользователи как можно меньше модифицировали код?
  • Как заставить разработчиков чувствовать, что они разрабатывают код локальной версии?

Подумав об этом, вы обнаружите, что это запутанно и сложно. Если мы сами реализуем его на основе PyTorch, вы обнаружите, что это может оказаться PipeDream. Итак, давайте посмотрим, как с этим справится PyTorch.

3.2 Идея PyTorch

PyTorch использует RPC для решения этих проблем.

3.2.1 Четыре Небесных Короля

Как мы упоминали ранее, распределенная среда PyTorch использует четырех королей:

  • **Удаленный вызов процедуры (RPC)** Запустите функцию на указанном рабочем потоке с заданными параметрами и получите возвращаемое значение или создайте ссылку на возвращаемое значение. Существует три основных API:rpc_sync()(Синхронизировать),rpc_async()(асинхронный) иremote()(асинхронно и возвращает ссылку на удаленное возвращаемое значение).
    • Если пользовательский код не может продолжать работу без возвращаемого значения, используйте синхронный API.
    • В противном случае используйте асинхронный API для получения Future и ждите Future, когда вызывающему объекту потребуется возвращаемое значение.
    • remote()API предназначен для ситуаций, когда что-то должно быть создано удаленно, но никогда не должно быть получено вызывающей стороной.
  • Удаленная ссылка (RRef)Является распределенным общим указателем на локальный или удаленный объект, то есть ссылку на локальную или кросс-машинную переменную.
  • **Распределенный Autograd** объединяет все локальные механизмы autograd, участвующие в рабочих процессах прямого прохода, и автоматически связывается с ними для вычисления градиентов во время обратного прохода. Это особенно полезно при выполнении прямых проходов, если вам нужно охватить несколько компьютеров, например, при параллельном обучении распределенной модели, обучении сервера параметров и т. д. Благодаря этой функции пользовательскому коду больше не нужно беспокоиться о том, как отправлять градиенты через границы RPC и в каком порядке следует запускать локальный механизм автоградации, что может стать очень сложным, если в прямом проходе есть вложенные и взаимозависимые вызовы RPC.
  • оптимизатор распределенияДля строительства требуетсяOptimizer()(Например,SGD(),Adagrad()и т.д.) и список аргументов RRefs. т. е. создать по одному поверх каждого отдельного владельца RefOptimizer()экземпляр, затем запуститеstep()Обновите параметры соответствующим образом. Когда пользователи выполняют распределенное прямое и обратное распространение, параметры и градиенты будут распределены между несколькими рабочими процессами, поэтому каждый связанный рабочий процесс необходимо оптимизировать. Distributed Optimizer объединяет все эти локальные оптимизаторы в один и предоставляет краткие конструкторы иstep()API.

3.2.2 Логические отношения

Мы используем официальную диаграмму, чтобы увидеть внутреннюю архитектуру и логическую взаимосвязь распределенного пакета PyTorch. Распределенный оптимизатор основан на трех других.

img

Позже мы объясним, как использовать его в сочетании с кодом.

0x04 Распределенный оптимизатор PyTorch

Прежде всего, ради четкого анализа мы будем игнорировать все части, связанные со скриптами в будущем.

4.1 Пример

DistributedOptimizerиспользуется следующим образом:

  1. Получить список удаленных параметров для оптимизации (RRef). Это также могут быть локальные параметры, заключенные в локальный RRef.
  2. будетOptimizerКласс действует как локальный оптимизатор для запуска всех владельцев RRef.
  3. Распределенный оптимизатор создает экземпляры своих локальных оптимизаторов на каждом рабочем узле и хранит RRef этих локальных оптимизаторов.
  4. при звонкеtorch.distributed.optim.DistributedOptimizer.step(), распределенный оптимизатор выполняет все локальные оптимизаторы удаленно на соответствующем удаленном рабочем месте с помощью RPC.torch.distributed.optim.DistributedOptimizer.stepнеобходимо получить распределенный автоградcontext_idВ качестве входных данных локальный оптимизатор сохранит градиент в соответствующем контексте.
  5. Если несколько параллельных распределенных оптимизаторов одновременно обновляют один и тот же параметр рабочего процесса, эти обновления будут сериализованы с помощью блокировок.

Это кажется немного абстрактным, нам нужно анализировать шаг за шагом.

4.2 Простой сквозной пример

С учетом всего сказанного, вот простой сквозной пример с использованием распределенного автограда и распределенного оптимизатора. Если вы поместите свой код в файл с именем «dist_autograd_simple.py», вы можете использовать командуMASTER_ADDR="localhost" MASTER_PORT=29500 python dist_autograd_simple.pyЗапустите этот код:

import multiprocessing as mp
import torch
import torch.distributed.autograd as dist_autograd
from torch.distributed import rpc
from torch import optim
from torch.distributed.optim import DistributedOptimizer

def random_tensor():
    return torch.rand((3, 3), requires_grad=True)

def _run_process(rank, dst_rank, world_size):
    name = "worker{}".format(rank)
    dst_name = "worker{}".format(dst_rank)

    # Initialize RPC.
    rpc.init_rpc(
        name=name,
        rank=rank,
        world_size=world_size
    )

    # Use a distributed autograd context.
    with dist_autograd.context() as context_id: # 本地优化器将把梯度保存在相关的context之中
        # Forward pass (create references on remote nodes).
        rref1 = rpc.remote(dst_name, random_tensor) # 在远端创建一个 random_tensor
        rref2 = rpc.remote(dst_name, random_tensor) # 在远端创建一个 random_tensor
        loss = rref1.to_here() + rref2.to_here() # 获取要优化的远程参数列表 (`RRef`)

        # Backward pass (run distributed autograd).
        dist_autograd.backward([loss.sum()])

        # Build DistributedOptimizer.
        dist_optim = DistributedOptimizer( # 分布式优化器在每个 worker 节点上创建其本地Optimizer的实例,并将持有这些本地优化器的 RRef。
        optim.SGD,
        [rref1, rref2],
        lr=0.05,
        )

        # Run the distributed optimizer step.
        dist_optim.step()

def run_process(rank, dst_rank, world_size):
    _run_process(rank, dst_rank, world_size)
    rpc.shutdown()

processes = []

# Run world_size workers.
world_size = 2
for i in range(world_size):
    p = mp.Process(target=run_process, args=(i, (i + 1) % 2, world_size))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

4.3 Определения

DistributedOptimizer получает удаленные ссылки на параметры, разбросанные по рабочим процессам, а затем локально запускает оптимизатор для этих параметров.

Для одного воркера, если он получает сообщения от одних и тех же или разных клиентов~torch.distributed.optim.DistributedOptimizer.stepпараллельные вызовы, эти вызовы будут выполняться последовательно на этом рабочем потоке, поскольку оптимизатор каждого рабочего процесса может обрабатывать только один набор градиентов за раз.

Определение DistributedOptimizer на самом деле ничего не может видеть, это из-за особенностей языка Python, мы не можем видеть переменные-члены класса в едином месте, но есть заслуживающая нашего внимания функциональная_optim_map. Здесь каждый встроенный оптимизатор настраивается с соответствующим новым оптимизатором. Например, optim.Adagrad соответствует _FunctionalAdagrad. Давайте выберем новый оптимизатор для просмотра.

class DistributedOptimizer:
    """
    DistributedOptimizer takes remote references to parameters scattered
    across workers and applies the given optimizer locally for each parameter.

    This class uses :meth:`~torch.distributed.autograd.get_gradients` in order
    to retrieve the gradients for specific parameters.

    Concurrent calls to
    :meth:`~torch.distributed.optim.DistributedOptimizer.step`,
    either from the same or different clients, will
    be serialized on each worker -- as each worker's optimizer can only work
    on one set of gradients at a time. However, there is no guarantee that
    the full forward-backward-optimizer sequence will execute for one client
    at a time. This means that the gradients being applied may not correspond
    to the latest forward pass executed on a given worker. Also, there is no
    guaranteed ordering across workers.

    `DistributedOptimizer` creates the local optimizer with TorchScript enabled
    by default, so that optimizer updates are not blocked by the Python Global
    Interpreter Lock (GIL) in the case of multithreaded training (e.g. Distributed
    Model Parallel). This feature is currently enabled for most optimizers. You
    can also follow `the recipe`__ in PyTorch tutorials to enable TorchScript support
    for your own custom optimizers.

    Args:
        optimizer_class (optim.Optimizer): the class of optimizer to
            instantiate on each worker.
        params_rref (list[RRef]): list of RRefs to local or remote parameters
            to optimize.
        args: arguments to pass to the optimizer constructor on each worker.
        kwargs: arguments to pass to the optimizer constructor on each worker.
        
    """
    
    # dict to map a user passed in optimizer_class to a functional
    # optimizer class if we have already defined inside the
    # distributed.optim package, this is so that we hide the
    # functional optimizer to user and still provide the same API.
    functional_optim_map = {
        optim.Adagrad: _FunctionalAdagrad,
        optim.Adam: _FunctionalAdam,
        optim.AdamW: _FunctionalAdamW,
        optim.SGD: _FunctionalSGD,
        optim.Adadelta: _FunctionalAdadelta,
        optim.RMSprop: _FunctionalRMSprop,
        optim.Rprop: _FunctionalRprop,
        optim.Adamax: _FunctionalAdamax,
    }        

4.3.1_FunctionalSGD

optim.SGD соответствует _FunctionalSGD. Его код находится в torch/distributed/optim/functional_sgd.py. В частности, определите функциональный оптимизатор SGD, совместимый с TorchScript, и PyTorch будет использовать эти оптимизаторы функциональным образом. При обновлении параметров PyTorch не использует param.grad, а явно разрешает распределенному оптимизатору передавать градиенты в функцию шага. Примечание. Этот оптимизатор должен использоваться только внутри распределенного оптимизатора, а не для пользователей.

import torch.optim._functional as F

# Define a TorchScript compatible Functional SGD Optimizer
# where we use these optimizer in a functional way.
# Instead of using the `param.grad` when updating parameters,
# we explicitly allow the distributed optimizer pass gradients to
# the `step` function. In this way, we could separate the gradients
# and parameters and allow multithreaded trainer to update the
# parameters without data traces on accumulating to the same .grad.
# NOTE: This should be only used by distributed optimizer internals
# and not meant to expose to the user.
@torch.jit.script
class _FunctionalSGD(object):
    def __init__(
        self,
        params: List[Tensor],
        lr: float = 1e-2,
        momentum: float = 0.0,
        dampening: float = 0.0,
        weight_decay: float = 0.0,
        nesterov: bool = False
    ):
        self.defaults = {
            "lr": lr,
            "momentum": momentum,
            "dampening": dampening,
            "weight_decay": weight_decay,
        }
        self.nesterov = nesterov
        self.state = torch.jit.annotate(Dict[torch.Tensor, Dict[str, torch.Tensor]], {})

        # NOTE: we only have one param_group and don't allow user to add additional
        # param group as it's not a common use case.
        self.param_group = {"params": params}

    def step(self, gradients: List[Optional[Tensor]]):
        params = self.param_group['params']
        grads = []
        momentum_buffer_list: List[Optional[Tensor]] = []
        lr = self.defaults['lr']
        weight_decay = self.defaults['weight_decay']
        momentum = self.defaults['momentum']
        dampening = self.defaults['dampening']

        for param, gradient in zip(params, gradients):
            if gradient is not None:
                grads.append(gradient)

                if param not in self.state:
                    self.state[param] = {}

                state = self.state[param]
                if 'momentum_buffer' not in state:
                    momentum_buffer_list.append(None)
                else:
                    momentum_buffer_list.append(state['momentum_buffer'])

        with torch.no_grad():
            F.sgd(params,
                  grads,
                  momentum_buffer_list,
                  weight_decay=weight_decay,
                  momentum=momentum,
                  lr=lr,
                  dampening=dampening,
                  nesterov=self.nesterov)

        # update momentum_buffers in state
        for i, p in enumerate(params):
            state = self.state[p]
            momentum_buffer = momentum_buffer_list[i]
            if momentum_buffer is not None:
                state['momentum_buffer'] = momentum_buffer

4.4 Инициализация

4.4.1 Инициализация

Эта часть кода в основном соответствует следующему: распределенный оптимизатор создает экземпляр своего локального оптимизатора на каждом рабочем узле и будет содержать RRef этих локальных оптимизаторов. В частности, исходя из нашего предыдущего примера кода, params_rref — это список параметров, которые необходимо оптимизировать, каждый из которых соответствует оптимизатору, то есть DistributedOptimizer генерирует оптимизаторы на всех узлах и сохраняет их в self.remote_optimizers в виде rpc. RRef(_LocalOptimizer) средний.

def __init__(self, optimizer_class, params_rref, *args, **kwargs):
    per_worker_params_rref = defaultdict(list)
    for param in params_rref: # 
        per_worker_params_rref[param.owner()].append(param) # [owner] = param

    # 拿到对应的本地优化器类    
    if optimizer_class in DistributedOptimizer.functional_optim_map and jit._state._enabled:
        optim_ctor = DistributedOptimizer.functional_optim_map.get(optimizer_class)
    else:
        optim_ctor = optimizer_class
    self.is_functional_optim = (optim_ctor != optimizer_class)

    if self.is_functional_optim:
        optimizer_new_func = _new_script_local_optimizer
    else:
        optimizer_new_func = _new_local_optimizer # 下面会介绍

    remote_optim_futs = []
    for worker, param_rrefs in per_worker_params_rref.items():
        remote_optim_rref_fut = rpc.rpc_async(
            worker, # 在 worker 之上生成其本地优化器
            optimizer_new_func, # rpc_async 会调用
            args=(optim_ctor, param_rrefs) + args,
            kwargs=kwargs,
        )
        remote_optim_futs.append(remote_optim_rref_fut)

    self.remote_optimizers = _wait_for_all(remote_optim_futs) # 本地保存的远端各个节点上优化器

4.4.2 Создание оптимизатора _LocalOptimizer

_new_local_optimizerгенерируется_LocalOptimizer.

def _new_local_optimizer(optim_cls, local_params_rref, *args, **kwargs):
    return rpc.RRef(
        _LocalOptimizer(optim_cls, local_params_rref, *args, **kwargs))

_LocalOptimizer — это локальный оптимизатор, работающий на удаленных рабочих узлах, а у мастера есть прокси для этих оптимизаторов.

class _LocalOptimizer(object):
    # Ideally we would only need to share a lock for instances of
    # _LocalOptimizer that deal with the same parameters. We are
    # making a simplifying assumption here that if there is more
    # than one instance of _LocalOptimizer per worker, they will
    # be optimizing the same parameters (e.g. each data parallel
    # trainer will create its own instance of _LocalOptimizer but
    # they will all optimize the same parameters on each worker)
    global_lock = Lock()

    def __init__(self, optim_cls, local_params_rref, *args, **kwargs):
        self._local_params = [rref.local_value() for rref in local_params_rref]
        self.optim = optim_cls( # 优化器还是普通的优化器,因为优化器代码还是之前的,只是优化的参数对象变成了异地节点参数
            self._local_params, # 用参数代理初始化
            *args,
            **kwargs)

    def step(self, autograd_ctx_id):
        # 获取到分布上下文里面计算好的梯度
        all_local_grads = dist_autograd.get_gradients(autograd_ctx_id)

        with _LocalOptimizer.global_lock:
            for param, grad in all_local_grads.items():
                param.grad = grad
            self.optim.step() # 参数优化

4.4.3 Дождитесь завершения

Используйте _wait_for_all для ожидания асинхронного завершения.

def _wait_for_all(rpc_futs):
    # TODO: improve error propagation
    exception = None
    results = []
    for fut in rpc_futs:
        try:
            results.append(fut.wait())
        except Exception as e:
            results.append(e)
            exception = e
    if exception is not None:
        raise exception
    return results

Соответствующая логика выглядит следующим образом:

  • ref1, ref2 — параметры, которые нужно оптимизировать на удаленном конце, оба — torch.rand((3, 3)).
  • optim_rref1, optim_rref2 — это ссылки локального оптимизатора на узле 2 и узле 3 соответственно.
                                                      +----------------------------------+
+--------------------------------------------+        | Node 2                   worker 1|
| Node 1                              master |        |                                  |
|                                            |        |    +--------------------------+  |
|                                            |        |    | _LocalOptimizer          |  |
|  +---------------------------------+       |        |    |                          |  |
|  | DistributedOptimizer            |       |        |    |                          |  |
|  |                                 |       |        |    |   optim = _FunctionalSGD |  |
|  |                                 |       |        |    |                          |  |
|  |     remote_optimizers = [       |       |        |    |   _local_params = rref1  |  |
|  |                optim_rref1 +------------------------> |                     +    |  |
|  |                ,                |       |        |    |                     |    |  |
|  |                optim_rref2 +-------+    |        |    +--------------------------+  |
|  |                ]                |  |    |        |                          |       |
|  |                                 |  |    |        |                          v       |
|  |                                 |  |    |   +-------------->   torch.rand((3, 3))   |
|  |                                 |  |    |   |    |                                  |
|  +---------------------------------+  |    |   |    +----------------------------------+
|                                       |    |   |
|                                       |    |   |    +-----------------------------------+
|                                       |    |   |    | Node 3                   worker 2 |
|                                       |    |   |    |                                   |
|                                       |    |   |    |     +--------------------------+  |
|                                       |    |   |    |     | _LocalOptimizer          |  |
|                                       |    |   |    |     |                          |  |
|                                       +-----------------> |                          |  |
|                                            |   |    |     |   optim = _FunctionalSGD |  |
|                                            |   |    |     |                          |  |
|                             rref1 +------------+    |     |   _local_params = rref2  |  |
|                                            |        |     |                     +    |  |
|                                            |        |     |                     |    |  |
|                             rref2 +------------+    |     +--------------------------+  |
|                                            |   |    |                           |       |
|                                            |   |    |                           |       |
|                                            |   |    |                           v       |
|                                            |   +--------------->   torch.rand((3, 3))   |
|                                            |        |                                   |
+--------------------------------------------+        +-----------------------------------+

4.5 Параметры обновления

Когда DistributedOptimizer оптимизируется, он просматривает сохраненные оптимизаторы и вызывает _local_optimizer_step один за другим.

Почему эти удаленные оптимизаторы могут вызываться одинаково на узле 1? Поскольку следующий раунд прямого распространения может быть вызван только после того, как все параметры будут обновлены в конце, его можно вызвать единообразно, а затем дождаться его завершения.

def step(self, context_id):
    """
    Performs a single optimization step.

    This will call :meth:`torch.optim.Optimizer.step` on each worker
    containing parameters to be optimized, and will block until all workers
    return. The provided ``context_id`` will be used to retrieve the
    corresponding :class:`~torch.distributed.autograd.context` that
    contains the gradients that should be applied to the parameters.

    Args:
        context_id: the autograd context id for which we should run the
            optimizer step.
    """
    dist_autograd._is_valid_context(context_id)

    if self.is_functional_optim:
        optimizer_step_func = _script_local_optimizer_step
    else:
        optimizer_step_func = _local_optimizer_step # 

    rpc_futs = []
    for optimizer in self.remote_optimizers: # 遍历 _LocalOptimizer
        rpc_futs.append(rpc.rpc_async( # 异步异地调用
            optimizer.owner(),
            optimizer_step_func, # 逐一调用
            args=(optimizer, context_id),
        ))
    _wait_for_all(rpc_futs)

4.5.1 Локальная оптимизация

_local_optimizer_step должен получить _LocalOptimizer и вызвать его шаг.

def _local_optimizer_step(local_optim_rref, autograd_ctx_id):
    local_optim = local_optim_rref.local_value()
    local_optim.step(autograd_ctx_id)

Шаг _LocalOptimizer сначала получает распределенный градиент, а затем использует этот градиент для оптимизации параметров.

class _LocalOptimizer(object):

    def step(self, autograd_ctx_id):
        # 获取到分布上下文里面计算好的梯度
        all_local_grads = dist_autograd.get_gradients(autograd_ctx_id)

        with _LocalOptimizer.global_lock:
            for param, grad in all_local_grads.items():
                param.grad = grad
            self.optim.step() # 参数优化

4.5.2 Получение распределенных градиентов

Код Python для get_gradients на самом деле не имеет смысла.

def get_gradients(context_id): # real signature unknown; restored from __doc__
    """
    get_gradients(context_id: int) -> Dict[Tensor, Tensor]
    
    Retrieves a map from Tensor to the appropriate gradient for that Tensor
    accumulated in the provided context corresponding to the given ``context_id``
    as part of the distributed autograd backward pass.
    
    Arguments:
        context_id(int): The autograd context id for which we should retrieve the
                         gradients.
    
    Returns:
        A map where the key is the Tensor and the value is the associated gradient
        for that Tensor.
    
    Example::
        >>> import torch.distributed.autograd as dist_autograd
        >>> with dist_autograd.context() as context_id:
        >>>     t1 = torch.rand((3, 3), requires_grad=True)
        >>>     t2 = torch.rand((3, 3), requires_grad=True)
        >>>     loss = t1 + t2
        >>>     dist_autograd.backward(context_id, [loss.sum()])
        >>>     grads = dist_autograd.get_gradients(context_id)
        >>>     print(grads[t1])
        >>>     print(grads[t2])
    """
    return {}

Соответствующий ему C++ находится в torch/csrc/jit/runtime/register_distributed_ops.cpp. это функция, вызвавшая контекст.

// Implementations located in
// torch/csrc/jit/runtime/register_distributed_ops.cpp
TORCH_LIBRARY_IMPL(aten, CatchAll, m) {
  m.impl("get_gradients", [](int64_t context_id) {
    const auto& autogradContext =
        dist_autograd::DistAutogradContainer::getInstance().retrieveContext(
            context_id);
    return autogradContext->getGradients(); // 上下文
  });
}

Код для getGradients в мире C++ выглядит следующим образом:

const c10::Dict<torch::Tensor, torch::Tensor> DistAutogradContext::
    getGradients() const {
  std::lock_guard<std::mutex> guard(lock_);
  // block current streams before accessing gradients to make sure that
  // gradient computations are finished before use.
  for (auto& entry : gradReadyEvents_) {
    auto& event = entry.second;
    event.block(impl_.getStream(event.device()));
  }
  return accumulatedGrads_; // 分布式梯度累积在这里
}

В torch/csrc/distributed/autograd/context/context.h есть:

// DistAutogradContext which stores information for a single distributed
// autograd pass on a worker.
class TORCH_API DistAutogradContext {
  // Gradients accumulated in this context so far. The key is the variable on
  // which the gradient needs to be accumulated and the value is the gradient
  // that needs to be accumulated on that variable..
  c10::Dict<torch::Tensor, torch::Tensor> accumulatedGrads_;

Итак, мы логически расширяем следующим образом:

  1. DistributedOptimizer вызывает пошаговый метод optim_rref1 и optim_rref2 для запуска и оптимизации на удаленном работнике.
  2. Над рабочим 1 и рабочим 2_LocalOptimizerдля получения местного_local_params_оптимизировать.
  3. Результат оптимизации в_Node DistAutogradContextсредиaccumulatedGrads_накопление.

Таким образом, каждая подмодель всей модели обучается/оптимизируется на едином этапе поверх каждого узла.

                                                   +--------------------------------------+
                                                   | Node 2                      worker 1 |
                                                   |                                      |
                                                   |    +--------------------------+      |
                                                   |    | DistAutogradContext      |      |
                                                   |    |                          |  3   |
                                                   |    |     accumulatedGrads_ <------+  |
+-----------------------------------------+        |    |                          |   |  |
| Node 1                           master |        |    +--------------------------+   |  |
|                                         |        |    +--------------------------+   |  |
| +-------------------------------+       |  +--------> | _LocalOptimizer          |   |  |
| | DistributedOptimizer          |       |  |     |    |                          |   |  |
| |                               |       |  |     |    |   optim = _FunctionalSGD |   |  |
| |                               |       |  |     |    |                          |   |  |
| |   remote_optimizers = [       |       |  |     |    |   _local_params = rref1  |   |  |
| |              optim_rref1 +---------------+     |    |                     +    |   |  |
| |              ,                |       |     +---------> step() +-------------------+  |
| |              optim_rref2 +-------+    |     |  |    |                     |    |      |
| |                               |  |    |     |  |    +--------------------------+      |
| |              ]           +----------------->+  |                        2 |           |
| |                          |    |  |    |        |                          v           |
| |                          |    |  |    |   +----------------> torch.rand((3, 3))       |
| |                        1 |    |  |    |   |    |                                      |
| |   step() {               |    |  |    |   |    +--------------------------------------+
| |                          |    |  |    |   |
| |     optim_rref1.step()+--+    |  |    |   |    +--------------------------------------+
| |                               |  |    |   |    | Node 3                      worker 2 |
| |     optim_rref2.step()+--+    |  |    |   |    |                                      |
| |                          |    |  |    |   |    |     +--------------------------+     |
| |   }                      |    |  |    |   |    |     | _LocalOptimizer          |     |
| |                          |    |  |    |   |    |     |                          |     |
| +-------------------------------+  +-----------------> |                          |     |
|                            |            |   |    |     |   optim = _FunctionalSGD |     |
|                            |            |   |    |     |                          |     |
|                          1 |            |   |    |     |   _local_params = rref2  |     |
|                            |            |   |    |     |                     +    |  3  |
|                            +-----------------------------> step() +------------------v  |
|                                         |   |    |     |                     |    |  |  |
|                         rref1 +-------------+    |     +--------------------------+  |  |
|                                         |        |                        2  |       |  |
|                                         |        |                           v       |  |
|                         rref2 +-------------------------------> torch.rand((3, 3))   |  |
|                                         |        |                                   |  |
+-----------------------------------------+        |     +--------------------------+  |  |
                                                   |     | DistAutogradContext      |  |  |
                                                   |     |                          |  |  |
                                                   |     |     accumulatedGrads_ <-----+  |
                                                   |     |                          |     |
                                                   |     +--------------------------+     |
                                                   +--------------------------------------+

0x05 Оптимизатор PipeDream

Наконец, давайте взглянем на PipeDream и посмотрим, как он реализует распределенный оптимизатор.Мы ищем следующие идеи:

  • Поскольку PipeDream запускает весь код поверх каждого рабочего процесса, как каждый локальный оптимизатор определяет, какие параметры оптимизировать?
  • Как обновить параметры при оптимизации?

5.1 Как определить параметры оптимизации

Скажем заранее:

  • Модуль каждого узла различен, поэтому оптимизируемыми параметрами для каждого оптимизатора являются параметры локального модуля.
  • Каждый узел оптимизирует ту часть модуля, за которую он отвечает.

Нам нужно начать сначала.

5.1.1 основной метод

Перейдите в runtime/translation/main_with_runtime.py. Здесь сначала создайте StageRuntime, а затем используйте параметры StageRuntime для построения оптимизатора.

def main():
    r = runtime.StageRuntime(
        model=model, distributed_backend=args.distributed_backend,
        fp16=args.fp16, loss_scale=args.loss_scale,
        training_tensor_shapes=training_tensor_shapes,
        eval_tensor_shapes=eval_tensor_shapes,
        training_tensor_dtypes=dtypes,
        inputs_module_destinations=inputs_module_destinations,
        target_tensor_names=target_tensor_names,
        configuration_maps=configuration_maps,
        master_addr=args.master_addr,
        rank=args.rank, local_rank=args.local_rank,
        num_ranks_in_server=args.num_ranks_in_server,
        verbose_freq=args.verbose_frequency,
        model_type=runtime.TRANSLATION,
        enable_recompute=args.recompute)
    
    if use_adam_optimizer:
        optimizer = adam.AdamWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, betas=(0.9,0.999),
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency,
            macrobatch=args.macrobatch)
    else:
        optimizer = sgd.SGDWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, momentum=args.momentum,
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency)    

5.1.2 Среда выполнения сборки

Функция инициализации StageRuntime построит модуль, а здесь она построит свои собственные модули через этап этого узла.

Мы выдержки из предыдущей статьи.

stage_to_module_map — установить отношения между этапами и модулями, чтобы получить модули, соответствующие этому этапу.

Эта стадия (значение 3) соответствует двум модулям с индексами 3 и 4, которые являются следующими 3 и 3.

module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]

Конкретный код:

def initialize(self, model, inputs_module_destinations,
               configuration_maps, master_addr, rank,
               local_rank, num_ranks_in_server):
  
        if module_to_stage_map is None:
            self.modules_with_dependencies = ModulesWithDependencies(model)
        else:
            # 依据本stage来找到自己的modules。
            modules = stage_to_module_map[self.stage]
            self.modules_with_dependencies = ModulesWithDependencies(
                [model[module] for module in modules])
        
        # 确定哪些模型layers
        modules = self.modules_with_dependencies.modules()            

        # 拿到 master_parameters 和 model_parameters
        if self.fp16:
            self.master_parameters = []
            self.model_parameters = []
            for i in range(len(modules)):
                import apex.fp16_utils as fp16_utils
                module_parameters, module_master_parameters = \
                    fp16_utils.prep_param_lists(modules[i])
                self.master_parameters.extend(module_master_parameters)
                self.model_parameters.extend(module_parameters)
        else:
            self.master_parameters = list(self.parameters())
            self.model_parameters = None     
            
            

Например, модель распределена на два узла, каждый узел имеет два слоя, где узел 2 имеет параллелизм данных DDP.

Параметры модели каждого узла различны: оптимизируемые параметры узла 1 — это параметры уровней 1 и 2, параметры узла 2 — параметры уровней 3 и 4.

                                              Node 2
                                              +----------------------------------------+
                                              | Stage 2                   StageRuntime |
                                              |                                        |
Node 1                                        |           CommunicationHandler         |
+---------------------------------------+     |                                        |
| Stage 1        StageRuntime           |     |      +----------------------------+    |
|                                       |     |      | +------------------------+ |    |
|                                       |     |      | |Rank 2                  | |    |
|         CommunicationHandler          |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|      +-----------------------+        |     |      | |  Layer 3 +---> Layer 4 | |    |
|      |Rank 1                 |        |     |      | |                        | |    |
|      |                       |        |     | DDP  | |                        | |    |
|      | Layer 1 +---> Layer 2 |        +----------->+ +------------------------+ |    |
|      |                       |        |     |      | +------------------------+ |    |
|      |                       |        |     |      | |Rank 3                  | |    |
|      +-----------------------+        |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|   master_parameters = Parameters(     |     |      | |  Layer 3 +---> Layer 4 | |    |
|                   Layer 1, Layer 2)   |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|   model_parameters                    |     |      | +------------------------+ |    |
|                                       |     |      +----------------------------+    |
+---------------------------------------+     |                                        |
                                              |                                        |
                                              |  master_parameters = Parameters(       |
                                              |                      Layer 3, Layer 4) |
                                              |                                        |
                                              |                                        |
                                              |  model_parameters                      |
                                              |                                        |
                                              +----------------------------------------+

5.1.3 SGDWithWeightStashing

Затем создайте локальный оптимизатор SGDWithWeightStashing с параметрами master_parameters и model_parameters среды выполнения.

OptimizerWithWeightStashing — это базовый класс для SGDWithWeightStashing.

class SGDWithWeightStashing(OptimizerWithWeightStashing): # 基类
    """
    SGD optimizer with weight stashing.
    """
    def __init__(self, modules, master_parameters, model_parameters,
                 loss_scale, num_versions, lr=required, momentum=0,
                 dampening=0, weight_decay=0, nesterov=False, verbose_freq=0,
                 macrobatch=False):
        super(SGDWithWeightStashing, self).__init__(
            optim_name='SGD',
            modules=modules, master_parameters=master_parameters,
            model_parameters=model_parameters, loss_scale=loss_scale,
            num_versions=num_versions, lr=lr, momentum=momentum,
            dampening=dampening, weight_decay=weight_decay,
            nesterov=nesterov, verbose_freq=verbose_freq,
            macrobatch=macrobatch,
        )

Базовый класс OptimizerWithWeightStashing создаст собственный оптимизатор, который назначается base_optimizer.

class OptimizerWithWeightStashing(torch.optim.Optimizer):
    """Wrapper class that adds weight stashing to a vanilla torch.optim.Optimizer.

    Arguments:
        - optim_name: the name of optimizer, required to create the corresponding
                      base_optimizer (torch.optim.{optim_name}).
        - optimizer_args: the keyword arguments passed to base_optimizer.
    """

    def __init__(self, optim_name, modules, master_parameters, model_parameters,
                 loss_scale, num_versions, verbose_freq=0, macrobatch=False,
                 **optimizer_args):
        self.modules = modules
        self.master_parameters = master_parameters
        self.model_parameters = model_parameters  # model_parameters is None if not fp16.
        self.loss_scale = loss_scale

        # Only need at most 2 versions if using macrobatching.
        if macrobatch:
            num_versions = min(2, num_versions)
        self.num_versions = num_versions
        
        # 生成一个原生优化器
        self.base_optimizer = getattr(torch.optim, optim_name)(
            master_parameters, **optimizer_args)
        self.latest_version = Version()
        self.current_version = Version()
        self.initialize_queue()
        self.verbose_freq = verbose_freq
        self.batch_counter = 0

        # If macrobatching, push and pop versions at the right rate.
        if macrobatch:
            self.update_interval = self.num_versions
        else:
            self.update_interval = 1

Логика расширяется следующим образом, каждый оптимизатор использует свои собственные параметры Node для оптимизации.

                                              +----------------------------------------+
                                              | Stage 2                   StageRuntime |
                                              |                                        |
                                              |           CommunicationHandler         |
+---------------------------------------+     |                                        |
| Stage 1        StageRuntime           |     |      +----------------------------+    |
|                                       |     |      | +------------------------+ |    |
|                                       |     |      | |Rank 2                  | |    |
|         CommunicationHandler          |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|      +-----------------------+        |     |      | |  Layer 3 +---> Layer 4 | |    |
|      |Rank 1                 |        |     |      | |                        | |    |
|      |                       |        |     | DDP  | |                        | |    |
|      | Layer 1 +---> Layer 2 |        +----------->+ +------------------------+ |    |
|      |                       |        |     |      | +------------------------+ |    |
|      |                       |        |     |      | |Rank 3                  | |    |
|      +-----------------------+        |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|   master_parameters = Parameters(     |     |      | |  Layer 3 +---> Layer 4 | |    |
|                   Layer 1, Layer 2)   |     |      | |                        | |    |
|                             +         |     |      | |                        | |    |
|   model_parameters          |         |     |      | +------------------------+ |    |
|                             |         |     |      +----------------------------+    |
|  +---------------------------------+  |     |                                        |
|  |SGDWithWeightStashing     |      |  |     |                                        |
|  |                          |      |  |     |  master_parameters = Parameters(       |
|  |   base_optimizer = SGB(  v      |  |     |                      Layer 3, Layer 4) |
|  |              master_parameters) |  |     |                               +        |
|  |                                 |  |     |  model_parameters             |        |
|  +---------------------------------+  |     |                               |        |
|                                       |     |  +----------------------------------+  |
+---------------------------------------+     |  |SGDWithWeightStashing       |     |  |
                                              |  |                            |     |  |
                                              |  |      base_optimizer = SGB( v     |  |
                                              |  |               master_parameters) |  |
                                              |  +----------------------------------+  |
                                              |                                        |
                                              +----------------------------------------+

5.2 Оптимизация

5.2.2 Общая оптимизация

Все это асинхронная работа, то есть асинхронная оптимизация.

def train(train_loader, r, optimizer, epoch):

  	# 省略其他
    
    # start num_warmup_minibatches forward passes
    for i in range(num_warmup_minibatches):
        r.run_forward()

    for i in range(n - num_warmup_minibatches):
        # perform forward pass
        r.run_forward()

        # perform backward pass
        if args.fp16:
            r.zero_grad()
        else:
            optimizer.zero_grad()
        optimizer.load_old_params()

        r.run_backward()
        optimizer.load_new_params()
        optimizer.step()

    # finish remaining backward passes
    for i in range(num_warmup_minibatches):
        optimizer.zero_grad()
        optimizer.load_old_params()
        r.run_backward()
        optimizer.load_new_params()
        optimizer.step()

    # wait for all helper threads to complete
    r.wait()

5.2.2 Оптимизация оптимизатора

Оптимизирован для прямого использования пошагового метода SGDWithWeightStashing. Его последний также является пошаговым методом класса OptimizerWithWeightStashing(torch.optim.Optimizer) .

def step(self, closure=None):
    """Performs a single optimization step.

    Arguments:
        closure (callable, optional): A closure that reevaluates the model
                                      and returns the loss.
    """
    # Update the gradient every `update_interval` steps.
    if self.batch_counter % self.update_interval != self.update_interval - 1:
        self.batch_counter += 1
        return None

    if self.model_parameters is not None:
        import apex.fp16_utils as fp16_utils
        fp16_utils.model_grads_to_master_grads(self.model_parameters,
                                               self.master_parameters)
        if self.loss_scale != 1.0:
            # 处理梯度
            for parameter in self.master_parameters:
                parameter.grad.data = parameter.grad.data / self.loss_scale

    for p in self.param_groups[0]['params']:
        if p.grad is not None: # 继续处理累积的梯度
            p.grad.div_(self.update_interval)

    loss = self.base_optimizer.step() # 进行优化
    if self.model_parameters is not None:
        import apex.fp16_utils as fp16_utils
        fp16_utils.master_params_to_model_params(self.model_parameters,
                                                 self.master_parameters)
    self.latest_version = self.latest_version.incr()
    if self.num_versions > 1:
        self.buffered_state_dicts = self.queue[0][0]
        self.queue.append(self.get_params(clone=False))

    self.batch_counter += 1
    return loss

детали следующим образом:

                                               Node 2
                                               +-----------------------------------------+
                                               | Stage 2                    StageRuntime |
                                               |                                         |
Node 1                                         |           CommunicationHandler          |
+-----------------------------------------+    |                                         |
| Stage 1                    StageRuntime |    |      +----------------------------+     |
|                                         |    |      | +------------------------+ |     |
|                                         |    |      | |Rank 2                  | |     |
|          CommunicationHandler           |    |      | |                        | |     |
|                                         |    |      | |                        | |     |
|       +-----------------------+         |    |      | |  Layer 3 +---> Layer 4 | |     |
|       |Rank 1                 |         |    |      | |                        | |     |
|       |                       |         |    | DDP  | |                        | |     |
|       | Layer 1 +---> Layer 2 |         +---------->+ +------------------------+ |     |
|       |                       |         |    |      | +------------------------+ |     |
|       |                       |         |    |      | |Rank 3                  | |     |
|       +-----------------------+         |    |      | |                        | |     |
|                                         |    |      | |                        | |     |
|  master_parameters = Parameters(        |    |      | |  Layer 3 +---> Layer 4 | |     |
|                  Layer 1, Layer 2)      |    |      | |                        | |     |
|                                +        |    |      | |                        | |     |
|  model_parameters              |        |    |      | +------------------------+ |     |
|                                |        |    |      +----------------------------+     |
|  step()                        |        |    |                                         |
|   +                            |        |    |                                         |
|   |                            |        |    |  master_parameters = Parameters(        |
|   |  +-------------------------------+  |    |                      Layer 3, Layer 4)  |
|   |  |SGDWithWeightStashing    |     |  |    |                                   +     |
|   |  |                         |     |  |    |  model_parameters                 |     |
|   |  |   base_optimizer = SGB( v     |  |    |                                   |     |
|   |  |            master_parameters) |  |    |  step()                           |     |
|   |  |                               |  |    |   +                               |     |
|   +----> base_optimizer.step()       |  |    |   |                               |     |
|      |                               |  |    |   |  +-------------------------------+  |
|      +-------------------------------+  |    |   |  |SGDWithWeightStashing       |  |  |
|                                         |    |   |  |                            |  |  |
+-----------------------------------------+    |   |  |      base_optimizer = SGB( v  |  |
                                               |   |  |            master_parameters) |  |
                                               |   |  |                               |  |
                                               |   +------>  base_optimizer.step()    |  |
                                               |      |                               |  |
                                               |      +-------------------------------+  |
                                               |                                         |
                                               +-----------------------------------------+

На этом серия распределенных оптимизаторов завершена.В последующем анализе ZeRO мы также представим PyTorch ZeroRedundancyOptimizer, который, по оценкам, будет ждать несколько недель. В следующей статье мы представим несколько официальных примеров приложений PyTorch для распространения, чтобы соединить всю логику PyTorch, распространяемую последовательно, чтобы увидеть, как ее следует применять на практике, так что следите за обновлениями.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

Чтение исходного кода torch.optim.optimizer и гибкое использование

чтение исходного кода pytorch (2) принцип оптимизатора

оптимизатор pytorch (optim) работа с различными группами параметров и различными настройками скорости обучения

Питорч - импульс импульс

Сводка и сравнение различных методов оптимизации (sgd/momentum/Nesterov/adagrad/adadelta)

[Оптимизатор] Алгоритм оптимизатора и реализация PyTorch (1): нестираемый SGD

Возьмите optim.SGD в качестве примера, чтобы представить оптимизатор pytorch.

Примечания к исследованию Pytorch 08 ---- Подробное объяснение алгоритма оптимизатора Optimizer (SGD, Adam)

Использование torch.optim в pytorch для оптимизации нейронной сети и выбора оптимизатора - pytorch

Подробный оптимизатор pytorch: SGD

Поговорим о связи с графическим процессором

developer.nvidia.com/gpudirect

woohoo.NVIDIA.capability/дата-центр…

woohoo.NVIDIA.capability/дата-центр…