[Анализ исходного кода] Распределенный PyTorch (10) ------ Редуктор статической архитектуры DistributedDataParallel

глубокое обучение PyTorch

0x00 сводка

Благодаря приведенному выше анализу мы уже знаем базовую архитектуру DDP и то, как ее инициализировать.В этой статье мы рассмотрим статическую архитектуру ее ядра Reducer. Редуктор обеспечивает базовую реализацию синхронизации градиента при обратном распространении.

Другие статьи из этой серии:

[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор

[Анализ исходного кода] Как PyTorch использует GPU

[Анализ исходного кода] Распределенный PyTorch (2) ----- DataParallel (включен)

[Анализ исходного кода] Распределенный PyTorch (3) ----- DataParallel (ниже)

[Анализ исходного кода] Распределенный PyTorch (4) ------ Основная концепция распределенного приложения

[Анализ исходного кода] Распределенный PyTorch (5) ------ Обзор DistributedDataParallel и способы его использования

[Анализ исходного кода] Распределенный PyTorch (6) ---DistributedDataParallel -- инициализация и хранение

[Анализ исходного кода] Распределенный PyTorch (7) ----- Группа процессов DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (8) -------- Бумага DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (9) ----- Инициализация DistributedDataParallel

0x01 Введение

1.1 вызов

Код создания Reducer выглядит следующим образом, в файле _ddp_init_helper.

        # Note: reverse list of buckets because we want to approximate the
        # order in which their gradients are produced, and assume they
        # are used in the forward pass in the order they are defined.
        self.reducer = dist.Reducer(
            parameters, # parameters[0]是张量列表
            list(reversed(bucket_indices)), # 桶信息
            self.process_group,
            expect_sparse_gradient,
            self.bucket_bytes_cap,
            self.find_unused_parameters,
            self.gradient_as_bucket_view,
            param_to_name_mapping,
        )

Параметры, которые нужно вызвать, следующие, parameters[0] — это параметры модели на ранге 0, вы можете видеть, что только элемент [0] имеет смысл, а сам оригинал [0] включает 20 элементов:

parameters = {list: 1} 
0 = {list: 4}           
 0 = {Parameter: 10} Parameter containing:\ntensor([[-4.0381e-02,  3.8828e-02, 1  )   
 1 = {Parameter: 10} Parameter containing:\ntensor([-0.0438, -0.2033,  0.2771,  0.0721,  ) 
 2 = {Parameter: 5} Parameter containing:\ntensor([[-0.0094, -0.1319,  0.0713,  0.3155,  )
 3 = {Parameter: 5} Parameter containing:\ntensor([-0.0008,  0.0582, -0.1245, -0.2538, )
 ...
 20 = {Parameter: 5} Parameter containing:\ntensor([-0.0008,  0.0582, -0.1245, -0.2538, )                                                   
 __len__ = {int} 20
__len__ = {int} 1

Примеры Bucket_indices следующие:

Что касается тензорных индексов, то всем тензорам присваивается индекс, увеличивающийся от 0 до tensors.size(). Если параметры модели имеют всего 20 тензоров, то индекс тензора от 0 до 19 и разделен на 6 бакетов, то среди этих 6 бакетов каждый тензорный индекс уникален и не повторяется.

+-----------------------------------------------------------------------+
|                                                                       |
|  <tensor index 0, tensor index 1, tensor index 2, tensor index 3>     |
|                                                                       |
|                                                                       |
|  <tensor index 4, tensor index 5, tensor 6>                           |
|                                                                       |
|                                                                       |
|  ......                                                               |
|                                                                       |
|                                                                       |
|  <tensor index 16, tensor index 17, tensor index 18, tensor index 19> |
|                                                                       |
+-----------------------------------------------------------------------+

Код Python не имеет смысла, мы можем смотреть только на C++.

class Reducer(__pybind11_builtins.pybind11_object):
    def __init__(self, replicas, *args, **kwargs): 
        """ __init__(self: torch._C._distributed_c10d.Reducer, replicas: List[List[at::Tensor]], bucket_indices: List[List[int]], process_group: c10d::ProcessGroup, expect_sparse_gradients: List[List[bool]] = [], bucket_bytes_cap: int = 26214400, find_unused_parameters: bool = False, gradient_as_bucket_view: bool = False, param_to_name_mapping: Dict[int, str] = {}) -> None """
        pass

Итак, мы подошли к torch/lib/c10d/reducer.h и torch/lib/c10d/reducer.cpp.

0x02 Определение редуктора

Редьюсер обеспечивает основную реализацию синхронизации градиента при обратном распространении, его определение довольно сложное, нам даже нужно удалить некоторые неважные переменные-члены, чтобы показать:

class Reducer {
 public:
  // The constructor takes a list of variables for every model replica.
  // The bucket assignment for this reducer is specified as a list of
  // buckets, each of which is specified as a list of indices into the
  // variables list for **a single replica** (i.e. `variables[0]`).
  explicit Reducer(
      std::vector<std::vector<at::Tensor>> replicas,
      std::vector<std::vector<size_t>> bucket_indices,
      c10::intrusive_ptr<c10d::ProcessGroup> process_group,
      std::vector<std::vector<bool>> expect_sparse_gradients,
      int64_t bucket_bytes_cap,
      bool find_unused_parameters,
      bool gradient_as_bucket_view,
      std::unordered_map<size_t, std::string>
          paramNames);

 protected:
  // Forward declaration.
  struct Bucket;

  void push_rebuilt_params(const VariableIndex& index);

  mutable std::mutex mutex_;
  const std::vector<std::vector<at::Tensor>> replicas_;
  const c10::intrusive_ptr<::c10d::ProcessGroup> process_group_;
  std::vector<std::vector<bool>> expect_sparse_gradients_;

  std::vector<std::vector<std::shared_ptr<torch::autograd::Node>>>
      grad_accumulators_;
  std::unordered_map<torch::autograd::Node*, VariableIndex>
      gradAccToVariableMap_;
  std::vector<std::pair<uintptr_t, std::shared_ptr<torch::autograd::Node>>>
      hooks_;

  bool expect_autograd_hooks_;
  bool require_finalize_;
  size_t next_bucket_;

  bool has_marked_unused_parameters_;
  const bool find_unused_parameters_;
  const bool gradient_as_bucket_view_;
  std::vector<VariableIndex> unused_parameters_; // 如果没有用到,直接设置为就绪,第一次迭代之后久不会改变了
  // Locally used parameter maps indicating if parameters are used locally
  // during the current iteration or no_sync session if no_sync is on. One
  // tensor for each model replica and each tensor is one-dim int32 tensor of
  // number of parameters. These tensors are marked in autograd_hook to indicate
  // the corresponding param has been used, and get allreduced in the end of
  // backward of current iteration or no_sync session for figuring out the
  // globally unused parameters.
  //
  // local_used_maps_:     CPU tensors for bookkeeping locally used params
  // local_used_maps_dev_: dev tensors for reducing globally unused params
  std::vector<at::Tensor> local_used_maps_;
  std::vector<at::Tensor> local_used_maps_dev_;
  // Indicate that reduction is done and D2H copy is done as well.
  bool local_used_maps_reduced_;

  using GradCallback =
      torch::distributed::autograd::DistAutogradContext::GradCallback;

  // A bucket replica represents [1..N] gradients to be reduced,
  // with the same dtype, on the same device.
  //
  // Batching gradients together before reducing them can result in lower
  // overhead and/or faster time to completion. Only gradients of the same type
  // and on the same device can be batched. The tensor that represents the
  // flattened gradient uses the same type and is placed on the same device.
  // Buckets are filled as the gradients they hold are computed (triggered by
  // autograd hooks). Buckets are reduced in a predetermined order that is
  // identical across processes.
  struct BucketReplica {
    // Flattened (1 dimensional) contents of bucket.
    at::Tensor contents;

    // Views into contents for each grad.  Each view will be created with
    // layout (sizes + strides) matching the grad's expected layout
    // ("Gradient Layout Contract" in torch/csrc/autograd/AccumulateGrad.h).
    // `bucket_views_in[i].copy_(grad)` and
    // `grad.copy_(bucket_views_out[i])`
    // provide convenient ways to move grad data in/out of contents.
    // The reason we keep two states for bucket_views is that if DDP
    // communication hook was registered, `bucket_views_out` could be
    // re-initialized with the value of hook's `future_work`. We still need to
    // keep a separate view reference to replica's original contents for
    // `bucket_views_in[i].copy_(grad)` call.
    std::vector<at::Tensor> bucket_views_in;
    std::vector<at::Tensor> bucket_views_out;

    // Variables that contribute to this bucket replica. Use refcounted value
    // here so that we can easily unflatten the bucket contents into the
    // participating variables after reduction has completed.
    std::vector<at::Tensor> variables;

    // Per-variable offset/length into the flat bucket contents tensor and grad
    // bucket.
    std::vector<size_t> offsets;
    std::vector<size_t> lengths;

    // Per-variable sizes into the grad bucekt.
    std::vector<c10::IntArrayRef> sizes_vec;

    // Number of tensors to be added before this bucket is complete.
    // This is reset to `variables.size()` every iteration.
    size_t pending;

    // TODO(@pietern)
    // Memory copies from gradient tensors into the bucket are potentially
    // done on different CUDA streams. We record an event for every copy
    // so that we can synchronize with them prior to kicking off the reduction.
    // std::vector<at::cuda::CUDAEvent> events;
  };
  // A bucket holds N bucket replicas (1 per model replica).
  //
  // If every bucket in this struct is ready, the reduction can be kicked off.
  // One bucket per replica. Reduction is kicked off when every bucket is ready.
  //
  struct Bucket {
    std::vector<BucketReplica> replicas;

    // Global indices of participating variables in the bucket
    std::vector<size_t> variable_indices;

    // Number of replicas to be marked done before this bucket is ready.
    size_t pending;

    // Keep work handle around when this set of buckets is being reduced.
    c10::intrusive_ptr<c10d::ProcessGroup::Work> work;

    // Keep future work handle around if DDP comm hook is registered.
    c10::intrusive_ptr<torch::jit::Future> future_work;

    // If this bucket should expect a single sparse gradient.
    // Implies: replicas[i].variables.size() == 1.
    bool expect_sparse_gradient = false;
  };

  std::vector<Bucket> buckets_;

  // A variable locator locates a particular variable in the bucket
  // structure. The `bucket_index` field points to the bucket in the `buckets_`
  // vector. The `intra_bucket_index` field points to the index of the variable
  // in any of the vector fields in the bucket replica.
  struct VariableLocator {
    // Index into the `buckets_` variable.
    size_t bucket_index;
    // Index of parameter in single bucket replica.
    size_t intra_bucket_index;

    VariableLocator() = default;

    VariableLocator(size_t bucket_index_, size_t intra_bucket_index_) {
      bucket_index = bucket_index_;
      intra_bucket_index = intra_bucket_index_;
    }
  };

  // Map the index of a variable to its location in the bucket structure.
  std::vector<VariableLocator> variable_locators_;

  // track the number of iterations to synchronize grads in training so far.
  long num_iterations_;
  // track the number of buckets that have been ready for
  // communication calls like allReduce or communication hooks.
  int num_buckets_ready_;

  // We collect the relative timestamp of every gradient being ready
  // when executing autograd. This can be used to derive a timeline of
  // the point in time buckets were ready, or ideal bucket assignment/ordering.
  std::vector<std::vector<int64_t>> backward_stats_;

  int ddp_runtime_logging_sample_rate_ = kDDPRuntimeLoggingSampleRate;

  bool is_multi_device_module_ = false;

  // Following variables are to help build dynamic bucket order
  bool has_rebuilt_bucket_;
  std::vector<at::Tensor> rebuilt_params_;
  std::vector<int64_t> rebuilt_param_indices_;
  const int64_t bucket_bytes_cap_;

  struct RpcContext {
    using ContextPtr = torch::distributed::autograd::ContextPtr;
    // The shared_ptr is to hold the context instance.
    ContextPtr context_ptr_holder;
    std::atomic<ContextPtr::element_type*> context_ptr{nullptr};

    void set(ContextPtr&& new_context_ptr);
  };
  RpcContext rpc_context_;

  // A struct containing work handle and tensor for allreduce scheduled in
  // forward pass, if applicable.
  struct ForwardPassAllreduceWork {
    c10::intrusive_ptr<c10d::ProcessGroup::Work> workHandle;
    at::Tensor resultTensor;
    // whether we should divide by the initial world_size or the no. of
    // remaining DDP ranks.
    bool useStaticWorldSize;
  };

  // Handle for the currently scheduled allreduce in the forward pass, if
  // applicable.
  ForwardPassAllreduceWork forwardPassWorkHandle_;

  // Division factor for reduction of gradients.
  int divFactor_;

  bool static_graph_;

  // Key: VariableIndex, Value: the number of times that a variable's autograd_hook()
  // should be triggered before marking this variable's grad as ready for communication.
  // Map will not change after 1st iteration.
  std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMap_;
  // Key: VariableIndex, Value: the number of times that a variable's autograd_hook()
  // are left to be triggered before marking this variable's grad as ready for communication.
  // Map will change after 1st iteration to track a grad is ready for communication or not.
  std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMapPerIteration_;

 private:
  // comm_hook_ is used to access the DDP communication hook if registered.
  std::unique_ptr<CommHookInterface> comm_hook_;
  // Current thread local state
  at::ThreadLocalState thread_local_state_;
  // Debug level setting. It is parsed once when Reducer is constructed, and
  // remains the same across a single invocation of DDP training.
  DistributedDebugLevel ddp_debug_level_;
  // Mapping of variable index to fully qualified name of model to notify users
  // about errors when certain parameters do not get gradient.
  std::unordered_map<size_t, std::string> param_names_;
  // Per iteration set of parameter indices that have been marked ready.
  std::unordered_set<size_t> perIterationReadyParams_;
  // Retrieves parameter names that have not been marked as ready as part of
  // previous iteration.
  std::vector<std::string> getUnmarkedParamsForIteration();
  // Retrives parameter indices that have not been marked as ready as part of
  // previous iteration.
  std::vector<size_t> getUnmarkedParamIndicesForIteration();
  // Raises appropriate error if mark_variable_ready is called on the same
  // variable twice, which is unexpected.
  void checkAndRaiseMarkedTwiceError(size_t curVariableIndex);

  friend class Logger;
};

Ключевые переменные-члены Reducer следующие.

  std::vector<std::vector<std::shared_ptr<torch::autograd::Node>>>
      grad_accumulators_; // 对应的 index 存了相应的 grad_accumulator,就是 tensor index对应的grad_accumulator
  std::unordered_map<torch::autograd::Node*, VariableIndex>
      gradAccToVariableMap_; // 存了grad_accumulator & index 的对应关系,这样以后在 autograd graph 寻找 unused parameters 就方便了
  std::vector<std::pair<uintptr_t, std::shared_ptr<torch::autograd::Node>>>
      hooks_;

  std::vector<Bucket> buckets_;

  const std::vector<std::vector<at::Tensor>> replicas_; // 传入的张量
  const c10::intrusive_ptr<::c10d::ProcessGroup> process_group_; // 进程组

Затем мы анализируем эти переменные-члены одну за другой.

0x03 Bucket

3.1 Дизайн

Объединение градиентов в пакеты перед уменьшением градиентов может сократить накладные расходы и/или ускорить время выполнения. Но пакетировать можно только градиенты одного типа на одном устройстве.

Ведро — это набор градиентов, и градиенты одного типа на унифицированном устройстве помещаются в одно и то же ведро. В коде Bucket — это концепция ведра.

При каждом обратном проходе тензоры из всех градиентов параметров копируются в ведра, а средние градиенты копируются обратно в ведра после AllReduce. Для ускорения операций копирования сегменты всегда создаются на том же устройстве, что и параметры. Если модель охватывает несколько устройств, DDP учитывает привязку устройств, чтобы гарантировать, что все параметры в одном сегменте находятся на одном устройстве. Порядок AllReduce также влияет на результат, так как он определяет, насколько обмен данными может пересекаться с вычислением. DDP запускает AllReduce в обратном порядке model.parameters().

3.2 Определения

3.2.1 BucketReplica имеет несколько

Для лучшего объяснения давайте сначала проанализируем, что такое BucketReplica. Начнем с аннотаций.

Во-первых, ведро имеет несколько BucketReplica, и каждая модель соответствует одной BucketReplica.

// A bucket holds N bucket replicas (1 per model replica).

Но используется только один элемент [0], так как однопроцессный режим с несколькими устройствами в настоящее время не поддерживается, поэтому предполагается, что в ведре только одна реплика.

    GradBucket grad_bucket(
        next_bucket_,
        tensors[0],
        // 这里的注释指明了不支持 SPMD
        // Since currently we do not support single-process multiple-device
        // mode, we can assume only one replica in the bucket.
        bucket.replicas[0].offsets,
        bucket.replicas[0].lengths,
        bucket.replicas[0].sizes_vec);
    bucket.future_work = comm_hook_->runHook(grad_bucket);

В сочетании с предыдущим кодом SPMD не будет поддерживаться в будущем. параметры — это набор параметров списка моделей [ToyModel], а параметры [0] — это параметры ToyModel.

    # 下面注释指明了未来也不会支持 SPMD
    # TODO(wayi@): Remove this field since SPMD is no longer supported,
    # and also remove all the relevant unnecessary loops.
    # Module replication within process (single-process multi device)
    
    self._module_copies = [self.module] # 构建一个比如 [ToyModel] 这样的列表
    # Build parameters for reducer.
    parameters, expect_sparse_gradient = self._build_params_for_reducer()

Из вышеизложенного мы знаем, что:

  • Первоначально DDP надеялся поддерживать SPMD, как и DP, поэтомуэтот процессНеобходимо поддерживать параметры нескольких копий модели на нескольких графических процессорах, то есть параметры — это массив, а каждый элемент массива — параметр копии модели.
  • параметры назначаются какReducer.replicas_Reducer.replicas_Используется для назначения Bucket.replicas.
  • Поскольку в будущем Reducer.replicas_ не будет поддерживаться, значение имеют только параметры[0].

Итак, мы заключаем:

  • BucketReplica — это группа параметров градиента, которую необходимо получить для модели. Реплика соответствует информации о параметрах (части) реплики модели на устройстве (GPU), то есть реплика представляет собой [1..N] градиентов, которые необходимо уменьшить, эти градиенты имеют одинаковый dtype и расположены на то же устройство.
  • На самом деле имеет значение только Bucket.replicas[0], что соответствует тензору частичного спроса в [self.module] в приведенном выше коде, то есть параметрам[0] .

3.2.2 Ключ

Резюмируем ключевые моменты Bucket:

  • Переменная-член реплики — это каждая BucketReplica, соответствующая корзине. BucketReplica представляет [1..N] градиентов, которые необходимо уменьшить с помощью того же dtype и на том же устройстве.

    • Только Bucket.replicas[0] имеет значение, что соответствует тензору, соответствующему этому сегменту в группе параметров градиента, которая будет вычисляться в этой модели.
    • Как присвоить значение? Он заключается в использовании Reducer.replicas_ для присвоения значений, а replicas_ — это параметры параметра. Мы представим его ниже.
  • Переменная-член variable_indices используется для записи индексов переменных в этом сегменте.

    Как присвоить значение? Используйте Bucket_indices, описанные ранее для назначения.

    bucket.variable_indices = std::move(bucket_indices[bucket_index]);
    

    как пользоваться? intra_bucket_index — это порядковый номер Bucket.variable_indices, используйте серийный номер, чтобы получить реальный индекс переменной. Код будет объяснен позже.

    size_t variable_index = bucket.variable_indices[intra_bucket_index];
    

3.2.3 Специальные определения

Наконец, Bucket определяется следующим образом:

  // A bucket holds N bucket replicas (1 per model replica).
  //
  // If every bucket in this struct is ready, the reduction can be kicked off.
  // One bucket per replica. Reduction is kicked off when every bucket is ready.
  //
  struct Bucket {
    std::vector<BucketReplica> replicas;// 每个模型副本对应一个桶

    // Global indices of participating variables in the bucket
    std::vector<size_t> variable_indices; // 具体每个桶里面有哪些 variable。

    // Number of replicas to be marked done before this bucket is ready.
    size_t pending; // 计数,

    // Keep work handle around when this set of buckets is being reduced.
    c10::intrusive_ptr<c10d::ProcessGroup::Work> work;

    // Keep future work handle around if DDP comm hook is registered.
    c10::intrusive_ptr<torch::jit::Future> future_work;

    // If this bucket should expect a single sparse gradient.
    // Implies: replicas[i].variables.size() == 1.
    bool expect_sparse_gradient = false;
  };

3.3 Настройки

Переменная-член Buckets_ редуктора является ключом, то есть всеми сегментами редуктора.

std::vector<Bucket> buckets_;

Как инициализировать ведра_ в функции инициализации, ядро ​​такое:

  • Найдите индекс этого сегмента в Bucket_indices.
  • Найдите среди параметров тензор, соответствующий индексу.
  • Настройка этих тензоров в BucketReplica — это тензоры, которые должен уменьшить этот ковш.
void Reducer::initialize_buckets(
    std::vector<std::vector<size_t>> bucket_indices) {
  
  buckets_.reserve(bucket_count);
  
  for (size_t bucket_index = 0; bucket_index < bucket_count; bucket_index++) {
    Bucket bucket;
    
    // Variables that expect sparse gradients must have their own bucket.
    if (bucket_indices[bucket_index].size() == 1) {
      const auto variable_index = bucket_indices[bucket_index].front();
      bucket.expect_sparse_gradient = // 设置 bucket
          expect_sparse_gradients_[0][variable_index];
    }     
    // Iterate over model replicas.
    for (size_t replica_index = 0; replica_index < replica_count;
         replica_index++) {
      
      BucketReplica replica; // 设置replica

      if (bucket.expect_sparse_gradient) {
        const auto variable_index = bucket_indices[bucket_index].front();
        // 找到index对应的tensor
        const auto& variable = replicas_[replica_index][variable_index];
        replica.variables = {variable};
      } else {

        // Iterate over bucket variables.
        for (const auto variable_index : bucket_indices[bucket_index]) {
          // 找到index对应的tensor
          const auto& variable = replicas_[replica_index][variable_index];
          if (!options.has_device()) {
            options = options.device(variable.device());
          } 
          if (!options.has_dtype()) {
            options = options.dtype(variable.dtype());
          } 
          
          const auto length = variable.numel();
          replica.variables.push_back(variable); // 插入张量
          replica.offsets.push_back(offset);
          replica.lengths.push_back(length);
          replica.sizes_vec.push_back(variable.sizes());
          offset += length;
        }

        // Allocate bucket contents tensor.
         initialize_bucket_views(replica, replica.contents);
      }

      // Add bucket replica to enclosing bucket.
      bucket.replicas.push_back(std::move(replica)); // 配置bucket
    }   
    
    bucket.variable_indices = std::move(bucket_indices[bucket_index]);
    buckets_.push_back(std::move(bucket)); //插入桶列表
  }  
}

Легенда выглядит следующим образом: здесь предполагается, что индекс корзины равен 1, то есть второй корзине, поэтому переменные_индексы соответствуют соответствующей части в Bucket_indices. Например, BucketReplica[0] содержит Tensor 4,5,6, а variable_indices — это индекс Tensor 4,5,6 соответственно.

Bucket_indices на изображении ниже — это один из параметров конструктора Reducer.

+--------------------------------+   +------------------------------------+
|Reducer                         |   |                                    |
|                                |   |bucket 0, bucket 1, ...... bucket n |
|      vector<Bucket> buckets_ +---> |    +                               |
|                                |   |    |                               |
+--------------------------------+   +------------------------------------+
                                          |
                          +---------------+              +------------------------------+
                          |                         +--> | Tensor 4, Tensor 5, Tensor 6 |
                          |                         |    +------------------------------+
                          |                         |
                          v                   +-----------------------------------------+
+-------------------------+-----------+       |     |                                   |
| Bucket                              |       | +---+-----------+     +---------------+ |
|                                     |       | | BucketReplica |     | BucketReplica | |
|                                     |       | |               | ... |               | |
|   vector<BucketReplica> replicas +--------> | +---------------+     +---------------+ |
|                                     |       +-----------------------------------------+
|                                     |
|   vector<size_t> variable_indices +------->  <tensor index 4, tensor index 5, tensor 6>
|                                     |
+-------------------------------------+





bucket_indices    +-----------------------------------------------------------------------+
     +            |                                                                       |
     |            |  <tensor index 0, tensor index 1, tensor index 2, tensor index 3>     |
     |            |                                                                       |
     +----------> |                                                                       |
                  |  <tensor index 4, tensor index 5, tensor 6>                           |
                  |                                                                       |
                  |                                                                       |
                  |  ......                                                               |
                  |                                                                       |
                  |                                                                       |
                  |  <tensor index 16, tensor index 17, tensor index 18, tensor index 19> |
                  |                                                                       |
                  +-----------------------------------------------------------------------+


0x03 BucketReplica

Как обсуждалось ранее, BucketReplica представляет [1..N] градиентов, которые необходимо уменьшить, все одного и того же dtype на одном устройстве. Это часть параметров градиента, которые должны быть получены моделью, которые определяются переменными_индексами ведра.

Его ключевые переменные-члены:

  • std::vector<at::Tensor> variablesэто переменная, которая составляет копию этого сегмента. Здесь мы используем значение с подсчетом ссылок, чтобы мы могли легко разложить содержимое корзины на участвующие переменные после выполнения сокращения.
  • at::Tensor contents: Результат выравнивания содержимого ведра, то есть результат после Flattened (1 diversity).
  • std::vector<at::Tensor> bucket_views_in: предоставляет способ просмотра определенных градиентов содержимого с точки зрения ввода.
  • std::vector<at::Tensor> bucket_views_out: предоставляет способ просмотра определенных градиентов содержимого с точки зрения вывода.

Дополнительные сведения см. в следующих примечаниях:

Views serve as entry points to copy_ each grad's data in/out of the flat contents tensor.

3.1 Views

оstd::vector<at::Tensor> bucket_views_inиstd::vector<at::Tensor> bucket_views_outДальнейшее объяснение:

  • В PyTorch представление относится к созданию чего-то, что легко просматривать. Представление разделяет память с исходными данными. Оно просто организует исходные данные и отображает часть содержимого напрямую или меняет его порядок перед отображением.
  • Каждое представление будет создано со следующим макетом (размеры + шаги), который соответствует ожидаемому макету выпускника.
  • Две переменные Bucket_views_in и Bucket_views_out предоставляют методы для управления определенными градиентами в содержимом, или, скорее, они предоставляют представления, которые управляют градиентом каждого тензора в содержимом. Пользователь использует эти две переменные в качестве точек входа для перемещения данных в контент и из него для каждого градиента.
  • мыbucket_Причина, по которой представление сохраняет два состояния: если зарегистрирован коммуникационный крючок DDP,bucket_views_outкрючокfuture_workЗначение повторно инициализируется. Поэтому нам нужноbucket_views_in[i].copy_(grad)Сохраняет отдельную ссылку представления на исходное содержимое реплики.
  • bucket_views_in[i].copy_(grad)иgrad.copy_(bucket_views_out[i])Предоставляет удобные методы для перемещения данных градиента в/из содержимого.

Кроме того, информация о каждом плоском тензоре следующих трех сегментов переменной-члена, например смещения, сохраняет смещение каждого тензора в содержимом плоского сегмента.

// Per-variable offset/length into the flat bucket contents tensor and grad
// bucket.
std::vector<size_t> offsets;
std::vector<size_t> lengths;
// Per-variable sizes into the grad bucekt.
std::vector<c10::IntArrayRef> sizes_vec;

3.2 Определения

BucketReplica конкретно определяется как:

// A bucket replica represents [1..N] gradients to be reduced,
// with the same dtype, on the same device.
//
// Batching gradients together before reducing them can result in lower
// overhead and/or faster time to completion. Only gradients of the same type
// and on the same device can be batched. The tensor that represents the
// flattened gradient uses the same type and is placed on the same device.
// Buckets are filled as the gradients they hold are computed (triggered by
// autograd hooks). Buckets are reduced in a predetermined order that is
// identical across processes.
struct BucketReplica {
  // Flattened (1 dimensional) contents of bucket.
  at::Tensor contents; // 这里打平了

  // Views into contents for each grad.  Each view will be created with
  // layout (sizes + strides) matching the grad's expected layout
  // ("Gradient Layout Contract" in torch/csrc/autograd/AccumulateGrad.h).
  // `bucket_views_in[i].copy_(grad)` and
  // `grad.copy_(bucket_views_out[i])`
  // provide convenient ways to move grad data in/out of contents.
  // The reason we keep two states for bucket_views is that if DDP
  // communication hook was registered, `bucket_views_out` could be
  // re-initialized with the value of hook's `future_work`. We still need to
  // keep a separate view reference to replica's original contents for
  // `bucket_views_in[i].copy_(grad)` call.
  std::vector<at::Tensor> bucket_views_in; // 怎么从contents 之中查找
  std::vector<at::Tensor> bucket_views_out; // 一个输出视图

  // Variables that contribute to this bucket replica. Use refcounted value
  // here so that we can easily unflatten the bucket contents into the
  // participating variables after reduction has completed.
  std::vector<at::Tensor> variables;

  // Per-variable offset/length into the flat bucket contents tensor and grad
  // bucket.
  std::vector<size_t> offsets;
  std::vector<size_t> lengths;

  // Per-variable sizes into the grad bucekt.
  std::vector<c10::IntArrayRef> sizes_vec;

  // Number of tensors to be added before this bucket is complete.
  // This is reset to `variables.size()` every iteration.
  size_t pending;

  // TODO(@pietern)
  // Memory copies from gradient tensors into the bucket are potentially
  // done on different CUDA streams. We record an event for every copy
  // so that we can synchronize with them prior to kicking off the reduction.
  // std::vector<at::cuda::CUDAEvent> events;
};

Пока логика следующая, как уже было сказано, имеет смысл только replicas[0] на ведро.

                                    +-----------------------------------------------------+
+----------------------------+      | +-------+      +----------------------------------+ |
| Reducer                    |      | |Bucket |      |Bucket                            | |
|                            |      | |       |      |                                  | |
|                            |      | |       |      |            Future  future_work   | |
|  vector<Bucket> buckets_ +------> | |       | ...  |                                  | |
|                            |      | |       |      |       ProcessGroup::Work  work   | |
|                            |      | |       |      |                                  | |
|                            |      | |       |      | vector<size_t> variable_indices  | |
|                            |      | |       |      |                                  | |
|                            |      | |       |      |  vector<BucketReplica> replicas  | |
|                            |      | |       |      |                          +       | |
|                            |      | |       |      |                          |       | |
|                            |      | |       |      |                          |       | |
+----------------------------+      | +-------+      +----------------------------------+ |
                                    +-----------------------------------------------------+
                                                                                |
                                                                                |
                                                                                v
                           +--------------------------------------------------------------+
                           | +---------------+       +----------------------------------+ |
                           | |BucketReplica  |       | BucketReplica                    | |
                           | |               |       |                                  | |
                           | |               |       |                                  | |
                           | |               |       |  vector<Tensor> bucket_views_in  | |
                           | |               |  ...  |                                  | |
                           | |               |       |  vector<Tensor> bucket_views_out | |
                           | |               |       |                                  | |
                           | |               |       |  Tensor contents                 | |
                           | |               |       |                                  | |
                           | |               |       |  vector<Tensor> variables        | |
                           | |               |       |                                  | |
                           | |               |       |                                  | |
                           | +---------------+       +----------------------------------+ |
                           +--------------------------------------------------------------+

3.3 Инициализация

Часть кода инициализации находится в Reducer::initialize_buckets.

// Allocate bucket contents tensor. 分配内存
replica.contents = at::empty({static_cast<long>(offset)}, options);

initialize_bucket_views(replica, replica.contents);

Конкретный код initialize_bucket_views выглядит следующим образом: здесь необходимо объяснить несколько функций PyTorch.

  • as_strided: создать представление (тип все еще тензорный) на основе существующего тензора и заданного размера шага, разделить память с исходными данными и не хранить стихи, поэтому оба представления не являются реальным хранилищем, а просто представлениями.
  • узкий : возвращает новый тензор, который является суженной версией исходного тензора.

Основная логика initialize_bucket_views такова:

  • Пройдитесь по тензорам реплики, выполните различную обработку для каждого тензора в зависимости от того, является ли он плотным или разреженным, и, наконец, вставьте его в replica.bucket_views_in.

  • Установите для replica.bucket_views_out значение replica.bucket_views_in, которое обычно должно быть равно.

  • еслиgradient_as_bucket_view_Если установлено значение true, вам нужно обработать два случая:

    • При вызовеrebuild_buckets для перестроения бакета, initialize_bucket_view может быть вызван внутри initialize_bucket.Если градация была определена/вычислена в предыдущей итерации, вам нужно скопировать старую градацию в новый Bucket_View и сделать так, чтобы градация указывала на новую Bucket_View.

    • Initialize_bucket_view также может вызываться внутри initialize_bucket во время сборки. Град не определяется во время сборки,

      В этом случае не позволяйте градиенту указывать на bucket_view, потому что градиент должен оставаться неопределенным для глобально неиспользуемых параметров.

Конкретный код выглядит следующим образом:

// (see Note:  "Gradient Layout Contract" in initialize_buckets).
void Reducer::initialize_bucket_views(
    Reducer::BucketReplica& replica,
    at::Tensor& contents) {
  for (size_t i = 0; i < replica.variables.size(); i++) { // 遍历replica的张量
    auto& v = replica.variables[i];
    const auto offset = replica.offsets[i];
    const auto length = replica.lengths[i];
    if (v.is_non_overlapping_and_dense()) {
      // If the param's memory is dense, match its layout, anticipating
      // the autograd engine (AccumulateGrad) will also create gradients
      // matching its layout.
      replica.bucket_views_in.push_back( // dense类型
          contents.as_strided(v.sizes(), v.strides(), offset));
    } else {
      // Fall back to a C-style contiguous view, again anticipating
      // AccumulateGrad will do the same when stashing grads for non-dense
      // params.
      replica.bucket_views_in.push_back( // sparse类型
          contents.narrow(0, offset, length).view(v.sizes()));
    }
    // By default `bucket_views_out` and `bucket_views_in` are
    // essentially the same thing.
    replica.bucket_views_out = replica.bucket_views_in;

    // If gradient_as_bucket_view_ is set as true, then there are two cases to
    // handle: initialize_bucket_views could be called inside initialize_buckets
    // when rebuild_buckets, if grad has already been defined/calculated in
    // previous iteration, old grad needs to be copied into new bucket_view and
    // let grad point to the new bucket_view, initialize_bucket_views could also
    // be called inside initialize_buckets during construction. Grads are not
    // defined during construction time, in this case, do not let grad point to
    // bucket_view, because grads should be kept as being undefined for globally
    // unused parameters.
    if (gradient_as_bucket_view_) {
      auto& bucket_view = replica.bucket_views_in.back();
      runGradCallbackForVariable(v, [&](auto& grad) {
        if (grad.defined() && !grad.is_alias_of(bucket_view)) {
          bucket_view.copy_(grad);
          grad = bucket_view;
          // 梯度被修改,需要写回去
          // The grad is modefied and needs to be written back.
          return true;
        }
        // 梯度没有被修改,不需要回写
        // The grad is not modified and does not need to be written back.
        return false;
      });
    }
  }
}

Подробности следующие:

+------------------------------------------+
| BucketReplica                            |
|                                          |
|       vector<Tensor> bucket_views_in +--------------------+
|                                          |                |
|                                          |                |
|       vector<Tensor> bucket_views_out +--------------+    |
|                                          |           |    |
|                                          |           |    |
|                                          |           v    v
|                                          |     +-----+----+--------------------------+
|       Tensor contents  +---------------------> |Flattened (Tensor1, Tensor2, Tensor3)|
|                                          |     +-------------------------------------+
|                                          |
|                                          |
|       vector<Tensor> variables  +------------>  [Tensor1,Tensor2,Tensor3]
|                                          |
|                                          |
|                                          |
+------------------------------------------+

Кроме того, mark_variable_ready_sparse, mark_variable_ready_dense и finalize_backward имеют присваивания содержимому.

0x04 Класс запроса

Следующие два класса используются для того, чтобы функция автоградации определяла соответствующее ведро тензора.

4.1 VariableIndex

VariableIndex должен определить положение тензора в ведре. Это полезно для хуков autograd. Для обратного вызова autograd hook процесс, в котором находится функция обратного вызова, знает только свой собственный тензор градиента, но функции обратного вызова необходимо знать, в какой реплике находится этот тензор и где он находится в реплике, чтобы его можно было в дальнейшем уменьшенный.

4.1.1 Переменные-члены

Среди экземпляров классов, таких как Reducer, есть только одна переменная-член VariableIndex, и эта независимая переменная-член:

std::vector<VariableIndex> unused_parameters_

VariableIndex существует скорее как часть или параметр других переменных-членов.Например, в Reducer gradAccToVariableMap_ использует VaribaleIndex.

std::unordered_map<torch::autograd::Node*, VariableIndex>
      gradAccToVariableMap_; // 存了grad_accumulator & index 的对应关系,这样以后在 autograd graph 寻找 unused parameters 就方便了

4.1.2 Определения

VariableIndex определяется следующим образом:

// Locates a specific variable by replica index and variable index.
struct VariableIndex {
  size_t replica_index; // 位于哪个replica
  size_t variable_index; // variable index,注意,不是"位于replica之中哪个位置",而是所有 varibale的index,比如一共有10个参数,variable_index 的取值是从0~9。那么"位于replica之中哪个位置"由什么来确定?由下面的 VariableLocator 确定。

  VariableIndex() = default;

  VariableIndex(size_t replica_index_, size_t variable_index_) {
    replica_index = replica_index_;
    variable_index = variable_index_;
  }

  static size_t hash(const VariableIndex& key) {
    return c10::get_hash(key.replica_index, key.variable_index);
  }
};

В конструкторе Reducer есть следующий код для настройки autogrid_hook, который устанавливает хук для каждого тензора на каждой реплике. Если хук autograd не знает, какой корзине соответствует градиент, он не может сообщить DDP, что корзина готова в целом.

Как найти бочку? Требуется указанный ниже VariableLocator.

auto& variable = replicas_[replica_index][variable_index];
const auto index = VariableIndex(replica_index, variable_index); // 生成了 VariableIndex
        hooks_.emplace_back(
            grad_accumulator->add_post_hook(
                torch::make_unique<torch::autograd::utils::LambdaPostHook>(
                    [=](const torch::autograd::variable_list& outputs,
                        const torch::autograd::variable_list& /* unused */) {
#ifndef _WIN32
                      this->rpc_context_.set(  
                          ThreadLocalDistAutogradContext::getContextPtr());
#endif
                      this->autograd_hook(index); // Hook的参数是 VariableIndex,目的是为了让 hook 可以顺利找到张量
                      return outputs;
                    })),
            grad_accumulator);

4.2 VariableLocator

4.2.1 Определения

VariableLocator используется для идентификации переменной в корзине. Чтобы найти местоположение тензора, нам нужно знать, в каком ведре, где в тензорах ведра.

  • Какое ведро:bucket_indexдаReducer.buckets_положение в списке, указывающееbuckets_ведро сверху.
  • Где копия ведра:intra_bucket_index— индекс переменной векторного поля в Bucket.replica.
// A variable locator locates a particular variable in the bucket
// structure. The `bucket_index` field points to the bucket in the `buckets_`
// vector. The `intra_bucket_index` field points to the index of the variable
// in any of the vector fields in the bucket replica.
struct VariableLocator {
  // Index into the `buckets_` variable.
  size_t bucket_index; // 哪个桶
  // Index of parameter in single bucket replica.
  size_t intra_bucket_index; // 在桶副本的哪个位置

  VariableLocator() = default;

  VariableLocator(size_t bucket_index_, size_t intra_bucket_index_) {
    bucket_index = bucket_index_;
    intra_bucket_index = intra_bucket_index_;
  }
};

4.2.2 Переменные-члены

Переменные-члены Reducer:

// Map the index of a variable to its location in the bucket structure.
std::vector<VariableLocator> variable_locators_;
4.2.2.1 Инициализация

Как инициализировать?

void Reducer::initialize_buckets(
    std::vector<std::vector<size_t>> bucket_indices) {
  // Clear current bucket assignment.
  buckets_.clear();
  variable_locators_.clear();
  // Ensure we have a bucket index for every variable.
  variable_locators_.resize(replicas_[0].size());
  
  // Iterate over buckets.
  const auto bucket_count = bucket_indices.size();
  const auto replica_count = replicas_.size();
  buckets_.reserve(bucket_count);
  
  for (size_t bucket_index = 0; bucket_index < bucket_count; bucket_index++) { // 遍历桶  
    // Map participating variables to this bucket.
    // This is identical across replicas so we only need to do this once.
    size_t intra_bucket_index = 0;
    for (const auto variable_index : bucket_indices[bucket_index]) { // 遍历桶里面的张量,所有桶里每个张量index 都是唯一的
      variable_locators_[variable_index] =
          VariableLocator(bucket_index, intra_bucket_index++); // intra_bucket_index 就是递加
    }
	}
}

Вопрос: переменная_locators_[переменная_индекс] не повторяется между разными сегментами? Нет, потому что VariableLocator(bucket_index, intra_bucket_index++) По определению комбинация Bucket_index и intra_bucket_index уникальна.

Приводим пример. Что касается тензорных индексов, то всем тензорам присваивается индекс, увеличивающийся от 0 до tensors.size(). Если параметры модели имеют всего 12 тензоров, индекс тензора изменяется от 0 до 11. Если его разделить на 6 сегментов, среди этих 6 сегментов каждый тензорный индекс уникален и не повторяется.

+-----------------------------------------------------------------------+
|                                                                       |
|  <tensor index 0, tensor index 1, tensor index 2, tensor index 3>     |
|                                                                       |
|                                                                       |
|  <tensor index 4, tensor index 5, tensor 6>                           |
|                                                                       |
|                                                                       |
|  ......                                                               |
|                                                                       |
|                                                                       |
|  <tensor index 16, tensor index 17, tensor index 18, tensor index 19> |
|                                                                       |
+-----------------------------------------------------------------------+

Таким образом, соответствующие переменные_locators_:

variable_locators_[tensor index 0] =  VariableLocator(bucket 0, 0),即 tensor index 0 属于 bucket 0 的 第一个variable。

variable_locators_[tensor index 1] =  VariableLocator(bucket 0, 1),即 tensor index 1 属于 bucket 0 的 第二个variable。

variable_locators_[tensor index 2] =  VariableLocator(bucket 0, 2),即 tensor index 2 属于 bucket 0 的 第三个variable。

variable_locators_[tensor index 3] =  VariableLocator(bucket 0, 3),即 tensor index 3 属于 bucket 0 的 第四个variable。
4.2.2.2 Использование

как пользоваться? В качестве примера используем следующее.

Когда вызывается хук autograd, используйте индекс VariableIndex для обратного вызова,

this->autograd_hook(index)

Наконец, autograd_hook вызывает mark_variable_ready_dense, где variable_locators_ используется для определения корзины, а затем выполняются последующие операции.

void Reducer::mark_variable_ready_dense(VariableIndex index) {
  const auto replica_index = index.replica_index;
  const auto variable_index = index.variable_index;
  const auto& bucket_index = variable_locators_[variable_index]; // 找到张量对应的桶index
  auto& bucket = buckets_[bucket_index.bucket_index]; // 找到桶
  auto& replica = bucket.replicas[replica_index]; // 再通过桶找到对应的 replica
  auto& variable = replica.variables[bucket_index.intra_bucket_index]; // 找到了张量
  const auto offset = replica.offsets[bucket_index.intra_bucket_index]; // 找到了张量信息
  const auto length = replica.lengths[bucket_index.intra_bucket_index];
  auto& bucket_view = replica.bucket_views_in[bucket_index.intra_bucket_index];

  // 接下来就可以继续处理了
  
  // Copy contents of gradient tensor to bucket tensor.
  // If the gradient is not set, we assume it wasn't computed
  // as part of the current backwards pass, and zero the part
  // of the bucket it would otherwise hold.
  runGradCallbackForVariable(variable, [&](auto& grad) {
    if (grad.defined()) {
      this->check_grad_layout(grad, bucket_view);
      // When gradient_as_bucket_view_ is false, or even when
      // gradient_as_bucket_view_ is true, in rare cases users may set grad to
      // be None after every iteration. In these cases, grad and bucket_view are
      // pointing to different storages and thus need to copy grads to
      // bucket_view. If gradient_as_bucket_view_ is set as true, let grad point
      // to bucket_view. If grad has already been set as views of buckets in
      // previous iterations, no copy is needed.
      if (!grad.is_alias_of(bucket_view)) {
        this->copy_grad_to_bucket(grad, bucket_view);
        if (gradient_as_bucket_view_) {
          // Let grad point to bucket_view buffer.
          grad = bucket_view;
          // The grad is modified and need to be written back.
          return true;
        }
      } else {
        // If grad and bucket view point to the same storage, no need to copy
        if (comm_hook_ == nullptr) {
          bucket_view.div_(divFactor_);
        }
      }
    } else {
      bucket_view.zero_();
    }
    // The grad is not modified and doesn't need to be written back.
    return false;
  });
}

0x05 Накопить связанные классы

Ниже приведены классы, связанные с накоплением градиента.

5.1 grad_accumulators_

grad_accumulators_ можно рассматривать как матрицу, каждый элемент матрицы представляет собой AccumulateGrad (тип Node), который используется для вычисления градиента. На данный момент это, кажется, просто бухгалтерская роль здесь.

std::vector<std::vector<std::shared_ptr<torch::autograd::Node>>>
    grad_accumulators_;

Как показано на рисунке ниже, переменная1 является фактическим тензором, а элемент в grad_accumulators_ указывает на AccumulateGrad переменной1.

                                        variable1 +----+
                                                       |
                                                       |
                                                       v
+-----------------------------------+    +-------------+-----------+
|grad_accumulators_                 |    | Variable                |
|                                   |    |                         |
|                                   |    |   +------------------+  |
| [replica_index][variable_index]+---------->+ AccumulateGrad   |  |
|                                   |    |   |                  |  |
|                                   |    |   |                  |  |
+-----------------------------------+    |   |    post_hooks_+--------> autograd_hook(index)
                                         |   |                  |  |
                                         |   |                  |  |
                                         |   +------------------+  |
                                         |                         |
                                         +-------------------------+

5.1.1 Инициализация

Как инициализировать? Среди функций сборки Reducer:

  {
    const auto replica_count = replicas_.size();

    // 以下两个for循环会遍历所有的张量
    for (size_t replica_index = 0; replica_index < replica_count;
         replica_index++) {

      for (size_t variable_index = 0; variable_index < variable_count;
           variable_index++) {
        
        auto& variable = replicas_[replica_index][variable_index];
        const auto index = VariableIndex(replica_index, variable_index);

        // The gradient accumulator function is lazily initialized once.
        // Therefore we can use its presence in the autograd graph as
        // evidence that the parameter has participated in an iteration.
        
        auto grad_accumulator = // 得到一个张量的grad_accumulator
            torch::autograd::impl::grad_accumulator(variable);

        // Hook to execute after the gradient accumulator has executed.
        hooks_.emplace_back(
            grad_accumulator->add_post_hook(
                torch::make_unique<torch::autograd::utils::LambdaPostHook>(
                    [=](const torch::autograd::variable_list& outputs,
                        const torch::autograd::variable_list& /* unused */) {
#ifndef _WIN32
                      this->rpc_context_.set(
                          ThreadLocalDistAutogradContext::getContextPtr());
#endif
                      this->autograd_hook(index);
                      return outputs;
                    })),
            grad_accumulator);

        // Map raw function pointer to replica index and parameter index.
        // This is used later on when the autograd graph is traversed
        // to check for parameters for which no gradient is computed, if
        // find_unused_parameters=True.
        // Note that the mapping of gradient accumulator to variable should be
        // one to one as we deduplicate shared parameters before constructing
        // Reducer.
        if (find_unused_parameters_) {
          gradAccToVariableMap_[grad_accumulator.get()] = index;
        }

        numGradHooksTriggeredMap_[index] = 0;

        // The gradient accumulator is stored as weak_ptr in the autograd
        // metadata of the variable, so we have to keep it alive here for
        // the raw pointer to be valid.
        grad_accumulators_[replica_index][variable_index] =
            std::move(grad_accumulator); // 把这个张量的 grad_accumulator 复制到 grad_accumulators_
      }
    }
  }

5.1.2 Использование

grad_accumulator возвращает Node, то есть AccumulateGrad, который является типом Node, и мы вынули проверочный код проверки.

std::shared_ptr<Node> grad_accumulator(const Variable& self) {
  auto autograd_meta = get_autograd_meta(self);

  std::lock_guard<std::mutex> lock(autograd_meta->mutex_);

  auto result = autograd_meta->grad_accumulator_.lock();
  if (result)
    return result;

  c10::raw::intrusive_ptr::incref(self.unsafeGetTensorImpl());
  auto intrusive_from_this = c10::intrusive_ptr<at::TensorImpl>::reclaim(self.unsafeGetTensorImpl());
  result = std::make_shared<AccumulateGrad>(Variable(std::move(intrusive_from_this)));
  autograd_meta->grad_accumulator_ = result;
  return result;
}

5.2 gradAccToVariableMap_

gradAccToVariableMap_ определяется следующим образом:

std::unordered_map<torch::autograd::Node*, VariableIndex> gradAccToVariableMap_;

Функция состоит в том, чтобы дать каждому узлу соответствующий VariableIndex, как показано на рисунке ниже, чтобы дать переменной 1 индекс 1:

                                                        +--------------+
                                                        | Variable     |
                                                  +---> |              |
                                                  |     |              |
                                                  |     +--------------+
                                                  |
                                                  |
+-------------------------------------+           |
| gradAccToVariableMap_               |           |
|                                     |           |
|                                     |           +
|         <Node*, VariableIndex> +---------> [variable1 :index1, variable2 : index2]
|                                     |                     +
|                                     |                     |
|                                     |                     |
+-------------------------------------+                     |
                                                            |
                                                            v
                                                  +---------+-----------------------------+
                                                  |VariableIndex                          |
                                                  |                                       |
                                                  |          replica_index of Variable1   |
                                                  |                                       |
                                                  |          variable_index of Variable1  |
                                                  |                                       |
                                                  +---------------------------------------+

5.2.1 Инициализация

Как инициализировать? В конструкторе Reducer есть следующее, то есть VariableIndex для каждой переменной, которую необходимо дифференцировать.

auto& variable = replicas_[replica_index][variable_index];
const auto index = VariableIndex(replica_index, variable_index);
auto grad_accumulator = torch::autograd::impl::grad_accumulator(variable);

if (find_unused_parameters_) {
  gradAccToVariableMap_[grad_accumulator.get()] = index;
}

5.2.2 Использование

Использование gradAccToVariableMap_ выглядит следующим образом, search_unused_parameters предназначен для прохождения поискаgradAccToVariableMap_, если функция-аккумулятор неgradAccToVariableMap_Внутри это означает, что градиент не нужно рассчитывать.

// Traverse the autograd graph starting at the specified output.
// All parameters for which we have a pointer to their gradient accumulation
// functions, but don't show up in the autograd graph will be marked ready for
// for reduction as soon as the first autograd hook is called. This is not
// done immediately because the model output may be ignored, and we only
// want to start performing reductions on `torch.autograd.backward()`.
void Reducer::search_unused_parameters(
    const std::vector<torch::autograd::Variable>& outputs) {
  std::unordered_set<torch::autograd::Node*> seen;
  std::vector<torch::autograd::Node*> queue;

  // Seed queue with the grad functions of all outputs.
  for (const auto& output : outputs) {
    const auto& grad_fn = output.grad_fn();
    if (grad_fn) {
      queue.push_back(grad_fn.get());
    }
  }

  // Traverse the autograd graph starting at the specified output.
  while (!queue.empty()) {
    auto fn = queue.back();
    queue.pop_back();
    for (const auto& edge : fn->next_edges()) {
      if (auto next_ptr = edge.function.get()) {
        const bool was_inserted = seen.insert(next_ptr).second;
        if (was_inserted) {
          queue.push_back(next_ptr);
        }
      }
    }
  }

  // 遍历查找,如果某一个accumulator 函数没有在这图里面,就说明不用计算梯度
  // Find accumulator functions that don't show up in this graph.
  for (const auto& it : gradAccToVariableMap_) {
    // If the accumulator function is present in the graph, we know
    // a gradient will be computed for the corresponding parameter.
    if (seen.count(it.first) == 0) {
      unused_parameters_.push_back(it.second);
    }
  }
}

5.3 numGradHooksTriggeredMap_

Запишите autograd_hook этого тензора до того, как градиент этого тензора будет готовнадо вызывать несколько раз. После первой итерации он не увеличивается, поэтому это значение должно быть 1 или 0. Используется для установки неиспользуемых_параметров_ и настройки numGradHooksTriggeredMapPerIteration_.

// Key: VariableIndex, Value: the number of times that a variable's autograd_hook()
// should be triggered before marking this variable's grad as ready for communication.
// Map will not change after 1st iteration.
std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMap_;

5.3.1 Инициализация

Как инициализировать? Среди функций сборки:

numGradHooksTriggeredMap_[index] = 0;

После первой итерации последующие вызовы autogrid_hook увеличиваются на единицу.

// The function `autograd_hook` is called after the gradient for a
// model parameter has been accumulated into its gradient tensor.
// This function is only to be called from the autograd thread.
void Reducer::autograd_hook(VariableIndex index) {

  // 省略部分代码
  
  if (static_graph_first_iteration()) {
    numGradHooksTriggeredMap_[index] += 1; // 静态图第一次迭代时候,这里会增加1
    return; // 然后直接返回,注意!
  }

  // If `find_unused_parameters_` is true there may be model parameters that
  // went unused when computing the model output, they won't be part of the
  // autograd graph, and won't receive gradients. These parameters are
  // discovered in the `prepare_for_backward` function and their indexes stored
  // in the `unused_parameters_` vector.
  if (!has_marked_unused_parameters_) {
    has_marked_unused_parameters_ = true;
    for (const auto& unused_index : unused_parameters_) {
      mark_variable_ready(unused_index);
    }
  }

  // If it is static graph, after 1st iteration, check a avariable
  // is ready for communication based on numGradHooksTriggeredMap_.
  if (static_graph_after_first_iteration()) {
    if (--numGradHooksTriggeredMapPerIteration_[index] == 0) {
      // Finally mark variable for which this function was originally called.
      mark_variable_ready(index); // 
    }
  } else {
    // Finally mark variable for which this function was originally called.
    mark_variable_ready(index);
  }
}

5.3.2 Использование

как пользоваться? Он будет сброшен здесь.

void Reducer::reset_bucket_counting() {
  next_bucket_ = 0;
  // Reset num_buckets_ready_ at the beginning of backward computation
  // in each iteration.
  num_buckets_ready_ = 0;

  for (auto& bucket : buckets_) {
    for (auto& replica : bucket.replicas) {
      replica.pending = replica.variables.size();
    }
    bucket.pending = bucket.replicas.size();
  }

  if (static_graph_) {
    numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_;
  }
}

Он также будет обработан здесь. Если 0, вставьте неиспользуемые_параметры_.

// Right now delay_all_reduce is only called when static_graph_=true and
// num_iterations_==1.
void Reducer::delay_all_reduce() {

  // 省略部分代码
  
  // copy all gradients to buckets
  for (size_t replica_index = 0; replica_index < replicas_.size();
       replica_index++) {
    for (size_t variable_index = 0; variable_index < replicas_[replica_index].size();
         variable_index++) {
      const auto index = VariableIndex(replica_index, variable_index);
      // set unused_parameters_
      if (numGradHooksTriggeredMap_[index] == 0) { // 如果为0,则插入unused_parameters_
        unused_parameters_.push_back(index);
      }
      require_finalize_ = true;
      set_divide_factor();
      if (expect_sparse_gradients_[replica_index][variable_index]) {
        mark_variable_ready_sparse(index);
      } else {
        mark_variable_ready_dense(index);
      }
    }
  }

  // launch all reduces for all buckets
  for (auto & bucket : buckets_) {
    all_reduce_bucket(bucket);
  }

  finalize_backward();
}

5.4 numGradHooksTriggeredMapPerIteration_

Прежде чем градиент этого тензора будет готов, autograd_hook этого тензорапо-прежнему необходимовызывается несколько раз. Если он равен 0, это означает, что ведро должно быть готово целиком.

Эта переменная-член сбрасывается с помощью numGradHooksTriggeredMap_.

// Key: VariableIndex, Value: the number of times that a variable's autograd_hook()
// are left to be triggered before marking this variable's grad as ready for communication.
// Map will change after 1st iteration to track a grad is ready for communication or not.
std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMapPerIteration_;

5.4.1 Использование

как пользоваться? В случае статических графов, если это не первая итерация (градиент в это время только что сгенерирован),numGradHooksTriggeredMapPerIteration_[index]Декремент, если он равен 0, означает, что переменная готова, и можно выполнять операцию уменьшения градиента множества.

// The function `autograd_hook` is called after the gradient for a
// model parameter has been accumulated into its gradient tensor.
// This function is only to be called from the autograd thread.
void Reducer::autograd_hook(VariableIndex index) {
  
  // 省略其他代码
  
  // If it is static graph, after 1st iteration, check a avariable
  // is ready for communication based on numGradHooksTriggeredMap_.
  if (static_graph_after_first_iteration()) {
    if (--numGradHooksTriggeredMapPerIteration_[index] == 0) {
      // Finally mark variable for which this function was originally called.
      mark_variable_ready(index);
    }
  } else {
    // Finally mark variable for which this function was originally called.
    mark_variable_ready(index);
  }
}

Когда произойдет новая итерация, это значение будет сброшено, и prepare_for_backward вызовет reset_bucket_counting.

И используйте numGradHooksTriggeredMap_ для сброса.

void Reducer::reset_bucket_counting() {
  next_bucket_ = 0;
  // Reset num_buckets_ready_ at the beginning of backward computation
  // in each iteration.
  num_buckets_ready_ = 0;

  for (auto& bucket : buckets_) {
    for (auto& replica : bucket.replicas) {
      replica.pending = replica.variables.size();
    }
    bucket.pending = bucket.replicas.size();
  }

  if (static_graph_) {
    numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_;
  }
}

Покажем конкретную логику:

  • Для тензора 2 он не используется, поэтому метод delay_all_reduce напрямую ставится в неиспользуемый параметр.
  • Для тензора 1:
    • numGradHooksTriggeredMap_ инициализируется 0.
    • Становится 1 после первой итерации.
    • Во время обратного распространения вызовите prepare_for_backward и reset_bucket_counting, поместитеnumGradHooksTriggeredMap_назначить вnumGradHooksTriggeredMapPerIteration_.
    • Он будет декрементирован в autograd_hook, и если он равен 0, установить эту переменную в готовое значение, которое можно уменьшить.
   Variable 2

                                     delay_all_reduce

   numGradHooksTriggeredMap_[2] = 0  +---------------> unused_parameters_.push_back(0)


+----------------------------------------------------------------------------------------+

   Variable 1



    numGradHooksTriggeredMap_[1] = 0

                   +
                   |
                   |  first_iteration
                   |
                   v

    numGradHooksTriggeredMap_[1] = 1

                   +
                   |  prepare_for_backward
                   |
                   |  reset_bucket_counting
                   v

 numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_
                   +
                   |
                   |
                   | backward
                   |
                   | autograd_hook
                   v
                                                               YES
 if (++numGradHooksTriggeredMapPerIteration_[index]=== 0)?? +------->  mark_variable_ready(1)
                   +
                   |  NO
                   |
                   v

5.5 perIterationReadyParams_

В каждой итерации perIterationReadyParams_ представляет готовые параметры.

// Per iteration set of parameter indices that have been marked ready.
std::unordered_set<size_t> perIterationReadyParams_;

5.5.1 Настройки

То есть, если переменная готова, она вставляется в perIterationReadyParams_.

void Reducer::mark_variable_ready(VariableIndex index) {

  if (should_rebuild_buckets()) {
    push_rebuilt_params(index);
  }

  const auto replica_index = index.replica_index;
  const auto variable_index = index.variable_index;

  if (replica_index == 0) {
    checkAndRaiseMarkedTwiceError(variable_index);
    perIterationReadyParams_.insert(variable_index);
  }
}

5.5.2 Сброс

Эта переменная сбрасывается перед обратным распространением.

void Reducer::prepare_for_backward(
    const std::vector<torch::autograd::Variable>& outputs) {

  // Reset per iteration marked ready parameters.
  perIterationReadyParams_.clear();

}

5.5.3 Использование

Просто пройдите через perIterationReadyParams_, если не найдено, вернитесь.

В методе reboot_buckets будет вызываться sure_prior_reduction_finished, и эти два метода будут вызываться для проверки.

std::vector<std::string> Reducer::getUnmarkedParamsForIteration() {
  std::vector<std::string> unMarkedParamNames;
  for (const auto& it : param_names_) {
    if (perIterationReadyParams_.find(it.first) ==
        perIterationReadyParams_.end()) {
      unMarkedParamNames.push_back(it.second);
    }
  }
  return unMarkedParamNames;
}

std::vector<size_t> Reducer::getUnmarkedParamIndicesForIteration() {
  std::vector<size_t> unmarked_param_indices;
  const auto variable_count = replicas_[0].size();
  for (size_t variable_index = 0; variable_index < variable_count; variable_index++) {
    if (perIterationReadyParams_.find(variable_index) == perIterationReadyParams_.end()) {
      unmarked_param_indices.push_back(variable_index);
    }
  }
  return unmarked_param_indices;
}

5.6 Используемые параметры

Следующие две переменные используются для записи локально используемых параметров, которые указывают, использовались ли эти параметры локально в текущей итерации или сеансе no_sync, когда синхронизация не включена (no_sync включен).

Каждая копия модели соответствует тензору в карте, и каждый тензор является одномерным тензором int32 по количеству параметров.

Эти тензоры отмечены в autograd_hook, чтобы указать, что был использован соответствующий параметр. Все эти тензоры уменьшаются в конце текущей итерации или при обратном распространении сеанса no_sync для вычисления глобально неиспользуемых параметров.

// Locally used parameter maps indicating if parameters are used locally
// during the current iteration or no_sync session if no_sync is on. One
// tensor for each model replica and each tensor is one-dim int32 tensor of
// number of parameters. These tensors are marked in autograd_hook to indicate
// the corresponding param has been used, and get allreduced in the end of
// backward of current iteration or no_sync session for figuring out the
// globally unused parameters.
//
// local_used_maps_:     CPU tensors for bookkeeping locally used params
// local_used_maps_dev_: dev tensors for reducing globally unused params
std::vector<at::Tensor> local_used_maps_; // autograd_hook中会设置,对应论文中的
std::vector<at::Tensor> local_used_maps_dev_; // GPU

5.6.1 Тезис

Его можно увидеть вместе с бумагой здесь.

Градиенты глобально неиспользуемых параметров должны оставаться постоянными во время прямого и обратного прохода. Для обнаружения неиспользуемых параметров требуется глобальная информация, поскольку в процессе DDP параметр может не существовать в одной операции, но может участвовать в обучении в той же итерации другого процесса. Поэтому DDP сохраняет локально неиспользуемую информацию о параметрах в растровом изображении и запускает дополнительный AllReduce для сбора глобального растрового изображения. Поскольку растровые изображения намного меньше по размеру, чем тензоры, все параметры в модели используют одно и то же растровое изображение вместо создания растровых изображений для каждого сегмента. Битовая карта находится на ЦП, чтобы избежать запуска выделенного ядра CUDA для каждого обновления. Однако некоторые серверные части ProcessGroup могут не запускать AllReduce на тензорах ЦП. Например, ProcessGroupNCCL поддерживает только тензоры CUDA. Кроме того, поскольку DDP должен работать с любой пользовательской серверной частью ProcessGroup, нельзя предполагать, что все серверные части поддерживают тензоры ЦП. Чтобы решить эту проблему, DDP поддерживает другое растровое изображение на том же устройстве в качестве первого параметра модели и вызывает операцию неблокирующего копирования для перемещения растрового изображения ЦП на растровое изображение устройства для коллективной связи.

5.6.2 Инициализация

Функция инициализации выглядит следующим образом:

void Reducer::initialize_local_used_map() {
  const auto replica_count = replicas_.size();
  const auto variable_count = replicas_[0].size();
  local_used_maps_.resize(replica_count);
  local_used_maps_dev_.resize(replica_count);

  for (size_t i = 0; i < replica_count; i++) {
    at::TensorOptions options;
    options = options.dtype(at::kInt);

    // Deliberately don't pin the memory even if local_used_maps_dev_ will
    // be cuda. See Note [local_used_maps_ -> local_used_maps_dev copying]
    local_used_maps_[i] =
        at::zeros({static_cast<long>(variable_count)}, options);

    // This tensor needs to be on the same device as replica because backend
    // such as NCCL may not support CPU tensors, and hence it might not work
    // if we always put it on CPU.
    options = options.device(replicas_[i][0].device());
    local_used_maps_dev_[i] =
        at::empty({static_cast<long>(variable_count)}, options);
  }
}

5.6.3 Сброс

И finalize_bucket_dense, и finalize_backward сбрасываются.

void Reducer::finalize_backward() {
  if (dynamic_graph_find_unused()) {
    // Reset unused parameter accounting.
    // See Note [local_used_maps_ -> local_used_maps_dev copying]
    for (auto& local_used : local_used_maps_) {
      local_used.fill_(0);
    }
    local_used_maps_reduced_ = false;
  }  

5.6.4 Настройки

Если используется в autograd_hook, установите для него значение 1.

void Reducer::autograd_hook(VariableIndex index) {

  // 在这里会记录,已经使用了。	
  // See Note [Skip allreducing local_used_maps_dev]
  if (dynamic_graph_find_unused() || static_graph_first_iteration()) {
    // Since it gets here, this param has been used for this iteration. We want
    // to mark it in local_used_maps_. During no_sync session, the same var can
    // be set multiple times, which is OK as does not affect correctness. As
    // long as it is used once during no_sync session, it is marked as used.
    local_used_maps_[index.replica_index][index.variable_index] = 1;
  }

5.6.5 Использование

All_reduce_local_used_map будет вызываться в момент времени mark_variable_ready Если требуется синхронизация, синхронизация выполняется здесь. Переведем ноты:

  • DDP использует асинхронный H2D, чтобы избежать накладных расходов на блокировку. Асинхронная копия и allreduce будут смотреть на текущий поток и, таким образом, будут упорядочены правильно.

  • Также важен правильный порядок операций на хосте. H2Dcopy_упорядочена в потоковом режиме, а пара хостовlocal_used_maps_Изменения сортируются по хосту.

  • Если существует большое количество незавершенных работ с потоком cuda, которое откладывает операцию copy_ на будущее, и если между настоящим моментом и finalize_backward не происходит блокирующих вызовов, то finalize_backward повторно обнулит локальную карту, используемую на хосте, до того, как поток выполнит копирование, в этом случае copy_ будет читать эти нули, а не значения, которые мы указываем здесь читать.

  • Копирование local_used_maps_[i] в ​​закрепленную временную память (которую распределитель закрепленного кэша должен предоставлять асинхронно) позволяет избежать этого неприятного, редкого состояния гонки.

  • В тех случаях, когда требуются все параметры, DDP сама не будет выполнять никаких блокирующих операций между этим моментом и повторным обнулением, так что эта опасная ситуация реальна.

  • Таким образом, Редюсер работает в обороне, чтобы гарантировать, что local_used_maps_tmp отличается от local_used_maps_[i].

void Reducer::all_reduce_local_used_map() {
  // See Note [Skip allreducing local_used_maps_dev]
    // H2D from local_used_maps_ to local_used_maps_dev_
    for (size_t i = 0; i < local_used_maps_.size(); i++) {
      if (local_used_maps_dev_[i].is_cuda()) {
        // Note [local_used_maps_ -> local_used_maps_dev copying]
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        // We do async H2D to avoid the blocking overhead. The async copy and
        // allreduce respect the current stream, so will be sequenced
        // correctly.
        //
        // Correct sequencing with respect to host operations is also
        // essential. The H2D copy_ is stream ordered, while the host's
        // changes to local_used_maps_ are host ordered. If a large backlog of
        // cuda-stream work pushes the copy_ far into the future, and if no
        // blocking calls occur between now and finalize_backward()** such
        // that finalize_backward() re-zeroes local_used_maps_ on the host
        // before the stream executes the copy_, copy_ will read those zeros
        // instead of the values we thought we told it to read here. Copying
        // local_used_maps_[i] to a pinned temporary (which the pinned caching
        // allocator should supply asynchronously) avoids this nasty, rare
        // race condition.
        //
        // ** In the hoped-for case where all params are used, DDP itself
        // won't do any blocking work between now and the re-zeroing, so the
        // danger is real.
        //
        // Defensively ensures local_used_maps_tmp is distinct from
        // local_used_maps_[i]
        auto local_used_maps_tmp = at::native::empty_like(
            local_used_maps_[i],
            optTypeMetaToScalarType(local_used_maps_[i].options().dtype_opt()),
            local_used_maps_[i].options().layout_opt(),
            local_used_maps_[i].options().device_opt(),
            true /* pinned_memory */);
        // Paranoid asserts here because in some workloads, the pinned
        // allocator behaves in a way we don't understand, and may be bugged.
        // See https://github.com/pytorch/pytorch/pull/54474
        TORCH_INTERNAL_ASSERT(local_used_maps_tmp.is_pinned());
        TORCH_INTERNAL_ASSERT(
            local_used_maps_tmp.data_ptr() != local_used_maps_[i].data_ptr());
        local_used_maps_tmp.copy_(local_used_maps_[i]);
        local_used_maps_dev_[i].copy_(local_used_maps_tmp, true);
      } else {
        local_used_maps_dev_[i].copy_(local_used_maps_[i], true);
      }
    }
    local_used_work_ = process_group_->allreduce(local_used_maps_dev_);
}

5.7 Вычисление классов поддержки градиента

Далее мы проанализируем некоторые основные функции и вспомогательные классы, участвующие в вычислении градиентов.

5.7.1 RpcContext

Этот класс используется для инкапсуляции Distributed::autograd::ContextPtr.

struct RpcContext {
  using ContextPtr = torch::distributed::autograd::ContextPtr;
  // The shared_ptr is to hold the context instance.
  ContextPtr context_ptr_holder;
  std::atomic<ContextPtr::element_type*> context_ptr{nullptr};

  void set(ContextPtr&& new_context_ptr);
};
RpcContext rpc_context_;

5.7.2 hooks_

Его функция состоит в том, чтобы поддерживать крюк автограда, а также играть роль бухгалтерии.

std::vector<std::pair<uintptr_t, std::shared_ptr<torch::autograd::Node>>>
    hooks_;

Инициализируйте следующим образом:

        // Hook to execute after the gradient accumulator has executed.
        hooks_.emplace_back(
            grad_accumulator->add_post_hook(
                torch::make_unique<torch::autograd::utils::LambdaPostHook>(
                    [=](const torch::autograd::variable_list& outputs,
                        const torch::autograd::variable_list& /* unused */) {
#ifndef _WIN32
                      this->rpc_context_.set(
                          ThreadLocalDistAutogradContext::getContextPtr());
#endif
                      this->autograd_hook(index);
                      return outputs;
                    })),
            grad_accumulator);

5.7.3 comm_hook_

5.7.3.1 Концепция

Давайте взглянем на концепции через [Коммуникационный крючок DDP].

Перехватчик связи DDP — это усовершенствование, предоставляющее перехватчик, который можно использовать для переопределения DDP для межранговой связи градиента, которую можно использовать в таких алгоритмах, как Gradient Compression/GossipGrad. Может использовать Python APIregister_comm_hookчтобы зарегистрировать функцию ловушки.

Если коммуникационный хук DDP не зарегистрирован, редюсеру нужно только вызвать allreduce, чтобы уменьшить ведро. Если он зарегистрирован, хук будет вызываться и обрабатываться с помощью будущего рабочего дескриптора. Если редьюсер зарегистрирован, он также пропускает шаг «разделить градиент по мировому размеру». Цель этого состоит в том, что коммуникационный хук может полностью переопределить способ, которым мы осуществляем связь, и пользователь имеет полный контроль над тем, как обрабатываются градиенты.

PythonCommHookдаCommHookInterfaceПодкласс , который может регистрировать хук Python. Кроме того, есть несколько встроенных реализаций хуков C++, к которым можно получить доступ, вызвав Python API.register_builtin_comm_hookуказать.

// Note [DDP Communication Hook]
// ~~~~~~~~~~~~~~~~~~~~~~~~~~
// If DDP communication hook is not registered, the reducer reduces the buckets
// by just calling allreduce. If registered, it calls the hook and uses future
// work handle. If registered, reducer also skips dividing grads by world size.
// The reason for this is that the communication hook is expected to completely
// override how we perform communication and the user should have complete
// control over how the grads are handled.
//
// DDP communication hook is an enhancement that provides a hook which can be
// used to override how DDP communicates gradients across ranks, this can be
// used for algorithms like Gradient Compression/GossipGrad. This hook can be
// registered from Python API using `register_comm_hook`. `PythonCommHook`
// enables registering a Python hook and is a subclass of `CommHookInterface`.
// Additionally, there are also some built-in C++ hook implementations that can
// be specified by calling `register_builtin_comm_hook` from Python API.
5.7.3.2 Использование

Давайте посмотрим на torch/distributed/algorithms/ddp_comm_hooks/default_hooks.py.

Следующие хуки выполняют свою собственную специальную обработку до и после all-reduce. Если вы используете этот хук, используйте ddp_model.register_comm_hook(process_group, fp16_compress_hook).

def fp16_compress_hook(
    process_group: dist.ProcessGroup, bucket: dist.GradBucket
) -> torch.futures.Future:
    """
    This DDP communication hook implements a simple gradient compression
    approach that casts ``GradBucket`` tensors to half-precision floating-point format (``torch.float16``)
    and then divides it by the process group size.
    It allreduces those ``float16`` gradient tensors. Once compressed gradient
    tensors are allreduced, the chained callback ``decompress`` casts it back to the input data type (such as ``float32``).

    Example::
        >>> ddp_model.register_comm_hook(process_group, fp16_compress_hook)
    """
    group_to_use = process_group if process_group is not None else dist.group.WORLD
    world_size = group_to_use.size()

    compressed_tensor = bucket.get_tensor().to(torch.float16).div_(world_size)

    fut = dist.all_reduce(
        compressed_tensor, group=group_to_use, async_op=True
    ).get_future()

    def decompress(fut):
        decompressed_tensor = bucket.get_tensor()
        # Decompress in place to reduce the peak memory.
        # See: https://github.com/pytorch/pytorch/issues/45968
        decompressed_tensor.copy_(fut.value()[0])
        return [decompressed_tensor]

    return fut.then(decompress)

5.7.4 runGradCallbackForVariable

Функция mark_variable_ready_dense вызовет runGradCallbackForVariable.

5.7.4.1 Reducer

RunGradCallbackForVariable редуктора выглядит следующим образом: он вызывает для обработки Distributed::autograd::ContextPtr.runGradCallbackForVariable.

void Reducer::runGradCallbackForVariable(
    at::Tensor& variable,
    GradCallback&& cb) {
  // 加载rpc context
  auto context_ptr = rpc_context_.context_ptr.load();
  if (context_ptr == nullptr) {
    cb(variable.mutable_grad());
  } else {
    // Under distributed autograd
#ifndef _WIN32
    // 下面分析
    context_ptr->runGradCallbackForVariable(variable, std::move(cb));
#endif
  }
}
5.7.4.2 DistAutogradContext

мы пойдемDistAutogradContext.

Он найдет градацию градиента, соответствующую тензору в наборе градиентов_, среди накопленных градиентов, затем использует входящую функцию обратного вызова для обработки градации градиента и, наконец, скопирует обработанный градиент обратно в набор накопленных градиентов_. Таким образом, он начинается с градиента, полученного крючком, и заканчивается градиентом после возврата сокращения, завершая замкнутый цикл.

void DistAutogradContext::runGradCallbackForVariable(
    const torch::autograd::Variable& variable,
    GradCallback&& cb) {
  torch::Tensor grad;
  {
    std::lock_guard<std::mutex> guard(lock_);
    auto it = accumulatedGrads_.find(variable); // 找到张量 对应的梯度 grad
    TORCH_INTERNAL_ASSERT(
        it != accumulatedGrads_.end(),
        "The grad for the variable should exist in dist_autograd context.");
    grad = it->value();
  }
  if (cb(grad)) { // 用传入的回调函数来处理梯度grad
    std::lock_guard<std::mutex> guard(lock_);
    auto device = grad.device();
    // Needs to update the grad in the map.
    accumulatedGrads_.insert_or_assign(variable, std::move(grad)); //最后把处理后的梯度拷贝回accumulatedGrads_
    recordGradEvent(device);
  }
}

Накопленный Grads_ DistAutogradContext будет записывать текущий градиент, соответствующий тензору.

// DistAutogradContext which stores information for a single distributed
// autograd pass on a worker.
class TORCH_API DistAutogradContext {
 public:
  // Gradients accumulated in this context so far. The key is the variable on
  // which the gradient needs to be accumulated and the value is the gradient
  // that needs to be accumulated on that variable..
  c10::Dict<torch::Tensor, torch::Tensor> accumulatedGrads_;  
}

До сих пор мы ввели некоторые базовые классы, и мы продолжим вводить их в следующей главе (их слишком много...).

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

Распределенная серия pytorch 3 — что делает torch.utils.data.distributed.DistributedSampler во время распределенного обучения?

Распределенная серия Pytorch 1 — узнайте переменные среды, связанные с torch.distributed.launch

pytorch распределенная серия 2 - как синхронизируется DistributedDataParallel?

pytorch (распределенные) данные параллельной личной практики - DataParallel/DistributedDataParallel

nn.DataParallel от Pytorch

обсуждение.py torch.org/he/data пар AL…

py torch.org/docs/stable…

Понимать распределенное обучение интерпретации исходного кода PyTorch?

Практическое руководство | Реализация слоя PyTorch AutoGrad C++

PYTORCH автоматическая дифференциация (1)

Как PyTorch ускоряет параллельное обучение данных? Раскрыты распространенные читы

Распределенное обучение pytorch (два init_process_group)

py torch.org/tutorials/i…

py torch.org/docs/master…

py torch.org/tutorials/i…

DP и DDP интерпретации исходного кода PyTorch: параллелизм моделей и анализ распределенного обучения

Параметры и буферы в модели Pytorch