[Анализ исходного кода] Распределенный PyTorch (12) ----- Прямое распространение до DistributedDataParallel

машинное обучение PyTorch

0x00 сводка

В предыдущей статье было представлено, как устроен Редьюсер, и несколько важных сценариев, а в этой статье будет проанализировано, как Редюсер реализует прямое распространение.

Другие статьи из этой серии:

Другие статьи из этой серии:

[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор

[Анализ исходного кода] Как PyTorch использует GPU

[Анализ исходного кода] Распределенный PyTorch (2) ----- DataParallel (включен)

[Анализ исходного кода] Распределенный PyTorch (3) ----- DataParallel (ниже)

[Анализ исходного кода] Распределенный PyTorch (4) ------ Основная концепция распределенного приложения

[Анализ исходного кода] Распределенный PyTorch (5) ------ Обзор DistributedDataParallel и способы его использования

[Анализ исходного кода] Распределенный PyTorch (6) ---DistributedDataParallel -- инициализация и хранение

[Анализ исходного кода] Распределенный PyTorch (7) ----- Группа процессов DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (8) -------- Бумага DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (9) ----- Инициализация DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (10) ------ Редуктор статической архитектуры DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (11) ----- DistributedDataParallel для создания операций Reducer и Join

0x01 Общая логика

Нам все еще нужно прибегнуть к магическому оружию и посмотреть на общую логику DDP в статье:

Тогда общая стратегия для прямого распространения дается следующим образом:

Forward Pass:

  • Каждый процесс считывает свои обучающие данные, а DistributedSampler гарантирует, что каждый процесс считывает разные данные.
  • DDP принимает ввод и передает его локальной модели.
  • Модель выполняет опережающее вычисление, и результат выводится. Теперь вычисления выполняются для каждого процесса (устройство CUDA).
  • еслиfind_unused_parametersУстановить какTrue, DDP проанализирует выходные данные локальной модели, пройдет расчетный график из out и пометит неиспользуемые параметры как готовые, потому что расчетный график будет меняться каждый раз, поэтому его нужно каждый раз проходить.
    • Этот режим (Mode) позволяет работать в обратном направлении на подграфе модели, а DDP помечает все неиспользуемые параметры как готовые путем обхода графа автоградации из выходных данных модели, чтобы уменьшить параметры, участвующие в обратном проходе.
    • Во время обратного распространения Редуктор уменьшает все сегменты, во время этого процессаReducerБудет ждать неподготовленных параметров. Пометка градиентов параметров как готовых не помогает DDP пропускать сегменты, но предотвращает вечное ожидание DDP несуществующих градиентов во время обратных проходов.
    • Обратите внимание, что обход графа autograd приводит к дополнительным затратам, поэтому приложение устанавливает его только при необходимости.find_unused_parametersзаTrue.
  • Просто вернись. Это отличается от DP тем, что выходные данные сети модели DDP не нужно собирать в процесс ранга 0.

0x02 мир Python

Начнем с кода Python, который находится по адресу: torch/nn/parallel/distributed.py.

Мы опускаем соединение, связанное с этим, и сосредоточимся только на основной части.Логика прямого метода такова:

  • Сохранить локальное состояние потока.
  • Если настроено, вызовите reducer.prepare_for_forward, чтобы подготовиться к пересылке.
  • Если настроен ddp_join_enabled, выполните соответствующую обработку.
  • Используйте _rebuild_buckets для сброса сегментов перед распространением вперед.
    • В функции _rebuild_buckets новые сегменты могут быть выделены до того, как старые будут освобождены.
    • Если вы хотите сэкономить на пиковом использовании памяти, вызовите до того, как пиковое использование памяти увеличится во время прямого вычисления._rebuild_bucket.
  • Если требуется синхронизация, вызовите _sync_params, чтобы перенаправить параметры прохода в параметры прямого прохода.
  • прямое распространение.
  • Prepare_for_backward вызывается, если требуется синхронное обратное распространение градиентов.
    • Когда параметр DDP find_unused_parameter имеет значение true, он запускает возврат в конце переадресации, помечает все неиспользуемые параметры и устанавливает их в состояние готовности заранее, чтобы можно было выполнить перенаправление на подграфе, но таким образом. Некоторое время будет принесено в жертву.

Конкретный код выглядит следующим образом:

    def forward(self, *inputs, **kwargs):
        with torch.autograd.profiler.record_function("DistributedDataParallel.forward"):
        
        		# 保存线程本地状态
            self.reducer.save_thread_local_state()
          
            # 如果做配置,则调用 reducer 为forward做准备
            if torch.is_grad_enabled() and self.require_backward_grad_sync:
                self.logger.set_runtime_stats_and_log()
                self.num_iterations += 1
                self.reducer.prepare_for_forward()
                
            # 如果配置ddp_join_enabled,做相应处理    
            if self.ddp_uneven_inputs_config.ddp_join_enabled:
                ones = torch.ones(1, device=self.device)
                work = dist.all_reduce(ones, group=self.process_group, async_op=True)
                if self.ddp_uneven_inputs_config.ddp_join_throw_on_early_termination:
                    # Active ranks schedule an allreduce with zeros, inactive
                    # ranks schedule them with 1. If the result != 0 it
                    # indicates at least one rank has terminated and we should
                    # throw.
                    zeros = torch.zeros(1, device=self.device)
                    dist.all_reduce(zeros, group=self.process_group)
                    should_throw_stop_iteration = zeros.item()
                    if should_throw_stop_iteration:
                        raise RuntimeError(
                            "Detected at least one rank that exhausted inputs. Throwing across all ranks."
                        )
                else:
                    self.reducer._set_forward_pass_work_handle( # 是join这里用到
                        work,
                        self.ddp_uneven_inputs_config.ddp_join_divide_by_initial_world_size,
                    )

            # Calling _rebuild_buckets before forward compuation,
            # It may allocate new buckets before deallocating old buckets
            # inside _rebuild_buckets. To save peak memory usage,
            # call _rebuild_buckets before the peak memory usage increases
            # during forward computation.
            # This should be called only once during whole training period.
            
            # 在前向传播之前使用 _rebuild_buckets 来重置桶
            # 在此函数内,也许在释放旧bucket之前分配新bucket。
            # 如果要节省峰值内存使用量,请在正向计算期间峰值内存使用量增加之前调用_rebuild_bucket。
            # 在整个训练期间,这只能调用一次。
            if torch.is_grad_enabled() and self.reducer._rebuild_buckets():
                logging.info("Reducer buckets have been rebuilt in this iteration.")

            # 如果需要同步前向传播参数,则进行同步    
            if self.require_forward_param_sync:
                self._sync_params()

            if self.ddp_uneven_inputs_config.ddp_join_enabled:
                # Notify joined ranks whether they should sync in backwards pass or not.
                self._check_global_requires_backward_grad_sync(is_joined_rank=False)

            # 进行前向传播    
            if self.device_ids:
			        	# 多卡情况
                inputs, kwargs = self.to_kwargs(inputs, kwargs, self.device_ids[0])
                output = self.module(*inputs[0], **kwargs[0])
            else:
                output = self.module(*inputs, **kwargs)

            # 如果需要同步后向传播梯度,则调用prepare_for_backward  
            if torch.is_grad_enabled() and self.require_backward_grad_sync:
			        	# 当DDP参数 find_unused_parameter 为 true 时,其会在 forward 结束时,启动一个回溯,标记出所有没被用到的 parameter,提前把这些设定为 ready,这样 backward 就可以在一个 subgraph 进行,但这样会牺牲一部分时间。

                self.require_forward_param_sync = True
                # We'll return the output object verbatim since it is a freeform
                # object. We need to find any tensors in this object, though,
                # because we need to figure out which parameters were used during
                # this forward pass, to ensure we short circuit reduction for any
                # unused parameters. Only if `find_unused_parameters` is set.
                if self.find_unused_parameters and not self.static_graph:
                    # Do not need to populate this for static graph.
                    self.reducer.prepare_for_backward(list(_find_tensors(output)))
                else:
                    self.reducer.prepare_for_backward([])
            else:
                self.require_forward_param_sync = False

        # TODO. Right now we add this sink for static_graph training only. once
        # this feature is stable, we will add this sink for all cases. E.g.
        # This sink can help capture more accuracte backward start time as well.
        if self.static_graph and self.num_iterations == 1:
            # Need to grab list of tensors from user output in order to pass
            # to custom autograd function.
            output_tensor_list, treespec = tree_flatten(output)
            passthrough_tensor_list = _DDPSink.apply(
                self.reducer,
                *output_tensor_list
            )
            # Reconstruct output data structure.
            output = tree_unflatten(passthrough_tensor_list, treespec)
        return output

Среди них используйте _sync_params для синхронизации параметров модели, которая выполняется с помощью _distributed_broadcast_coalesced.

def _sync_params(self):
    with torch.no_grad():
        # module buffer sync
        if self.will_sync_module_buffers():
            # Synchronize buffers across processes.
            # If we are running DDP with the join manager, we have to agree
            # upon a rank to sync module buffers from, since rank 0 may
            # already have been joined and have stale module buffers.
            if self.ddp_uneven_inputs_config.ddp_join_enabled:
                authoritative_rank = self._find_common_rank(
                    self._distributed_rank, True
                )
            else:
                # The process with rank 0 is considered the authoritative copy.
                authoritative_rank = 0
            self._distributed_broadcast_coalesced(
                self.modules_buffers[0],
                self.broadcast_bucket_size,
                authoritative_rank,
            )

0x03 мир С++

Теперь давайте перейдем в мир C++ и посмотрим, как здесь поддерживается прямое распространение. В частности, он делится на: подготовку к прямому распространению, перестроение сегментов и подготовку к обратному распространению.

3.1 Подготовка к прямому распространению

Здесь num_iterations_ увеличивается и записывается время.

void Reducer::prepare_for_forward() {
  std::lock_guard<std::mutex> lock(mutex_);
  num_iterations_++; // 这里会递增
  if (should_collect_runtime_stats()) {
    record_forward_compute_start_time();
  }
}

4.2 Восстановить ведро

Далее перестройте ведро, которое разделено на:

  • Настройте различные ограничения размера.
  • Рассчитайте размеры ковша.
  • Синхронизируйте индексы сегментов.
  • Инициализируйте ведро.
bool Reducer::rebuild_buckets() {
  // Ensure reduction for previous backwards pass is finished. If user's model
  // has unused parameters for example, this will raise an error recommending to
  // run with find_unused_parameters=True, instead of the size mismatch
  // exception below.
  std::lock_guard<std::mutex> lock(mutex_);
  ensure_prior_reduction_finished();
  if (!should_rebuild_buckets() || rebuilt_params_.empty()) {
    return false;
  }

  std::vector<std::vector<size_t>> rebuilt_bucket_indices;
  // 配置各种尺寸限制
  std::vector<size_t> bucket_size_limits;
  bucket_size_limits.push_back(kDefaultFirstBucketBytes);
  bucket_size_limits.push_back(bucket_bytes_cap_);
  // 计算桶的尺寸
  rebuilt_bucket_indices = compute_bucket_assignment_by_size(
      rebuilt_params_,
      bucket_size_limits,
      expect_sparse_gradients_[0],
      rebuilt_param_indices_);

  // For rebuilt bucket indices, it needs to be synced across all ranks.
  // Broadcast the newly rebuilt bucket indices from rank 0 in default.
  // After syncing up rebuilt bucket indices, initialize buckets for reducer.
  // 同步桶indices
  sync_bucket_indices(rebuilt_bucket_indices);

  has_rebuilt_bucket_ = true;
  rebuilt_params_.clear();
  rebuilt_param_indices_.clear();

  // 初始化桶
  initialize_buckets(std::move(rebuilt_bucket_indices));
  return true;
}

Давайте посмотрим, как восстановить его дальше.

3.2.1 Расчет размера ковша

Сначала нам нужно взглянуть на структуру ключей в calculate_bucket_assignment_by_size следующим образом: BucketAccumulator можно рассматривать как фактическое ведро.

struct BucketAccumulator {
    std::vector<size_t> indices; // 桶内容,是张量列表
    size_t size = 0; // 桶大小,比如若干mb
  }; // 桶的逻辑内容

  // Keep vector of indices and size accumulator by tensor type and device.
std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
      buckets; // 所有桶的列表,每一个实际桶可以认为是 BucketAccumulator

Во-вторых, давайте взглянем на конкретную логику calculate_bucket_assignment_by_size:

  • Создайте результат вычисления и используйте размер тензоров параметров, чтобы зарезервировать место для результата.

  • Создайте ведро, которое представляет собой список всех ведер, каждое фактическое ведро можно считать BucketAccumulator.

  • Перебрать все переданные тензоры для каждого тензора:

    • Если есть индекс, получить индекс тензора.
    • Если он настроен на ожидание разреженного градиента, поместите этот тензор в корзину отдельно, потому что его нельзя объединить с другими тензорами.
    • Используйте информацию о тензоре, чтобы построить ключ корзины.
    • Используйте ключ, чтобы найти соответствующее ведро и получить BucketAccumulator.
    • Вставьте индекс нового тензора в индексы списка тензоров корзины, где индексы — это список индексов тензора.
    • Увеличьте соответствующий размер ковша.
    • При необходимости установите начальное значение ограничения размера.
    • Если размер ведра больше минимального предела, это означает, что размер текущего ведра достиг максимального предела ведра, и говорится, что его нужно перенести в новое ведро (на самом деле это переносится в новый логический бакет, но на самом деле все еще выполняется в существующем бакете. , поскольку тип и устройство все те же, они должны продолжать накапливаться в исходном бакете, но был перенесен индекс исходного бакета к результату, который эквивалентен опорожнению).
      • Вставьте содержимое корзины в возвращаемый результат, то есть, если размер корзины слишком велик, сначала вставьте его в результат.
      • Используйте BucketAccumulator() для повторного создания сегмента BucketAccumulator(). Bucket является ссылкой, поэтому прямое присваивание эквивалентно очистке исходного сегмента, то есть исходный сегмент продолжает использоваться, но исходные индексы сегмента были перенесены в результат.
  • Вставьте оставшиеся индексы корзины в результат возвращаемого значения. Некоторые из них были непосредственно вставлены в результат ранее.

  • Сортировать результат:

    • Если tensor_indices не пусто, это означает, что порядок тензоров уже соответствует порядку, в котором готовятся градиенты, и нет необходимости их переупорядочивать.
    • Если tensor_indices пусто, порядок основан на наименьшем индексе тензора, предполагая, что порядок тензоров соответствует порядку их использования (или обратному порядку генерации их градиента). Этот порядок гарантирует, что ведра будут подготовлены в последовательном порядке.
    • Обратите внимание, что это положительный порядок.Когда редуктор создается, он передается в обратном порядке: list(reversed(bucket_indices))

Еще одна вещь, которую следует отметить: поскольку тензоры — это параметры parameters[0] в коде Python, а параметры[0] основаны на возвращаемом результате parameters(), поэтому DDP, наконец, запускает AllReduce в порядке, обратном model.parameters( ).

std::vector<std::vector<size_t>> compute_bucket_assignment_by_size(
    const std::vector<at::Tensor>& tensors,
    const std::vector<size_t>& bucket_size_limits, // 桶大小限制
    const std::vector<bool>& expect_sparse_gradient,
    const std::vector<int64_t>& tensor_indices) { //实际上,初始化时候没有传入 tensor_indices
  // Either expect_sparse_gradient is not specified or it has as many elements
  // as the vector with tensors.
  TORCH_INTERNAL_ASSERT(
      expect_sparse_gradient.empty() ||
      (tensors.size() == expect_sparse_gradient.size()));
  TORCH_INTERNAL_ASSERT(tensors.size() > 0);

  std::vector<std::vector<size_t>> result;
  result.reserve(tensors.size()); // 预留大小

  // Keep iterator into the size_limit vector by tensor type and device.
  // This is done so that we can use the consecutive bucket limits per type.
  std::unordered_map<
      BucketKey,
      std::vector<size_t>::const_iterator,
      c10::hash<BucketKey>>
      bucket_size_limit_iterators;

  // Local accumulator type for a single bucket.
  struct BucketAccumulator {
    std::vector<size_t> indices; // 桶内容,是张量列表
    size_t size = 0; // 桶大小,比如若干mb
  }; // 桶的逻辑内容

  // Keep vector of indices and size accumulator by tensor type and device.
  std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
      buckets; // 所有桶的列表,每一个实际桶可以认为是 BucketAccumulator

  for (size_t i = 0; i < tensors.size(); i++) { // 遍历传入的所有张量
    const auto& tensor = tensors[i]; //拿到张量
    TORCH_CHECK(!tensor.is_sparse(), "No support for sparse tensors.");

    // when tensor_indices is empty, the index of tensors[i] assigned to
    // bucket is i, otherwise the tensor index is tensor_indices[i].
    auto tensor_index = i; // 就是给所有的tensor一个index,从0开始递增,一直到 tensors.size()
    if (!tensor_indices.empty()) {
      tensor_index = tensor_indices[i]; // 如果有index,就拿到张量的index
    }
    // If we expect a sparse gradient to be produced for this tensor, it cannot
    // be grouped together with other gradients and gets its own bucket.
    // 如果配置了期待sparse gradient,则把这个张量自己放入一个桶,因为没法和其他张量放在一起
    if (!expect_sparse_gradient.empty() &&
        expect_sparse_gradient[tensor_index]) {
      result.push_back({tensor_index});
      continue;
    }

    auto key = BucketKey(tensor.scalar_type(), tensor.device()); //使用张量信息构建桶的key
    auto& bucket = buckets[key]; // 找到对应的桶, 拿到BucketAccumulator
    bucket.indices.push_back(tensor_index); // 该桶的张量列表里面插入新张量的index,indices 是 tensor index list
    bucket.size += tensor.numel() * tensor.element_size();// 增加对应桶大小

    // Initialize bucket size limit iterator if necessary.
    // 如果需要,就设定成大小限制的初始值
    if (bucket_size_limit_iterators.count(key) == 0) {
      bucket_size_limit_iterators[key] = bucket_size_limits.begin();
    }

    // bucket_size_limit_iterator 就是桶大小的范围, 即 [_DEFAULT_FIRST_BUCKET_BYTES, int(bucket_cap_mb * 1024 * 1024)]
    auto& bucket_size_limit_iterator = bucket_size_limit_iterators[key];
    const auto bucket_size_limit = *bucket_size_limit_iterator; // 当前最小值限制
    if (bucket.size >= bucket_size_limit) { 
      // 如果桶的尺寸大于最小值限制,就是说目前桶的尺寸已经达到了桶的最大限制,按说需要转移到新桶了(实际上确实转移到了逻辑的新桶,但是实际还是在现有桶内执行,因为 type, device 还是同样的,还是应该在原有桶内继续累积,不过原有桶的indice已经转移到了result之中,就相当于清空了)
      result.emplace_back(std::move(bucket.indices)); // 把桶内容插入到返回result,就是说,当桶尺寸过大的时候,就先插入到result之中。
      bucket = BucketAccumulator(); // 重新生成桶,bucket是个引用,所以直接赋值,就相当于清空原有的桶,就是原来桶继续用,但是桶内原有的indices已经转移到了result之中。

      // Advance to the next bucket size limit for this type/device.
      // 前进到下一个尺寸限制
      auto next = bucket_size_limit_iterator + 1;
      if (next != bucket_size_limits.end()) {
        bucket_size_limit_iterator = next;
      }
    }
  }

  // Add remaining buckets. 把剩余的桶内indices插入到返回值,因为之前已经有些直接插入到了result之中
  for (auto& it : buckets) {
    auto& bucket = it.second;
    if (!bucket.indices.empty()) {
      result.emplace_back(std::move(bucket.indices));
    }
  }

  // If tensor_indices is not empty, the order of the tensors is in the gradient
  // ready order, so no need to sort.
  // If tensor_indices is empty, sort resulting buckets by the minimum tensor
  // index they include. We assume that the order of the tensors is the order in
  // which they are used (or the reverse order in which their gradients are
  // produced). This sorting step ensures that the buckets are ready in
  // consecutive order.
  // 如果 tensor_indices 非空,说明张量的顺序已经是梯度准备好的顺序,不需要再排序了
  // 如果 tensor_indices 是空的,依据最小张量index来排序,这里假定张量的顺序是他们使用的顺序(或者说是他们梯度产生次序的反序)。这种排序可保证桶是按照连续不断的顺序准备好。
  // 注意,这里就是正序排列,等到创建Reducer的时候,才反序传入:list(reversed(bucket_indices))
  if (tensor_indices.empty()) {
    std::sort(
        result.begin(),
        result.end(),
        [](const std::vector<size_t>& a, const std::vector<size_t>& b) {
          // 对于任意两个vector,排序的依据是:用这两个vector之中最小index来排序
          const auto amin = std::min_element(a.begin(), a.end()); // a中的最小index
          const auto bmin = std::min_element(b.begin(), b.end()); // b中的最小index
          return *amin < *bmin;
        });
  }

  return result;
}

Конечный результат таков: каждый вектор в нем соответствует корзине, которая является индексом тензора, здесь он отсортирован от меньшего к большему. Параметры модели начинаются с (примерно)Model.parameters()Распределите по корзинам в порядке, обратном данной модели. Причина использования обратного порядка заключается в том, что DDP ожидает, что градиенты будут готовы примерно в этом порядке во время обратного прохода.

+-----------------------------------------------------------------------+
|                                                                       |
|  <tensor index 1, tensor index 2, tensor index 3, tensor index 4>     |
|                                                                       |
|                                                                       |
|  <tensor index 5, tensor index 6, tensor 7>                           |
|                                                                       |
|                                                                       |
|  ......                                                               |
|                                                                       |
|                                                                       |
|  <tensor index 8, tensor index 9, tensor index 10, tensor index 11>   |
|                                                                       |
+-----------------------------------------------------------------------+

3.2.2 Индексы корзины синхронизации

После того, как размер сгенерирован, sync_bucket_indices используется для синхронизации индексов корзины, логика следующая:

  • Перейдите ведро и запишите размер ведра в bucket_sizes.
  • Настройте TensorOptions.
  • Поместите индексы и количество сегментов, соответствующих сегментам, в index_tensor. Здесь тензор читается и записывается через средство доступа PyTorch. Средство доступа похоже на тензор, но жестко кодирует размерность и тип тензора в качестве параметров шаблона. , который может быть эффективным элементом доступа.
  • Поскольку группы процессов, такие как NCCL, поддерживают операции только между устройствами, скопируйте index_tensor в index_tensor_device.
  • Широковещательные индексы_tensor_device.
  • Точно так же транслируется размер ведра.
  • После завершения трансляции пройдитесь по блокам и обновите входящий параметр Bucket_indices с помощью num_buckets, Bucket_sizes_tensor и index_tensor, полученных с ранга 0.
void Reducer::sync_bucket_indices(
    std::vector<std::vector<size_t>>& bucket_indices) {
  
  auto num_buckets = bucket_indices.size();
  std::vector<size_t> bucket_sizes;
  bucket_sizes.reserve(num_buckets);
  int64_t total_size = 0;
  
  //遍历桶,把桶的大小都记录到bucket_sizes
  for (size_t i = 0; i < num_buckets; i++) {
    auto bucket_size = bucket_indices.at(i).size();
    bucket_sizes.push_back(bucket_size);
    total_size += bucket_size;
  }

  // 配置TensorOptions
  at::TensorOptions options;
  options = options.dtype(at::kInt);
  options = options.device(replicas_[0][0].device());

  // Group indices and num_bucket together into indices_tensor
  // Broadcast this tensor first, as its size is equal among all processes
  // 把桶对应的indices和桶数目放入indices_tensor,这里是通过 PyTorch accessor来对张量进行读写,accessor就像是一个张量,但它将张量的维度和 dtype 硬编码为了模板参数,可以高效的访问元素
  auto indices_tensor = at::empty({total_size + 1}, at::kInt);
  auto indices_accessor = indices_tensor.accessor<int, 1>();
  auto indices_accessor_Index = 0;
  for (size_t i = 0; i < num_buckets; i++) {
    const auto& bucket_size = bucket_indices.at(i).size();
    for (size_t j = 0; j < bucket_size; j++) {
      indices_accessor[indices_accessor_Index++] = bucket_indices[i][j];
    }
  }
  indices_accessor[indices_accessor_Index] = num_buckets;

  // Copy CPU tensor to device tensor, as the process_group_ could be NCCL and
  // it can only broadcast device tensors.
  auto indices_tensor_device = at::empty({total_size + 1}, options);
  // 因为 NCCL这样的 ProcessGroup 只支持device之间的操作,所以把indices_tensor拷贝到indices_tensor_device
  indices_tensor_device.copy_(indices_tensor, /*non_blocking=*/true);
  std::vector<at::Tensor> indices_tensor_list = {indices_tensor_device};
  // 对 indices_tensor_device 进行广播
  process_group_->broadcast(indices_tensor_list)->wait();
  indices_tensor.copy_(indices_tensor_list.front(), /*non_blocking=*/false);

  // Update num_buckets after receiving it from rank 0
  num_buckets = indices_accessor[indices_accessor_Index];

  // Broadcast bucket_sizes
  // 类似,对桶尺寸进行广播
  auto bucket_sizes_tensor = at::empty({(int64_t)num_buckets}, at::kInt);
  auto bucket_sizes_accessor = bucket_sizes_tensor.accessor<int, 1>();
  for (size_t i = 0; i < num_buckets; i++) {
    // For rank != 0, it is possible that local num buckets bucket_sizes.size()
    // is smaller than broadcasted num_buckets
    bucket_sizes_accessor[i] =
        bucket_sizes.at(std::min(i, (bucket_sizes.size() - 1)));
  }
  auto bucket_sizes_tensor_device = at::empty({(int64_t)num_buckets}, options);
  bucket_sizes_tensor_device.copy_(bucket_sizes_tensor, /*non_blocking=*/true);
  std::vector<at::Tensor> bucket_sizes_tensor_list = {
      bucket_sizes_tensor_device};
  process_group_->broadcast(bucket_sizes_tensor_list)->wait();
  bucket_sizes_tensor.copy_(
      bucket_sizes_tensor_list.front(), /*non_blocking=*/false);

  // Clear bucket_indices first, and then update bucket_indices using received
  // num_buckets, bucket_sizes_tensor and indices_tensor from rank 0
  bucket_indices.clear();
  bucket_indices.reserve(num_buckets);
  indices_accessor_Index = 0;
  // 遍历桶,使用从rank 0收到的num_buckets, bucket_sizes_tensor 和 indices_tensor 更新传进来的参数bucket_indices
  for (size_t i = 0; i < num_buckets; i++) {
    const auto& bucket_size = bucket_sizes_accessor[i];
    std::vector<size_t> bucket;
    bucket.reserve(bucket_size);
    for (size_t j = 0; j < bucket_size; j++) {
      bucket.push_back(indices_accessor[indices_accessor_Index++]);
    }
    bucket_indices.emplace_back(std::move(bucket));
  }
}

3.2.3 Инициализация корзины

После синхронизации бакет инициализируется, эта часть кода была проанализирована в предыдущем разделе, поэтому она опущена.

3.3 Подготовка к обратному распространению

После завершения прямого прохода вызовите prepare_for_backward, чтобы завершить подготовку к обратному проходу.

Он примерно разделен на два шага: сброс и поиск неиспользуемых параметров.

void Reducer::prepare_for_backward(
    const std::vector<torch::autograd::Variable>& outputs) {
  std::lock_guard<std::mutex> lock(mutex_);

  // 记录开始时间
  cpu_timer_.backward_compute_start_time = current_time_in_nanos();
  if (should_collect_runtime_stats()) {
    record_backward_compute_start_time();
  }

  // Reset accounting.
  expect_autograd_hooks_ = true;
  reset_bucket_counting();
  // Reset unused parameter accounting.
  has_marked_unused_parameters_ = false;
  // Reset per iteration marked ready parameters.
  perIterationReadyParams_.clear(); // 重置每次迭代的marked ready parameters

  // If static graph is not set, search graph to detect unused parameters.
  // When static graph is set, unused_parameters_ will be detected and will
  // not change after 1st iteration.
  // If static_graph_ = false and find_unused_parameters_ is false,
  // we assume that autograd hooks for ALL variables will be called,
  // and we don't have to search the autograd graph for presence of these hooks.
  if (dynamic_graph_find_unused()) {
    unused_parameters_.clear();
    search_unused_parameters(outputs); // 查找没有使用的参数
  }
}

3.3.1 Сброс

Здесь происходит обход сегментов.Для каждого сегмента сбрасывается состояние ожидания его реплики.Состояние ожидания реплики модели определяется количеством переменных в соответствующем сегменте реплики модели.

Если это статическая карта, сбросьте numGradHooksTriggeredMapPerIteration_.

void Reducer::reset_bucket_counting() {
  next_bucket_ = 0;
  // Reset num_buckets_ready_ at the beginning of backward computation
  // in each iteration.
  num_buckets_ready_ = 0;

  for (auto& bucket : buckets_) { // 遍历桶
    for (auto& replica : bucket.replicas) {
      replica.pending = replica.variables.size(); //对于每个桶,重置其副本的pending状态,某一个模型副本pending,是由这个模型副本中,本桶的变量数目决定
    }
    bucket.pending = bucket.replicas.size(); // 重置桶的pending状态,桶pending是由多少个模型副本决定
  }

  if (static_graph_) {
    // 重置numGradHooksTriggeredMapPerIteration_
    numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_;
  }
}

3.3.2 Поиск неиспользуемых параметров

search_unused_parameters завершает функциональность «найти неиспользуемые параметры».

Сначала мы рассмотрим переменную-член find_unused_parameters_ редуктора. Если для find_unused_parameters_ задано значение true, DDP будет возвращаться к указанному выходу в конце прямого распространения, проходить граф расчета автоградации, чтобы найти все неиспользуемые параметры, и помечать их как готовые.

Для всех параметров в DDP есть указатель на их функцию накопления градиента, но для тех параметров, которых нет в графе автоградации, они будут помечены как готовые при первом вызове хука автоградации.

Поскольку вывод модели может быть проигнорирован, эта операция не выполняется немедленно, как и вtorch.autograd.backward()Здесь начинается работа протокола.

Как видите, это будет очень дорого, зачем это делать? Это связано с тем, что вычислительный динамический граф изменяется.

  • Во время обучения в итерации может использоваться только один подграф модели, а поскольку PyTorch вычисляется динамически, подграф будет изменяться во время итерации, то есть некоторые параметры могут быть пропущены в следующей итерации обучения.
  • В то же время, поскольку все параметры были разделены на ведра в начале, и хук предусматривает, что будет сообщено только все ведро готово (т.е. pending == 0), поэтому, если мы не пометим неиспользуемые параметры как готовые , Весь процесс связи не сможет продолжаться.
// Traverse the autograd graph starting at the specified output.
// All parameters for which we have a pointer to their gradient accumulation
// functions, but don't show up in the autograd graph will be marked ready for
// for reduction as soon as the first autograd hook is called. This is not
// done immediately because the model output may be ignored, and we only
// want to start performing reductions on `torch.autograd.backward()`.
void Reducer::search_unused_parameters(
    const std::vector<torch::autograd::Variable>& outputs) {
  std::unordered_set<torch::autograd::Node*> seen;
  std::vector<torch::autograd::Node*> queue;

  RECORD_FUNCTION(
      "torch.distributed.ddp.reducer::search_unused_parameters",
      std::vector<c10::IValue>());

  // Seed queue with the grad functions of all outputs.
  for (const auto& output : outputs) {
    const auto& grad_fn = output.grad_fn();
    if (grad_fn) {
      queue.push_back(grad_fn.get()); // 把所有输出节点的梯度函数插入到queue
    }
  }

  // Traverse the autograd graph starting at the specified output.
  // 遍历这个queue中的元素,对于每一个函数,找到其后向图之中的后续边,然后把后续边指向的节点再插入queue,然后继续循环,最终 seen 里面是所有从output出发,所有节点的梯度函数
  while (!queue.empty()) {
    auto fn = queue.back();
    queue.pop_back();
    for (const auto& edge : fn->next_edges()) {
      if (auto next_ptr = edge.function.get()) {
        const bool was_inserted = seen.insert(next_ptr).second;
        if (was_inserted) {
          queue.push_back(next_ptr);
        }
      }
    }
  }

  // Find accumulator functions that don't show up in this graph.
  // gradAccToVariableMap_ 里面是所有需要被规约的variable
  // 遍历gradAccToVariableMap_,如果 seen 之中没有,就说明这个参数没有被使用,插入到unused_parameters_
  for (const auto& it : gradAccToVariableMap_) {
    // If the accumulator function is present in the graph, we know
    // a gradient will be computed for the corresponding parameter.
    if (seen.count(it.first) == 0) {
      unused_parameters_.push_back(it.second);
    }
  }

  // Warn user about unnecessary perf hit if all parameters were used in
  // forward.
  if (unused_parameters_.empty()) {
    TORCH_WARN_ONCE(
        "find_unused_parameters=True was specified in DDP constructor, "
        "but did not find any unused parameters in the forward pass. This flag "
        "results in an extra traversal of the autograd graph every iteration, "
        " which can adversely affect performance. If your model indeed never "
        "has any unused parameters in the forward pass, consider turning this "
        "flag off. Note that this warning may be a false positive if your model "
        "has flow control causing later iterations to have unused parameters.");
  }
}

На этом прямое распространение закончено, и мы получаем следующее:

  • Параметры, необходимые для вычисления градиентов, были разделены на сегменты.
  • Ствол восстановлен.
  • Прямое распространение завершено.
  • Вернитесь к указанному выходу, просмотрите график расчета автоградации, чтобы найти все неиспользуемые параметры, и пометьте их как готовые.

Мы проанализируем обратное распространение в следующей статье.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

Распределенная серия pytorch 3 — что делает torch.utils.data.distributed.DistributedSampler во время распределенного обучения?

Распределенная серия Pytorch 1 — узнайте переменные среды, связанные с torch.distributed.launch

pytorch распределенная серия 2 - как синхронизируется DistributedDataParallel?

pytorch (распределенные) данные параллельной личной практики - DataParallel/DistributedDataParallel

nn.DataParallel от Pytorch

обсуждение.py torch.org/he/data пар AL…

py torch.org/docs/stable…

Понимать распределенное обучение интерпретации исходного кода PyTorch?

Практическое руководство | Реализация слоя PyTorch AutoGrad C++

PYTORCH автоматическая дифференциация (1)

Как PyTorch ускоряет параллельное обучение данных? Раскрыты распространенные читы

Распределенное обучение pytorch (два init_process_group)

py torch.org/tutorials/i…

py torch.org/docs/master…

py torch.org/tutorials/i…

DP и DDP интерпретации исходного кода PyTorch: параллелизм моделей и анализ распределенного обучения

Параметры и буферы в модели Pytorch