[Анализ исходного кода] Распределенный PyTorch (2) ----- DataParallel (включен)

глубокое обучение PyTorch

0x00 сводка

Начиная с этой статьи, мы вводим параллелизм данных PyTorch, Эта статья является первой, знакомящей с DataPrallel, потому что слов слишком много (более 12 000 слов, поэтому она разбита на две статьи и опубликована).

Другие статьи из этой серии:

Автоматическая дифференциация инструментов глубокого обучения (1)

Автоматическая дифференциация инструментов глубокого обучения (2)

Автоматическая дифференциация оружия глубокого обучения (3) --- Пример интерпретации

[Анализ исходного кода] Как PyTorch реализует прямое распространение (1) --- Базовый класс (1)

[Анализ исходного кода] Как PyTorch реализует прямое распространение (2) --- Базовый класс (ниже)

[Анализ исходного кода] Как PyTorch реализует прямое распространение (3) --- конкретная реализация

[Анализ исходного кода] Как Pytorch реализует обратное распространение (1) ---- вызов движка

[Анализ исходного кода] Как Pytorch реализует обратное распространение (2) ---- Статическая структура движка

[Анализ исходного кода] Как Pytorch реализует обратное распространение (3) ---- Динамическая логика движка

[Анализ исходного кода] Как PyTorch реализует обратное распространение (4) ---- конкретный алгоритм

[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор

[Анализ исходного кода] Как PyTorch использует GPU

Примечание. Эта статья во многом основана на следующих двух статьях, и я хотел бы выразить им свою глубокую признательность.

Distributed data parallel training using Pytorch on AWS

DP и DDP интерпретации исходного кода PyTorch: параллелизм моделей и анализ распределенного обучения

0x01 Обзор

Давайте начнем с рассмотрения DataParallel со всех сторон.

1.1 С точки зрения процесса

С точки зрения процесса DataParallel работает, загружая все данные мини-пакета в основной поток, а затем распределяя данные суб-мини-пакетов (суб-мини-пакетов) по всей сети GPU.

  1. Перенесите данные мини-пакета из памяти с блокировкой страниц в GPU 0 (главный), главный GPU также содержит модель, а другие GPU имеют устаревшие копии модели.

  2. Распределяйте мини-пакеты данных между графическими процессорами. В частности, данные, вводимые в мини-пакет, делятся на несколько частей и отправляются на соответствующий графический процессор для расчета.

  3. Скопируйте модель между графическими процессорами. Все данные, относящиеся к Модулю, также будут продублированы.

  4. Запустите прямой проход на каждом графическом процессоре, вычислив выходные данные. PyTorch использует многопоточность для параллельного прямого распространения, когда каждый графический процессор будет выполнять прямые вычисления независимо и параллельно со своими собственными входными данными в отдельном потоке.

  5. Выходные данные собираются на главном графическом процессоре и вычисляются потери. То есть значение функции потерь вычисляется путем сравнения выходных данных сети с истинной меткой данных каждого элемента в пакете.

  6. Распределите потери между графическими процессорами, запустите обратное распространение на каждом графическом процессоре и вычислите градиенты параметров.

  7. Слияние градиентов на GPU 0.

  8. Обновите параметры градиента.

    • Выполните градиентный спуск и обновите параметры модели на основном графическом процессоре.
    • Поскольку параметры модели обновляются только на ведущем графическом процессоре, а другие подчиненные графические процессоры в это время не обновляются синхронно, необходимо скопировать обновленные параметры модели на оставшиеся подчиненные графические процессоры для достижения параллелизма.

img

1.2 С точки зрения шаблона

Во-первых, давайте дадим технический обзор с точки зрения шаблона:

  • DP можно рассматривать как приложение, подобное серверу параметров.
  • DDP можно рассматривать как приложение коллективного общения.

Сервер параметров можно условно разделить на главный и рабочий, а DP основан на одной машине с несколькими картами, поэтому соответствующие отношения следующие:

  • worker : все графические процессоры (включая GPU 0) являются рабочими и отвечают за вычисления и обучение сети.
  • master: GPU 0 (не настоящая метка GPU, а первая позиция входного параметра device_ids) также отвечает за интеграцию градиентов и обновление параметров.

Итак, давайте сосредоточимся на GPU 0.

DataParallel по умолчанию поместит сетевую модель на GPU 0, а затем скопирует модель с GPU 0 на другие GPU. Каждый GPU начнет параллельное обучение, а затем GPU 0 будет использоваться в качестве основного для суммирования градиентов и обновления модели. Наконец, задача расчета будет загружена на другие GPU. Это очень похоже на механизм сервера параметров.

Эту же информацию можно увидеть и на официальной карте.

img

1.3 С точки зрения операционной системы

С точки зрения операционной системы DP и DDP различаются следующим образом (заранее спойлерим):

  • DataParallel — это однопроцессный многопоточный метод параллельного обучения, который может работать только на одном компьютере.
  • DistributedDataParallel поддерживает несколько процессов и работает как с обучением на одной, так и на нескольких машинах. Кроме того, DistributedDataParallel копирует модель заранее, а не на каждой итерации, и позволяет избежать глобальных блокировок интерпретатора.

1.4 Неэффективность

ДП имеет следующие дефекты:

  • избыточные копии данных

    • Данные сначала копируются с хоста на основной графический процессор, а затем субмини-пакеты распределяются (разбрасываются) между другими графическими процессорами.
  • Перед прямым распространением требуется репликация модели между графическими процессорами.

    • Поскольку параметры модели обновляются на основном графическом процессоре, модель необходимо повторно синхронизировать в начале каждого прямого прохода.
  • Каждый пакет будет иметь накладные расходы на создание/уничтожение потока.

    • Параллельное прямое распространение реализовано в нескольких потоках (это может быть проблемой PyTorch).
  • Есть возможность снизить градиент конвейера, но она не используется.

    • В реализации Pytorch 1.0.1 с параллельными данными градиентный спуск происходит в конце обратного распространения, которое может быть конвейерным.
  • Излишне собирать выходные данные модели на основном графическом процессоре.

  • Использование графического процессора неравномерно, и нагрузка неравномерна. Память и использование основного графического процессора будут выше, чем у других видеокарт, потому что:

    • Выполните расчет потерь на основном графическом процессоре.
    • Уменьшение градиента и параметры обновления происходят на основном графическом процессоре.

0x02 Обзор

2.1 Пример

Давайте используем пример, чтобы увидеть, что конкретная логика такова:

  • Установите видимый графический процессор для этой программы.

    • Соответствующий код должен использовать args.gpu_id="2,7" и os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu_id для настройки серийного номера GPU, на самом деле цель состоит в том, чтобы установить os.environ['CUDA_VISIBLE_DEVICES' ] = "2,7", Таким образом, device_ids[0] соответствует физической карте №2, а device_ids[1] соответствует физической карте №7.
    • Его также можно указать временно во время выполнения, например: CUDA_VISIBLE_DEVICES='2,7' Python train.py.
  • Поместите параметры модели и буферы в device_ids[0], блок распараллеливания должен иметь свои параметры и буферы в device_ids[0] перед запуском блока DataParallel.

    • Код: model=model.cuda() .
  • Постройте модель DP. Преимущество ДП в том, что им очень удобно пользоваться, нужно только поменять оригинальный однокарточный модуль с ДП на многокарточный.

    • Код модели = torch.nn.DaraParallel (модель).
    • На самом деле DP — это Pytorch nn.Module, поэтому и модель, и оптимизатор должны использовать .module для получения фактической модели и оптимизатора.
  • Загрузите данные в основной графический процессор.

    • data,label= data.cuda(),label.cuda()
  • прямое распространение.

    • Модуль модели DP будет копировать на каждое устройство.
    • DP разделит входные данные на несколько небольших фрагментов и распределит эти небольшие фрагменты данных по разным графическим процессорам для расчета.Каждой модели нужно только обрабатывать свои собственные выделенные данные.
  • Выполняется обратное распространение.

    • DP будет накапливать градиенты, рассчитанные каждым GPU, в GPU 0 для суммирования.

Конкретный код выглядит следующим образом:

args.gpu_id="2,7" ; #指定gpu id
args.cuda = not args.no_cuda and torch.cuda.is_available() #是否使用cpu
# 配置环境  也可以在运行时临时指定,比如:CUDA_VISIBLE_DEVICES='2,7' Python train.py
os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu_id # 赋值必须是字符串
device_ids=range(torch.cuda.device_count())  #torch.cuda.device_count()=2
# device_ids=[0,1] ---- 也可以这么使用。这里的0 就是上述指定 2,是主gpu, 1就是7,模型和数据由主gpu分发
 
if arg.cuda:
    model=model.cuda()  #将模型复制到gpu ,默认是cuda('0'),即转到第一个GPU 2
if len(device_id)>1:
    model=torch.nn.DataParallel(model);#构建DP,前提是model已经.cuda()了
 
optimizer = torch.optim.SGD(model.parameters(), args.lr,
                                momentum=args.momentum,
                                weight_decay=args.weight_decay)
    
#前向传播时,数据也要执行cuda(),即把数据复制到主gpu里
for batch_idx, (data, label) in pbar:   
    if args.cuda:
        data,label= data.cuda(),label.cuda(); # 数据放到了默认GPU
    data_v = Variable(data)
    target_var = Variable(label)
    prediction= model(data_v,target_var,args)
    #这里的prediction 预测结果是由两个gpu合并过的,并行计算只存在于前向传播里
    #前向传播每个gpu计算量为 batch_size/len(device_ids),等前向传播完了将结果归并到主gpu里
    #prediction的长度等于batch_size 
    criterion = nn.CrossEntropyLoss()
    loss = criterion(prediction,target_var) # 在默认GPU之上计算loss
    optimizer.zero_grad()
    loss.backward()  
    optimizer.step()

2.2 Соответствующие знания

Перед каждым запуском сетевого распространения DP передает параметры и буферы на главном узле другим узлам, чтобы поддерживать единство состояния. Эта часть соответствующих знаний в основном касается того, как скопировать модель в GPU и как вызвать функцию ядра GPU.Подробности см. в предыдущей статье.[Анализ исходного кода] Как PyTorch использует GPU.

0x03 Определение

3.1 Определения

Давайте посмотрим на структуру DataParallel через функцию инициализации DataParallel.

__init__Три входных параметра определяются следующим образом:

  • модуль: модель,
  • device_ids : обучающие устройства,
  • output_device : Устройство, на котором сохраняются результаты вывода. По умолчанию это device_ids[0], что является первой картой.

код показывает, как показано ниже:

import operator
import torch
import warnings
from itertools import chain
from ..modules import Module
from .scatter_gather import scatter_kwargs, gather
from .replicate import replicate
from .parallel_apply import parallel_apply
from torch._utils import (
    _get_all_device_indices,
    _get_available_device_type,
    _get_device_index,
    _get_devices_properties
)
​
class DataParallel(Module):
​
    # TODO: update notes/cuda.rst when this class handles 8+ GPUs well
​
    def __init__(self, module, device_ids=None, output_device=None, dim=0):
        super(DataParallel, self).__init__()
​
        # 得到可用的GPU
        device_type = _get_available_device_type()
        if device_type is None:
            self.module = module
            self.device_ids = []
            return
​
        # 没有输入的情况下,使用所有可见的GPU
        if device_ids is None:
            device_ids = _get_all_device_indices()
​
        # 把GPU列表上第一个作为输出,也会作为master
        if output_device is None:
            output_device = device_ids[0]
​
        self.dim = dim
        self.module = module
        self.device_ids = [_get_device_index(x, True) for x in device_ids]
        self.output_device = _get_device_index(output_device, True)
        self.src_device_obj = torch.device(device_type, self.device_ids[0])
​
        # 检查负载均衡
        _check_balance(self.device_ids)
​
        # 单卡就直接使用
        if len(self.device_ids) == 1:
            self.module.to(self.src_device_obj)

3.2 Балансировка нагрузки

Хотя входные данные делятся поровну и распределяются параллельно, потери на выходе будут каждый раз агрегироваться и рассчитываться на первом графическом процессоре, поэтому загрузка памяти и использование первого графического процессора будут больше, чем у других графических карт.

Функция _check_balance проверит, сбалансирована ли нагрузка, и предупредит, если память или процессор max/min > 0,75.

def _check_balance(device_ids):
    imbalance_warn = """
    There is an imbalance between your GPUs. You may want to exclude GPU {} which
    has less than 75% of the memory or cores of GPU {}. You can do so by setting
    the device_ids argument to DataParallel, or by setting the CUDA_VISIBLE_DEVICES
    environment variable."""
    device_ids = [_get_device_index(x, True) for x in device_ids]
    dev_props = _get_devices_properties(device_ids)
​
    def warn_imbalance(get_prop):
        values = [get_prop(props) for props in dev_props]
        min_pos, min_val = min(enumerate(values), key=operator.itemgetter(1))
        max_pos, max_val = max(enumerate(values), key=operator.itemgetter(1))
        if min_val / max_val < 0.75:
            warnings.warn(imbalance_warn.format(device_ids[min_pos], device_ids[max_pos]))
            return True
        return False
​
    if warn_imbalance(lambda props: props.total_memory):
        return
    if warn_imbalance(lambda props: props.multi_processor_count):
        return

0x04 Прямое распространение

DataParallel существует только среди параллельных вычислений процесса распространения вперед.

4.1 Общие

В предыдущем примере функция cuda() использовалась для размещения модели на GPU[0], где GPU[0] уже имеет параметры и буферы модели.

model=model.cuda() 

Поэтому в прямой функции она не используется для этого шага, а начинается с распространения модели и данных.Следует отметить, что модель будет распространяться каждый раз при прямом распространении. Он разделен на несколько шагов.

  • Проверка: просмотрите параметры и буферы модуля, чтобы увидеть, все ли они выше GPU[0], если нет, сообщите об ошибке.
  • Распределить (разбросать) входные данные: разделить входные данные на несколько копий в соответствии с их первым измерением (обычно размером пакета) и передать их на несколько графических процессоров;
  • Реплицировать модель: скопировать модель на несколько графических процессоров соответственно;
  • Параллельное применение (parallel_apply): параллельное прямое распространение по нескольким моделям. Поскольку device_ids[0] графического процессора и базовый параллельный модуль совместно используют хранилище, обновления на месте на устройстве[0] также сохраняются, а другие графические процессоры — нет.
  • Сбор: сбор данных, отправленных обратно с нескольких графических процессоров;

Конкретный код выглядит следующим образом:

    def forward(self, *inputs, **kwargs):
        
        with torch.autograd.profiler.record_function("DataParallel.forward"):
            # 如果机器上没有GPU,则直接用CPU运行
            if not self.device_ids:
                return self.module(*inputs, **kwargs)
        
            # 遍历module的parameters和buffers,看看是否都在GPU[0]之上,如果不在,报错。
            for t in chain(self.module.parameters(), self.module.buffers()):
                if t.device != self.src_device_obj:
                    raise RuntimeError("module must have its parameters and buffers "
                                       "on device {} (device_ids[0]) but found one of "
                                       "them on device: {}".format(self.src_device_obj, t.device))
​
            # 现在GPU[0]上有了模型,开始训练
            
            # 首先分发输入
            inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
            # for forward function without any inputs, empty list and dict will be created
            # so the module can be executed on one device which is the first one in device_ids
            if not inputs and not kwargs:
                inputs = ((),)
                kwargs = ({},)
​
            # 如果只有单卡,直接使用
            if len(self.device_ids) == 1:
                return self.module(*inputs[0], **kwargs[0])
            
            # 分发模型
            replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
            # 并行训练
            outputs = self.parallel_apply(replicas, inputs, kwargs)
            # 把前向传播的结果收集到master
            return self.gather(outputs, self.output_device)

4.2 Распределение (вход)

В приведенном выше коде следующая инструкция завершает операцию распределения данных.

inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)

Нашему графику распространения соответствует:

img

Итак, давайте сначала посмотрим, как его распределить.

Scatter на самом деле представляет собой пакет scatter_kwargs, поэтому давайте посмотрим непосредственно на scatter_kwargs.

    def scatter(self, inputs, kwargs, device_ids):

        return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)

4.2.1 scatter_kwargs

scatter_kwargs вызывает scatter для распределения ввода и kwargs соответственно.

def scatter_kwargs(inputs, kwargs, target_gpus, dim=0):
    r"""Scatter with support for kwargs dictionary"""
    # 分发input
    inputs = scatter(inputs, target_gpus, dim) if inputs else []
    # 分发kwargs
    kwargs = scatter(kwargs, target_gpus, dim) if kwargs else []
    
    # 用空项补齐,这样可以让 inputs 和 kwargs 长度相等
    if len(inputs) < len(kwargs):
        inputs.extend([() for _ in range(len(kwargs) - len(inputs))])
    elif len(kwargs) < len(inputs):
        kwargs.extend([{} for _ in range(len(inputs) - len(kwargs))])
    # 返回 tuple    
    inputs = tuple(inputs)
    kwargs = tuple(kwargs)
    return inputs, kwargs
​

4.2.2 scatter

Как вы можете понять из комментариев, тензор нарезается на примерно равные куски, а затем распределяется между заданными графическими процессорами. Это разделить пакет данных на более мелкие пакеты примерно поровну. Для других типов переменных будут выполняться разные операции в соответствии с разными типами, например вызов scatter_map для выполнения рекурсивной обработки внутри них.

img

def scatter(inputs, target_gpus, dim=0):
    r"""
    Slices tensors into approximately equal chunks and
    distributes them across given GPUs. Duplicates
    references to objects that are not tensors.
    """
    def scatter_map(obj):
        if isinstance(obj, torch.Tensor):
            # 针对张量会调用Scatter.apply处理
            return Scatter.apply(target_gpus, None, dim, obj)
        if is_namedtuple(obj):
            # 调用 scatter_map 对其子模块进行递归处理。
            return [type(obj)(*args) for args in zip(*map(scatter_map, obj))]
        if isinstance(obj, tuple) and len(obj) > 0:
            # 调用 scatter_map 对其子模块进行递归处理。
            return list(zip(*map(scatter_map, obj)))
        if isinstance(obj, list) and len(obj) > 0:
            # 调用 scatter_map 对其子模块进行递归处理。
            return [list(i) for i in zip(*map(scatter_map, obj))]
        if isinstance(obj, dict) and len(obj) > 0:
            # 调用 scatter_map 对其子模块进行递归处理。
            return [type(obj)(i) for i in zip(*map(scatter_map, obj.items()))]
        return [obj for targets in target_gpus]

    # After scatter_map is called, a scatter_map cell will exist. This cell
    # has a reference to the actual function scatter_map, which has references
    # to a closure that has a reference to the scatter_map cell (because the
    # fn is recursive). To avoid this reference cycle, we set the function to
    # None, clearing the cell
    try:
        res = scatter_map(inputs)
    finally:
        scatter_map = None
    return res

4.2.3 Scatter

Как упоминалось ранее, Scatter.apply обрабатывает тензоры, так что давайте посмотрим. Scatter расширяет функцию по следующей логике:

  • Если cuda доступна, получите список потоков, чтобы можно было выполнить копирование ЦП на ГП в фоновом потоке.
  • Позвоните comm.scatter для распространения.
  • Вызовите wait_stream и record_stream, чтобы синхронизировать поток копирования.
class Scatter(Function):
​
    @staticmethod
    def forward(ctx, target_gpus, chunk_sizes, dim, input):
        target_gpus = [_get_device_index(x, True) for x in target_gpus]
        ctx.dim = dim
        ctx.input_device = input.get_device() if input.device.type != "cpu" else -1
        streams = None
        # 对于cuda,进行处理
        if torch.cuda.is_available() and ctx.input_device == -1:
            # Perform CPU to GPU copies in a background stream
            streams = [_get_stream(device) for device in target_gpus]
            
        # 调用C++进行操作
        outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams)
        # Synchronize with the copy stream
        if streams is not None:
            for i, output in enumerate(outputs):
                with torch.cuda.device(target_gpus[i]):
                    main_stream = torch.cuda.current_stream()
                    main_stream.wait_stream(streams[i]) # 同步
                    output.record_stream(main_stream) # 同步
        return outputs
​
    @staticmethod
    def backward(ctx, *grad_output):
        return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)
​

4.2.4 comm.scatter

Эта функция в основном называетсяtorch._C._scatter, тем самым входя в мир C++.

​
def scatter(tensor, devices=None, chunk_sizes=None, dim=0, streams=None, *, out=None):
    """Scatters tensor across multiple GPUs. """
    tensor = _handle_complex(tensor)
    if out is None:
        devices = [_get_device_index(d) for d in devices]
        return tuple(torch._C._scatter(tensor, devices, chunk_sizes, dim, streams))
    else:
        return tuple(torch._C._scatter_out(tensor, out, dim, streams))

4.2.5 C++

В преобразованном файле мы видим, что разброс — это цель, которую мы хотим проанализировать.

      .def(
          "_scatter",
          [](at::Tensor& tensor,
             std::vector<int64_t>& devices,
             c10::optional<std::vector<int64_t>> chunk_sizes,
             int64_t dim,
             c10::optional<py::object> py_streams) {
            c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>> streams;
            if (py_streams) {
              py::handle handle = *py_streams;
              streams = THPUtils_PySequence_to_CUDAStreamList(handle.ptr());
            }
            // Note: We're holding the GIL up to here.
            pybind11::gil_scoped_release no_gil;
            // 实际需要看这里
            return scatter(tensor, devices, chunk_sizes, dim, streams);
          },
          py::arg("tensor"),
          py::arg("devices"),
          py::arg("chunk_sizes"),
          py::arg("dim"),
          py::arg("streams"))

Как вы можете видеть в scatter, scatter заключается в раздаче данных на каждый GPU, логика следующая:

  • Сначала вызовите split_with_sizes или chunk, чтобы разделить тензор на куски.
  • Во-вторых, куски распределяются между каждым GPU, что выполняется через распределение.
std::vector<at::Tensor> scatter(
    const at::Tensor& tensor,
    at::IntArrayRef devices,
    const c10::optional<std::vector<int64_t>>& chunk_sizes,
    int64_t dim,
    const c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>>&
        streams) {
​
  dim = at::maybe_wrap_dim(dim, tensor);
    
  // 首先把tensor分割成 chunks
  std::vector<at::Tensor> chunks = chunk_sizes
      ? tensor.split_with_sizes(/*split_sizes=*/*chunk_sizes, /*dim=*/dim)
      : tensor.chunk(/*chunks=*/devices.size(), /*dim=*/dim);
  at::cuda::OptionalCUDAStreamGuard cuda_guard;
    
  // 其次把 chunks 分布到各个GPU之上
  for (size_t i = 0; i < chunks.size(); ++i) {
    const auto device_index = static_cast<int16_t>(devices[i]);
    if (device_index != tensor.get_device()) {
      if (i < (streams ? streams->size() : 0U) && (*streams)[i]) {
        cuda_guard.reset_stream(*(*streams)[i]);
      }
      chunks[i] = chunks[i].to( // 拷贝
          {DeviceType::CUDA, device_index},
          /*non_blocking=*/true,
          /*copy=*/false,
          /*memory_format=*/at::MemoryFormat::Preserve);
    }
  }
  return chunks; // 返回结果
}

4.3 Копия (модель)

До сих пор мы использовали функцию Scatter для размещения и копирования данных с устройства [0] на разные карты, и мы будем использовать функцию Replicate для копирования модели с устройства [0] на разные карты.

        # 分发模型
        replicas = self.replicate(self.module, self.device_ids[:len(inputs)])

Нашему графику распространения соответствует:

img

replicate — это всего лишь ретвит, его еще нужно посмотреть.

def replicate(self, module, device_ids):
    return replicate(module, device_ids, not torch.is_grad_enabled())

4.3.1 replicate

Конкретная логика репликации такова:

  • Используйте _replicatable_module, чтобы проверить, можно ли безопасно реплицировать модель.

  • Посмотрите, сколько графических процессоров и сколько копий нужно сделать.

  • Операция копирования.

    • Копировать параметры.

      • Используйте _broadcast_coalesced_reshape для копирования параметров на каждый GPU.
    • Копировать буферы.

      • Сначала подсчитайте буферы.
      • Запишите индекс буфера, который нужно дифференцировать.
      • Запишите буфер буфера, который не нуждается в руководстве.
      • Используйте _broadcast_coalesced_reshape для двух буферов для копирования на каждый GPU.
    • Скопируйте модель.

      • modules() возвращает итератор, содержащий все модули текущей модели. Преобразование в список можно рассматривать как выравнивание модели.
      • Пройдитесь по модулям и добавьте каждый слой модели в каждый module_copies.
      • Наконец, module_copies[j] содержит каждый слой модели, а именноmodule_copies[j][i]является i-м слоем модели.
  • Настройте действия.

    • Это необходимо для настройки модели сети и настройки ссылки данных в графическом процессоре на каждый модуль в массиве модулей, чтобы эти модули были полными моделями.

    • Потому что раньше сеть вложенной модели была разбита и скопирована на GPU: буферы и параметры также были скопированы на GPU. Теперь их нужно переконфигурировать в модель мелкого копирования, которая завершает логику модели.

    • Пройдитесь по каждому подмодулю модели и настройте только некоторые необходимые параметры.

      • справиться со своим сыном_modules_.
      • Обработайте его _parameters.
      • Обработайте его _buffers.
  • В последующих параллельных операциях каждый воркер будет получать каждый модуль в массиве модулей и обучаться на этом модуле.

Конкретный код выглядит следующим образом:

def replicate(network, devices, detach=False):
    if not _replicatable_module(network):
        raise RuntimeError("Cannot replicate network where python modules are "
                           "childrens of ScriptModule")

    if not devices:
        return []

    # 看看有多少个GPU,需要复制多少份
    devices = [_get_device_index(x, True) for x in devices]
    num_replicas = len(devices) # 复制这些份

    # 1)复制操作
    
    # 复制参数 parameters
    params = list(network.parameters())
    param_indices = {param: idx for idx, param in enumerate(params)}
    # 拷贝到各个GPU,我们随后会讲解_broadcast_coalesced_reshape
    param_copies = _broadcast_coalesced_reshape(params, devices, detach)

    # 复制buffers
    # 首先统计一下buffers
    buffers = list(network.buffers())
    buffers_rg = [] # 需要求导的
    buffers_not_rg = [] # 不需要求导的
    for buf in buffers:
        if buf.requires_grad and not detach:
            buffers_rg.append(buf)
        else:
            buffers_not_rg.append(buf)

    # 记录需要求导的 buffer 的 index
    buffer_indices_rg = {buf: idx for idx, buf in enumerate(buffers_rg)}
    # 记录不需要求导的 buffer 的 index
    buffer_indices_not_rg = {buf: idx for idx, buf in enumerate(buffers_not_rg)}

    # 对于两种buffers分别拷贝到各个GPU
    buffer_copies_rg = _broadcast_coalesced_reshape(buffers_rg, devices, detach=detach)
    buffer_copies_not_rg = _broadcast_coalesced_reshape(buffers_not_rg, devices, detach=True)

    # 准备拷贝模型网络
    modules = list(network.modules()) # modules()返回一个包含当前模型所有模块的迭代器。转变成list,可以认为把模型打平了
    module_copies = [[] for device in devices] # 为各个GPU准备好空list
    module_indices = {}

	  # 得到模型的浅拷贝列表
    for i, module in enumerate(modules):  # 遍历模型 list
        module_indices[module] = i
        for j in range(num_replicas):
            replica = module._replicate_for_data_parallel() # 获取浅拷贝
            # This is a temporary fix for DDP. DDP needs to access the
            # replicated model parameters. It used to do so through
            # `mode.parameters()`. The fix added in #33907 for DP stops the
            # `parameters()` API from exposing the replicated parameters.
            # Hence, we add a `_former_parameters` dict here to support DDP.
            replica._former_parameters = OrderedDict()
            module_copies[j].append(replica) # 往每个module_copies里面添加模型的每一层
    # 最终,module_copies[j] 里面包含了模型的每一层,即module_copies[j][i] 就是模型的第 i 层

    # 2)配置操作   
    
    # 这一步的目的是:把GPU中数据的reference赋值到浅拷贝之中,变成完备模型。因为之前是把嵌套的模型网络打散了分别拷贝到GPU,buffers和parameters也分别拷贝到了GPU,现在把他们构建到浅拷贝的模型之中,把模型逻辑补齐。
    
    for i, module in enumerate(modules): # 遍历模型每个子模块,只赋值需要的部分参数
        
        # 处理其子_modules
        for key, child in module._modules.items():
            if child is None:
                for j in range(num_replicas):
                    replica = module_copies[j][i] # module_copies[j]是第j个模型拷贝
                    replica._modules[key] = None
            else:
                module_idx = module_indices[child]
                for j in range(num_replicas):
                    replica = module_copies[j][i] # module_copies[j]是第j个模型拷贝
                    setattr(replica, key, module_copies[j][module_idx]) # 设置第j个模型的对应部分,下同
                    
        # 处理_parameters
        for key, param in module._parameters.items():
            if param is None:
                for j in range(num_replicas):
                    replica = module_copies[j][i]
                    replica._parameters[key] = None
            else:
                param_idx = param_indices[param]
                for j in range(num_replicas):
                    replica = module_copies[j][i]
                    param = param_copies[j][param_idx]
                    # parameters in replicas are no longer leaves,
                    # so setattr them as non-parameter attributes
                    setattr(replica, key, param)
                    # expose the parameter for DDP
                    replica._former_parameters[key] = param
                    
        # 处理 _buffers            
        for key, buf in module._buffers.items():
            if buf is None:
                for j in range(num_replicas):
                    replica = module_copies[j][i]
                    replica._buffers[key] = None
            else:
                if buf.requires_grad and not detach:
                    buffer_copies = buffer_copies_rg
                    buffer_idx = buffer_indices_rg[buf]
                else:
                    buffer_copies = buffer_copies_not_rg
                    buffer_idx = buffer_indices_not_rg[buf]
                for j in range(num_replicas):
                    replica = module_copies[j][i]
                    setattr(replica, key, buffer_copies[j][buffer_idx])

    return [module_copies[j][0] for j in range(num_replicas)]

4.3.2 Проверка копий

_replicatable_module используется для проверки безопасности копирования модели.

​
# Check if we can safely replicate the module.
# there are two types of module:
# 1. python modules
# 2. ScriptModule
#
# currently a module cannot be replicated properly if the descendants of
# any ScriptModule contains python module (type 1 above)
def _replicatable_module(module, memo=None):
​
    # module.modules() contains module itself as the first element
    def descendant_modules(module):
        gen = module.modules()
        next(gen)
        return gen
​
    if not _is_jit_enabled():
        return True
    if memo is None:
        memo = set()
​
    # memoize visited modules
    memo.add(module)
    if _is_script_module(module):
        memo.update(descendant_modules(module))
        return all(_is_script_module(descendant) for
                   descendant in descendant_modules(module))
​
    for child in module.children():
        # since any unreplicatable module will cause the check to return
        # False early, visited modules here can be safely ignored.
        if child in memo:
            continue
        if not _replicatable_module(child, memo):
            return False
​
    return True

4.3.3 Общие копии

В PyTorch есть мелкие копии и глубокие копии.

Предполагая, что модель представляет собой серию матриц параметров, объект модели фактически указывает на каждую матрицу параметров.

  • Поверхностная копия (теневая копия) копирует только самые внешние значения и указатели, а не копирует более глубокие объекты, то есть копирует только родительский объект. model.state_dict() также является мелкой копией.Если param=model.state_dict(), то при изменении param параметры модели также будут изменены соответствующим образом.
  • Соответственно, глубокая копия (deepcopy): копировать значения, указатели и пространство глубокой памяти, на которое указывают указатели, то есть копировать родительский объект и его дочерние объекты.

Например:

import torch
import copy
​
# a引用指向某块内存空间
a = torch.nn.Linear(in_features=5, out_features=1, bias=True)
# 浅拷贝相当于拷贝一个引用,所以他们指向的内存空间是一样的
b = copy.copy(a)
​
# state_dict is shadow copy
p = a.state_dict()
print(id(a.state_dict()) == id(p)) # False,这两个不相等
​
# 通过引用p去修改内存空间
print(a.weight)
p['weight'][0][0] = 8.8888
​
# 可以看到a指向的内存空间也被修改了
print(a.weight)

Результат выглядит следующим образом:

False
Parameter containing:
tensor([[-0.2253,  0.0802,  0.3984, -0.1208,  0.3796]], requires_grad=True)
Parameter containing:
tensor([[ 8.8888,  0.0802,  0.3984, -0.1208,  0.3796]], requires_grad=True)

В частности, возвращаясь к нашему анализу, в классе модуля есть метод _replicate_for_data_parallel, который используется для возврата копии, которая разделяет хранилище с исходной моделью, которая является поверхностной копией.

    def _replicate_for_data_parallel(self):
        replica = self.__new__(type(self))
        replica.__dict__ = self.__dict__.copy()
​
        # replicas do not have parameters themselves, the replicas reference the original
        # module.
        replica._parameters = OrderedDict()
        replica._buffers = replica._buffers.copy() # 浅拷贝
        replica._modules = replica._modules.copy() # 浅拷贝模型内部的子模块
        replica._is_replica = True
​
        return replica

Можно считать, что до операции set копия выглядит следующим образом:

+---------------------------------------------------------------+
|                               +----------------------+        |
| CPU                           | Module               |        |
|                               |                      |        |
|                               |     _parameters      |        |
|                               |                      |        |
|                    +--------------> _buffers  <-------------+ |
|                    |          |                      |      | |
|                    |     +------->  _modules  <----------+  | |
|                    |     |    |                      |   |  | |
|                    |     |    +----------------------+   |  | |
| +---------------------+  |    +----------------------+   |  | |
| | module_copies[0] |  |  |    | module_copies[1]     |   |  | |
| |                  |  |  |    |                      |   |  | |
| |    _parameters   |  |  |    |     _parameters      |   |  | |
| |                  |  |  |    |                      |   |  | |
| |    _buffers +----+  |  |    |     _buffers +--------------+ |
| |                     |  |    |                      |   |    |
| |    _modules  +-------->+    |     _modules  +--------->+    |
| |                     |       |                      |        |
| +---------------------+       +----------------------+        |
+---------------------------------------------------------------+

  +---------------------+       +----------------------+
  | GPU 0               |       | GPU 1                |
  |                     |       |                      |
  |     _parameters     |       |      _parameters     |
  |                     |       |                      |
  |     _buffers        |       |      _buffers        |
  |                     |       |                      |
  |                     |       |                      |
  |                     |       |                      |
  +---------------------+       +----------------------+


После операции set это выглядит следующим образом:

   +-----------------------------------------------------------------+
   | CPU                             +----------------------+        |
   |                                 | Module               |        |
   |                                 |                      |        |
   |                                 |     _parameters      |        |
   |                                 |                      |        |
   |                                 |     _buffers         |        |
   |                                 |                      |        |
   |                                 |     _modules         |        |
   |                                 |                      |        |
   |                                 +----------------------+        |
   |   +---------------------+       +----------------------+        |
   |   | module_copies[0]    |       | module_copies[1]     |        |
   |   |                     |       |                      |        |
+---------+ _parameters      |       |     _parameters +-----------+ |
|  |   |                     |       |                      |      | |
|  |   |    _buffers +------------+  |     _buffers +-----------+  | |
|  |   |                     |    |  |                      |   |  | |
|  |   |    _modules         |    |  |     _modules         |   |  | |
|  |   |                     |    |  |                      |   |  | |
|  |   +---------------------+    |  +----------------------+   |  | |
|  +-----------------------------------------------------------------+
|                                 |                             |  |
|      +---------------------+    |  +----------------------+   |  |
|      | GPU 0               |    |  | GPU 1                |   |  |
|      |                     |    |  |                      |   |  |
+--------->  _parameters     |    |  |      _parameters <----------+
       |                     |    |  |                      |   |
       |     _buffers  <----------+  |      _buffers   <--------+
       |                     |       |                      |
       |                     |       |                      |
       |                     |       |                      |
       +---------------------+       +----------------------+
​

4.3.4 Операция копирования

4.3.4.1 _broadcast_coalesced_reshape

Используются параметры копирования _broadcast_coalesced_reshape.

def _broadcast_coalesced_reshape(tensors, devices, detach=False):
    from ._functions import Broadcast
    if detach:
        # 如果是detach,就直接调用
        return comm.broadcast_coalesced(tensors, devices)
    else:
        # Use the autograd function to broadcast if not detach
        if len(tensors) > 0:
            # 否则先用Broadcast过度一下,最后还是调用broadcast_coalesced
            tensor_copies = Broadcast.apply(devices, *tensors)
            return [tensor_copies[i:i + len(tensors)]
                    for i in range(0, len(tensor_copies), len(tensors))]
        else:
            return []
4.3.4.2 Broadcast

Причин использования Broadcast слишком много: поскольку тензоры не отсоединяются, помимо трансляции нужно еще задать, какие градиенты не нужны в контексте. В некоторых случаях пользовательские функции должны знать об этом.

class Broadcast(Function):
​
    @staticmethod
    def forward(ctx, target_gpus, *inputs):
        assert all(i.device.type != 'cpu' for i in inputs), (
            'Broadcast function not implemented for CPU tensors'
        )
        target_gpus = [_get_device_index(x, True) for x in target_gpus]
        ctx.target_gpus = target_gpus
        if len(inputs) == 0:
            return tuple()
        ctx.num_inputs = len(inputs)
        # input 放在 device[0]
        ctx.input_device = inputs[0].get_device()
        # 和 detach 的情形一样
        outputs = comm.broadcast_coalesced(inputs, ctx.target_gpus)
        non_differentiables = []
        
        # 在上下文中设置哪些不需要梯度
        for idx, input_requires_grad in enumerate(ctx.needs_input_grad[1:]):
            if not input_requires_grad:
                for output in outputs:
                    non_differentiables.append(output[idx])
        ctx.mark_non_differentiable(*non_differentiables)
        return tuple([t for tensors in outputs for t in tensors])
​
    @staticmethod
    def backward(ctx, *grad_outputs):
        return (None,) + ReduceAddCoalesced.apply(ctx.input_device, ctx.num_inputs, *grad_outputs)
​

Среди них mark_non_дифференцируемый определяется в torch/csrc/autograd/custom_function.cpp, где недифференцируемые переменные настраиваются в AutogradContext.

void AutogradContext::mark_non_differentiable(const variable_list &outputs) {
  non_differentiable_.clear();
  non_differentiable_.reserve(outputs.size());
  for(auto& var : outputs) {
    non_differentiable_.insert(var.unsafeGetTensorImpl());
  }
}
4.3.4.3 broadcast_coalesced

Broadcast_coalesced прыгает в мир C++.

def broadcast_coalesced(tensors, devices, buffer_size=10485760):
    """Broadcasts a sequence tensors to the specified GPUs.
    Small tensors are first coalesced into a buffer to reduce the number
    of synchronizations.
​
    Args:
        tensors (sequence): tensors to broadcast. Must be on the same device,
          either CPU or GPU.
        devices (Iterable[torch.device, str or int]): an iterable of GPU
          devices, among which to broadcast.
        buffer_size (int): maximum size of the buffer used for coalescing
​
    Returns:
        A tuple containing copies of :attr:`tensor`, placed on :attr:`devices`.
    """
    devices = [_get_device_index(d) for d in devices]
    tensors = [_handle_complex(t) for t in tensors]
    return torch._C._broadcast_coalesced(tensors, devices, buffer_size)
​
4.3.4.4 C++

Как видно из кода инициализации, это делается в файле broadcast_coalesced.

  auto m = py::cast<py::module>(module);
  m.def(
       "_broadcast_coalesced",
       [](std::vector<at::Tensor>& tensors,
          std::vector<int64_t> devices,
          size_t buffer_size) {
         return broadcast_coalesced(tensors, devices, buffer_size);
       },
       py::arg("tensors"),
       py::arg("devices"),
       py::arg("buffer_size"),
       py::call_guard<py::gil_scoped_release>())
  • Broadcast_coalesced будет распространять переменную на все графические процессоры. В Broadcast_coalesced несколько переменных могут быть объединены в одну большую переменную, транслированы на другие устройства, а затем разделены в соответствии с исходной формой.
  • При разделении операция представления будет транслировать все переменные вместе для совместного использования счетчика версий, поскольку все они являются представлениями больших переменных. Однако эта большая переменная немедленно отбрасывается, и все эти переменные вообще не используют общую память.
  • Например, когда два буфера совместно транслируются в режиме «Параллельные данные», и один из них выполняет операции на месте во время «вперед», а другой используется в режиме «назад», движок автоградации будет жаловаться. Поэтому мы переопределим эти переменные после трансляции и дадим им отдельные счетчики версий.

Конкретный код находится в torch/csrc/cuda/comm.cpp. Изучим его аннотации.

// broadcast_coalesced
// ~~~~~~~~~~~~~~~~~~~
//
// In broadcast_coalesced, multiple variables may be coalesced into a single
// large one, broadcast to other devices, and the get split according to the
// original shapes.
//
// When splitting, the view operations will make all Variables broadcast
// together to share a single version counter, because they are all views of the
// large Variable. However, that large Variable is immediately discarded and all
// these Variables do not share storage at all.
//
// For example, when two buffers are broadcast together in `DataParallel` and
// one of them is modified in-place during `forward` but the other is needed in
// backward, autograd engine will complain.
//
// We thus re-wrap these Variables after broadcasting (i.e., effectively doing
// what is equivalent to .data in Python), and give them individual version
// counters.


Конкретные параметры метода broadcast_coalesced поясняются следующим образом:

  • тензоры должны быть на одном устройстве, CPU или GPU;
  • devices — устройство, на которое нужно скопировать;
  • buffer_size — это самый большой буфер. Буфер используется здесь для объединения небольших тензоров в буфер, чтобы уменьшить количество синхронизаций;
tensor_list2d broadcast_coalesced(
    TensorList tensors,
    IntArrayRef devices,
    size_t buffer_size) {
  TORCH_CHECK(
      std::all_of(
          tensors.begin(),
          tensors.end(),
          [&](const at::Tensor& t) { return t.get_device() == devices[0]; }),
      "All tensors must be on devices[0]: ",
      devices[0]);
#ifdef USE_NCCL
  buffer_size = std::min(torch::cuda::nccl::get_max_count(), buffer_size);
#endif

  tensor_list2d outputs(devices.size());
  outputs[0] = tensors.vec();
  for (auto& o : outputs)
    o.reserve(tensors.size());

  unique_type_checker type_checker;
  at::cuda::CUDAGuard device_guard(devices[0]);
  for (auto& chunk : utils::take_tensors(tensors, buffer_size)) {
    auto type_id = chunk.type_id();
    type_checker.show(type_id);
    std::vector<at::Tensor> results;
    if (chunk.options().is_sparse()) {
      auto flat_tuple = utils::flatten_sparse_tensors(chunk.tensors);
      auto broadcast_indices = broadcast(flat_tuple.first, devices); //这里进行广播
      auto broadcast_values = broadcast(flat_tuple.second, devices); //这里进行广播
      results.reserve(devices.size());
      for (size_t i = 1, num_devices = devices.size(); i < num_devices; ++i) {
        device_guard.set_index(devices[i]);
        auto& device_outputs = outputs[i];
        auto& inds = broadcast_indices[i];
        auto& vals = broadcast_values[i];
        for (auto& t :
             utils::unflatten_sparse_tensors(inds, vals, chunk.tensors)) {
          Variable var = t;
          device_outputs.push_back(make_variable(var.tensor_data(), false));
        }
      }
    } else {
      auto results = // 这里进行广播
          broadcast(utils::flatten_dense_tensors(chunk.tensors), devices);
      for (size_t i = 1, num_devices = devices.size(); i < num_devices; ++i) {
        device_guard.set_index(devices[i]);
        auto& device_outputs = outputs[i];
        for (auto& t :
             utils::unflatten_dense_tensors(results[i], chunk.tensors)) {
          Variable var = t;
          device_outputs.push_back(make_variable(var.tensor_data(), false));
        }
      }
    }
  }

  // If we only saw a single tensor type, then we can skip expensive reordering
  if (!type_checker.unique) {
    for (auto& o : outputs)
      utils::reorder_tensors_like(o, tensors);
  }
  return outputs;
}


Метод трансляции следующий:

std::vector<Tensor> broadcast(const Tensor& tensor, IntArrayRef devices) {
  std::vector<Tensor> diff_device_dst_tensors;
  diff_device_dst_tensors.reserve(devices.size());
  for (auto device : devices) {
    if (device != tensor.get_device()) {
      diff_device_dst_tensors.push_back(at::empty(
          tensor.sizes(),
          tensor.options().device(
              at::Device(DeviceType::CUDA, device)))); // preserve memory format
    }
  }
  // 继续调用操作
  _broadcast_out_impl(tensor, diff_device_dst_tensors);
  std::vector<Tensor> dst_tensors;
  dst_tensors.reserve(devices.size());
  auto it = diff_device_dst_tensors.begin();
  for (auto device : devices) {
    if (device != tensor.get_device()) {
      dst_tensors.push_back(*it++);
    } else {
      dst_tensors.push_back(tensor);
    }
  }
  TORCH_INTERNAL_ASSERT(it == diff_device_dst_tensors.end());
  return dst_tensors;
}

Наконец, _broadcast_out_impl вызывается для трансляции исходного тензора (CPU или CUDA) в список устройств CUDA, который вызывает nccl::broadcast(nccl_list).

static inline std::vector<Tensor>& _broadcast_out_impl(
    const Tensor& tensor,
    std::vector<Tensor>& out_tensors) {
#ifdef USE_NCCL
  std::vector<Tensor> nccl_list;
  nccl_list.reserve(out_tensors.size() + 1);
  nccl_list.push_back(tensor);
  for (auto& out_tensor : out_tensors) {
    nccl_list.push_back(out_tensor);
  }
  if (nccl::is_available(nccl_list)) {
    nccl::broadcast(nccl_list); // 这里调用了 NCCL 操作
  } else {
#else
  {
#endif
    for (auto& out_tensor : out_tensors) {
      out_tensor.copy_(tensor, /*non_blocking=*/true);
    }
  }
  return out_tensors;
}

До сих пор мы распространяли и данные, и модель на другие графические процессоры. Сначала мы строим текущий форвард-граф, и вы можете иметь четкое представление.Репликация вызывает Broadcast.forward и сохраняет input_device и num_inputs в своем контексте. Далее может быть выполнено прямое распространение.

+----------------------------------------------------------------------------------------+
| DataParallel.forward                                                                   |
|                                                                                        |
|                                                                                        |
|              replicate +--------------->   parallel_apply             gather           |
|                                                                                        |
+----------------------------------------------------------------------------------------+
​
     +---------------------------+
     | Broadcast                 |
     |                           |
     |                           |
     |                           |
     |          forward()  +----------->
     |                           |
     |                           |
     |  +---------------------+  |
     |  | ctx                 |  |
     |  |       input_device  |  |
     |  |                     |  |
     |  |       num_inputs    |  |
     |  |                     |  |
     |  +---------------------+  |
     |                           |
     |                           |
     |                           |
     |                           |
     |                           |
     |                           |
     +---------------------------+
​

Из-за нехватки места в следующей статье мы продолжим анализ с параллельной операции (прямое распространение).

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

torch.optim интерпретации исходного кода PyTorch: подробное объяснение интерфейса алгоритма оптимизации

pytorch (распределенные) данные параллельной личной практики - DataParallel/DistributedDataParallel

nn.DataParallel от Pytorch

Понимать распределенное обучение интерпретации исходного кода PyTorch?

обсуждение.py torch.org/he/data пар AL…

[Оригинал][глубина][PyTorch] Вторая часть серии DDP: принцип реализации и анализ исходного кода

Pytorch-CUDA от входа до отказа (2)

Pytorch наступает на яму: разница между присваиванием, поверхностным копированием и глубоким копированием, а также ямы model.state_dict() и model.load_state_dict()

DP и DDP интерпретации исходного кода PyTorch: параллелизм моделей и анализ распределенного обучения