[Анализ исходного кода] Распределенный PyTorch (9) ----- Инициализация DistributedDataParallel

глубокое обучение PyTorch

0x00 сводка

В предыдущей статье мы представили некоторые вспомогательные модули DDP, что сделало необходимую подготовку для этой статьи.Эта статья начинает знакомство с частью инициализации кода мира Python и мира C++. Основной код мира C++ описан ниже.

Другие статьи из этой серии:

[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор

[Анализ исходного кода] Как PyTorch использует GPU

[Анализ исходного кода] Распределенный PyTorch (2) ----- DataParallel (включен)

[Анализ исходного кода] Распределенный PyTorch (3) ----- DataParallel (ниже)

[Анализ исходного кода] Распределенный PyTorch (4) ------ Основные концепции распределенных приложений

[Анализ исходного кода] Распределенный PyTorch (5) ------ Обзор DistributedDataParallel и способы его использования

[Анализ исходного кода] Распределенный PyTorch (6) ---DistributedDataParallel -- инициализация и хранение

[Анализ исходного кода] Распределенный PyTorch (7) ----- Группа процессов DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (8) -------- Бумага DistributedDataParallel

0x01 Обзор

1.1 Параллелизм данных

DDP — это реализация параллельного обучения данных.Чтобы пробудить у всех память, нам все еще нужно взглянуть на общий процесс параллелизма данных, который исходит из исходного кода github.

1.2 Архитектура DDP

Следующий текст переведен сpy torch.org/docs/master…

Ниже представлены компоненты реализации DDP. Диаграмма стека показывает структуру кода.

Мы следуем этой архитектурной диаграмме сверху вниз.

1.2.1 Распределенный параллелизм данных

Вверху находится параллельный компонент распределенных данных.

  • Распространяется Друзья:
    • Это точка входа Python для DDP. Он реализует шаг инициализации, соответствующийnn.parallel.DistributedDataParallelмодульныйforwardфункция, модуль вызовет библиотеку C++.
    • это_sync_paramФункция такова: когда процесс DDP работает на нескольких устройствах, он выполняет внутрипроцессную синхронизацию параметров, а также транслирует буферы моделей из процесса ранга 0 всем остальным процессам.
    • Синхронизация параметров между процессами вReducer.cppреализовано в.
  • comm.h: реализует объединенную вспомогательную функцию широковещательной рассылки, которая вызывается во время инициализации для широковещательной передачи состояния модели и синхронизации буферов модели перед прямым распространением.
  • reducer.h: обеспечивает базовую реализацию синхронизации градиента при обратном распространении. Он имеет три функции точки входа:
    • Reducer: чей конструктор находится вdistributed.pyназывается,ReducerзарегистрируетReducer::autograd_hook()в аккумулятор градиента.
    • autograd_hook()Движок autograd вызовет эту функцию, когда градиент будет готов.
    • prepare_for_backward()существуетdistributed.pyСреди них, когда прямой проход DDP заканчивается, он называетсяprepare_for_backward(). Если в конструкторе DDP поставитьfind_unused_parametersУстановить какTrue, DDP просматривает граф вычислений autograd, чтобы найти неиспользуемые параметры.

1.2.2 Процесс

Ниже приведены два компонента, связанных с процессом.

  • ProcessGroup.hpp : абстрактный API, содержащий все реализации групп процессов.c10dБиблиотека предоставляет 3 готовые реализации: ProcessGroupGloo, ProcessGroupNCCL и ProcessGroupMPI.DistributedDataParallelиспользоватьProcessGroup::broadcast()Отправлять состояние модели из процесса ранга 0 другим процессам во время инициализации иProcessGroup::allreduce()Суммирование градиента.
  • Store.hpp : служба сбора, которая помогает экземплярам группы процессов находить друг друга.

1.3 Общая реализация DDP

Мы кладем бумагу иpy torch.org/docs/master…В совокупности посмотрите на общую реализацию DDP.

Мы резюмируем шаги в итерации DistributedDataParallel следующим образом (не полностью соответствует приведенному выше рисунку, есть некоторые уточнения):

  • Предпосылки:

    • Опора DDP C10DProcessGroupобщаться. Поэтому приложение должноProcessGroupСоздайте экземпляр перед созданием DDP.
  • Конструктор:

    • Процесс ранга 0 будет ссылаться на локальный модуль, поместив модельstate_dict()Параметры передаются всем процессам, что гарантирует, что все процессы будут обучены с одинаковыми значениями инициализации и копиями модели.
    • Каждый процесс DDP создает локальныйReducer, синхронизация градиента будет выполнена позже во время обратного прохода.
    • Для повышения эффективности коммуникации,ReducerОрганизуйте градиенты параметров в корзины, уменьшая одну корзинку за раз.
      • Инициализируйте сегменты и назначьте параметры сегментам в обратном порядке, что может повысить эффективность связи.
      • Размер корзины можно настроить, установив параметр Bucket_cap_mb в конструкторе DDP.
      • Сопоставление градиентов параметров с сегментами определяется во время сборки на основе ограничений размера сегментов и размеров параметров. Параметры модели начинаются с (примерно)Model.parameters()Распределите по корзинам в порядке, обратном данной модели. Причина использования обратного порядка заключается в том, что DDP ожидает, что градиенты будут готовы примерно в этом порядке во время обратного прохода.
      • На изображении ниже показан пример. Пожалуйста, обрати внимание,grad0иgrad1существуетbucket1, два других градиента находятся вbucket0середина. Конечно, это предположение не всегда может быть верным, и когда это происходит, это может повредить обратной скорости DDP, потому что он не можетReducerНачните общаться заранее.
    • В дополнение к ковшированию,ReducerХуки Autograd также регистрируются при построении, по одному хуку на аргумент. Эти хуки будут запущены во время обратного прохода, когда градиент будет готов. В частности, просмотрите параметры и добавьте grad_accumulator и autograd_hook к каждому параметру.
  • Forward Pass:

    • Каждый процесс считывает свои обучающие данные, а DistributedSampler гарантирует, что каждый процесс считывает разные данные.
    • DDP принимает ввод и передает его локальной модели.
    • Модель выполняет опережающее вычисление, и результат выводится. Теперь вычисления выполняются для каждого процесса (устройство CUDA).
    • еслиfind_unused_parametersУстановить какTrue, DDP проанализирует выходные данные локальной модели, пройдет расчетный график из out и пометит неиспользуемые параметры как готовые, потому что расчетный график будет меняться каждый раз, поэтому его нужно каждый раз проходить.
      • Этот режим (Mode) позволяет работать в обратном направлении по подграфам модели, а DDP уменьшает параметры, участвующие в обратном проходе, путем обхода графа автоградации из вывода модели и пометки всех неиспользуемых параметров как готовых.
      • Во время обратного проходаReducerБудет ждать только неподготовленных параметров, но все равно уменьшит все ведра. Пометка градиентов параметров как готовых не помогает DDP пропускать сегменты, но предотвращает вечное ожидание DDP несуществующих градиентов во время обратных проходов.
      • Обратите внимание, что обход графа autograd приводит к дополнительным накладным расходам, поэтому приложения должны устанавливать это значение только в случае необходимости.find_unused_parametersзаTrue.
    • Вернись. Выход модели сети не нужно собирать в процесс ранга 0, который отличается от DP.
  • Backward Pass:

    • backward()вызывать функцию непосредственно при потереTensor, который находится вне контроля DDP, который использует перехватчики автоградации, зарегистрированные при построении, для запуска синхронизации градиента. Когда градиент будет готов, сработает соответствующий хук DDP на этом накопителе градиента.
    • Сделайте all-reduce в autograd_hook. Предполагая, что индекс параметра равен param_index, используйте param_index, чтобы получить параметр и пометить его как готовый.Если все градиенты в ведре готовы, ведро готово.
    • Когда градиенты в ведре будут готовы, он будет на ведреReducerзапустить асинхронныйallreduceвычислить средний градиент всех процессов.
    • Если все сегменты готовы, дождитесь завершения всех сокращений. Когда все ведра будут готовы,Reducerзаблокирует ожидание всехallreduceОперация завершена. После этого запишите средний градиент вparam.gradПоля для всех параметров.
    • Градиенты всех процессов уменьшены, и после обновления веса всех моделей одинаковы. Таким образом, после выполнения обратного распространения поля градации соответствующих одних и тех же параметров в разных процессах DDP должны быть одинаковыми.
    • Нет необходимости передавать параметры после каждой итерации, как это делает DP. Но буферы по-прежнему должны транслироваться процессом ранга 0 другим процессам на каждой итерации.
  • Optimizer Step:

    • С точки зрения оптимизатора это оптимизация локальной модели.

    • Реплики модели во всех процессах DDP могут синхронизироваться, поскольку все они начинаются с одного и того же состояния и имеют одинаковый средний градиент на каждой итерации.

0x02 инициализация

Поскольку мир Python может устанавливать переменные-члены в классы много раз, мы по-прежнему начинаем с__init__Посмотрите.

2.1 __init__

Его основная логика такова:

  • Установите тип устройства.

  • Установите идентификаторы устройств.

  • Установите self.process_group, по умолчанию — GroupMember.WORLD.

  • Настройте различные переменные-члены класса.

  • Проверить параметры.

  • Установите размер ведра.

  • Параметры сборки.

  • Передайте state_dict() ранга 0 другим воркерам, чтобы гарантировать, что все воркеры имеют одинаковое начальное состояние модели.

  • Построить редукторы.

    Конкретный код выглядит следующим образом:

class DistributedDataParallel(Module):

    def __init__(
        self,
        module,
        device_ids=None,
        output_device=None,
        dim=0,
        broadcast_buffers=True,
        process_group=None,
        bucket_cap_mb=25,
        find_unused_parameters=False,
        check_reduction=False,
        gradient_as_bucket_view=False,
    ):

        super(DistributedDataParallel, self).__init__()

        # 设置设备类型
        self.is_multi_device_module = len({p.device for p in module.parameters()}) > 1
        distinct_device_types = {p.device.type for p in module.parameters()}
        self.device_type = list(distinct_device_types)[0]

        # 设置设备IDs
        if (
            device_ids is None
            or len(device_ids) == 0  # For backward compatibility.
            or self.device_type == "cpu"
            or self.is_multi_device_module
        ):

            self.device_ids = None
            self.output_device = None
        else:
            self.device_ids = [_get_device_index(x, True) for x in device_ids]
            if output_device is None:
                output_device = device_ids[0]
            self.output_device = _get_device_index(output_device, True)

        # 设置process group    
        if process_group is None:
            self.process_group = _get_default_group()
        else:
            self.process_group = process_group

        # 配置各种成员变量    
        self.static_graph = False
        self.dim = dim
        self.module = module
        self.device = list(self.module.parameters())[0].device
        self.broadcast_buffers = broadcast_buffers
        self.find_unused_parameters = find_unused_parameters
        self.require_backward_grad_sync = True
        self.require_forward_param_sync = True
        self.ddp_uneven_inputs_config = _DDPUnevenInputsConfig(
            ddp_join_enabled=False,
            ddp_join_divide_by_initial_world_size=False,
            ddp_join_throw_on_early_termination=False,
        )
        self.gradient_as_bucket_view = gradient_as_bucket_view
        if hasattr(module, "_ddp_params_and_buffers_to_ignore"):
            self.parameters_to_ignore = module._ddp_params_and_buffers_to_ignore
        else:
            self.parameters_to_ignore = []

        # 检查 parameters
        # Check that a module does not have Uninitialized parameters
        for param in module.parameters():
            if isinstance(param, torch.nn.parameter.UninitializedParameter):
                raise RuntimeError(
                    "Modules with uninitialized parameters can't be used with `DistributedDataParallel`. "
                    "Run a dummy forward pass to correctly initialize the modules"
                )
        # used for intra-node param sync and inter-node sync as wel
        self.broadcast_bucket_size = int(250 * 1024 * 1024)

        # reduction bucket size
        self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024)
        # Whether to perform input tensor CPU to GPU copies on a side-stream
        self.use_side_stream_for_tensor_copies = (
            os.environ.get("PYTORCH_DDP_USE_SIDE_STREAM", "1") == "1"
        )

        # 构建参数
        # TODO(wayi@): Remove this field since SPMD is no longer supported,
        # and also remove all the relevant unnecessary loops.
        # Module replication within process (single-process multi device)
        # 这里需要注意,就是以后不支持了
        self._module_copies = [self.module]
        # Build parameters for reducer.
        parameters, expect_sparse_gradient = self._build_params_for_reducer()
        # Verify model equivalence.
        dist._verify_model_across_ranks(self.process_group, parameters)
        
        
        # Sync params and buffers. Ensures all DDP models start off at the same value.
        # 将 rank 0 的state_dict() 广播到其他worker,以保证所有worker的模型初始状态相同;
        self._sync_params_and_buffers(authoritative_rank=0)
        
        # In debug mode, build a mapping of parameter index -> parameter.
        if dist._get_debug_mode() != dist._DistributedDebugLevel.OFF:
            param_to_name_mapping = self._build_param_to_name_mapping(parameters)
        else:
            param_to_name_mapping = {}
            
        # Builds reducer.
        self._ddp_init_helper(parameters, expect_sparse_gradient, param_to_name_mapping)

Затем мы выбираем несколько важных шагов для анализа.

2.2 Параметры сборки

Для DDP первым ключевым шагом является построение параметров.Здесь следует отметить, что если текущая ситуация представляет собой одну машину с несколькими GPU, то есть один процесс с несколькими устройствами (так же, как DP), то модель должна быть быть скопированы в процессе.

Но в будущем он не будет поддерживаться и будет удален. Таким образом, параметры — это набор параметров [ToyModel], а параметры [0] — это параметры ToyModel. BucketReplica будет упомянут позже.

    # TODO(wayi@): Remove this field since SPMD is no longer supported,
    # and also remove all the relevant unnecessary loops.
    # Module replication within process (single-process multi device)
    
    self._module_copies = [self.module] # 构建一个比如 [ToyModel] 这样的列表
    # Build parameters for reducer.
    parameters, expect_sparse_gradient = self._build_params_for_reducer()

Посмотрим, какие важные параметры есть в модели:

  • параметр : параметр, который должен быть обновлен оптимизатором во время обратного распространения. мы можем пройтиmodel.parameters()получить эти параметры.
  • буфер : параметры, которые не нужно обновлять оптимизатору во время обратного распространения. мы можем пройтиmodel.buffers()получить эти параметры.

2.2.1 _build_params_for_reducer

В частности, _build_params_for_reducer строит параметры для редьюсера, логика примерно следующая:

  • Перейдите _module_copies, чтобы получить список (модуль, параметр) modules_and_parameters.Эти параметры должны быть дифференцированы и не могут быть в списке игнорирования.
  • Используйте коллекцию для удаления параметров, которые могут быть общими для нескольких модулей.
  • Создайте список параметров.
  • Проверьте, ожидает ли модуль разреженный градиент, поместите результат в expect_sparse_gradient.
  • Параметры, полученные из модуля, вместе с приведенным ниже буфером используются для синхронизации с другими воркерами.
  • Получить буфер модуля, module_buffers будет использоваться при последующей синхронизации.
  • Возвращает список аргументов и expect_sparse_gradient.
# 之前在初始化过程中,设定了 self._module_copies = [self.module]

def _build_params_for_reducer(self):
        
        # Build tuple of (module, parameter) for all parameters that require grads.
        modules_and_parameters = [
            [
                (module, parameter)
                # 得到module列表
                for module_name, module in replica.named_modules()
                # 得到参数列表,并且参数是需要求导,不在忽略列表之中
                for parameter in [
                    param
                    # Note that we access module.named_parameters instead of
                    # parameters(module). parameters(module) is only needed in the
                    # single-process multi device case, where it accesses replicated
                    # parameters through _former_parameters.
                    for param_name, param in module.named_parameters(recurse=False)
                    if param.requires_grad
                    and f"{module_name}.{param_name}" not in self.parameters_to_ignore
                ]
            ]
            for replica in self._module_copies
        ]

        # Deduplicate any parameters that might be shared across child modules.
        # 用集合去除可能在多个modules中共享的参数
        memo = set()
        modules_and_parameters = [
            # "p not in memo" is the deduplication check.
            # "not memo.add(p)" is always True, and it's only there to cause "add(p)" if needed.
            [(m, p) for m, p in replica_mps if p not in memo and not memo.add(p)]
            for replica_mps in modules_and_parameters
        ]

        # Build list of parameters.
        # 构建一个参数列表
        parameters = [
            list(parameter for _, parameter in replica)
            for replica in modules_and_parameters
        ]

        # Checks if a module will produce a sparse gradient.
        def produces_sparse_gradient(module):
            if isinstance(module, torch.nn.Embedding) or isinstance(
                module, torch.nn.EmbeddingBag
            ):
                return module.sparse
            return False

        # Build list of booleans indicating whether or not to expect sparse
        # gradients for the corresponding parameters.
        # 参数是否期盼sparse gradients
        expect_sparse_gradient = [
            list(produces_sparse_gradient(module) for module, _ in replica)
            for replica in modules_and_parameters
        ]

        # The following modules_params and modules_buffers are used for
        # param/buffer sync in _sync_params.
        # 得到module的参数,与下面的buffer一起,都是用来同步到其他worker的
        self.modules_params = [
            list(self._get_parameters(m)) for m in self._module_copies
        ]
        # Collect buffers for modules, filtering out buffers that should be ignored.
        # 得到module的buffer,module_buffers 在后续同步时候会用到
        named_module_buffers = [
            [(buffer, buffer_name) for buffer_name, buffer in m.named_buffers()]
            for m in self._module_copies
        ]
        self.modules_buffers = [
            [
                buffer
                for (buffer, buffer_name) in module_buffers
                if buffer_name not in self.parameters_to_ignore
            ]
            for module_buffers in named_module_buffers
        ]

        return parameters, expect_sparse_gradient

На данный момент пример параметров выглядит следующим образом, вы можете видеть, что имеет значение только элемент [0], а сам оригинал [0] включает 4 элемента:

parameters = {list: 1} 
0 = {list: 4}           
 0 = {Parameter: 10} Parameter containing:\ntensor([[-4.0381e-02,  3.8828e-02, 1  )   
 1 = {Parameter: 10} Parameter containing:\ntensor([-0.0438, -0.2033,  0.2771,  0.0721,  ) 
 2 = {Parameter: 5} Parameter containing:\ntensor([[-0.0094, -0.1319,  0.0713,  0.3155,  )
 3 = {Parameter: 5} Parameter containing:\ntensor([-0.0008,  0.0582, -0.1245, -0.2538, )
 __len__ = {int} 4
__len__ = {int} 1

2.2.2 modules_buffers

Еще одна вещь, где используется self.modules_buffers? Позже он будет использоваться при трансляции параметров, таких как:

    # When running in join mode, checks and performs sync of module buffers if
    # the models have buffers that should be synchronized in the forward pass.
    def _check_and_sync_module_buffers(self):
        if self.will_sync_module_buffers():
            authoritative_rank = self._find_common_rank(self._distributed_rank, False)
            self._distributed_broadcast_coalesced(
                self.modules_buffers[0], self.broadcast_bucket_size, authoritative_rank
            )

Здесь _find_common_rank используется для получения всех действительных рангов, используемых в настоящее время DDP.

def _find_common_rank(self, input_rank, rank_cond):
    # -1 indicates that this rank is not under consideration to be the
    # common_rank
    rank_to_use = torch.tensor(
        [input_rank if rank_cond else -1],
        device=self.device,
    )
    # 使用MAX操作得到最大数值
    dist.all_reduce(rank_to_use, op=ReduceOp.MAX, group=self.process_group)
    if rank_to_use.item() == -1:
        raise ValueError(
            "BUG! Expected rank_cond to be true for at least one process."
        )
    return rank_to_use.item() # 返回全部ranks

2.3 Модель проверки

Далее следует этап проверки модели.

2.3.1 Предыстория

Поскольку следующий код используется позже, давайте сначала рассмотрим широковещательную рассылку фоновых знаний. У друзей, не знакомых с этой частью, возникнут вопросы: почему трансляция может транслироваться с ранга 0 на другие ранги, очевидно, что все ранги называют одним и тем же кодом трансляции.

process_group->broadcast(vec)->wait(); // 把 rank 0 的 meta 广播到对应的设备

Приходим в torch/lib/c10d/ProcessGroupMPI.cpp. Как видите, он использует MPI_Bcast API MPI для широковещательных операций, где ключом является opts.rootRank.

c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupMPI::broadcast(
    std::vector<at::Tensor>& tensors,
    const BroadcastOptions& opts) {
  checkSingleTensor(tensors);
  std::function<void(std::unique_ptr<WorkEntry>&)> runFunc =
      [opts, this](std::unique_ptr<WorkEntry>& entry) {
        auto data = (entry->src)[0];
        c10::DeviceGuard guard(data.device());
        std::unique_lock<std::mutex> globalLock(pgGlobalMutex_);
        MPI_CHECK(MPI_Bcast( // 调用MPI API
            data.data_ptr(),
            data.numel(),
            mpiDatatype.at(data.scalar_type()),
            opts.rootRank, // 这里是关键,只是从root广播其他rank
            pgComm_));
      };
  auto entry = std::make_unique<WorkEntry>(&tensors, &tensors, std::move(runFunc));
  return enqueue(
      std::move(entry),
      "mpi:broadcast",
      c10::optional<std::vector<at::Tensor>>(tensors));
}

opts является экземпляром BroadcastOptions.

class BroadcastOptions:
    rootRank: int
    rootTensor: int
    timeout: timedelta

В мире C++ это соответствует следующему:

struct BroadcastOptions {
  int rootRank = 0;
  int rootTensor = 0;
  std::chrono::milliseconds timeout = kUnsetTimeout;
};

Как видно из определения, BroadcastOptions автоматически инициализируется C++ равным 0, поэтому все ранговые процессы используют rootRank = 0 для вызова MPI_Bcast, и результатом является широковещательная рассылка с ранга = 0 на другие ранги.

c10::intrusive_ptr<ProcessGroup::Work> broadcast(
    std::vector<at::Tensor>& data,
    const BroadcastOptions& opts = BroadcastOptions()) override;

2.3.2 Специальный код

Давайте посмотрим, как проверить модель дальше.

Роль _verify_model_across_ranks заключается в проверке того, что соответствующие параметры модели (реплика 0) имеют одинаковый размер/шаги для всех процессов после трансляции.

    # Verify model equivalence.
    dist._verify_model_across_ranks(self.process_group, parameters)

Из следующего кода видно, что _verify_model_across_ranks на самом деле вызывает verify_replica0_across_processes.

module.def(
    "_verify_model_across_ranks",
    &::c10d::verify_replica0_across_processes,
    py::arg("process_group"),
    py::arg("replicas"),
    py::call_guard<py::gil_scoped_release>());

В verify_replica0_across_processes параметр model_replicas — это предыдущие параметры, а логика следующая:

  • Во-первых, получите метаданные из model_replicas.
  • Затем клонируйте метаданные в metadata_dev.
  • Затем передайте metadata_dev процесса 0 на соответствующее устройство.
    • Каждый процесс будет запускать один и тот же код, но в process_group->broadcast только 0-й ранг будет установлен в root_rank, поэтому будут транслироваться только данные 0-го ранга.
    • После трансляции метаданные_dev всех процессов одинаковы, то есть данные в процессе 0.
  • Затем скопируйте metadata_dev обратно в элемент управления, сравните элемент управления с model_replicas[0], чтобы убедиться, что он равен оригиналу.
    • Проверьте, имеет ли элемент управления тот же размер, что и model_replicas.
    • Здесь используется аксессор. LibTorch использует аксессор для быстрого доступа к тензору. Если тензор находится на ЦП, используйте аксессор. Если тензор находится на графическом процессоре, используйте для доступа Packed_accessor. Эта часть упоминается в «Всесторонняя интерпретация внутреннего механизма PyTorch от основных разработчиков».

Конкретный код выглядит следующим образом:

// Verifies corresponding params in replica 0 have the same sizes/strides
// across processes.
void verify_replica0_across_processes(
    c10::intrusive_ptr<c10d::ProcessGroup> process_group,
    std::vector<std::vector<at::Tensor>> model_replicas) {
  size_t i = 0;
  for (const auto& t : model_replicas[0]) {
    i += 2 * t.dim();
  }
  at::TensorOptions options;
  options = options.dtype(at::kLong);
  auto metadata = at::empty({static_cast<long>(i)}, options);

  // Technically, process 0 is the broadcast source, so only process 0 needs
  // to populate metadata.  But no harm keeping work aligned across processes.
  auto metadata_accessor = metadata.accessor<int64_t, 1>();
  i = 0;
  // 把model_replicas[0]拷贝到metadata_accessor,其实就是metadata
  for (const auto& t : model_replicas[0]) {
    for (const auto& sz : t.sizes()) {
      metadata_accessor[i++] = sz;
    }
    for (const auto& str : t.strides()) {
      metadata_accessor[i++] = str;
    }
  }

  // 然后把metadata克隆到metadata_dev
  auto metadata_dev = metadata.clone().to(model_replicas[0][0].device());
  std::vector<at::Tensor> vec{metadata_dev};
  //  广播metadata_dev
  process_group->broadcast(vec)->wait(); // 把process 0 的 meta 广播到对应的设备

  // 这之后,metadata_dev 就是所有进程的结果大家都一样了
  // Technically, process 0 doesn't need to double-check metadata, because it
  // was the source.  But no harm keeping work aligned.
  auto control = at::empty({static_cast<long>(i)}, options);
  // 把 metadata_dev 拷贝回 control
  control.copy_(metadata_dev, /*non_blocking=*/false);
  
  // 然后把 control 和 model_replicas[0]比较,看看是否和原来相等
  auto control_accessor = control.accessor<int64_t, 1>();
  i = 0;
  for (size_t p = 0; p < model_replicas[0].size(); p++) {
    const auto& t = model_replicas[0][p];
    // I'd like to include which process we are in the message,
    // but ProcessGroup::getRank is not public!
    for (const auto& sz : t.sizes()) {
      TORCH_CHECK(
          sz == control_accessor[i++],
          "replicas[0][",
          p,
          "] in this process"
          " with sizes ",
          t.sizes(),
          " appears not to match sizes of the same param in process 0.");
    }
    for (const auto& str : t.strides()) {
      TORCH_CHECK(
          str == control_accessor[i++],
          "replicas[0][",
          p,
          "] in this process"
          " with strides ",
          t.strides(),
          " appears not to match strides of the same param in process 0.");
    }
  }
}

2.4 Статус трансляции

Следующим шагом является широковещательная передача состояния, которая транслирует исходные параметры и переменные модели с ранга 0 на другие ранги.

    # Sync params and buffers. Ensures all DDP models start off at the same value.
    # 将 rank 0 的state_dict() 广播到其他worker,以保证所有worker的模型初始状态相同;
    self._sync_params_and_buffers(authoritative_rank=0)

2.4.1 state_dict

Начнем с того, что нужно транслировать.

state_dict от pytorch — это объект словаря, который сопоставляет каждый слой модели с соответствующими параметрами, такими как веса и смещения каждого слоя модели. В state_dict модели будут сохранены только те слои с параметрами, которые можно обучить (такие как сверточные слои, линейные слои и т. д.), а слой пула и слой BN, которые сами не имеют параметров, не будут сохранены в state_dict. Например, для модели ниже.

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

state_dict выглядит следующим образом:

self.module.state_dict() = {OrderedDict: 4} 
 'net1.weight' = {Tensor: 10} tensor([[ 0.2687,  0.0840, -0.1032,  0.3079,  0.0385, -0.0495, -0.3068, -0.1271,\n         -0.1067, -0.1966],\n        [-0.1203,  0.1789,  0.0666,  0.1882,  0.1335,  0.1921, -0.1145, -0.1781,\n          0.0661, -0.2339],\n        [ 0.1865, -0.2076,  0.2071,  0
 'net1.bias' = {Tensor: 10} tensor([ 0.2146, -0.1599,  0.2350, -0.2843, -0.0773, -0.2151,  0.1864, -0.3068,\n        -0.2093,  0.1365])
 'net2.weight' = {Tensor: 5} tensor([[ 0.1922, -0.0148, -0.1884,  0.2124, -0.1361,  0.0172, -0.2371,  0.1946,\n          0.2047, -0.2697],\n        [-0.2690,  0.1372,  0.2269,  0.0436, -0.1353, -0.2054, -0.2418, -0.2300,\n          0.1987,  0.0007],\n        [ 0.0995, -0.2659, -0.2374, -0
 'net2.bias' = {Tensor: 5} tensor([0.1488, 0.0791, 0.1667, 0.1449, 0.0545])

2.4.2 _sync_params_and_buffers

_sync_params_and_buffers собирает обучаемые параметры в соответствии с state_dict модуля, а затем транслирует эти параметры.

Конкретный код:

    def _sync_params_and_buffers(self, authoritative_rank=0):
        module_states = []
        for name, param in self.module.state_dict().items():
            if name not in self.parameters_to_ignore:
                module_states.append(param)

# module_states = {list: 4} [tensor([[ 0.2687,  0.0840, -0.1032,  0.3079,  0.0385, -0.0495, -0.3068, -0.1271,\n         -0.1067, -0.1966],\n        [-0.1203,  0.1789,  0.0666,  0.1882,  0.1335,  0.1921, -0.1145, -0.1781,\n          0.0661, -0.2339],\n        [ 0.1865, -0.2076,  0.2071,  
                
        if len(module_states) > 0:
            self._distributed_broadcast_coalesced(
                module_states, self.broadcast_bucket_size, authoritative_rank
            )

Посмотрим,_distributed_broadcast_coalesced называетсяdist._broadcast_coalesced

import torch.distributed as dist

def _distributed_broadcast_coalesced(
        self, tensors, buffer_size, authoritative_rank=0
    ):
        dist._broadcast_coalesced(
            self.process_group, tensors, buffer_size, authoritative_rank
        )

2.4.3 dist._broadcast_coalesced

Посмотрим по коду, сначала дойдем до torch\distributed_init_.py, куда будет импортирован _broadcast_coalesced.

if is_available():
    from torch._C._distributed_c10d import (
        Store,
        FileStore,
        TCPStore,
        ProcessGroup,
        PrefixStore,
        Reducer,
        Logger,
        BuiltinCommHookType,
        GradBucket,
        _DEFAULT_FIRST_BUCKET_BYTES,
        _register_comm_hook,
        _register_builtin_comm_hook,
        _broadcast_coalesced, # 在这里导入
        _compute_bucket_assignment_by_size,
        _verify_model_across_ranks,
        _test_python_store,
        _DistributedDebugLevel,
        _get_debug_mode
    )
    if sys.platform != 'win32':
        from torch._C._distributed_c10d import (
            HashStore,
            _round_robin_process_groups,
        )

    from .distributed_c10d import *  # noqa: F403
    # Variables prefixed with underscore are not auto imported
    # See the comment in `distributed_c10d.py` above `_backend` on why we expose
    # this.

    from .distributed_c10d import _backend, _all_gather_base

Продолжаем находить torch\csrc\distributed\c10d\init.cpp

  module.def(
      "_broadcast_coalesced",
      // Define a lambda such that the pybind11 prototype can take a std::vector
      // for the tensor list argument, but still pass it to the underlying
      // function as a c10::ArrayRef.
      [](c10::intrusive_ptr<::c10d::ProcessGroup> process_group,
         std::vector<at::Tensor> tensors, // NOLINT
         size_t buffer_size,
         int rank) {
        broadcast_coalesced( // 在这里
            std::move(process_group), tensors, buffer_size, rank);
      },
      py::arg("process_group"),
      py::arg("tensors"),
      py::arg("buffer_size"),
      // The source of truth rank to broadcast the tensors from.
      py::arg("src") = 0,
      py::call_guard<py::gil_scoped_release>());

Наконец добрался до torch/lib/c10d/comm.cpp, где ProcessGroup используется для трансляции тензоров.

// Broadcast many tensors to all processes in the process group.
void broadcast_coalesced(
    c10::intrusive_ptr<c10d::ProcessGroup> process_group,
    at::TensorList tensors,
    size_t buffer_size,
    int rank) {
  // Coalesce tensors into buckets taking into account the maximum buffer size.
  // This routine is multi-device aware, so the tensors can be split across
  // multiple devices and can contain a mix of CPU and CUDA tensors.
  // 首先计算出桶
  const auto buckets =
      compute_bucket_assignment_by_size(tensors.vec(), {buffer_size});

  // Returns tensor at specified index in input tensor list.
  const auto lookup = [&tensors](size_t index) { return tensors[index]; };

  // We maintain a maximum of 2 in flight broadcast operations to avoid
  // allocating too much memory (in case the specified tensors are very large).
  std::deque<BroadcastWork> in_flight; // 建立一个广播work列表
  constexpr auto max_in_flight = 2;
  for (const auto& bucket : buckets) { // 遍历桶
    if (in_flight.size() >= max_in_flight) { // 由注释可以知道,广播维度是2,这样避免内存占用过大
      in_flight.front().finish(); // 广播变量
      in_flight.pop_front();
    }

    in_flight.emplace_back(process_group, c10::fmap(bucket, lookup), rank);
  }

  while (!in_flight.empty()) {
    in_flight.front().finish();
    in_flight.pop_front();
  }
}

Для BroadcastWork добавим, что это использование ProcessGroup для трансляции тензоров.Подробнее о ProcessGroup см. в предыдущей статье.

class BroadcastWork {
 public:
  BroadcastWork(
      const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
      std::vector<at::Tensor> bucket_tensors,
      int root_rank = 0)
      : bucket_tensors_(std::move(bucket_tensors)),
        flat_tensor_({torch::utils::flatten_dense_tensors(bucket_tensors_)}) {
    BroadcastOptions broadcastOptions;
    broadcastOptions.rootRank = root_rank;
    work_ = process_group->broadcast(flat_tensor_, broadcastOptions);
  }

  void finish() {
    work_->wait();

    // Copy the output of the broadcast operation back.
    auto output_tensors = torch::utils::unflatten_dense_tensors(
        flat_tensor_.front(), bucket_tensors_);
    TORCH_INTERNAL_ASSERT(output_tensors.size() == bucket_tensors_.size());
    for (size_t i = 0; i < output_tensors.size(); i++) {
      bucket_tensors_[i].copy_(output_tensors[i], /*non_blocking=*/true);
    }
  }

 protected:
  // The list of tensors to broadcast. They are guaranteed to be
  // placed on the same device and have the same dtype.
  std::vector<at::Tensor> bucket_tensors_;

  // The vector with a single flattened tensor containing the contents
  // of the tensors in bucket_tensors_. It must be stored in a vector
  // because c10d::ProcessGroup::broadcast takes a vector argument.
  std::vector<at::Tensor> flat_tensor_;

 private:
  // The broadcast work that is kicked off upon construction.
  c10::intrusive_ptr<c10d::ProcessGroup::Work> work_;
};

2.5 Функция инициализации

Далее _ddp_init_helper будет call_ddp_init_helper для инициализации бизнес-функций.

2.5.1 _ddp_init_helper

Функция _ddp_init_helper используется для инициализации сервиса, основная логика следующая:

  • Параметры разделены на сегменты, а распределение параметров равномерно распределено по сегментам в соответствии с порядком, обратным прямому распространению (градиент, рассчитанный первым при прямом распространении, будет сначала распространяться обратно), что может улучшить скорость связи и слияние. скорость;
  • сбросить статус ведра;
  • Сгенерируйте Reducer, который регистрирует autograd_hook внутри, который используется для синхронизации градиентов во время обратного распространения;
  • Настроить логирование;
  • Передать дескриптор DDP на уровень SyncBatchNorm;

Конкретный код выглядит следующим образом:

    def _ddp_init_helper(self, parameters, expect_sparse_gradient, param_to_name_mapping):
        """
        Initialization helper function that does the following:
        (1) bucketing the parameters for reductions
        (2) resetting the bucketing states
        (3) registering the grad hooks
        (4) Logging constructin-time DDP logging data
        (5) passing a handle of DDP to SyncBatchNorm Layer
        """
        self.num_iterations = 0
        # The bucket size limit is specified in the constructor.
        # Additionally, we allow for a single small bucket for parameters
        # that are defined first, such that their gradients don't spill into
        # a much larger bucket, adding unnecessary latency after gradient
        # computation finishes. Experiments showed 1MB is a reasonable value.
        bucket_indices = dist._compute_bucket_assignment_by_size(
            parameters[0],
            [dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap],
            expect_sparse_gradient[0],
        )

        # Note: reverse list of buckets because we want to approximate the
        # order in which their gradients are produced, and assume they
        # are used in the forward pass in the order they are defined.
        self.reducer = dist.Reducer(
            parameters,
            list(reversed(bucket_indices)), # 利用桶index
            self.process_group,
            expect_sparse_gradient,
            self.bucket_bytes_cap,
            self.find_unused_parameters,
            self.gradient_as_bucket_view,
            param_to_name_mapping,
        )

        self.logger = dist.Logger(self.reducer)

        # Set logging data that can be got during construction time.
        self.logger.set_construction_data_and_log(
            self.module.__class__.__name__,
            [] if self.device_ids is None else self.device_ids,
            -1 if self.output_device is None else self.output_device,
            self.broadcast_buffers,
        )

        # passing a handle to torch.nn.SyncBatchNorm layer
        self._passing_sync_batchnorm_handle(self._module_copies)

2.5.2 Расчет ковшей

Во-первых, _compute_bucket_assignment_by_size завершает функцию группирования. Здесь parameters[0] — соответствующий список тензоров.

_DEFAULT_FIRST_BUCKET_BYTES = 1048576
# reduction bucket size
self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024)
        
bucket_indices = dist._compute_bucket_assignment_by_size(
            parameters[0],
            # 桶的大小限制是一个数组
            [dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap],
            expect_sparse_gradient[0],
        )
2.5.2.1 Содержание статьи

Далее проанализируем содержание статьи.

Идея градиентной корзины основана на наблюдении, что коллективная коммуникация более эффективна на больших тензорах.

Эксперименты показывают, что если DDP ожидает и сохраняет несколько градиентов в одну операцию AllReduce за короткое время, он может достичь более высокой пропускной способности и меньшей задержки вместо запуска выделенного AllReduce, как только станет доступным каждое хранилище градиентов. Это особенно полезно для моделей с большим количеством малых параметров. Однако DDP не должен передавать все данные в одном AllReduce, иначе связь не может быть инициирована до конца вычислений.

Сопоставление параметра с сегментом оказывает значительное влияние на скорость DDP. При каждом обратном проходе тензоры из всех градиентов параметров копируются в ведра, а средние градиенты копируются обратно в ведра после AllReduce. Для ускорения операций копирования сегменты всегда создаются на том же устройстве, что и параметры. Если модель охватывает несколько устройств, DDP учитывает привязку устройств, чтобы гарантировать, что все параметры в одном сегменте находятся на одном устройстве. Порядок AllReduce также влияет на результат, так как он определяет, насколько обмен данными может пересекаться с вычислением. DDP запускает AllReduce в обратном порядке model.parameters().

Следовательно, для повышения эффективности связи, DDPReducerГрадиенты параметров организованы в сегменты, которые уменьшаются по одному сегменту за раз. Сопоставление градиентов параметров с сегментами определяется во время сборки на основе ограничений размера сегментов и размеров параметров. Пользователи могут настроить размер корзины, установив Bucket_cap_mb.

Параметры модели начинаются с (примерно)Model.parameters()Распределите по корзинам в порядке, обратном данной модели. Причина использования обратного порядка:

  • Порядок обратного распространения является обратным порядком вычислений прямого распространения.
  • DDP ожидает, что градиенты будут готовы во время обратного прохода примерно в порядке прямого прохода.
2.5.2.2 Сгруппировать по

DDP группируется по типу и устройству как ключи, т.к. тензоры на разных устройствах не должны группироваться вместе, а однотипные тензоры должны группироваться в ведро. Использование типа и устройства в качестве ключа гарантирует, что тензоры одного типа на одном и том же устройстве размещаются в одном сегменте.

// Tensors may be coalesced into buckets. Buckets must contain tensors of
// the same type, on the same device, so a bucket can identified by a
// composite key of a tensor's type identifier and its device.
struct BucketKey {
  BucketKey(c10::ScalarType type, c10::Device device)
      : type(std::move(type)), device(std::move(device)) {}

  const c10::ScalarType type;
  const c10::Device device;

  // See torch/csrc/utils/hash.h for dispatch code.
  static size_t hash(const BucketKey& key) {
    return c10::get_hash(key.type, key.device); // 用类型和设备作为key
  }
};
2.5.2.3 compute_bucket_assignment_by_size

Его ключевая структура выглядит следующим образом: BucketAccumulator можно рассматривать как фактическое ведро.

struct BucketAccumulator {
    std::vector<size_t> indices; // 桶内容,是张量列表
    size_t size = 0; // 桶大小,比如若干mb
  }; // 桶的逻辑内容

  // Keep vector of indices and size accumulator by tensor type and device.
std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
      buckets; // 所有桶的列表,每一个实际桶可以认为是 BucketAccumulator

Давайте посмотрим на конкретную логику calculate_bucket_assignment_by_size:

  • Определяет список ограничений размера корзины. Bucket_size_limit_iterators.
  • Определяет список всех сегментов сегментов, каждый фактический сегмент может считаться BucketAccumulator.
  • Перебрать все переданные тензоры:
    • Дайте всем тензорам индекс, увеличивающийся от 0 до tensors.size () Если индексы были переданы, получите индекс тензора.
    • Если он настроен на ожидание разреженного градиента, поместите этот тензор в корзину отдельно, потому что его нельзя объединить с другими тензорами.
    • Используйте информацию о тензоре, чтобы создать ключ корзины и найти соответствующую корзину.
      • Получите BucketAccumulator, вставьте индекс нового тензора в список тензоров корзины, а индексы — это список индексов тензора.
      • Увеличьте соответствующий размер ковша.
    • При необходимости установите начальное значение ограничения размера.
    • Получить текущий минимальный лимит.
    • Если размер ведра больше минимального предела, это означает, что размер текущего ведра достиг максимального предела ведра, и говорят, что его необходимо перенести в новое ведро.
      • На самом деле он действительно переносится в новый логический сегмент, но по-прежнему выполняется в существующем сегменте.Поскольку тип и устройство остаются прежними, он должен продолжать накапливаться в исходном сегменте, но индекс исходного сегмента был изменен. переносится в результат. , что эквивалентно очистке.
      • Вставьте содержимое корзины в возвращаемый результат, то есть, если размер корзины слишком велик, сначала вставьте его в результат.
      • Для регенерации ведра ведро является ссылкой, поэтому прямое присвоение значения эквивалентно очистке исходного ведра, то есть исходное ведро продолжает использоваться, но исходные индексы в ведре были перенесены в результат.
      • Перейти к следующему пределу размера.
    • Вставьте оставшиеся индексы корзины в возвращаемое значение, потому что некоторые из них были вставлены непосредственно в результат ранее.
    • Сортировать результаты:
      • Если tensor_indices не пусто, это означает, что порядок тензоров уже соответствует порядку, в котором готовятся градиенты, и нет необходимости их переупорядочивать.
      • Если tensor_indices пусто, порядок основан на наименьшем индексе тензора, предполагая, что порядок тензоров соответствует порядку их использования (или обратному порядку генерации их градиента). Этот порядок гарантирует, что ведра будут подготовлены в последовательном порядке.
      • Обратите внимание, что это положительный порядок, и когда редюсер создается, он передается в обратном порядке: список (обратный (bucket_indices)).
    • Наконец, результат возвращается, и результат следующий: каждый вектор в нем соответствует корзине, и это индекс тензора, здесь он отсортирован от меньшего к большему.
std::vector<std::vector<size_t>> compute_bucket_assignment_by_size(
    const std::vector<at::Tensor>& tensors,
    const std::vector<size_t>& bucket_size_limits, // 桶大小限制
    const std::vector<bool>& expect_sparse_gradient,
    const std::vector<int64_t>& tensor_indices) { //实际上,初始化时候没有传入 tensor_indices
  // Either expect_sparse_gradient is not specified or it has as many elements
  // as the vector with tensors.
  TORCH_INTERNAL_ASSERT(
      expect_sparse_gradient.empty() ||
      (tensors.size() == expect_sparse_gradient.size()));
  TORCH_INTERNAL_ASSERT(tensors.size() > 0);

  std::vector<std::vector<size_t>> result;
  result.reserve(tensors.size()); // 预留大小

  // Keep iterator into the size_limit vector by tensor type and device.
  // This is done so that we can use the consecutive bucket limits per type.
  std::unordered_map<
      BucketKey,
      std::vector<size_t>::const_iterator,
      c10::hash<BucketKey>>
      bucket_size_limit_iterators;

  // Local accumulator type for a single bucket.
  struct BucketAccumulator {
    std::vector<size_t> indices; // 桶内容,是张量列表
    size_t size = 0; // 桶大小,比如若干mb
  }; // 桶的逻辑内容

  // Keep vector of indices and size accumulator by tensor type and device.
  std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
      buckets; // 所有桶的列表,每一个实际桶可以认为是 BucketAccumulator

  for (size_t i = 0; i < tensors.size(); i++) { // 遍历传入的所有张量
    const auto& tensor = tensors[i]; //拿到张量
    TORCH_CHECK(!tensor.is_sparse(), "No support for sparse tensors.");

    // when tensor_indices is empty, the index of tensors[i] assigned to
    // bucket is i, otherwise the tensor index is tensor_indices[i].
    auto tensor_index = i; // 就是给所有的tensor一个index,从0开始递增,一直到 tensors.size()
    if (!tensor_indices.empty()) {
      tensor_index = tensor_indices[i]; // 如果有index,就拿到张量的index
    }
    // If we expect a sparse gradient to be produced for this tensor, it cannot
    // be grouped together with other gradients and gets its own bucket.
    // 如果配置了期待sparse gradient,则把这个张量自己放入一个桶,因为没法和其他张量放在一起
    if (!expect_sparse_gradient.empty() &&
        expect_sparse_gradient[tensor_index]) {
      result.push_back({tensor_index});
      continue;
    }

    auto key = BucketKey(tensor.scalar_type(), tensor.device()); //使用张量信息构建桶的key
    auto& bucket = buckets[key]; // 找到对应的桶, 拿到BucketAccumulator
    bucket.indices.push_back(tensor_index); // 往该桶的张量列表里面插入新张量的index,indices 是 tensor index list
    bucket.size += tensor.numel() * tensor.element_size();// 增加对应桶大小

    // Initialize bucket size limit iterator if necessary.
    // 如果需要,就设定成大小限制的初始值
    if (bucket_size_limit_iterators.count(key) == 0) {
      bucket_size_limit_iterators[key] = bucket_size_limits.begin();
    }

    // bucket_size_limit_iterator 就是桶大小的范围, 即 [_DEFAULT_FIRST_BUCKET_BYTES, int(bucket_cap_mb * 1024 * 1024)]
    auto& bucket_size_limit_iterator = bucket_size_limit_iterators[key];
    const auto bucket_size_limit = *bucket_size_limit_iterator; // 当前最小值限制
    if (bucket.size >= bucket_size_limit) { 
      // 如果桶的尺寸大于最小值限制,就是说目前桶的尺寸已经达到了桶的最大限制,按说需要转移到新桶了(实际上确实转移到了逻辑上的新桶,但是实际还是在现有桶内执行,因为 type, device 还是同样的,还是应该在原有桶内继续累积,不过原有桶的indice已经转移到了result之中,就相当于清空了)
      result.emplace_back(std::move(bucket.indices)); // 把桶内容插入到返回result,就是说,当桶尺寸过大的时候,就先插入到result之中。
      bucket = BucketAccumulator(); // 重新生成桶,bucket是个引用,所以直接赋值,就相当于清空原有的桶,就是原来桶继续用,但是桶内原有的indices已经转移到了result之中。

      // Advance to the next bucket size limit for this type/device.
      // 前进到下一个尺寸限制
      auto next = bucket_size_limit_iterator + 1;
      if (next != bucket_size_limits.end()) {
        bucket_size_limit_iterator = next;
      }
    }
  }

  // Add remaining buckets. 把剩余的桶内indices插入到返回值,因为之前已经有些直接插入到了result之中
  for (auto& it : buckets) {
    auto& bucket = it.second;
    if (!bucket.indices.empty()) {
      result.emplace_back(std::move(bucket.indices));
    }
  }

  // If tensor_indices is not empty, the order of the tensors is in the gradient
  // ready order, so no need to sort.
  // If tensor_indices is empty, sort resulting buckets by the minimum tensor
  // index they include. We assume that the order of the tensors is the order in
  // which they are used (or the reverse order in which their gradients are
  // produced). This sorting step ensures that the buckets are ready in
  // consecutive order.
  // 如果 tensor_indices 非空,说明张量的顺序已经是梯度准备好的顺序,不需要再排序了
  // 如果 tensor_indices 是空的,依据最小张量index来排序,这里假定张量的顺序是他们使用的顺序(或者说是他们梯度产生次序的反序)。这种排序可保证桶是按照连续不断的顺序准备好。
  // 注意,这里就是正序排列,等到创建Reducer的时候,才反序传入:list(reversed(bucket_indices))
  if (tensor_indices.empty()) {
    std::sort(
        result.begin(),
        result.end(),
        [](const std::vector<size_t>& a, const std::vector<size_t>& b) {
          // 对于任意两个vector,排序的依据是:用这两个vector之中最小index来排序
          const auto amin = std::min_element(a.begin(), a.end()); // a中的最小index
          const auto bmin = std::min_element(b.begin(), b.end()); // b中的最小index
          return *amin < *bmin;
        });
  }

  return result; // result 最终如下,里面每个vector 都对应了一个bucket,里面是都是 tensor 的 index,这里都是从小到大顺序排序。
}

Конечный результат таков: каждый вектор в нем соответствует корзине, которая является индексом тензора, здесь он отсортирован от меньшего к большему.

Обратите внимание: поскольку входящие тензоры параметров являются параметрами [0], а параметры [0] основаны на возвращаемом результате параметров (), то есть параметры модели (приблизительно)Model.parameters()Распределите по корзинам в порядке, обратном данной модели. Причина использования обратного порядка заключается в том, что DDP ожидает, что градиенты будут готовы примерно в этом порядке во время обратного прохода. Последний DDP должен запустить AllReduce в обратном порядке model.parameters().

+-----------------------------------------------------------------------+
|                                                                       |
|  <tensor index 1, tensor index 2, tensor index 3, tensor index 4>     |
|                                                                       |
|                                                                       |
|  <tensor index 5, tensor index 6, tensor 7>                           |
|                                                                       |
|                                                                       |
|  ......                                                               |
|                                                                       |
|                                                                       |
|  <tensor index 8, tensor index 9, tensor index 10, tensor index 11>   |
|                                                                       |
+-----------------------------------------------------------------------+

2.5.3 Reducer

Следующий код предназначен для создания редуктора.

    self.reducer = dist.Reducer(
        parameters,
        list(reversed(bucket_indices)), # 利用桶index
        self.process_group,
        expect_sparse_gradient,
        self.bucket_bytes_cap,
        self.find_unused_parameters,
        self.gradient_as_bucket_view,
        param_to_name_mapping,
    )

Мы подробно рассмотрим редукторы в следующей статье.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

Распределенная серия pytorch 3 — что делает torch.utils.data.distributed.DistributedSampler во время распределенного обучения?

Распределенная серия Pytorch 1 — узнайте переменные среды, связанные с torch.distributed.launch

pytorch распределенная серия 2 - как синхронизируется DistributedDataParallel?

pytorch (распределенные) данные параллельной личной практики - DataParallel/DistributedDataParallel

nn.DataParallel от Pytorch

обсуждение.py torch.org/he/data пар AL…

py torch.org/docs/stable…

Понимать распределенное обучение интерпретации исходного кода PyTorch?

Практическое руководство | Реализация слоя PyTorch AutoGrad C++

PYTORCH автоматическая дифференциация (1)

Как PyTorch ускоряет параллельное обучение данных? Раскрыты распространенные читы

Распределенное обучение pytorch (два init_process_group)

py torch.org/tutorials/i…

py torch.org/docs/master…

py torch.org/tutorials/i…

DP и DDP интерпретации исходного кода PyTorch: параллелизм моделей и анализ распределенного обучения

Параметры и буферы в модели Pytorch

py torch.org/docs/master…