0x00 сводка
Выше мы представили, как движок получает зависимости графа обратного расчета, В этой статье мы рассмотрим, как движок распространяется назад на основе этих зависимостей. Изучив эту статью, вы сможете:
- Чтобы понять, как RecvRpcBackward отправляет RPC-сообщения соответствующим нижестоящим узлам, можно еще раз просмотреть процесс взаимодействия обратного распространения между рабочими процессами.
- Узнайте, как AccumulateGrad накапливает градиенты в контексте.
Другие статьи о распространении PyTorch:
[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор
[Анализ исходного кода] Как PyTorch использует GPU
[Анализ исходного кода] Распределенный PyTorch (2) ----- DataParallel (включен)
[Анализ исходного кода] Распределенный PyTorch (3) ----- DataParallel (ниже)
[Анализ исходного кода] Распределенный PyTorch (7) ----- Группа процессов DistributedDataParallel
[Анализ исходного кода] Распределенный PyTorch (8) -------- Бумага DistributedDataParallel
[Анализ исходного кода] Распределенный PyTorch (9) ----- Инициализация DistributedDataParallel
[Анализ исходного кода] PyTorch, распространяемый Autograd (1) ---- дизайн
[Анализ исходного кода] PyTorch, распространяемый Autograd (2) ---- Фонд RPC
[Анализ исходного кода] PyTorch, распространяемый Autograd (3) ---- контекстно-зависимый
[Анализ исходного кода] PyTorch распространяет Автоград (4) ---- как врезаться в движок
[Анализ исходного кода] PyTorch, распространяемый Autograd (5) ---- движок (включен)
Для лучшего объяснения код в этой статье будет соответственно упрощен в соответствии с конкретной ситуацией.
0x01 Обзор
Сначала мы рассмотрим алгоритм алгоритма режима FAST следующим образом, в этой статье необходимо обсудить следующие части.
- Начнем с воркеров с обратным распространением корней (все корни должны быть локальными).
- Найти весь текущий распределенный контекст Autograd.
send
функция . - из предоставленного корня и все, что мы получили
send
Функция запускается, и мы вычисляем зависимости локально. - После вычисления зависимостей используйте предоставленный корень для запуска локального механизма автоградации.
- Когда движок автограда выполняет
recv
функция,recv
Функция отправляет входной градиент соответствующему воркеру через RPC. каждыйrecv
Функция знает идентификатор целевого рабочего, поскольку он записывается как часть прямого прохода. пройти черезautograd_context_id
иautograd_message_id
Долженrecv
Функция отправляется на удаленный хост. - Когда удаленный хост получает этот запрос, мы используем
autograd_context_id
иautograd_message_id
найти подходящееsend
функция. - Если это первый раз, когда работник получил
autograd_context_id
запросы, он будет вычислять зависимости локально, как описано в пунктах 1-3 выше. - то получит в точке 6
send
Методы вставляются в очередь для выполнения на локальном механизме автоградации этого рабочего. - Наконец, мы не в Тензоре.
.grad
вместо того, чтобы накапливать градиенты поверх каждого распределенного контекста Autograd отдельно. Градиенты хранятся вDict[Tensor, Tensor]
среди ,Dict[Tensor, Tensor]
По сути, это карта от Tensor к связанным с ними градиентам, и эту карту можно получить с помощью API get_gradients().
Во-вторых, давайте посмотрим на общий код выполнения.Все выполнение выполняется в DistEngine::execute, которое разделено на следующие этапы:
- Используйте contextId для получения прямого контекста.
- Используйте validateRootsAndRetrieveEdges для проверки.
- Создайте GraphRoot и используйте его для управления обратным распространением, которое можно рассматривать как виртуальный корень.
- Используйте ComputeDependencies для вычисления зависимостей.
- Используйте runEngineAndAccumulateGradients для вычислений обратного распространения.
- Используйте clearAndWaitForOutstandingRpcsAsync, чтобы дождаться завершения RPC.
void DistEngine::execute(
int64_t contextId,
const variable_list& roots,
bool retainGraph) {
// Retrieve the context for the given context_id. This will throw if the
// context_id is invalid.
auto autogradContext =
DistAutogradContainer::getInstance().retrieveContext(contextId);
// Perform initial pre-processing.
edge_list rootEdges;
variable_list grads;
validateRootsAndRetrieveEdges(roots, rootEdges, grads);
// 构造一个GraphRoot,用它来驱动后向传播,可以认为是一个虚拟根
std::shared_ptr<Node> graphRoot =
std::make_shared<GraphRoot>(rootEdges, grads);
edge_list outputEdges;
// Compute dependencies locally, starting from all roots and all 'send'
// functions.
{
std::lock_guard<std::mutex> guard(initializedContextIdsLock_);
// Context should not have been initialized already.
TORCH_INTERNAL_ASSERT(
initializedContextIds_.find(autogradContext->contextId()) ==
initializedContextIds_.end());
// 计算依赖
computeDependencies(
autogradContext, rootEdges, grads, graphRoot, outputEdges, retainGraph);
// Mark the autograd context id as initialized.
initializedContextIds_.insert(autogradContext->contextId());
}
BackwardPassCleanupGuard guard(autogradContext);
// This needs to be blocking and as a result we wait for the future to
// complete.
runEngineAndAccumulateGradients(autogradContext, graphRoot, outputEdges)
->waitAndThrow(); // 反向传播计算
// Wait for all of the outstanding rpcs to complete.
autogradContext->clearAndWaitForOutstandingRpcsAsync()->waitAndThrow();
}
Опять же, из предыдущей статьи мы знаем, что зависимости были обработаны в calculateDependencies, и вся информация о функциях, которую необходимо вычислить, находится в GraphTask.exec_info_. Давайте посмотрим, как его вычислить дальше, а именно с помощью двух методов: runEngineAndAccumulateGradients и clearAndWaitForOutstandingRpcsAsync.
0x02 Выполнить GraphTask
Давайте сначала посмотрим, как использовать runEngineAndAccumulateGradients для вычислений обратного распространения, накапливая градиенты.
2.1 runEngineAndAccumulateGradients
Среди движков первым вызывается runEngineAndAccumulateGradients. Главное — инкапсулировать NodeTask, а потом с этим вызвать execute_graph_task_until_ready_queue_empty. где at::launch используется для запуска потока.
c10::intrusive_ptr<c10::ivalue::Future> DistEngine::
runEngineAndAccumulateGradients(
const ContextPtr& autogradContext,
const std::shared_ptr<Node>& graphRoot,
const edge_list& outputEdges,
bool incrementOutstandingTasks) {
// Cleanup previous state for outstanding RPCs. Outstanding RPCs could be
// lingering if we're running backward multiple times and some of the
// passes ran into errors.
autogradContext->clearOutstandingRpcs();
// 得到GraphTask
auto graphTask = autogradContext->retrieveGraphTask();
// 启动了一个线程来运行 execute_graph_task_until_ready_queue_empty
at::launch([this, graphTask, graphRoot, incrementOutstandingTasks]() {
execute_graph_task_until_ready_queue_empty(
/*node_task*/ NodeTask(graphTask, graphRoot, InputBuffer(0)),
/*incrementOutstandingTasks*/ incrementOutstandingTasks);
});
// Use a reference here to avoid refcount bump on futureGrads.
// 处理结果
auto& futureGrads = graphTask->future_result_;
// Build a future that waits for the callbacks to execute (since callbacks
// execute after the original future is completed). This ensures we return a
// future that waits for all gradient accumulation to finish.
auto accumulateGradFuture =
c10::make_intrusive<c10::ivalue::Future>(c10::NoneType::get());
futureGrads->addCallback(
[autogradContext, outputEdges, accumulateGradFuture](c10::ivalue::Future& futureGrads) {
if (futureGrads.hasError()) {
// 省略错误处理部分
return;
}
try {
const variable_list& grads =
futureGrads.constValue().toTensorVector();
// 标识已经结束
accumulateGradFuture->markCompleted(c10::IValue());
} catch (std::exception& e) {
accumulateGradFuture->setErrorIfNeeded(std::current_exception());
}
});
return accumulateGradFuture;
}
at::launch находится в aten/src/ATen/ParallelThreadPoolNative.cpp, где в потоке будет вызываться входящая функция.
void launch(std::function<void()> func) {
internal::launch_no_thread_state(std::bind([](
std::function<void()> f, ThreadLocalState thread_locals) {
ThreadLocalStateGuard guard(std::move(thread_locals));
f();
},
std::move(func),
ThreadLocalState()
));
}
namespace internal {
void launch_no_thread_state(std::function<void()> fn) {
#if AT_EXPERIMENTAL_SINGLE_THREAD_POOL
intraop_launch(std::move(fn));
#else
get_pool().run(std::move(fn));
#endif
}
}
Давайте посмотрим, как эти внутренние методы выполняются один за другим.
2.2 execute_graph_task_until_ready_queue_empty
Эта функция аналогична Engine::thread_main и завершает выполнение этой GraphTask через NodeTask, в которой функцияvaluate_function будет постоянно вставлять новые NodeTasks в cpu_ready_queue.engine_.evaluate_functionМетод будет:
- Во-первых, инициализируйте собственный поток движка.
- Во-вторых, каждый вызов устанавливает очередь cpu_ready_queue, которая используется для перехода graph_task от root_to_execute, что позволяет параллельно выполнять GraphTask с разными потоками, что является очередью, зависящей от процессора.
- Вставьте входящий node_task в cpu_ready_queue.
- Начните с корня по обратному графу вычислений и вычисляйте до конечных узлов.
-
-
Здесь листовыми узлами являются AccumulateGrad или RecvRpcBackward.
-
Если это промежуточный узел, он рассчитывается нормально.
-
Если это RecvRpcBackward, он отправит сообщение RPC соответствующему нижестоящему узлу.
-
Если AccumulateGrad, градиент накапливается в контексте.
-
Конкретный код выглядит следующим образом:
void DistEngine::execute_graph_task_until_ready_queue_empty(
NodeTask&& node_task,
bool incrementOutstandingTasks) {
// 初始化原生引擎线程
engine_.initialize_device_threads_pool();
// Create a ready queue per call to traverse the graph_task from
// root_to_execute This allow concurrent execution of the same GraphTask from
// different threads
// 每个调用建立一个 ready queue,用来从root_to_execute开始遍历graph_task,这允许用不同的线程来对GraphTask并行执行,这是一个CPU相关的queue
std::shared_ptr<ReadyQueue> cpu_ready_queue = std::make_shared<ReadyQueue>();
auto graph_task = node_task.base_.lock();
if (graph_task == nullptr) {
LOG(ERROR) << "GraphTask has expired for NodeTask: "
<< node_task.fn_->name() << ", skipping execution.";
return;
}
cpu_ready_queue->push(std::move(node_task), incrementOutstandingTasks);
torch::autograd::set_device(torch::autograd::CPU_DEVICE);
graph_task->owner_ = torch::autograd::CPU_DEVICE;
while (!cpu_ready_queue->empty()) {
std::shared_ptr<GraphTask> local_graph_task;
{
// Scope this block of execution since NodeTask is not needed after this
// block and can be deallocated (release any references to grad tensors
// as part of inputs_)
NodeTask task = cpu_ready_queue->pop(); // 取出一个NodeTask
if (!(local_graph_task = task.base_.lock())) {
continue;
}
if (task.fn_ && !local_graph_task->has_error_.load()) {
AutoGradMode grad_mode(local_graph_task->grad_mode_);
try {
GraphTaskGuard guard(local_graph_task);
engine_.evaluate_function( // 这里会调用具体Node对应的函数
local_graph_task, task.fn_.get(), task.inputs_, cpu_ready_queue);
} catch (std::exception& e) {
engine_.thread_on_exception(local_graph_task, task.fn_, e);
// break the loop in error so that we immediately stop the execution
// of this GraphTask, mark it completed if necessary and return the
// future with proper ErrorMessage
break;
}
}
}
// Decrement the outstanding task.
--local_graph_task->outstanding_tasks_; // 处理了一个NodeTask
}
// Check if we've completed execution.
if (graph_task->completed()) {
// We don't need to explicitly notify the owner thread, since
// 'mark_as_completed_and_run_post_processing' would mark the Future as
// completed and this would notify the owner thread that the task has been
// completed.
graph_task->mark_as_completed_and_run_post_processing();
}
}
Кроме того, есть три места, где вызывается execute_graph_task_until_ready_queue_empty.
- будет вызываться runEngineAndAccumulateGradients Это тот случай, когда пользователь активно вызывает обратный вызов, что описано в этом разделе.
- будет вызываться executeSendFunctionAsync, что соответствует работе узла после получения градиента от предыдущего узла обратного распространения, который мы представим в следующем разделе.
- globalCpuThread будет вызываться выделенным потоком для работы ЦП, который мы скоро представим.
- В Engine.evaluate_function градиенты накапливаются для AccumulateGrad.
- В Engine.evaluate_function вызывается RecvRpcBackward для отправки сообщений после обратного распространения.
Подытожим несколько процессов вычисления градиентов, соответствующих следующим трем числам.
User Training Script RPC BACKWARD_AUTOGRAD_REQ
+ +
| |
| 1 | 2
v v
backward RequestCallbackNoPython.processRpc
+ +
| |
| |
v v
DistEngine.execute RequestCallbackNoPython.processBackwardAutogradReq
+ +
| |
| |
| v
| +----------+ DistEngine.executeSendFunctionAsync
| | +
| | |
v v |
DistEngine.computeDependencies |
| |
| |
v |
DistEngine.runEngineAndAccumulateGradients | DistEngine.globalCpuThread
+ | +
| +------------------+ |
| | | 3
| | +------------------------+
| | |
| | |
v v v
DistEngine.execute_graph_task_until_ready_queue_empty
+
|
|
v
DistEngine.evaluate_function
+
|
+--------------------------------------------------------------+
| |
| 4 AccumulateGrad | 5 RecvRpcBackward
v v
(*hook)(captured_grad) call_function(graph_task, func, inputs)
2.3 evaluate_function
В приведенном выше коде функция Assessment_function собственного движка фактически вызывается для завершения операции.
Давайте посмотрим, как использоватьexec_info_
, если он не настроен на выполнение, он не будет обработан. Здесь мы можем видеть вышеупомянутоеrecvBackwardEdges
Как работать сexec_info_
взаимодействовать.
Перейдите recvBackwardEdges и для каждого recvBackward установите соответствующий элемент в GraphTask.exec_info_ для выполнения.
Конкретный код выглядит следующим образом, здесь будет:
- Накапливайте градиенты для AccumulateGrad.
- Вызовите RecvRpcBackward, чтобы отправить сообщение после обратного распространения.
void Engine::evaluate_function(
std::shared_ptr<GraphTask>& graph_task,
Node* func,
InputBuffer& inputs,
const std::shared_ptr<ReadyQueue>& cpu_ready_queue) {
// If exec_info_ is not empty, we have to instrument the execution
auto& exec_info_ = graph_task->exec_info_;
if (!exec_info_.empty()) {
auto& fn_info = exec_info_.at(func);
if (auto* capture_vec = fn_info.captures_.get()) {
// Lock mutex for writing to graph_task->captured_vars_.
std::lock_guard<std::mutex> lock(graph_task->mutex_);
for (const auto& capture : *capture_vec) {
auto& captured_grad = graph_task->captured_vars_[capture.output_idx_];
captured_grad = inputs[capture.input_idx_];
for (auto& hook : capture.hooks_) {
captured_grad = (*hook)(captured_grad); //这里调用 hook,就是 DistAccumulateGradCaptureHook 的 operator(),captured_grad 就是累积的梯度
}
}
}
if (!fn_info.needed_) {
// Skip execution if we don't need to execute the function.
return; // 如果没有设置需要执行,则直接返回。recvBackward 会设置需要执行
}
}
// 这里就是调用 recvBackward
auto outputs = call_function(graph_task, func, inputs);
// 后续代码省略
2.4 globalCpuThread
Информацию о globalCpuThread см. в разделе [Продолжения от GPU к CPU] выше. globalCpuThread — это рабочий поток, который извлекает NodeTask из очереди готовности и выполняет его.
Для globalCpuThread его параметр ready_queue равен global_cpu_ready_queue_.
void DistEngine::globalCpuThread(
const std::shared_ptr<ReadyQueue>& ready_queue) {
while (true) {
NodeTask task = ready_queue->pop();
if (task.isShutdownTask_) {
// Need to shutdown this thread.
break;
}
auto graphTask = task.base_.lock();
if (graphTask == nullptr) {
// GraphTask has expired, ignore and continue processing.
continue;
}
// Launch the execution on a JIT thread.
at::launch([this,
graphTask,
graphRoot = task.fn_,
variables =
InputBuffer::variables(std::move(task.inputs_))]() mutable {
InputBuffer inputs(variables.size());
for (size_t i = 0; i < variables.size(); i++) {
inputs.add(i, std::move(variables[i]), c10::nullopt, c10::nullopt);
}
execute_graph_task_until_ready_queue_empty( // 这里会调用
/*node_task*/ NodeTask(graphTask, graphRoot, std::move(inputs)),
/*incrementOutstandingTasks*/ false);
});
}
}
Для распространенных движков также настроена выделенная очередь процессора.
auto graph_task = std::make_shared<GraphTask>(
/* keep_graph */ keep_graph,
/* create_graph */ create_graph,
/* depth */ not_reentrant_backward_call ? 0 : total_depth + 1,
/* cpu_ready_queue */ local_ready_queue);
2.5 Резюме
Для распределенных движков основными отличиями от обычных движков в вычислительной части являются:
-
Если это RecvRpcBackward, он отправит сообщение RPC соответствующему нижестоящему узлу.
-
Если AccumulateGrad, градиент накапливается в контексте.
Итак, давайте посмотрим, как работать с этими двумя частями.
0x03 RPC-вызов
В предыдущей статье мы видели, как получатель обрабатывает вызовы RPC с обратным распространением, давайте посмотрим, как движок инициирует вызовы RPC с обратным распространением, то есть как вызывать метод recv.
Это относится к случаю, когда воркер 0 вызывает recv ниже, а выполнение переходит к воркеру 1. Соответствующий проектный документ выглядит следующим образом.
Когда движок автограда выполняет
recv
функция,recv
Функция отправляет входной градиент соответствующему воркеру через RPC. каждыйrecv
Функция знает идентификатор целевого рабочего, поскольку он записывается как часть прямого прохода. пройти черезautograd_context_id
иautograd_message_id
Долженrecv
Функция отправляется на удаленный хост.
Давайте посмотрим, как выполнить функцию recv.
В частности, в сочетании с распределенным движком, когда движок обнаруживает, что узел находится в состоянии RecvRpcBackward, он вызывает свою функцию применения.
void Engine::evaluate_function(
std::shared_ptr<GraphTask>& graph_task,
Node* func,
InputBuffer& inputs,
const std::shared_ptr<ReadyQueue>& cpu_ready_queue) {
// If exec_info_ is not empty, we have to instrument the execution
auto& exec_info_ = graph_task->exec_info_;
if (!exec_info_.empty()) {
// 省略了梯度累积部分代码,具体可以参见上面章节
if (!fn_info.needed_) {
// Skip execution if we don't need to execute the function.
return; // 如果没有设置需要执行,则直接返回。recvBackward 会设置需要执行
}
}
// 这里就是调用 recvBackward.apply 函数
auto outputs = call_function(graph_task, func, inputs);
// 后续代码省略
3.1 RecvRpcBackward
3.1.1 Определения
RecvRpcBackward определяется следующим образом:
class TORCH_API RecvRpcBackward : public torch::autograd::Node {
public:
explicit RecvRpcBackward(
const AutogradMetadata& autogradMetadata,
std::shared_ptr<DistAutogradContext> autogradContext,
rpc::worker_id_t fromWorkerId,
std::unordered_map<c10::Device, c10::Device> deviceMap);
torch::autograd::variable_list apply(
torch::autograd::variable_list&& grads) override;
private:
const AutogradMetadata autogradMetadata_;
// Hold a weak reference to the autograd context to avoid circular
// dependencies with the context (since it holds a reference to
// RecvRpcBackward).
std::weak_ptr<DistAutogradContext> autogradContext_;
// The worker id from which the RPC was received. During the backward pass,
// we need to propagate the gradients to this workerId.
rpc::worker_id_t fromWorkerId_;
// Device mapping for tensors sent over RPC.
const std::unordered_map<c10::Device, c10::Device> deviceMap_;
};
3.1.2 Сборка
Конструктор выглядит следующим образом.
RecvRpcBackward::RecvRpcBackward(
const AutogradMetadata& autogradMetadata,
ContextPtr autogradContext,
rpc::worker_id_t fromWorkerId,
std::unordered_map<c10::Device, c10::Device> deviceMap)
: autogradMetadata_(autogradMetadata),
autogradContext_(std::move(autogradContext)),
fromWorkerId_(fromWorkerId),
deviceMap_(std::move(deviceMap)) {}
3.1.3 apply
torch/csrc/distributed/autograd/functions/recvrpc_backward.cpp определяет свою функцию применения, которая работает следующим образом:
- Поместите входящие градации градиента в outputGrads, потому что они будут выводиться на следующую ссылку.
- Создайте PropagateGradientsReq, то есть BACKWARD_AUTOGRAD_REQ.
- Отправьте RPC на следующую ссылку.
variable_list RecvRpcBackward::apply(variable_list&& grads) {
std::vector<Variable> outputGrads;
for (size_t i = 0; i < grads.size(); i++) { // 下面就是把传入的梯度 grads 放入outputGrads
const auto& grad = grads[i];
if (grad.defined()) {
outputGrads.emplace_back(grad);
} else {
// Put in zeros for a tensor with no grad.
outputGrads.emplace_back(input_metadata(i).zeros_like());
}
}
auto sharedContext = autogradContext_.lock();
// Send the gradients over the wire and record the future in the autograd
// context.
PropagateGradientsReq gradCall( // 构建 PropagateGradientsReq
autogradMetadata_,
outputGrads,
sharedContext->retrieveGraphTask()->keep_graph_);
// Send the gradients over to the appropriate node.
auto rpcAgent = rpc::RpcAgent::getCurrentRpcAgent();
auto jitFuture = rpcAgent->send( // 发送 RPC
rpcAgent->getWorkerInfo(fromWorkerId_),
std::move(gradCall).toMessage(), // 调用了toMessageImpl
rpc::kUnsetRpcTimeout,
deviceMap_);
// Record the future in the context.
sharedContext->addOutstandingRpc(jitFuture);
// 'recv' function sends the gradients over the wire using RPC, it doesn't
// need to return anything for any downstream autograd function.
return variable_list();
}
Поскольку PropagateGradientsReq отправляется сюда, давайте двигаться дальше.
3.2 PropagateGradientsReq
3.2.1 Определения
PropagateGradientsReq расширяет RpcCommandBase.
// Used to propagate gradients from one node to another during a distributed
// backwards pass. This RPC call is invoked when we hit a `recv` autograd
// function during backward pass execution.
class TORCH_API PropagateGradientsReq : public rpc::RpcCommandBase {
public:
PropagateGradientsReq(
const AutogradMetadata& autogradMetadata,
std::vector<torch::autograd::Variable> grads,
bool retainGraph = false);
const AutogradMetadata& getAutogradMetadata();
const std::vector<torch::autograd::Variable>& getGrads();
// Serialization and deserialization methods.
rpc::Message toMessageImpl() && override;
static std::unique_ptr<PropagateGradientsReq> fromMessage(
const rpc::Message& message);
// Whether or not to retain the autograd graph.
bool retainGraph();
private:
AutogradMetadata autogradMetadata_;
std::vector<torch::autograd::Variable> grads_;
bool retainGraph_;
};
Его toMessageImpl указывает, что это сообщение BACKWARD_AUTOGRAD_REQ.
Message PropagateGradientsReq::toMessageImpl() && {
std::vector<at::IValue> ivalues;
// Add all the grad tensors.
for (const auto& grad : grads_) {
ivalues.emplace_back(grad);
}
// Now add autograd metadata.
ivalues.emplace_back(autogradMetadata_.autogradContextId);
ivalues.emplace_back(autogradMetadata_.autogradMessageId);
// Add retain graph.
ivalues.emplace_back(retainGraph_);
// Now pickle using JIT pickler.
std::vector<torch::Tensor> tensorTable;
std::vector<char> payload =
jit::pickle(c10::ivalue::Tuple::create(std::move(ivalues)), &tensorTable);
return Message(
std::move(payload),
std::move(tensorTable),
MessageType::BACKWARD_AUTOGRAD_REQ); // 这里指明了消息类型。
}
3.3 Получатель
Для полноты картины давайте посмотрим, как получатель обрабатывает обратное распространение.
3.3.1 Получение сообщения
При создании TensorPipeAgent настройте RequestCallbackImpl как функцию обратного вызова. Это унифицированная функция ответа агента. Когда мы упоминали логику получения прокси ранее, она войдет в функцию RequestCallbackNoPython::processRpc. Видно, что есть логика обработки для BACKWARD_AUTOGRAD_REQ.
Это нормальный поток RPC.
void RequestCallbackNoPython::processRpc(
RpcCommandBase& rpc,
const MessageType& messageType,
const int64_t messageId,
const c10::intrusive_ptr<JitFuture>& responseFuture,
std::shared_ptr<LazyStreamContext> ctx) const {
switch (messageType) {
case MessageType::BACKWARD_AUTOGRAD_REQ: {
processBackwardAutogradReq(rpc, messageId, responseFuture); // 这里调用
return;
};
3.3.2 processBackwardAutogradReq
В процессеBackwardAutogradReq будет:
- Получите DistAutogradContainer.
- Получить контекст.
- Вызовите executeSendFunctionAsync для обработки ядра.
Из этого мы видим, что есть два пути в двигатель:
- Во-первых, пример кода явно и активно вызывает обратный вызов, а затем вызывает DistEngine::getInstance().execute, который является рабочим 0.
- Один из них заключается в пассивном вызове DistEngine::getInstance().executeSendFunctionAsync, который является рабочим 1.
void RequestCallbackNoPython::processBackwardAutogradReq(
RpcCommandBase& rpc,
const int64_t messageId,
const c10::intrusive_ptr<JitFuture>& responseFuture) const {
auto& gradientsCall = static_cast<PropagateGradientsReq&>(rpc);
const auto& autogradMetadata = gradientsCall.getAutogradMetadata();
// Retrieve the appropriate autograd context.
auto autogradContext = DistAutogradContainer::getInstance().retrieveContext(
autogradMetadata.autogradContextId); // 得到发送者的context id
// Lookup the appropriate 'send' function to enqueue.
std::shared_ptr<SendRpcBackward> sendFunction = // 依据发送者context id和消息id得到sendFunction
autogradContext->retrieveSendFunction(autogradMetadata.autogradMessageId);
// Attach the gradients to the send function.
sendFunction->setGrads(gradientsCall.getGrads()); // 设置梯度
// Now execute the autograd graph using the "distributed engine."
auto execFuture = DistEngine::getInstance().executeSendFunctionAsync( // 调用引擎
autogradContext, sendFunction, gradientsCall.retainGraph());
// Our response is satisfied when the rpcs come back.
execFuture->addCallback([responseFuture, messageId](JitFuture& execFuture) {
if (!execFuture.hasError()) {
Message m = std::move(PropagateGradientsResp()).toMessage();
m.setId(messageId);
responseFuture->markCompleted(
IValue(c10::make_intrusive<Message>(std::move(m))));
} else {
responseFuture->setError(execFuture.exception_ptr());
}
});
}
3.3.3 executeSendFunctionAsync
Здесь executeSendFunctionAsync начинает входить в движок.Обратите внимание, что получатель также входит в движок и выполняет вычисления в приемнике. executeSendFunctionAsync будет напрямую вызывать execute_graph_task_until_ready_queue_empty или может сначала вычислить зависимости, а затем продолжить выполнение. Здесь вы можете ознакомиться с дизайном:
- 6) Когда удаленный хост получает этот запрос, мы используем
autograd_context_id
иautograd_message_id
найти подходящееsend
функция. - 7) Если работник впервые получил
autograd_context_id
запросы, он будет вычислять зависимости локально, как описано в пунктах 1-3 выше. - 8) Тогда получим в пункте 6
send
Методы вставляются в очередь для выполнения на локальном механизме автоградации этого рабочего.
Конкретный код выглядит следующим образом:
c10::intrusive_ptr<c10::ivalue::Future> DistEngine::executeSendFunctionAsync(
const ContextPtr& autogradContext,
const std::shared_ptr<SendRpcBackward>& sendFunction,
bool retainGraph) {
// Typically the local autograd engine ensures stream synchronizations between
// nodes in the graph. However, for distributed autograd the sendFunction
// inputs might have been retrieved over the wire on a separate stream and the
// sendFunction itself runs on a different stream. As a result, we need to
// manually synchronize those two streams here.
const auto& send_backward_stream = sendFunction->stream(c10::DeviceType::CUDA);
if (send_backward_stream) { // 拿到本次执行对应的Stream
for (const auto& grad : sendFunction->getGrads()) {
const auto guard = c10::impl::VirtualGuardImpl{c10::DeviceType::CUDA};
const auto default_stream = guard.getStream(grad.device());
if (send_backward_stream != default_stream) {
auto event = c10::Event{c10::DeviceType::CUDA};
event.record(default_stream);
send_backward_stream->wait(event); // 需要同步,保证当前操作完成
}
}
}
std::unique_lock<std::mutex> lock(initializedContextIdsLock_);
if (initializedContextIds_.find(autogradContext->contextId()) ==
initializedContextIds_.end()) { // 遍历,查找sendFunction对应的上下文是否在本节点之中已经记录
// 没有找到上下文,需要计算依赖
edge_list outputEdges;
// Pass in a dummy graphRoot since all send functions are the roots.
auto dummyRoot = std::make_shared<GraphRoot>(edge_list(), variable_list());
computeDependencies( // 计算依赖
autogradContext, {}, {}, dummyRoot, outputEdges, retainGraph);
// Mark the autograd context id as initialized and unlock.
initializedContextIds_.insert(autogradContext->contextId());
lock.unlock();
// Enqueue the current send function.
auto graphTask = autogradContext->retrieveGraphTask();
// Run the autograd engine.
auto accumulateGradFuture = runEngineAndAccumulateGradients( // 计算梯度
autogradContext,
sendFunction,
outputEdges,
/*incrementOutstandingTasks=*/false);
// Build the 'uber' future that waits for everything.
auto callbackFuture =
c10::make_intrusive<c10::ivalue::Future>(c10::NoneType::get());
// 注册回调
accumulateGradFuture->addCallback([autogradContext,
callbackFuture](c10::ivalue::Future& accumulateGradFuture) {
try {
if (accumulateGradFuture.hasError()) {
// Perform cleanup at the end of the backward pass (before we mark
// the future as completed).
DistEngine::getInstance().cleanupBackwardPass(autogradContext);
// Skip any further processing on errors.
callbackFuture->setError(accumulateGradFuture.exception_ptr());
return;
}
// Wait for all RPCs after the autograd engine is done.
auto rpcFuture = autogradContext->clearAndWaitForOutstandingRpcsAsync();
rpcFuture->addCallback([callbackFuture, autogradContext](c10::ivalue::Future& rpcFuture) {
try {
// Perform cleanup at the end of the backward pass (before
// we mark the future as completed).
DistEngine::getInstance().cleanupBackwardPass(autogradContext);
} catch (std::exception& e) {
callbackFuture->setErrorIfNeeded(std::current_exception());
return;
}
// Finally mark the 'uber' future as completed.
if (!rpcFuture.hasError()) {
callbackFuture->markCompleted(c10::IValue());
} else {
callbackFuture->setError(rpcFuture.exception_ptr());
}
});
} catch (std::exception& e) {
callbackFuture->setErrorIfNeeded(std::current_exception());
}
});
// Return the future which waits for all async processing to be done.
return callbackFuture;
} else { // 可以在当前Node找到上下文
lock.unlock();
auto graphTask = autogradContext->retrieveGraphTask();
at::launch([this, graphTask, sendFunction]() {
execute_graph_task_until_ready_queue_empty(
/*node_task*/ NodeTask(graphTask, sendFunction, InputBuffer(0)),
/*incrementOutstandingTasks*/ false);
});
auto fut = c10::make_intrusive<c10::ivalue::Future>(c10::NoneType::get());
fut->markCompleted(c10::IValue());
return fut;
}
}
Подробности следующие:
+
worker 0 | worker 1
|
Engine RecvRpcBackward RpcAgent | RequestCallbackNoPython DistEngine
+ + + | + +
| | | | | |
| | | | | |
evaluate_function | | | | |
+ | | | | |
| | | | | |
+ | | | | |
call_function | | | | |
+ | | | | |
| grads v | | | |
+----------------> apply | | | |
| + | | | |
| | | | | |
| + | | | |
| gradCall | | | |
| + | | | |
| | PropagateGradientsReq | | | |
| +------------------------> | | | |
| | | + | |
| | + BACKWARD_AUTOGRAD_REQ | |
| | send +---------+---------> | |
| | + | | |
| | | | + |
| | | | processBackwardAutogradReq |
| | | | + |
| | | | | +
| | | | +------------> executeSendFunctionAsync
| | | | | +
| | | | | |
| | | | | |
v v v + v v
Телефон такой:
0x04 DistAccumulateGradCaptureHook
В настоящее время кажется, что общая логика завершена, но на самом деле отсутствует часть, которая соответствует дизайн-документу:
Наконец, мы не в Тензоре.
.grad
вместо того, чтобы накапливать градиенты поверх каждого распределенного контекста Autograd отдельно. Градиенты хранятся вDict[Tensor, Tensor]
среди ,Dict[Tensor, Tensor]
По сути, это карта от Tensor к связанным с ними градиентам, и эту карту можно получить с помощью API get_gradients().
Он предназначен для накопления удаленного/локального градиента в локальном контексте, поэтому давайте снова проанализируем DistAccumulateGradCaptureHook.
4.1 Определения
DistAccumulateGradCaptureHook имеет три функции:
-
Вызовите пре-хуки оригинального AccumulateGrad, чтобы изменить входной градиент.
-
Накопить градиент в контексте RPC.
-
Вызовите почтовые хуки оригинального AccumulateGrad.
Он определяется следующим образом:
// This hook does 3 things:
// 1. Call pre hooks of the original AccumulateGrad to modify the input grad.
// 2. Accumuate the gard to RPC context.
// 3. Call post hooks of the original AccumulateGrad.
class DistAccumulateGradCaptureHook
: public GraphTask::ExecInfo::Capture::GradCaptureHook {
public:
DistAccumulateGradCaptureHook(
std::shared_ptr<AccumulateGrad> accumulateGrad,
ContextPtr autogradContext)
: accumulateGrad_(std::move(accumulateGrad)),
autogradContext_(std::move(autogradContext)) {}
at::Tensor operator()(const at::Tensor& grad) override {
ThreadLocalDistAutogradContext contextGuard{ContextPtr(autogradContext_)};
variable_list inputGrads = {grad};
// It's intended that pre/post hooks are still called even if the grad is
// undenfined here.
for (const auto& hook : accumulateGrad_->pre_hooks()) {
inputGrads = (*hook)(inputGrads); // 调用 pre-hooks
}
// It is possible that the grad is not defined since a separate
// invocation of the autograd engine on the same node might actually
// compute this gradient.
if (inputGrads[0].defined()) {
// There are 3 internal references to 'inputGrads[0]' at this moment:
// 1. 'inputGrads[0]' in this function.
// 2. 'graph_task->captured_vars_' on the callsite in the local engine.
// 3. 'InputBuffer& inputs' on the callsite as the inputs of the
// function node.
autogradContext_->accumulateGrad( // 累积梯度
accumulateGrad_->variable, inputGrads[0], 3 /* num_expected_refs */);
}
const variable_list kEmptyOuput;
for (const auto& hook : accumulateGrad_->post_hooks()) {
(*hook)(kEmptyOuput, inputGrads); // 调用 post-hooks
}
return inputGrads[0];
}
private:
std::shared_ptr<AccumulateGrad> accumulateGrad_; // 这就是需要累积的目标向量,后续操作在其之上
ContextPtr autogradContext_;
};
4.2 Генерация
Как сгенерировать DistAccumulateGradCaptureHook? DistAccumulateGradCaptureHook генерируется при вычислении зависимостей, но записывается в Capture.hooks_.push_back.
Здесь нужно разобраться с AccumulateGrad.
-
AccumulateGrad должен быть конечным узлом, который не нужно выполнять, но нужно накапливать на нем градиенты, а вот RecvRpcBackward нужно выполнять.
-
AccumulateGrad хранится в DistAccumulateGradCaptureHook.
void DistEngine::computeDependencies(
const ContextPtr& autogradContext,
const edge_list& rootEdges,
const variable_list& grads,
const std::shared_ptr<Node>& graphRoot,
edge_list& outputEdges,
bool retainGraph) {
if (!outputEdges.empty()) {
// Compute 'needed execution' starting from all 'send' functions and the
// original graphRoot.
edge_list edges;
// Create some dummy edges (input_nr not important for init_to_execute).
for (const auto& mapEntry : sendFunctions) {
edges.emplace_back(mapEntry.second, 0);
}
// Add the original graphRoot as an edge.
edges.emplace_back(graphRoot, 0);
// Create a dummy GraphRoot and run init_to_execute with it.
GraphRoot dummyRoot(edges, {});
graphTask->init_to_execute(dummyRoot, outputEdges, /*accumulate_grad=*/false, /*min_topo_nr=*/0);
for (auto& mapEntry : graphTask->exec_info_) {
auto& execInfo = mapEntry.second;
if (!execInfo.captures_) {
continue;
}
auto fn = mapEntry.first;
// There may be nodes other than 'AccumulateGrad', e.g. RecvRPCBackward,
// to be captured.
if (auto accumulateGradFn = dynamic_cast<AccumulateGrad*>(fn)) {
for (auto& capture : *execInfo.captures_) {
capture.hooks_.push_back( // 这里会生成
std::make_unique<DistAccumulateGradCaptureHook>(
std::dynamic_pointer_cast<AccumulateGrad>( // 会保存 AccumulateGrad
accumulateGradFn->shared_from_this()),
autogradContext));
}
}
}
// Mark all 'RecvRPCBackward' as needing execution.
for (const auto& recvBackwardEdge : recvBackwardEdges) {
graphTask->exec_info_[recvBackwardEdge.function.get()].needed_ = true;
}
}
}
4.3 Использование
Код представляет собой сокращенную версию.
Во-первых, execute_graph_task_until_ready_queue_empty вызовет исходный движок engine_.evaluate_function.
void DistEngine::execute_graph_task_until_ready_queue_empty(
NodeTask&& node_task,
bool incrementOutstandingTasks) {
while (!cpu_ready_queue->empty()) {
std::shared_ptr<GraphTask> local_graph_task;
{
NodeTask task = cpu_ready_queue->pop();
if (task.fn_ && !local_graph_task->has_error_.load()) {
AutoGradMode grad_mode(local_graph_task->grad_mode_);
GraphTaskGuard guard(local_graph_task);
engine_.evaluate_function( // 调用原始引擎
local_graph_task, task.fn_.get(), task.inputs_, cpu_ready_queue);
}
}
// Decrement the outstanding task.
--local_graph_task->outstanding_tasks_;
}
}
Во-вторых, в исходном коде движка вызываются хуки.
void Engine::evaluate_function(
std::shared_ptr<GraphTask>& graph_task,
Node* func,
InputBuffer& inputs,
const std::shared_ptr<ReadyQueue>& cpu_ready_queue) {
// If exec_info_ is not empty, we have to instrument the execution
auto& exec_info_ = graph_task->exec_info_;
if (!exec_info_.empty()) {
auto& fn_info = exec_info_.at(func);
if (auto* capture_vec = fn_info.captures_.get()) {
// Lock mutex for writing to graph_task->captured_vars_.
std::lock_guard<std::mutex> lock(graph_task->mutex_);
for (const auto& capture : *capture_vec) {
auto& captured_grad = graph_task->captured_vars_[capture.output_idx_];
captured_grad = inputs[capture.input_idx_];
for (auto& hook : capture.hooks_) {
captured_grad = (*hook)(captured_grad); // 这里调用 hook,就是 DistAccumulateGradCaptureHook 的 operator(),captured_grad 就是累积的梯度
}
}
}
}
// 后续省略
В методе operator() DistAccumulateGradCaptureHook будет вызываться следующее для накопления градиентов.
autogradContext_->accumulateGrad(
accumulateGrad_->variable, inputGrads[0], 3 /* num_expected_refs */);
4.4 Совокупный градиент
4.4.1 Накопление контекста
void DistAutogradContext::accumulateGrad(
const torch::autograd::Variable& variable, // variable就是目标变量
const torch::Tensor& grad, // grad就是梯度,需要累积到variable之上
size_t num_expected_refs) {
std::lock_guard<std::mutex> guard(lock_);
auto it = accumulatedGrads_.find(variable);
at::Tensor old_grad;
if (it != accumulatedGrads_.end()) {
// Accumulate multiple grads on the same variable.
old_grad = it->value();
}
// Gradients are computed using the forward streams. Local autograd
// engine uses AccumulateGrad function to retrieve and apply forward
// stream during the backward computation. In distributed autograd,
// we directly call AccumulateGrad::accumulateGrad, and skip the
// CUDA stream restoration from autograd function. Hence, we manually
// call it here to get the streams correct.
auto forward_stream =
torch::autograd::impl::grad_accumulator(variable)->stream(
grad.device().type());
c10::OptionalStreamGuard stream_guard(forward_stream);
// No higher order gradients supported in distributed autograd.
AutoGradMode grad_mode(false);
at::Tensor new_grad = AccumulateGrad::callHooks(variable, grad); // 计算
AccumulateGrad::accumulateGrad( // 调用算子函数来累积梯度
variable,
old_grad,
new_grad,
// Add +1 here since we can't std::move(grad) when call
// AccumulateGrad::callHooks, since it is a const ref, and that incurs a
// refcount bump for the new_grad.
num_expected_refs + 1,
[this, &variable](at::Tensor&& grad_update) {
auto device = grad_update.device();
accumulatedGrads_.insert(variable, std::move(grad_update));
recordGradEvent(device);
});
}
4.4.2 Накопление оператора
Код находится в torch/csrc/autograd/functions/accumulate_grad.h. AccumulateGrad определяется следующим образом:
struct TORCH_API AccumulateGrad : public Node {
explicit AccumulateGrad(Variable variable_);
variable_list apply(variable_list&& grads) override;
static at::Tensor callHooks(
const Variable& variable,
at::Tensor new_grad) {
for (auto& hook : impl::hooks(variable)) {
new_grad = (*hook)({new_grad})[0];
}
return new_grad;
}
// Given a variable with its current grad as variable_grad, accumulates
// new_grad into variable_grad if in place accumulation is possible.
// Otherwise, uses 'update_grad' to update the grad for the variable.
// "Gradient Layout Contract"
//
// AccumulateGrad tries to stash strided (non-sparse) grads with memory layout
// (strides) such that variables and grads interact efficiently in later
// optimizer kernels, and grads interact efficiently with c10d::Reducer.cpp.
//
// Specifically, AccumulateGrad tries to ensure the following
// (cf torch/csrc/autograd/utils/grad_layout_contract.h):
// (1) if variable.is_non_overlapping_and_dense(), the stashed grad's
// strides match variable.
// (2) else, stashed grad is rowmajor contiguous.
// If variable's grad does not exist (!variable_grad.defined())
// AccumulateGrad steals new_grad if it's stealable and obeys the contract
// already, otherwise it deep copies new_grad into an obedient clone.
//
// If variable's grad already exists (variable_grad.defined()), new_grad must
// be added to variable_grad. If we aren't setting up for double backward
// (!GradMode::is_enabled()), AccumulateGrad performs "variable_grad += new_grad"
// in-place, which keeps variable_grad's layout. We assume (hope) variable_grad
// was created obeying (1) or (2) at some point in the past.
//
// If we are setting up for double backward, AccumulateGrad updates the grad
// out-of-place via "variable_grad + new_grad." TensorIterator operator+ decides
// result's layout. Typically TensorIterator matches strides of the first arg,
// so we once again assume (hope) variable_grad was originally created obeying
// (1) or (2).
//
// AccumulateGrad does not enforce the contract with 100% certainty. Examples:
// - If a user manually permutes a param or its grad, then runs a fwd+bwd,
// variable_grad += new_grad keeps variable_grad's layout without rechecking
// the contract.
// - If TensorIterator changes its corner cases about operator+'s result
// (for example, giving more or less priority to channels_last inputs, see
// https://github.com/pytorch/pytorch/pull/37968) the result may not obey.
//
// Fortunately, if a given grad doesn't satisfy (1) or (2), the penalty is
// degraded performance in Reducer.cpp or optimizer kernels, not death by
// assert or silently bad numerics.
// variable: the variable whose grad we're accumulating.
// variable_grad: the current grad for the variable.
// new_grad: new grad we want to acummulate for the variable.
// num_expected_refs: the number of refs we expect to hold internally
// such that it is safe to avoid cloning the grad
// if use_count() of the grad is less than or equal
// to this value (in addition to post_hooks).
// update_grad: Function that is used to update grad for the variable.
// The argument to the function is a Tensor which
// is used to set a new value for the grad.
template <typename T>
static void accumulateGrad( // 这里会进行具体的累积梯度
const Variable& variable,
at::Tensor& variable_grad,
const at::Tensor& new_grad,
size_t num_expected_refs,
const T& update_grad) {
if (!variable_grad.defined()) {
if (!GradMode::is_enabled() &&
!new_grad.is_sparse() &&
new_grad.use_count() <= num_expected_refs &&
(new_grad.is_mkldnn() || utils::obeys_layout_contract(new_grad, variable))) {
// we aren't setting up for double-backward
// not sparse
// no other user-visible tensor references new_grad
// new_grad obeys the "Gradient Layout Contract", there has a special case,
// For MKLDNN tensor, which is a opaque tensor, assuming it obeys layout_contract.
// Under these conditions, we can steal new_grad without a deep copy.
update_grad(new_grad.detach());
} else if (
!GradMode::is_enabled() && new_grad.is_sparse() &&
new_grad._indices().is_contiguous() &&
new_grad._values().is_contiguous() &&
// Use count for indices and values should always be <=1 since the
// SparseTensor should be the only one holding a reference to these.
new_grad._indices().use_count() <= 1 &&
new_grad._values().use_count() <= 1 &&
new_grad.use_count() <= num_expected_refs) {
// Can't detach sparse tensor (since metadata changes are not allowed
// after detach), so just create a new one for the grad which is a
// shallow copy. We need a shallow copy so that modifying the original
// grad tensor doesn't modify the grad we accumulate.
// We only skip clone if indices and values themselves are contiguous
// for backward compatiblity reasons. Since without this optimization,
// earlier we would clone the entire SparseTensor which cloned indices
// and values.
// For details see https://github.com/pytorch/pytorch/issues/34375.
update_grad(at::_sparse_coo_tensor_unsafe(
new_grad._indices(),
new_grad._values(),
new_grad.sizes(),
new_grad.options()));
} else {
if (new_grad.is_sparse()) {
update_grad(new_grad.clone());
} else {
if (new_grad.is_mkldnn()) {
update_grad(new_grad.clone());
} else {
// Deep copies new_grad according to the "Gradient Layout Contract."
update_grad(utils::clone_obey_contract(new_grad, variable));
}
}
}
} else if (!GradMode::is_enabled()) {
// This case is not strictly necessary, but it makes the first-order only
// case slightly more efficient.
if (variable_grad.is_sparse() && !new_grad.is_sparse()) {
// If `variable_grad` is sparse and `new_grad` is not sparse, their
// sum is not sparse, and we must change the TensorImpl type of
// `variable_grad` for it to store the result. However, changing the
// TensorImpl type of a tensor requires changing the tensor itself, and
// thus in this case we have to change the grad tensor.
auto result = new_grad + variable_grad;
CHECK_RESULT(result, variable);
update_grad(std::move(result));
} else if (!at::inplaceIsVmapCompatible(variable_grad, new_grad)) {
// Ideally we'd perform an in-place operation to avoid changing
// the grad tensor. However, if that's impossible because the grads
// are vmap-incompatible (See NOTE: [vmap-incompatible in-place operations]),
// then we just add them out-of-place.
auto result = variable_grad + new_grad;
CHECK_RESULT(result, variable);
update_grad(std::move(result));
} else {
// In this case we can avoid changing the grad tensor. There are three
// scenarios when we'll hit this case:
//
// 1. `variable_grad` is sparse, and `new_grad` is sparse.
// 2. `variable_grad` is dense, and `new_grad` is sparse.
// 3. `variable_grad` is dense, and `new_grad` is dense.
// 4. `variable_grad` is mkldnn, and `new_grad` is mkldnn.
//
// In all of these four cases, `variable_grad += new_grad` is a
// valid operation which adds `new_grad` to `variable_grad` in
// place. `variable_grad` is thus still referring to the same tensor
// after the operation.
// Also DistributedDataParallel(DDP) package relies on grad being
// mutated in place for saving peak memory usage. DDP will still
// work correctly if it is mutated out of place here, but DDP will
// maintain one extra copy of grad tensors in buffer and thus
// increase peak memory usage.
variable_grad += new_grad;
CHECK_RESULT(variable_grad, variable);
// ^ We could enforce the contract more aggressively here by writing:
// if (variable_grad.is_sparse() || new_grad.is_sparse()) {
// variable_grad += new_grad;
// } else if (obeys_layout_contract(variable_grad, variable)) {
// variable_grad += new_grad;
// } else {
// result = at::empty_strided(variable.sizes(), variable.strides(),
// variable.options().memory_format(c10::nullopt));
// update_grad(at::native::add_out(result, variable_grad, new_grad, 1.0);
// }
// However, that accumulation is sometimes in place and sometimes not,
// which may break user code.
}
} else {
at::Tensor result;
if (variable_grad.is_sparse() && !new_grad.is_sparse()) {
// CPU backend throws an error on sparse + dense, so prefer dense + sparse here.
result = new_grad + variable_grad;
} else {
// Assumes operator+ result typically matches strides of first arg,
// and hopes variable_grad was originally created obeying layout contract.
result = variable_grad + new_grad;
}
CHECK_RESULT(result, variable);
update_grad(std::move(result));
// ^ We could enforce the contract more aggressively here by saying
// if (obeys_layout_contract(new_grad, variable)) {
// update_grad(new_grad + variable_grad);
// } else {
// update_grad(variable_grad + new_grad);
// }
// such that the stashed grad is likely to have the right strides if
// either variable_grad or new_grad already has the right strides.
// We could enforce the contract with certainty by saying
// auto result = variable_grad + new_grad (or vice versa), checking result's
// layout, and copying to an obedient clone if necessary before update_grad.
// The copy would require another gmem pass. We can't create empty result with
// the right layout then add_out into it with a single kernel, because GradMode
// is enabled in this branch, and add_out isn't differentiable.
// Maybe more trouble than it's worth.
}
}
Variable variable;
};
В частности, как показано на следующем рисунке, слева — это структура данных, справа — поток алгоритма, порядковый номер справа указывает, что выполнение выполняется сверху вниз, структура данных слева будет использоваться в алгоритме. процесс выполнения, а вызывающая связь между алгоритмом и структурой данных представлена горизонтальной стрелкой.
- Распределенный движок вызывает execute_graph_task_until_ready_queue_empty для выполнения определенной задачи GraphTask.
- Engine::evaluate_function вызовет ExecInfo в GraphTask.
- Затем будет осуществлен доступ к GradCaptureHook, вызову хука и вызову операторной функции хука autogradContext_->accumulateGrad.
- autogradContext_ выполнит аккумулятор Grad для работы с аккумулированием Grad_, хранящимся в хуке (DistAccumulateGradCaptureHook).
- AccumulateGrad::accumulateGrad выполнит последнюю операцию обновления градиента.
DATA STRUCTURE + ALGORITHM
|
+-----------------------------------------------+ |
| GraphTask | | DistEngine::execute_graph_task_until_ready_queue_empty
| | | + |
| unordered_map<Node*, ExecInfo> exec_info_ | | | |
| + | <----------+ |
| | | | |
+-----------------------------------------------+ | | 1
| | |
| | |
v | |
+---------------------+------------------+ | v
| ExecInfo | <-------------+ Engine::evaluate_function
| | | +
| < vector<Capture> > captures_ | | |
| + | | |
| | | | | 2
+----------------------------------------+ | |
| | v
| |
v | +--+ captured_grad = (*hook)(captured_grad)
+-------------------+--------------------+ | | +
| Capture | | | |
| | | | |
| vector< <GradCaptureHook> > hooks_ <--------------+ | 3
| + | | |
+----------------------------------------+ | v
| |
| | +--+ autogradContext_->accumulateGrad(
v | | accumulateGrad_-> variable, inputGrads[0], 3)
+-------------------+--------------------+ | | +
| DistAccumulateGradCaptureHook | | | |
| | | | |
| ContextPtr autogradContext_ <------------+ | 4
| | | | |
| AccumulateGrad accumulateGrad_ <------------+ v
| + | |
+----------------------------------------+ | +-+ new_grad = AccumulateGrad::callHooks(variable, grad)
| | | +
| | | |
v | | | 5
+-------------------+------+ | | v
| AccumulateGrad | | |
| | | | AccumulateGrad::accumulateGrad(
| Variable variable <------------------+------+ variable, old_grad, new_grad,)
| | |
+--------------------------+ +
Телефон такой:
0x05 дождаться завершения
Наконец, распределенный механизм вызовет функцию clearAndWaitForOutstandingRpcsAsync, чтобы дождаться завершения обработки.
c10::intrusive_ptr<c10::ivalue::Future> DistAutogradContext::
clearAndWaitForOutstandingRpcsAsync() {
std::unique_lock<std::mutex> lock(lock_);
auto outStandingRpcs = std::move(outStandingRpcs_);
lock.unlock();
struct State {
explicit State(int32_t count)
: future(
c10::make_intrusive<c10::ivalue::Future>(c10::NoneType::get())),
remaining(count) {}
c10::intrusive_ptr<c10::ivalue::Future> future;
std::atomic<int32_t> remaining;
std::atomic<bool> alreadySentError{false};
};
auto state = std::make_shared<State>(outStandingRpcs.size());
if (outStandingRpcs.empty()) {
state->future->markCompleted(c10::IValue());
} else {
for (auto& rpc : outStandingRpcs) {
rpc->addCallback([state](rpc::JitFuture& future) {
if (future.hasError()) {
// If there's an error, we want to setError() on the future,
// unless another error has already been sent - use a CAS to
// guard.
//
// Don't decrement num remaining here! (We don't need to, since
// memory handling is separate). If we simply don't decrement on
// errors, reaching 0 means that there were no errors - and hence,
// we can just markCompleted() without any other checking there.
bool expectedAlreadySent = false;
if (state->alreadySentError.compare_exchange_strong(
expectedAlreadySent, true)) {
state->future->setError(future.exception_ptr());
}
return;
}
if (--state->remaining == 0) {
state->future->markCompleted(c10::IValue());
}
});
}
}
return state->future;
}
Поддерживаю, разбор распределенного автограда завершен.Как упоминалось ранее, в распределенной обработке есть четыре основных алмаза.Мы ввели RPC,RRef и проанализировали распределенный движок.Со следующей статьи мы начнем анализировать остальные распределенные оптимизаторы ., эта серия может включать 4~6 статей.
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
ссылка 0xFF
Понимать распределенное обучение интерпретации исходного кода PyTorch?
py torch.Apache can.org/docs/1.7/59…
py torch.org/docs/master… py torch.org/docs/master…
Ууху. Моя школа 3С. Талант /пи факел/Пак Ючон…
PyTorch Распределенный Autograd Design
Getting started with Distributed RPC Framework
Implementing a Parameter Server using Distributed RPC Framework
Combining Distributed DataParallel with Distributed RPC Framework