0x00 сводка
Мы проанализировали прямое распространение Reduer выше, и в этой статье мы продолжим рассматривать, как выполнять обратное распространение.
Другие статьи из этой серии:
[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор
[Анализ исходного кода] Как PyTorch использует GPU
[Анализ исходного кода] Распределенный PyTorch (2) ----- DataParallel (включен)
[Анализ исходного кода] Распределенный PyTorch (3) ----- DataParallel (ниже)
[Анализ исходного кода] Распределенный PyTorch (7) ----- Группа процессов DistributedDataParallel
[Анализ исходного кода] Распределенный PyTorch (8) -------- Бумага DistributedDataParallel
[Анализ исходного кода] Распределенный PyTorch (9) ----- Инициализация DistributedDataParallel
0x01 Обзор
1.1 Предыдущий обзор
Логику прямого распространения мы приводили в предыдущей статье, после завершения прямого распространения получаем следующее:
- Параметры, необходимые для вычисления градиентов, были разделены на сегменты.
- Ствол восстановлен.
- Прямое распространение завершено.
- Вернитесь к указанному выходу, просмотрите график расчета автоградации, чтобы найти все неиспользуемые параметры, и пометьте их как готовые.
Таким образом, DDP имеет основу для градиентного слияния: он знает, какие параметры могут быть объединены напрямую без работы механизма автоградации (состояние готовности), а какие параметры могут быть переданы и объединены вместе (группирование), а также инициатива в последующем. находится в автограде PyTorch. Механизм — это механизм, который выполняет уменьшение градиента между процессами при выполнении обратного расчета.
1.2 Общая логика
Затем мы даем общую стратегию обратного распространения следующим образом:
Backward Pass:
-
backward()
Обращение непосредственно к проигрышу — это работа автограда, которая находится вне контроля DDP, DDP использует Hook для достижения своей цели.- DDP регистрирует крюк автограда во время строительства.
- Движок Autograd выполняет расчет градиента.
- Когда градиент будет готов, будет активирован соответствующий хук DDP на этом накопителе градиента.
-
Сделайте all-reduce в autograd_hook. Предполагая, что индекс параметра равен param_index, используйте param_index, чтобы получить параметр и пометить его как готовый.Если все градиенты в ведре готовы, ведро готово.
-
Когда градиенты в ведре будут готовы, он будет на ведре
Reducer
запустить асинхронныйallreduce
вычислить средний градиент всех процессов. -
Если все сегменты готовы, дождитесь завершения всех сокращений. Когда все ведра будут готовы,
Reducer
заблокирует ожидание всехallreduce
Операция завершена. После этого запишите средний градиент вparam.grad
Поля для всех параметров. -
Градиенты всех процессов уменьшены, и после обновления веса всех моделей одинаковы. Таким образом, после выполнения обратного распространения поля градации соответствующих одних и тех же параметров в разных процессах DDP должны быть одинаковыми.
-
Нет необходимости передавать параметры после каждой итерации, как это делает DP. Но буферы по-прежнему должны транслироваться процессом ранга 0 другим процессам на каждой итерации.
Далее, давайте посмотрим, как сделать обратное распространение.
0x02 начинается с хука
Изображение ниже взято из статьи Куайшоу (пожалуйста, обратитесь к ссылке 1, которая также должна быть проанализирована в дальнейшем). Верхняя часть рисунка - метод обработки родного движка автоград, а следующая - метод обработки Хоровода и Факела-ДДП. Из этого видно, что слияние градиентов начинается в процессе обратного распространения.
В частности, в дополнение к ковшированию,Reducer
Хуки Autograd также регистрируются при построении, по одному хуку на аргумент. Эти крючки запускаются во время обратного прохода, когда градиент готов, выполняя уменьшение градиента. Если все градиенты в ведре готовы, то ведро готово. Когда градиенты в ведре будут готовы, он будет на ведреReducer
запустить асинхронныйallreduce
вычислить средний градиент всех процессов. Итак, начнем с хука, точки входа обратного распространения.
2.1 Как зарегистрировать хук
Давайте сначала посмотрим, как зарегистрировать хуки, которые включают AutogradMeta и Node.
2.1.1 AutogradMeta
AutoGradMeta
: записыватьVariable
Информация об истории автограда, основная переменная-член.
- grad_: сохраняет градиент текущего экземпляра Variable, который сам является Variable.
- grad_fn: это экземпляр узла,Только нелистовые узлы. Доступ к нему осуществляется через метод grad_fn().На самом деле, в PyTorch, является ли переменная листовой переменной, судят по тому, пуста ли grad_fn.
-
grad_accumulator_: также является экземпляром Node, но есть только у листовых узлов.
- Доступ к переменной grad_accumulator().
- Листовые узлы отвечают за накопление градиентов, а grad_accumulator_ — это функция обработки накопления градиентов.
- Соответствующий градиент хранится в переменной grad_.
- output_nr_: это число. output_nr_ указывает на первый выход узла. Например, если он равен 0, это указывает, что эта переменная является первым выходом узла.
- Подведем итог:
- Для нелистовых узлов grad_fn является операцией расчета градиента, и градиент не будет накапливаться на grad_, а будет передаваться на следующую остановку обратного распространения графа расчета. grad_fn — это просто Node.
- Для конечного узла PyTorch виртуализирует специальную вычислительную операцию для вывода конечного узла, и эта виртуальная вычислительная операция также используется как операция конечного узла.
grad_accumulator_
для накопления его градиента, градиент будет накапливаться на grad_, поэтому конечный узелoutput_nr_
Должно быть 0.grad_accumulator_
Это также узел, который называется AccumulateGrad.
Он определяется следующим образом:
struct TORCH_API AutogradMeta : public c10::AutogradMetaInterface {
std::string name_;
Variable grad_;
std::shared_ptr<Node> grad_fn_;
std::weak_ptr<Node> grad_accumulator_;
// This field is used to store all the forward AD gradients
// associated with this AutogradMeta (and the Tensor it corresponds to)
std::shared_ptr<ForwardGrad> fw_grad_;
std::vector<std::shared_ptr<FunctionPreHook>> hooks_;
std::shared_ptr<hooks_list> cpp_hooks_list_;
// Only meaningful on leaf variables (must be false otherwise)
bool requires_grad_;
// Only meaningful on non-leaf variables (must be false otherwise)
bool retains_grad_;
bool is_view_;
// The "output number" of this variable; e.g., if this variable
// was the second output of a function, then output_nr == 1.
// We use this to make sure we can setup the backwards trace
// correctly when this variable is passed to another function.
uint32_t output_nr_;
mutable std::mutex mutex_;
};
2.1.2 Node
В расчетном графе расчетная операция представлена узлом (Node), отличающимсяNode
Подклассы реализуют различные операции.
И grad_fn_, и grad_accumulator_ из AutogradMeta являются Node.
Основной целевой переменной-членом здесь является post_hooks_, которая представляет собой ловушку, которая будет выполняться после запуска вычисления градиента.
add_post_hook добавит хук в post_hooks_.
struct TORCH_API Node : std::enable_shared_from_this<Node> {
public:
std::vector<std::unique_ptr<FunctionPreHook>> pre_hooks_;
std::vector<std::unique_ptr<FunctionPostHook>> post_hooks_;
uintptr_t add_post_hook(std::unique_ptr<FunctionPostHook>&& post_hook) {
post_hooks_.push_back(std::move(post_hook));
// Use the raw pointer as the unique key to identify this hook. This key
// can then be used in del_post_hook(key) to remove this hook.
return reinterpret_cast<std::uintptr_t>(post_hooks_.back().get());
}
}
2.1.3 AccumulateGrad
AccumulateGrad — это класс, производный от Node.
2.2 Конструктор
Давайте рассмотрим конструктор Reducer, который:
- Каждый тензор получает свою переменную Variable::AutogradMeta grad_accumulator_, аккумулятор градиента, используемый для накопления конечных переменных.
- Для каждого аккумулятора градиента настраивается autograd_hook, этот хук висит на графике автограда и отвечает за синхронизацию градиента в обратном направлении.
- Установите gradAccToVariableMap_ для хранения соответствия между grad_accumulator и index (соответствие между указателем функции и тензором параметра), чтобы было удобно проходить по графу автоградации для поиска неиспользуемых параметров в будущем.
- Эти аккумуляторы градиента хранятся в grad_accumulators_.
Конкретный код выглядит следующим образом:
Reducer::Reducer(
std::vector<std::vector<at::Tensor>> replicas, // 张量
std::vector<std::vector<size_t>> bucket_indices, // 桶信息
......) {
for (size_t replica_index = 0; replica_index < replica_count; // 遍历replica
replica_index++) {
for (size_t variable_index = 0; variable_index < variable_count; // 遍历张量
variable_index++) {
auto& variable = replicas_[replica_index][variable_index]; //得到具体的张量
const auto index = VariableIndex(replica_index, variable_index); //每个张量一个index
// 得到Variable::AutogradMeta的grad_accumulator_,即用于累加叶子 Variable 的梯度累加器
auto grad_accumulator = torch::autograd::impl::grad_accumulator(variable);
hooks_.emplace_back(
// 累加器添加hook,这个 hook 挂在 autograd graph 之上,在 backward 时负责梯度同步。
// grad_accumulator 执行完后,autograd_hook 就会运行
grad_accumulator->add_post_hook(
torch::make_unique<torch::autograd::utils::LambdaPostHook>(
[=](const torch::autograd::variable_list& outputs,
const torch::autograd::variable_list& ) {
#ifndef _WIN32
this->rpc_context_.set(
ThreadLocalDistAutogradContext::getContextPtr());
#endif
this->autograd_hook(index); // 把reducer的autograd_hook函数添加进去
return outputs;
})),
grad_accumulator);
// gradAccToVariableMap_ 存了grad_accumulator & index 的对应关系(函数指针和参数张量的对应关系),这样以后在 autograd graph 遍历寻找 unused parameters 就方便了
if (find_unused_parameters_) {
gradAccToVariableMap_[grad_accumulator.get()] = index;
}
grad_accumulators_[replica_index][variable_index] =
std::move(grad_accumulator);
}
}
}
}
2.2.1 grad_accumulator
Здесь код grad_accumulator выглядит следующим образом, вы можете видеть, что он должен получить тензорautograd_meta->grad_accumulator_
, а затем вернуть для листовых узловgrad_accumulator_
Это АккумулатГрад.
std::shared_ptr<Node> grad_accumulator(const Variable& self) {
auto autograd_meta = get_autograd_meta(self); // 获取 autograd_meta
if (!autograd_meta) {
return nullptr;
}
if (autograd_meta->grad_fn_) {
throw std::logic_error(
"grad_accumulator() should be only called on leaf Variables");
}
if (!autograd_meta->requires_grad_) {
return nullptr;
}
std::lock_guard<std::mutex> lock(autograd_meta->mutex_);
// 获取autograd_meta->grad_accumulator_
auto result = autograd_meta->grad_accumulator_.lock();
if (result)
return result;
c10::raw::intrusive_ptr::incref(self.unsafeGetTensorImpl());
auto intrusive_from_this = c10::intrusive_ptr<at::TensorImpl>::reclaim(self.unsafeGetTensorImpl());
result = std::make_shared<AccumulateGrad>(Variable(std::move(intrusive_from_this)));
autograd_meta->grad_accumulator_ = result; // 获取 autograd_meta->grad_accumulator_
return result;
}
2.2.2 Иллюстрация
Тензор является переменной1, а соответствующий VariableIndex тензора является индексом 1. Конкретная конфигурация выглядит следующим образом: После того, как AccumulateGrad использует apply для вычисления градиента, он вызовет хук в post_hooks.
+-----------------------------------------+
| Reducer |
| |
| |
| +------------------------------------+ | +------------------+ +----------------+
| | grad_accumulators_ | | | variable1 | | AccumulateGrad |
| | | | | | | |
| | | | | | | |
| | [replica_index][variable_index]+------> | autograd_meta_+---> | post_hooks |
| | | | | | | + |
| | | | | | | | |
| +------------------------------------+ | +------------------+ +----------------+
| | |
| +-------------------------------+ | |
| | gradAccToVariableMap_ | | v
| | | |
| | | | +-----------------------+
| | [variable1 : index1] | | | autograd_hook(index1)|
| | | | +-----------------------+
| +-------------------------------+ |
| |
+-----------------------------------------+
+---------------------------------------+
index1 +--> |VariableIndex |
| |
| replica_index of Variable1 |
| |
| variable_index of Variable1 |
| |
+---------------------------------------+
2.3 Функция крюка
Когда градиент будет готов, движок вызовет функцию Hook.Hook — это следующий метод autograd_hook, который должен установить, готова переменная или нет в соответствии с соответствующими условиями. Логика следующая:
-
Если это динамическая карта и найти неиспользуемый тензор или первую итерацию статической карты, установите соответствующую позицию переменной в local_used_maps_ на 1.
- local_used_maps_ записывает локально используемые тензоры ЦП.
- Динамические карты могут быть непоследовательными на каждой итерации, сегменты и переменные могут каждый раз меняться, поэтому local_used_maps_ необходимо обновлять на каждой итерации.
- Статический граф один и тот же на каждой итерации, если он установлен в обратном вызове на первой итерации.
-
Если это первая итерация статической карты, измените соответствующую часть переменной в numGradHooksTriggeredMap_ на 1.
-
Если неиспользуемая переменная не помечена, выполните обход неиспользуемой переменной, пометьте неиспользуемую переменную как готовую и вызовите mark_variable_ready.
-
Если это статическая карта и после второй итерации, если numGradHooksTriggeredMapPerIteration_ равно 0 после соответствующего уменьшения, установите для переменной значение ready и вызовите mark_variable_ready.
-
В противном случае это динамический граф.Динамический граф должен каждый раз устанавливать переменную в состояние готовности и вызывать mark_variable_ready.
// The function `autograd_hook` is called after the gradient for a
// model parameter has been accumulated into its gradient tensor.
// This function is only to be called from the autograd thread.
void Reducer::autograd_hook(VariableIndex index) {
std::lock_guard<std::mutex> lock(this->mutex_);
// Carry over thread local state from main thread. This allows for
// thread-local flags such as profiler enabled to be configure correctly.
at::ThreadLocalStateGuard g(thread_local_state_);
// Ignore if we don't expect to be called.
// This may be the case if the user wants to accumulate gradients
// for number of iterations before reducing them.
if (!expect_autograd_hooks_) {
return;
}
// Note [Skip allreducing local_used_maps_dev]
// ~~~~~~~~~~~~~~~~~~~~~~~~~~
// If find_unused_parameters_ is set to false, there is no need to allreduce
// local_used_maps_dev_, because all parameters will be reduced anyway.
// Therefore, we can avoid allocating memory for local_used_maps and
// local_used_maps_dev_ if find_unused_parameters_ is false.
// See Note [Skip allreducing local_used_maps_dev]
// 动态图&找到未用张量 或者 静态图第一次迭代
if (dynamic_graph_find_unused() || static_graph_first_iteration()) {
// Since it gets here, this param has been used for this iteration. We want
// to mark it in local_used_maps_. During no_sync session, the same var can
// be set multiple times, which is OK as does not affect correctness. As
// long as it is used once during no_sync session, it is marked as used.
// 在 no_sync 的session之中,只要参数被用过一次,就会被标记为用过
// local_used_maps_ 记录本地使用过的CPU张量
// 动态图每次迭代都可能不一致,桶和变量可能每次都不一样,所以local_used_maps_需要每次迭代都更新
// 静态图每次迭代都一样,只要第一次迭代时候,在回调之中设定即可
local_used_maps_[index.replica_index][index.variable_index] = 1;
}
if (static_graph_first_iteration()) { // 静态图第一次迭代
numGradHooksTriggeredMap_[index] += 1;// 只是静态图第一次迭代时候,会增加1
return;
}
// If `find_unused_parameters_` is true there may be model parameters that
// went unused when computing the model output, they won't be part of the
// autograd graph, and won't receive gradients. These parameters are
// discovered in the `prepare_for_backward` function and their indexes stored
// in the `unused_parameters_` vector.
if (!has_marked_unused_parameters_) {
has_marked_unused_parameters_ = true;
for (const auto& unused_index : unused_parameters_) { // 遍历没有用到的variable
mark_variable_ready(unused_index); //未用到的当然就标示为ready了
}
}
// If it is static graph, after 1st iteration, check a avariable
// is ready for communication based on numGradHooksTriggeredMap_.
if (static_graph_after_first_iteration()) {// 第二次迭代之后确实用到了
// 为何从第二次迭代开始处理?因为第一次迭代,当进入到这里时候,梯度还没有准备好(就是没有经过Reducer处理过,只有经过Reducer处理过之后,才算处理好)
// 静态图时,numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_;
if (--numGradHooksTriggeredMapPerIteration_[index] == 0) {
// Finally mark variable for which this function was originally called.
mark_variable_ready(index); // 从1变成0,就是就绪了,所以设定variable为就绪
}
} else {
// Finally mark variable for which this function was originally called.
mark_variable_ready(index);// 动态图每次都要设定variable为就绪
}
}
0x03 готов
Если будет обнаружено, что переменная готова в ловушке параметра во время процесса обратного распространения, будет вызвана функция mark_variable_ready(index), и мы продолжим смотреть, как с ней справиться.
3.1 Переменная готова
3.1.1 Настройка готова
mark_variable_ready помечает переменную как готовую, логика следующая.
-
Если корзину необходимо перестроить, вставьте индекс в перестроенный список.
- Перестроение корзины происходит следующим образом: 1) Первая перестройка корзины. 2) Когда статическое изображение имеет значение true или неиспользуемый параметр поиска имеет значение false. 3) Этот обратный процесс требует запуска allreduce.
- Здесь мы просто сбрасываем тензор и его индекс параметра в параметр перестроения и индекс параметра перестроения на основе порядка прибытия градиента, затем в конце finalize_backward() ведро будет перестроено на основе параметра перестроения и индекса параметра перестроения, затем трансляция и инициализировать ведра. Кроме того, нам нужно только сбросить тензоры и индексы параметров для одной копии.
-
Найдите индекс копии, соответствующий этой переменной, и найдите, где эта переменная находится в копии.
-
Эта переменная используется, регистрируется и вставляется в perIterationReadyParams_.
-
Всякий раз, когда переменная помечается как готовая, вызывается finalize.
-
Проверяем, все ли градиенты в ведре готовы, если ожидающих нет, то ведро тоже готово.
-
Количество ожидающих копий этой модели уменьшается на 1, потому что готов еще один тензор.
-
Если ожидающий номер этой реплики равен 0, ожидающий номер этого сегмента уменьшается на 1.
- Потому что, если ожидающее значение этой копии модели равно 0, это означает, что количество ожидающих копий модели, соответствующих корзине, должно быть уменьшено на единицу.
- Если ожидающее ведро равно 0, используйте mark_bucket_ready, чтобы установить ведро в состояние готовности.
-
Если все ведра готовы, то:
-
Вызовите all_reduce_local_used_map.
-
Вызовите Engine::get_default_engine().queue_callback для регистрации обратного вызова, этот обратный вызов будет вызван после того, как движок выполнит все в обратном порядке, а используемые переменные будут сокращены позже, а в нем вызывается finalize_backward.
-
void Reducer::mark_variable_ready(VariableIndex index) {
// Rebuild bucket only if 1) it is the first time to rebuild bucket 2)
// static_graph_ is true or find_unused_parameters_ is false,
// 3) this backward pass needs to run allreduce.
// Here, we just dump tensors and their parameter indices into
// rebuilt_params_ and rebuilt_param_indices_ based on gradient arriving
// order, and then at the end of finalize_backward(), buckets will be
// rebuilt based on rebuilt_params_ and rebuilt_param_indices_, and then
// will be broadcasted and initialized. Also we only need to dump tensors
// and parameter indices of one replica.
if (should_rebuild_buckets()) {
push_rebuilt_params(index); // 如果需要重建,就把index插入到需重建列表之中
}
const auto replica_index = index.replica_index; // 找到副本index
const auto variable_index = index.variable_index; // 找到在副本中哪个位置
if (replica_index == 0) {
checkAndRaiseMarkedTwiceError(variable_index);
perIterationReadyParams_.insert(variable_index); // 这个variable是被使用过的,记录下来
}
backward_stats_[replica_index][variable_index] =
current_time_in_nanos() - cpu_timer_.backward_compute_start_time;
// Any time we mark a variable ready (be it in line due to unused parameters,
// or via an autograd hook), we require a call to the finalize function. If
// this doesn't happen before the next iteration (or call to
// `prepare_for_backwards`), we know something is wrong.
require_finalize_ = true; // 每当某个变量被标记成 ready,都要调用一下 finalize
const auto& bucket_index = variable_locators_[variable_index]; // 找到variable的index信息
auto& bucket = buckets_[bucket_index.bucket_index]; // 找到variable位于哪个桶
auto& replica = bucket.replicas[replica_index]; // 找到副本
set_divide_factor();
if (bucket.expect_sparse_gradient) {
mark_variable_ready_sparse(index); // sparse variable
} else {
mark_variable_ready_dense(index); // dense variable
}
// TODO(@pietern): Make this work for both CPU/CUDA tensors.
// When using CPU tensors we don't need to do this.
// // Record event so that we can wait for all of them.
// auto& event = replica.events[bucket_index.intra_bucket_index];
// event.record();
// Check if this was the final gradient for this bucket.
// 检查桶里的梯度是不是都ready,如果有没有pending,就是桶也ready了
if (--replica.pending == 0) { // 减去本模型副本pending数目,因为又一个张量ready了
// Kick off reduction if all replicas for this bucket are ready.
if (--bucket.pending == 0) {// 如果本模型副本的pending为0,则说明桶对应的模型副本pending数目应该减一
mark_bucket_ready(bucket_index.bucket_index); // 那么就设置桶就绪
}
}
// Run finalizer function and kick off reduction for local_used_maps once the
// final bucket was marked ready.
if (next_bucket_ == buckets_.size()) { // 如果所有桶都ready
if (dynamic_graph_find_unused()) {
all_reduce_local_used_map(); // 对使用过的variable进行规约
}
// The autograd engine uses the default stream when running callbacks, so we
// pass in the current CUDA stream in case it is not the default.
const c10::Stream currentStream = get_current_stream();
// 这里会注册 finalize_backward 到 engine
torch::autograd::Engine::get_default_engine().queue_callback([=] {
std::lock_guard<std::mutex> lock(this->mutex_);
// Run callback with the current stream
c10::OptionalStreamGuard currentStreamGuard{currentStream};
if (should_collect_runtime_stats()) {
record_backward_compute_end_time();
}
// Check that all buckets were completed and had their work kicked off.
TORCH_INTERNAL_ASSERT(next_bucket_ == buckets_.size());
this->finalize_backward();
});
}
}
Логика следующая:
- Редюсер зарегистрирует autograd_hook в post_hooks AccumulateGrad.
- В процессе обратного распространения Autograd Engine, если параметр оказывается готовым, он вызывает autograd_hook.
- Продолжить обработку в autograd_hook.
- Зарегистрирует finalize_backward в движке.
Engine AccumulateGrad Reducer
+ + +
| | |
| | 1 |
| | <-----------------------v
| |
| |
| |
| v 2
| post_hooks +--------> autograd_hook
| +
| |
| | 3
| v
| +------------------+---------------------------+
| | mark_variable_ready |
| | |
| | |
| | All variable in replica are ready? |
| | + |
| | | YES |
| | v |
| | All replica in bucket are ready? |
| | + |
| | | YES |
| | v |
| | mark_bucket_ready |
| | |
| | |
| | |
| | + |
| | | |
| | | |
| | v |
| | All buckets are ready? |
| | + |
| | | YES |
| | v |
| queue_back 4 | all_reduce_local_used_map |
| <----------------------------+ queue_callback(finalize_backward) |
| | |
| | |
v +----------------------------------------------+
3.1.2 Регистрация обратного вызова
В приведенном выше коде torch::autograd::Engine::get_default_engine().queue_callback используется для регистрации функции обратного вызова. Давайте проанализируем это.
В движке есть определение, которое должно вставлять обратные вызовы в final_callbacks_:
void Engine::queue_callback(std::function<void()> callback) {
std::lock_guard<std::mutex> lock(current_graph_task->final_callbacks_lock_);
current_graph_task->final_callbacks_.emplace_back(std::move(callback));
}
Для обработки final_callbacks_ в exec_post_processing обратный вызов будет вызываться, когда механизм завершает работу в обратном направлении.
void GraphTask::exec_post_processing() {
if (!not_ready_.empty()) {
throw std::runtime_error("could not compute gradients for some functions");
}
// set the thread_local current_graph_task_ as more callbacks can be installed
// by existing final callbacks.
GraphTaskGuard guard(shared_from_this());
// Lock mutex during each iteration for accessing final_callbacks.size()
// Unlocking is necessary, because the callback can register
// more callbacks (or they can be registered from other threads
// while it's waiting.
std::unique_lock<std::mutex> cb_lock(final_callbacks_lock_);
// WARNING: Don't use a range-for loop here because more callbacks may be
// added in between callback calls, so iterators may become invalidated.
// NOLINTNEXTLINE(modernize-loop-convert)
for (size_t i = 0; i < final_callbacks_.size(); ++i) {
cb_lock.unlock();
final_callbacks_[i](); // 调用了callback
cb_lock.lock();
}
// Syncs leaf streams with default streams (if necessary)
// See note "Streaming backwards"
for (const auto& leaf_stream : leaf_streams) {
const auto guard = c10::impl::VirtualGuardImpl{c10::DeviceType::CUDA};
const auto default_stream = guard.getDefaultStream(leaf_stream.device());
if (leaf_stream != default_stream) {
auto event = c10::Event{c10::DeviceType::CUDA};
event.record(leaf_stream);
default_stream.wait(event);
}
}
}
Таким образом, логика расширяется следующим образом:
- Редюсер зарегистрирует autograd_hook в post_hooks AccumulateGrad.
- В процессе обратного распространения Autograd Engine, если параметр оказывается готовым, он вызывает autograd_hook.
- Продолжить обработку в autograd_hook.
- Зарегистрирует finalize_backward в движке.
- finalize_backward вызывается в GraphTask::exec_post_processing.
Engine AccumulateGrad Reducer
+ + +
| | |
| | 1 |
| | <-----------------------+
| |
| |
| |
| v
| 2
| post_hooks +--------> autograd_hook
| +
| |
| | 3
| v
| +------------------+---------------------------+
| | mark_variable_ready |
| | |
| | |
| | All variable in replica are ready? |
| | + |
| | | YES |
| | v |
| | All replica in bucket are ready? |
| | + |
| | | YES |
| | v |
| | mark_bucket_ready |
| | |
| | |
| | |
| | + |
| | | |
| | | |
| | v |
| | All buckets are ready? |
| | + |
| | | YES |
| | v |
| queue_back 4 | all_reduce_local_used_map |
| <----------------------------+ queue_callback(finalize_backward) |
| | |
| | |
| +-------------------+--------------------------+
v |
|
GraphTask::exec_post_processing |
+ |
| |
| 5 v
+---------------------------------> finalize_backward
| +
| |
| |
v v
3.1.3 mark_variable_ready_sparse
Функция mark_variable_ready_sparse используется для обработки переменных разреженного типа, то есть для копирования градиента в Reducer.
void Reducer::mark_variable_ready_sparse(VariableIndex index) {
const auto replica_index = index.replica_index;
const auto variable_index = index.variable_index;
const auto& bucket_index = variable_locators_[variable_index];
auto& bucket = buckets_[bucket_index.bucket_index]; // 哪个桶
auto& replica = bucket.replicas[replica_index]; // 桶的哪个副本
auto& variable = replica.variables[bucket_index.intra_bucket_index]; // 副本之中哪个variable
runGradCallbackForVariable(variable, [&](auto& grad) {
TORCH_CHECK(grad.defined(), "Expected sparse gradient to be defined.");
TORCH_CHECK(
grad.options().layout() == c10::kSparse,
"Expected variable to have sparse gradient.");
// Sparse tensors cannot be grouped together with other sparse tensors
// in a single reduction operation like we can for dense tensors.
// Therefore, the `offsets` and `lengths` vectors in the bucket replica
// struct are empty, and there is no pre-existing accumulation tensor.
// Directly assign the sparse tensor to the `contents` field.
replica.contents = grad; //直接拷贝
// See Note [DDP Communication Hook]
if (comm_hook_ == nullptr) {
replica.contents.div_(divFactor_);
}
// The grad is modified in place and needs to be written back.
return true;
});
}
3.1.4 mark_variable_ready_dense
mark_variable_ready_dense будет иметь дело с плотными тензорами, которые фактически копируют градиент в Reducer.
Сначала мы смотрим на переменную-член: gradient_as_bucket_view_, которая:
-
Если false, после allreduce Bucket ведро нужно скопировать обратно в грады.
-
Если установлено значение «True», градиенты будут представлениями, указывающими на разные смещения «allreduce». Это снижает пиковое использование памяти, где размер сохраненной памяти будет равен общему размеру градиента. Кроме того, это позволяет избежать накладных расходов на дублирование между градиентами и коммуникационными сегментами allreduce. Невозможно вызвать градиент, когда градиент просматривается
detach_()
.
Логика mark_variable_ready_dense такова:
- Найдите, к какому сегменту и какой копии принадлежит эта переменная по индексу, а затем получите тензорную переменную в копии, а затем получите смещение и размер переменной. Наконец, получается Bucket_View, соответствующий тензору.
- Используйте runGradCallbackForVariable для обработки тензоров. runGradCallbackForVariable фактически использует DistAutogradContext для обработки обратного вызова и, наконец, возвращает DistAutogradContext.
- Внутренняя логика выполнения обратного вызова:
- когда градиент_as_bucket_view_ имеет значение false или даже если
gradient_as_bucket_view_
Если задано значение true, в редких случаях пользователь может установить для параметра grad значение None после каждой итерации. - В этих случаях grad и Bucket_View указывают на разные хранилища, поэтому grad необходимо скопировать в Bucket_view.
- Если для gradient_as_bucket_view_ установлено значение true, пусть grad указывает на Bucket_view.
- Если в предыдущей итерации для grad уже было задано значение Bucket_View, копирование не требуется.
- когда градиент_as_bucket_view_ имеет значение false или даже если
void Reducer::mark_variable_ready_dense(VariableIndex index) {
const auto replica_index = index.replica_index;
const auto variable_index = index.variable_index;
const auto& bucket_index = variable_locators_[variable_index];
auto& bucket = buckets_[bucket_index.bucket_index]; // 哪个桶
auto& replica = bucket.replicas[replica_index]; // 桶的哪个副本
auto& variable = replica.variables[bucket_index.intra_bucket_index]; // 得到副本中的variable
const auto offset = replica.offsets[bucket_index.intra_bucket_index]; // variable的offset
const auto length = replica.lengths[bucket_index.intra_bucket_index]; // variable的size
auto& bucket_view = replica.bucket_views_in[bucket_index.intra_bucket_index]; //插入view
// Copy contents of gradient tensor to bucket tensor.
// If the gradient is not set, we assume it wasn't computed
// as part of the current backwards pass, and zero the part
// of the bucket it would otherwise hold.
runGradCallbackForVariable(variable, [&](auto& grad) {
// 拿到张量对应的梯度 grad
if (grad.defined()) {
this->check_grad_layout(grad, bucket_view);
// When gradient_as_bucket_view_ is false, or even when
// gradient_as_bucket_view_ is true, in rare cases users may set grad to
// be None after every iteration. In these cases, grad and bucket_view are
// pointing to different storages and thus need to copy grads to
// bucket_view. If gradient_as_bucket_view_ is set as true, let grad point
// to bucket_view. If grad has already been set as views of buckets in
// previous iterations, no copy is needed.
if (!grad.is_alias_of(bucket_view)) {
this->copy_grad_to_bucket(grad, bucket_view); // 把梯度拷贝进入contents
if (gradient_as_bucket_view_) {
// Let grad point to bucket_view buffer.
grad = bucket_view; // 为了省内存,grad指向了bucket_view
// The grad is modified and need to be written back.
return true;
}
} else {
// If grad and bucket view point to the same storage, no need to copy
if (comm_hook_ == nullptr) {
bucket_view.div_(divFactor_);
}
}
} else {
bucket_view.zero_(); // 设置为0
}
// The grad is not modified and doesn't need to be written back.
return false;
});
}
Роль copy_grad_to_bucket заключается в копировании градиента в содержимое.
void Reducer::copy_grad_to_bucket(
const at::Tensor& grad,
at::Tensor& bucket_view) {
// See Note [DDP Communication Hook]
if (comm_hook_ == nullptr) {
auto wrapped = at::native::wrapped_scalar_tensor(double(1.) / divFactor_);
// Divides while copying into the bucket view.
at::mul_out(bucket_view, grad, wrapped);
} else {
bucket_view.copy_(grad); // 通过bucket_view把梯度拷贝到 桶副本的contents
}
}
3.2 Ковш готов
В предыдущем коде проверяем готовы ли градиенты в ведре.Если ожидающих нет, то ведро тоже готово.В это время вызывается mark_bucket_ready.
В mark_bucket_ready будут пройдены бакеты, а готовые бакеты будут уменьшены.
// Called when the bucket at the specified index is ready to be reduced.
void Reducer::mark_bucket_ready(size_t bucket_index) {
TORCH_INTERNAL_ASSERT(bucket_index >= next_bucket_);
// Buckets are reduced in sequence. Ignore this bucket if
// it's not its turn to be reduced.
if (bucket_index > next_bucket_) {
return;
}
// Keep going, until we either:
// - have kicked off reduction for all buckets, or
// - found a bucket that's not yet ready for reduction.
//
// 遍历桶,直到遇到下面两种情况:
// - 已经发起了对所有桶的规约
// - 发现一个桶其实没有就绪
for (; next_bucket_ < buckets_.size() && buckets_[next_bucket_].pending == 0;
next_bucket_++) {
num_buckets_ready_++; // 增加
if (num_buckets_ready_ == 1 && should_collect_runtime_stats()) {
record_backward_comm_start_time();
}
auto& bucket = buckets_[next_bucket_];
all_reduce_bucket(bucket); // 对于就绪的桶,进行规约
}
}
3.2.1 all_reduce_bucket
all_reduce_bucket предназначен для синхронизации содержимого.
- Итерируйте реплики ведра, вставляя тензоры реплик в тензоры.
- Если comm_hook не зарегистрирован, все эти тензоры сводятся напрямую.
- После регистрации comm_hook используйте хук для allreduce.Следует отметить, что этот comm_hook является только базовым хуком для обработки связи.Если вы хотите выполнить отсечение градиента перед уменьшением, вам все равно нужно повесить хуки в autograph.
void Reducer::all_reduce_bucket(Bucket& bucket) {
std::vector<at::Tensor> tensors;
tensors.reserve(bucket.replicas.size());
for (const auto& replica : bucket.replicas) {
// TODO(@pietern): Ensure proper synchronization with the CUDA events
// that recorded copies into this contents tensor. If these copies are
// executed on non-default streams, the current stream for the device
// that holds the contents tensor must wait on these events.
//
// As long as autograd uses the default stream for every device,
// these operations are implicitly sequenced, and we don't need to
// do any extra synchronization here.
//
// CUDA default stream 都按时序排好了
tensors.push_back(replica.contents);
}
// See Note [DDP Communication Hook]
if (comm_hook_ == nullptr) {
// 如果没注册 comm_hook,直接 allreduce
bucket.work = process_group_->allreduce(tensors);
} else {
// 注册了 comm_hook 那就使用 hook 进行allreduce
// 需要注意的是,这个comm_hook 只是处理通信的底层hook,如果想在 reduce 前分别进行梯度裁剪,还是需要在 autograph 挂 hook
GradBucket grad_bucket(
next_bucket_,
tensors[0], // 从下面注解可以知道,一个桶只有一个replica
// Since currently we do not support single-process multiple-device
// mode, we can assume only one replica in the bucket.
bucket.replicas[0].offsets,
bucket.replicas[0].lengths,
bucket.replicas[0].sizes_vec);
bucket.future_work = comm_hook_->runHook(grad_bucket);
}
}
Логическое расширение выглядит следующим образом:
- Редюсер зарегистрирует autograd_hook в post_hooks AccumulateGrad.
- В процессе обратного распространения Autograd Engine, если параметр оказывается готовым, он вызывает autograd_hook.
- Продолжить обработку в autograd_hook.
- Вызовите all_reduce_bucket для синхронизации градиентов.
- Зарегистрирует finalize_backward в движке.
- finalize_backward вызывается в GraphTask::exec_post_processing.
+
Worker 1 | Worker 2
|
Engine AccumulateGrad Reducer | Reducer
|
+ + + | +
| | | | |
| | 1 | | |
| | <-----------------------+ | |
| | | |
| | | |
| v | |
| 2 | |
| post_hooks +--------> autograd_hook | |
| + | |
| | | |
| | 3 | |
| v | |
| +------------------+---------------------------+ | |
| | mark_variable_ready | | |
| | | | |
| | | | |
| | All variable in replica are ready? | | |
| | + | | |
| | | YES | | |
| | v | | |
| | All replica in bucket are ready? | | |
| | + + + |
| | | YES |
| | v 4 all_reduce_bucket |
| | mark_bucket_ready <--------------+---+-----> |
| | | | |
| | | | |
| | | | |
| | + | | |
| | | | | |
| | | | | |
| | v | | |
| | All buckets are ready? | | |
| | + | | |
| | | YES | | |
| | v | | |
| queue_back 5 | all_reduce_local_used_map | | |
| <------------------------+ queue_callback(finalize_backward) | | |
| | | | |
| | | | |
| +-------------------+--------------------------+ | |
v | | |
| | |
GraphTask::exec_post_processing | | |
+ | | |
| | | |
| v | |
+-----------------------------> finalize_backward | |
| 6 + | |
| | | |
| | | |
v v + v
3.2.2 PythonCommHook
PythonCommHook используется для реализации особых потребностей пользователей, как мы упоминали ранее, вот еще два примера.
Пример PythonCommHook
c10::intrusive_ptr<c10::ivalue::Future> PythonCommHook::runHook(
GradBucket& bucket) {
py::gil_scoped_acquire acquire;
py::object py_fut = hook_(state_, bucket);
try {
return py_fut.cast<std::shared_ptr<torch::jit::PythonFutureWrapper>>()->fut;
} catch (const py::cast_error& e) {
auto type = py_fut.get_type();
auto errMsg = c10::str(
e.what(),
". DDP communication hook's callback must return a "
"torch.futures.Future or torch._C.Future object, but got ",
type.attr("__module__").cast<std::string>(),
".",
type.attr("__qualname__").cast<std::string>());
throw std::runtime_error(errMsg);
}
}
или
c10::intrusive_ptr<c10::ivalue::Future> AllReduceCommHook::runHook(
GradBucket& bucket) {
std::vector<at::Tensor> tensors = {bucket.getTensorRef()};
auto allreduce_work = state_->allreduce(tensors);
// FIXME Access the result through the Future passed as argument, instead of
// capturing the Work.
auto div_by_process_group_size = [allreduce_work,
this](c10::ivalue::Future& /* unused */) {
auto tensor = allreduce_work->result()[0] / state_->getSize();
return c10::IValue(tensor);
};
auto fut = allreduce_work->getFuture();
return fut->then(div_by_process_group_size, fut->elementType());
}
3.2.3 GradBucket
GradBucket — это класс для копирования информации.
// This class passes bucket contents tensor to DDP communication hook.
class GradBucket {
public:
explicit GradBucket(
size_t index,
const at::Tensor& tensor,
const std::vector<size_t>& offsets,
const std::vector<size_t>& lengths,
const std::vector<c10::IntArrayRef>& sizes_vec)
: index_(index),
tensor_(tensor),
offsets_(offsets),
lengths_(lengths),
sizes_vec_(sizes_vec) {}
// Returns the index of the bucket, which is unique across all the buckets.
size_t getIndex() const {
return index_;
}
const at::Tensor& getTensor() const {
return tensor_;
}
// Returns a mutable tensor compared with the above method.
at::Tensor& getTensorRef() {
return tensor_;
}
// Overwrites tensors at a specific index.
void setTensor(at::Tensor& tensor) {
tensor_ = tensor;
}
// Each tensor in the list that getPerParameterTensors corresponds to a
// parameter.
std::vector<at::Tensor> getPerParameterTensors() const;
// Returns whther this bucket is the last bucket to allreduce in an iteration.
bool isTheLastBucketToAllreduce() const {
return index_ == 0;
}
private:
size_t index_;
at::Tensor tensor_;
// Per-variable info in tensors_[0].
std::vector<size_t> offsets_;
std::vector<size_t> lengths_;
std::vector<c10::IntArrayRef> sizes_vec_;
};
3.3 all_reduce_local_used_map
Обратите внимание, что это должно уменьшить переменную local_used_maps_ использования тензора, а не градиент тензора.
3.3.1 Определения
Напомним определение.
Следующие две переменные используются для записи локально используемых параметров, которые указывают, использовались ли эти параметры локально в текущей итерации или сеансе no_sync, когда синхронизация не включена (no_sync включен).
Каждая копия модели соответствует тензору в карте, и каждый тензор является одномерным тензором int32 по количеству параметров.
Эти тензоры отмечены в autograd_hook, чтобы указать, что был использован соответствующий параметр. Все эти тензоры уменьшаются в конце текущей итерации или при обратном распространении сеанса no_sync для вычисления глобально неиспользуемых параметров.
// Locally used parameter maps indicating if parameters are used locally
// during the current iteration or no_sync session if no_sync is on. One
// tensor for each model replica and each tensor is one-dim int32 tensor of
// number of parameters. These tensors are marked in autograd_hook to indicate
// the corresponding param has been used, and get allreduced in the end of
// backward of current iteration or no_sync session for figuring out the
// globally unused parameters.
//
// local_used_maps_: CPU tensors for bookkeeping locally used params
// local_used_maps_dev_: dev tensors for reducing globally unused params
std::vector<at::Tensor> local_used_maps_; // autograd_hook中会设置,对应论文中的
std::vector<at::Tensor> local_used_maps_dev_; // GPU
3.3.2 Синхронизация
all_reduce_local_used_map использует здесь асинхронный H2D, чтобы избежать накладных расходов на блокировку. То есть скопируйте local_used_maps_ вlocal_used_maps_dev_
, тогда правильноlocal_used_maps_dev_
Сделать устав.
void Reducer::all_reduce_local_used_map() {
// See Note [Skip allreducing local_used_maps_dev]
// H2D from local_used_maps_ to local_used_maps_dev_
for (size_t i = 0; i < local_used_maps_.size(); i++) {
if (local_used_maps_dev_[i].is_cuda()) {
// Note [local_used_maps_ -> local_used_maps_dev copying]
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// We do async H2D to avoid the blocking overhead. The async copy and
// allreduce respect the current stream, so will be sequenced
// correctly.
//
// Correct sequencing with respect to host operations is also
// essential. The H2D copy_ is stream ordered, while the host's
// changes to local_used_maps_ are host ordered. If a large backlog of
// cuda-stream work pushes the copy_ far into the future, and if no
// blocking calls occur between now and finalize_backward()** such
// that finalize_backward() re-zeroes local_used_maps_ on the host
// before the stream executes the copy_, copy_ will read those zeros
// instead of the values we thought we told it to read here. Copying
// local_used_maps_[i] to a pinned temporary (which the pinned caching
// allocator should supply asynchronously) avoids this nasty, rare
// race condition.
//
// ** In the hoped-for case where all params are used, DDP itself
// won't do any blocking work between now and the re-zeroing, so the
// danger is real.
//
// Defensively ensures local_used_maps_tmp is distinct from
// local_used_maps_[i]
auto local_used_maps_tmp = at::native::empty_like(
local_used_maps_[i],
optTypeMetaToScalarType(local_used_maps_[i].options().dtype_opt()),
local_used_maps_[i].options().layout_opt(),
local_used_maps_[i].options().device_opt(),
true /* pinned_memory */);
// Paranoid asserts here because in some workloads, the pinned
// allocator behaves in a way we don't understand, and may be bugged.
// See https://github.com/pytorch/pytorch/pull/54474
TORCH_INTERNAL_ASSERT(local_used_maps_tmp.is_pinned());
TORCH_INTERNAL_ASSERT(
local_used_maps_tmp.data_ptr() != local_used_maps_[i].data_ptr());
local_used_maps_tmp.copy_(local_used_maps_[i]);
local_used_maps_dev_[i].copy_(local_used_maps_tmp, true);
} else {
local_used_maps_dev_[i].copy_(local_used_maps_[i], true);
}
}
local_used_work_ = process_group_->allreduce(local_used_maps_dev_);
}
Развернуть следующим образом:
- Редюсер зарегистрирует autograd_hook в post_hooks AccumulateGrad.
- В процессе обратного распространения Autograd Engine, если параметр оказывается готовым, он вызывает autograd_hook.
- Продолжить обработку в autograd_hook.
- Вызовите all_reduce_bucket для синхронизации градиентов.
- Вызовите allreduce, чтобы уменьшить переменную local_used_maps_.
- Зарегистрирует finalize_backward в движке.
- finalize_backward вызывается в GraphTask::exec_post_processing.
+
Worker 1 | Worker 2
|
Engine AccumulateGrad Reducer | Reducer
|
+ + + | +
| | | | |
| | 1 | | |
| | <-----------------------+ | |
| | | |
| | | |
| | | |
| | | |
| v | |
| 2 | |
| post_hooks +--------> autograd_hook | |
| + | |
| | | |
| | 3 | |
| v | |
| +------------------+---------------------------+ | |
| | mark_variable_ready | | |
| | | | |
| | | | |
| | All variable in replica are ready? | | |
| | + | | |
| | | YES | | |
| | v | | |
| | All replica in bucket are ready? | | |
| | + + + |
| | | YES 4 all_reduce_bucket |
| | v |
| | mark_bucket_ready <--------------+---+-----> |
| | | | |
| | | | |
| | | | |
| | + | | |
| | | | | |
| | | | | |
| | v | | |
| | All buckets are ready? | | |
| | + | | |
| | | YES + + |
| | v 5 allreduce |
| 6 queue_back | all_reduce_local_used_map <--------+---+-----> |
| <------------------------+ queue_callback(finalize_backward) | | |
| | | | |
| | | | |
| +-------------------+--------------------------+ | |
v | | |
| | |
GraphTask::exec_post_processing | | |
+ | | |
| | | |
| v | |
+-----------------------------> finalize_backward | |
| 7 + | |
| | | |
| | | |
v v + v
3.4 finalize_backward
finalize_backward завершает отделочные работы, логика такова:
- Перебрать ведра для каждого ведра:
- Дождитесь завершения синхронизации тензора.
- Скопируйте обратно содержимое из будущего результата.
- Дождитесь завершения синхронизации local_used_maps_dev.
void Reducer::finalize_backward() {
// No longer expect autograd hooks to fire after this function returns.
expect_autograd_hooks_ = false;
// No longer require call to finalize after this function returns.
require_finalize_ = false;
// Unset allreduce division factor, as it may change in next backwards pass
// when running with DDP join mode.
divFactor_ = kUnsetDivFactor;
// Wait for asynchronous reduction to complete and unflatten contents.
for (auto& bucket : buckets_) { // 遍历桶
// See Note [DDP Communication Hook]
if (comm_hook_ == nullptr) {
bucket.work->wait(); // 等待同步完成
} else {
bucket.future_work->wait(); // 等待同步完成
auto future_result =
comm_hook_->parseHookResult(bucket.future_work->value());
for (size_t i = 0; i < future_result.size(); i++) { //
auto& replica = bucket.replicas[i];
if (bucket.expect_sparse_gradient) {
replica.contents.copy_(future_result[i]); // 从future结果拷贝回contents
} else {
// Reinitialize only `bucket_views_out` with the future_result by
// following the same logic in `initialize_buckets`.
// 把 future_result[i] 解析到 bucket_views_out 之中
populate_bucket_views_out(replica, future_result[i]);
}
}
}
if (!bucket.expect_sparse_gradient) {
// We don't need to finalize the sparse bucket since the sparse grad and
// the bucket essentially point to the same storage. As a result, once
// the allreduce is done, the sparse grads are automatically updated.
finalize_bucket_dense(bucket); //
}
}
// See Note [Skip allreducing local_used_maps_dev]
if (dynamic_graph_find_unused() || static_graph_first_iteration()) {
// Due to the lazy wait, it is possible that reduction of the current
// iteration is still going when the one for next iteration gets kicked off.
// For such case, we want to wait explicitly to make sure the reduction does
// complete before kicking off next one. Otherwise the previous one may
// interfere, write to the device-side memory and clobber the content of
// local_unused_maps_dev_.
if (!local_used_maps_reduced_) {
local_used_work_->wait(); // 等待 local_used_maps_dev 同步完成
}
}
if (dynamic_graph_find_unused()) {
// Reset unused parameter accounting.
// See Note [local_used_maps_ -> local_used_maps_dev copying]
for (auto& local_used : local_used_maps_) {
local_used.fill_(0);
}
local_used_maps_reduced_ = false;
}
if (should_collect_runtime_stats()) {
record_backward_comm_end_time();
}
}
Этот процесс использует следующие функции.
4.6.1 populate_bucket_views_out
populate_bucket_views_out строит выходное представление из содержимого
// (see Note: "Gradient Layout Contract" in initialize_buckets).
void Reducer::populate_bucket_views_out(
Reducer::BucketReplica& replica,
at::Tensor& tensor) { // 把tensor解析到 bucket_views_out 之中
replica.bucket_views_out.clear(); // 清空
for (size_t i = 0; i < replica.variables.size(); i++) { // 重新初始化 bucket_views_out
const auto& v = replica.variables[i]; // 遍历副本的张量
const auto offset = replica.offsets[i];
const auto length = replica.lengths[i];
if (v.is_non_overlapping_and_dense()) {
// If the param's memory is dense, match its layout, anticipating
// the autograd engine (AccumulateGrad) will also create gradients
// matching its layout.
replica.bucket_views_out.push_back( // 把tensor解析到 bucket_views_out 之中
tensor.as_strided(v.sizes(), v.strides(), offset));
} else {
// Fall back to a C-style contiguous view, again anticipating
// AccumulateGrad will do the same when stashing grads for non-dense
// params.
replica.bucket_views_out.push_back( // 把tensor解析到 bucket_views_out 之中
tensor.narrow(0, offset, length).view(v.sizes()));
}
}
}
4.6.1 finalize_bucket_dense
Функция finalize_bucket_dense заключается в вызове runGradCallbackForVariable или copy_bucket_to_grad для копирования уменьшенного градиента в движок.
// A bucket with one or more dense tensors needs to be unflattened.
void Reducer::finalize_bucket_dense(Bucket& bucket) {
for (size_t replica_index = 0; replica_index < bucket.replicas.size();
replica_index++) {
auto& replica = bucket.replicas[replica_index];
for (size_t intra_bucket_index = 0;
intra_bucket_index < replica.variables.size();
intra_bucket_index++) {
auto& variable = replica.variables[intra_bucket_index];
const auto offset = replica.offsets[intra_bucket_index];
const auto length = replica.lengths[intra_bucket_index];
bool global_unused = false;
// See Note [Skip allreducing local_used_maps_dev]
if (static_graph_ || find_unused_parameters_) {
// Determine if this param has been used globally or not.
//
// If the variable was used locally, it is also used globally and then
// we don't need to wait for the reduction. Otherwise we lazily wait for
// the reduction to complete, only when we see a variable that was
// unused locally. Then we end up delaying the synchronization point
// that local_used_work_->wait() implies. If we don't have any unused
// parameters at all, we can skip waiting for the work to complete
// altogether, and cause negligible performance overhead for models
// where all parameters are used. Such lazily waiting means minimizing
// performance impact for the big majority of models where all
// parameters are always used. Then we only pay the overhead cost if
// there is indeed a parameter that is locally unused, because we need
// to check if it's also globally unused.
size_t variable_index = bucket.variable_indices[intra_bucket_index];
// Note: global_unused might not be global yet. As we lazily wait for
// the reduction to complete, it becomes really global only if we get to
// the point as below where we wait for the reduction work, make D2H
// copy, and update global_unused with the real global consensus, i.e.
// local_used_maps_reduced_ is true.
global_unused =
local_used_maps_[replica_index][variable_index].item<int>() == 0;
if (global_unused && !local_used_maps_reduced_) {
// Wait for local_used_maps reduction to complete.
local_used_work_->wait();
// D2H from local_used_maps_dev_ to local_used_maps_
for (size_t i = 0; i < local_used_maps_.size(); i++) {
// Blocking copy, if local_used_maps_dev_ is cuda
local_used_maps_[i].copy_(local_used_maps_dev_[i]);
}
global_unused =
local_used_maps_[replica_index][variable_index].item<int>() == 0;
local_used_maps_reduced_ = true;
}
}
if (!gradient_as_bucket_view_) {
copy_bucket_to_grad( // 拷贝回 dist.context 去
variable, replica, intra_bucket_index, global_unused);
} else {
const auto& bucket_view_out =
replica.bucket_views_out[intra_bucket_index];
auto& bucket_view_in = replica.bucket_views_in[intra_bucket_index];
// If communication_hook is registered, bucket_view_out stores
// allreduced results in a newly allocated tensor, copy bucket_view_out
// back to bucket_view_in that referring to replica.content tensor and
// grad.
if (!bucket_view_in.is_alias_of(bucket_view_out)) {
bucket_view_in.copy_(bucket_view_out); // 从out拷贝回in view
}
runGradCallbackForVariable(variable, [&](auto& grad) {
// If a parameter is globally unused, we keep its grad untouched.
if (!global_unused) {
// If grad is globally used but locally unused, let grad point to
// bucket_view_in
if (!grad.defined()) {
grad = bucket_view_in;
} else {
if (!grad.is_alias_of(bucket_view_in)) {
TORCH_CHECK(
false,
"Detected at least one parameter gradient is not the "
"expected DDP bucket view with gradient_as_bucket_view=True. "
"This may happen (for example) if multiple allreduce hooks "
"were registered onto the same parameter. If you hit this error, "
"please file an issue with a minimal repro.");
}
}
// The grad is modified and needs to be written back.
return true;
}
// The grad is not modified.
return false;
});
}
}
}
}
4.6.3 copy_bucket_to_grad
Вот соответствующий градиент, скопированный из ведра обратно в автоград.
void Reducer::copy_bucket_to_grad(
at::Tensor& variable,
Reducer::BucketReplica& replica,
size_t intra_bucket_index,
bool global_unused) {
const auto& bucket_view = replica.bucket_views_out[intra_bucket_index]; // 拿到输出view
runGradCallbackForVariable(variable, [&](auto& grad) {
// If a parameter is globally unused, we keep its grad untouched.
if (!global_unused) {
if (!grad.defined()) {
// Creates grad according to the "Gradient Layout Contract"
// (see torch/csrc/grad/AccumulateGrad.h)
grad =
torch::autograd::utils::clone_obey_contract(bucket_view, variable);
} else {
grad.copy_(bucket_view); // 从桶拷贝回梯度
}
// The grad is modified and needs to be written back.
return true;
}
// The grad is not modified.
return false;
});
}
На данный момент мы расширились следующим образом:
- Редюсер зарегистрирует autograd_hook в post_hooks AccumulateGrad.
- В процессе обратного распространения Autograd Engine, если параметр оказывается готовым, он вызывает autograd_hook.
- Продолжить обработку в autograd_hook.
- Вызовите all_reduce_bucket для синхронизации градиентов.
- вызовите всеуменьшить на
local_used_maps_
переменная для уменьшения,local_used_maps_
Цель состоит в том, чтобы выяснить глобально неиспользуемые параметры. - Затем DDP зарегистрирует finalize_backward в движке.
- finalize_backward вызывается в GraphTask::exec_post_processing.
- Вызовы DDP ожидают синхронизации с другими рабочими процессами.
- DDP вызывает copy_bucket_to_grad, чтобы скопировать градиент из корзины обратно в механизм автоградации.
Таким образом, мы знаем полный процесс того, как движок autograd взаимодействует с DDP в процессе обратного распространения и как использовать DDP для слияния градиентов при обратном вычислении.
+
Worker 1 | Worker 2
|
Engine AccumulateGrad Reducer | Reducer
|
+ + + | +
| | | | |
| | 1 | | |
| | <----------------------+ | |
| | | |
| | | |
| v | |
| 2 | |
| post_hooks +--------> autograd_hook | |
| + | |
| | | |
| | 3 | |
| v | |
| +------------------+---------------------------+ | |
| | mark_variable_ready | | |
| | | | |
| | | | |
| | All variable in replica are ready? | | |
| | + | | |
| | | YES | | |
| | v | | |
| | All replica in bucket are ready? | | |
| | + + + |
| | | YES 4 all_reduce_bucket |
| | v |
| | mark_bucket_ready <--------------+---+-----> |
| | | | |
| | | | |
| | | | |
| | + | | |
| | | | | |
| | | | | |
| | v | | |
| | All buckets are ready? | | |
| | + | | |
| | | YES + + |
| | v 5 allreduce |
| 6 queue_back | all_reduce_local_used_map <--------+---+-----> |
| <------------------------+ queue_callback(finalize_backward) | | |
| | | | |
| | | | |
| +-------------------+--------------------------+ | |
v | | |
| | |
GraphTask::exec_post_processing | | |
+ | | |
| | | |
| 7 v | |
+-----------------------------> finalize_backward | |
| + 8 wait | |
| | <---------------------------------> |
| <-------------------------------------+ | | |
v copy_bucket_to_grad 9 v + v
На этом анализ обратного распространения завершен, и весь анализ DDP завершен, далее мы анализируем распределенный автоград.
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
ссылка 0xFF
BAGUA: Scaling up Distributed Learning with System Relaxations
Распределенная серия Pytorch 1 — узнайте переменные среды, связанные с torch.distributed.launch
pytorch распределенная серия 2 - как синхронизируется DistributedDataParallel?
pytorch (распределенные) данные параллельной личной практики - DataParallel/DistributedDataParallel
обсуждение.py torch.org/he/data пар AL…
Понимать распределенное обучение интерпретации исходного кода PyTorch?
Практическое руководство | Реализация слоя PyTorch AutoGrad C++
PYTORCH автоматическая дифференциация (1)
Как PyTorch ускоряет параллельное обучение данных? Раскрыты распространенные читы
Распределенное обучение pytorch (два init_process_group)
DP и DDP интерпретации исходного кода PyTorch: параллелизм моделей и анализ распределенного обучения