0x00 сводка
Выше мы видели ряд базовых классов, таких как AutogradMetadata, DistAutogradContainer и DistAutogradContext. Мы знаем, как доставляется распределенный автоград на основе RPC, как взаимодействовать между узлами и как узлы поддерживают эти сеансы. Эта статья продолжает анализ, основная цель — увидеть, как обратное распространение врезается в движок.
Другие статьи о распространении PyTorch:
[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор
[Анализ исходного кода] Как PyTorch использует GPU
[Анализ исходного кода] Распределенный PyTorch (2) ----- DataParallel (включен)
[Анализ исходного кода] Распределенный PyTorch (3) ----- DataParallel (ниже)
[Анализ исходного кода] Распределенный PyTorch (7) ----- Группа процессов DistributedDataParallel
[Анализ исходного кода] Распределенный PyTorch (8) -------- Бумага DistributedDataParallel
[Анализ исходного кода] Распределенный PyTorch (9) ----- Инициализация DistributedDataParallel
[Анализ исходного кода] PyTorch, распространяемый Autograd (1) ---- дизайн
[Анализ исходного кода] PyTorch, распространяемый Autograd (2) ---- Фонд RPC
[Анализ исходного кода] PyTorch, распространяемый Autograd (3) ---- контекстно-зависимый
Для лучшего объяснения код в этой статье будет соответственно упрощен в соответствии с конкретной ситуацией.
0x01 Предыдущая память
Напомним содержание предыдущих статей.
во-первых, для распределенного автограда нам необходимо отслеживать все RPC во время прямого распространения, чтобы обеспечить правильное обратное распространение. Для этого при выполнении RPC ставимsend
иrecv
функции привязаны к графу автограда.
- Должен
send
Функция присоединена к исходному узлу RPC, а ее выходное ребро указывает на функцию автоградации входного тензора RPC. При обратном распространенииsend
Вход в функцию поступает от цели и является соответствующимrecv
вывод функции. - Должен
recv
Функция присоединяется к принимающему целевому узлу RPC, и ее входные данные получаются от определенных операторов, которые выполняются на принимающем целевом RPC с использованием входных тензоров. При обратном распространенииrecv
Выходной градиент функции будет отправлен через исходный узел и использован какsend
ввод метода. - Каждый
send-recv
присваивается глобально уникальныйautograd_message_id
чтобы однозначно идентифицироватьsend-recv
правильно. Это полезно для поиска соответствующих функций на удаленных узлах во время обратного распространения. - Для RRef всякий раз, когда мы вызываем
torch.distributed.rpc.RRef.to_here()
, мы все добавляем соответствующий тензор к задействованным тензорамsend-recv
правильно.
Второй, в конкретном коде прямого распространения мы храним в контексте каждый распространяемый автоградsend
иrecv
функция. Это гарантирует, что мы сохраним ссылку на соответствующий узел в графе autograd, чтобы поддерживать его в рабочем состоянии. Среди прочего, это также упрощает поиск соответствующихsend
иrecv
функция.
снова, следующее является частью определения сообщения в torch/csrc/distributed/rpc/message.h:
// Messages with autograd info
FORWARD_AUTOGRAD_REQ = 0x0f | MessageTypeFlags::REQUEST_TYPE,
FORWARD_AUTOGRAD_RESP = 0x10 | MessageTypeFlags::RESPONSE_TYPE,
// Messages to propagate gradients on the backward pass.
BACKWARD_AUTOGRAD_REQ = 0x11 | MessageTypeFlags::REQUEST_TYPE,
BACKWARD_AUTOGRAD_RESP = 0x12 | MessageTypeFlags::RESPONSE_TYPE,
В предыдущей статье мы видели, как FORWARD_AUTOGRAD_REQ вызывается при прямом распространении.Предположим следующий код: rpc.rpc_sync("worker1", torch.add, args=(t1, t2)), последовательность вызова:
- rpc_sync вызывает _invoke_rpc.
- _invoke_rpc вызывает _invoke_rpc_builtin.
- Затем вызовите pyRpcBuiltin, который, в свою очередь, вызывает sendMessageWithAutograd.
- sendMessageWithAutograd внутренне создает сообщение FORWARD_AUTOGRAD_REQ и отправляет его с помощью RPC.
Пока у нас есть несколько вопросов по поводу всего процесса:
- В начальной позиции графа обратного вычисления, как инициировать обратное распространение и как передать его следующему звену обратного распространения?
- Во внутренней части обратного распространения, когда вызывается BACKWARD_AUTOGRAD_REQ? Когда вызывается операция recv? В контексте, где установлен recvAutogradFunctions_?
- Как две вышеуказанные ссылки входят в распределенный движок автограда?
Далее мы разберем эти вопросы, а основное — как войти в движок dist.autograd.
0x02 Вычислительный график
Начнем с нескольких примеров из вычислительного графа.
2.1 Общий пример
Давайте сначала посмотрим на обычные расчеты, это местная версия официальной легенды dist.auto. Видно, что граф расчета состоит из AddBackward0, AccumulateGrad и MulBackward0.
t1 = torch.rand((3, 3), requires_grad=True)
t2 = torch.rand((3, 3), requires_grad=True)
t3 = t1 + t2
t4 = torch.rand((3, 3), requires_grad=True)
t5 = torch.mul(t3, t4)
next_functions = t5.grad_fn.next_functions
Конкретная переписка выглядит следующим образом:
2.2 Распределенный пример
Далее, давайте посмотрим на распределенный пример.Этот пример представляет собой код, примерно соответствующий легенде в официальном дизайне.Мы назвали torch.mul(t3, t4) как t5 и добавили потери.
def worker0():
# On worker 0:
# Setup the autograd context. Computations that take
# part in the distributed backward pass must be within
# the distributed autograd context manager.
with dist_autograd.context() as context_id:
t1 = torch.rand((3, 3), requires_grad=True)
t2 = torch.rand((3, 3), requires_grad=True)
# Perform some computation remotely.
t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))
# Perform some computation locally based on remote result.
t4 = torch.rand((3, 3), requires_grad=True)
t5 = torch.mul(t3, t4)
# Compute some loss.
loss = t5.sum()
# Run the backward pass.
dist_autograd.backward(context_id, [loss])
# Retrieve the gradients from the context.
dist_autograd.get_gradients(context_id)
print(loss)
При распределении t3 запускается вне сайта.
- t5 соответствует mul, а t5.grad_fn — это .
- t3.grad_fn — это , то есть recv соответствует CppFunction.
- потеря - тензор (5,5680, grad_fn =).
- Остальные Нет.
Давайте снова покажем легенду проекта.Приведенный выше пример кода — рабочий процесс 0 в левой части рисунка ниже, а t3 фактически выполняется на рабочем потоке 1. Вы можете увидеть некоторые функции в распределенном контексте.
2.3 Распределенная версия аннотаций
Для лучшей иллюстрации мы печатаем некоторые журналы в виде комментариев.
def _verify_send(send_function):
print(send_function.name())
next_funcs = send_function.next_functions
print(next_funcs[0][0].name())
print(next_funcs[1][0].name())
def _verify_recv(recv_function):
print(recv_function.name())
next_funcs = recv_function.next_functions
print(len(next_funcs))
def worker0():
# On worker 0:
# Setup the autograd context. Computations that take
# part in the distributed backward pass must be within
# the distributed autograd context manager.
with dist_autograd.context() as context_id:
t1 = torch.rand((3, 3), requires_grad=True)
t2 = torch.rand((3, 3), requires_grad=True)
# Perform some computation remotely.
#t3 = rpc.rpc_sync("worker1", my_add, args=(t1, t2))
t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))
# Perform some computation locally based on remote result.
t4 = torch.rand((3, 3), requires_grad=True)
t5 = torch.mul(t3, t4)
# Compute some loss.
loss = t5.sum()
print("--- send ---")
ctx = dist_autograd._retrieve_context(context_id)
send_functions = ctx._send_functions()
_verify_send(list(send_functions.values())[0])
print("--- loss ---")
print(loss)
mul_func = loss.grad_fn.next_functions[0][0]
print(mul_func.name())
next_funcs = mul_func.next_functions
print(next_funcs[0][0].name())
print(next_funcs[1][0].name())
print("---- recv ----")
recv_functions = ctx._recv_functions()
_verify_recv(list(recv_functions.values())[0])
# Run the backward pass.
dist_autograd.backward(context_id, [loss])
# Retrieve the gradients from the context.
dist_autograd.get_gradients(context_id)
Результат печати:
--- send ---
torch::distributed::autograd::SendRpcBackward
torch::autograd::AccumulateGrad
torch::autograd::AccumulateGrad
--- loss ---
tensor(3.5197, grad_fn=<SumBackward0>)
MulBackward0
torch::distributed::autograd::RecvRpcBackward
torch::autograd::AccumulateGrad
---- recv ----
torch::distributed::autograd::RecvRpcBackward
После добавления оператора распределенной корреляции диаграмма выглядит следующим образом:
0x03 Обратное распространение
Далее мы увидим, как ввести движок dist autograd в сочетании с нашей легендой, то есть:
- Как рабочий 0 активно инициирует обратное распространение, а затем входит в распределенный движок?
- Как woker 0 инициирует внутренний запрос обратного распространения к worker 1?
- Как рабочий процесс 1 пассивно принимает сообщения обратного распространения, а затем входит в распределенный механизм?
3.1 Инициировать обратное распространение
Давайте узнаем, как инициировать обратное распространение снизу вверх. Здесь тоже два:
- Один активно инициируется, например, активный вызов обратного метода при потере работника 0 на приведенном выше рисунке.
- Один неявно инициируется внутри, например, как t3 в рабочем потоке 0 на рисунке выше сообщает рабочему процессу 1 через recv, что вы должны начать обратное распространение.
3.1.1 Внешняя инициатива
3.1.1.1 Пример
Посмотрим, как активно вызывается обратно распределённого автограда сверху вниз, например вызов будет отображаться на примере.
def worker0():
# On worker 0:
with dist_autograd.context() as context_id:
t1 = torch.rand((3, 3), requires_grad=True)
t2 = torch.rand((3, 3), requires_grad=True)
# Perform some computation remotely.
t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))
# Perform some computation locally based on remote result.
t4 = torch.rand((3, 3), requires_grad=True)
t5 = torch.mul(t3, t4)
# Compute some loss.
loss = t5.sum()
# Run the backward pass.
dist_autograd.backward(context_id, [loss]) // 这里会调用
3.1.1.2 Мир C++
существуетtorch/_C/_distributed_autograd.pyi
Среди них мы можем увидеть следующие аннотации:
# This module is defined in torch/csrc/distributed/autograd/init.cpp
Итак, давайте взглянем на файл torch/csrc/distributed/autograd/init.cpp.
Часть кода опущена, здесь видно, что генерируется контекст, определяются back, get_gradients и т.д.
PyObject* dist_autograd_init(PyObject* _unused, PyObject* noargs) {
auto autograd_module =
THPObjectPtr(PyImport_ImportModule("torch.distributed.autograd"));
auto torch_C_module = THPObjectPtr(PyImport_ImportModule("torch._C"));
auto torch_C_m = py::handle(torch_C_module).cast<py::module>();
auto m = torch_C_m.def_submodule("_distributed_autograd", "distributed autograd bindings");
auto module = py::handle(m).cast<py::module>();
auto distAutogradContext =
shared_ptr_class_<DistAutogradContext>(module, "DistAutogradContext")
.def(
"_context_id",
&DistAutogradContext::contextId,
py::call_guard<py::gil_scoped_release>())
.def(
"_recv_functions",
[](const DistAutogradContext& ctx) {
std::map<int64_t, py::object> funcs;
for (const auto& map_entry : ctx.recvFunctions()) {
funcs.emplace(
map_entry.first,
py::reinterpret_steal<py::object>(
torch::autograd::functionToPyObject(
map_entry.second)));
}
return funcs;
})
.def(
"_send_functions",
[](const ContextPtr& ctx) {
std::map<int64_t, py::object> funcs;
for (const auto& map_entry : ctx->sendFunctions()) {
funcs.emplace(
map_entry.first,
py::reinterpret_steal<py::object>(
torch::autograd::functionToPyObject(
map_entry.second)));
}
return funcs;
})
.def("_known_worker_ids", &DistAutogradContext::getKnownWorkerIds);
module.def(
"_new_context",
[]() -> const ContextPtr {
return DistAutogradContainer::getInstance().newContext();
},
py::return_value_policy::reference);
py::options options;
options.disable_function_signatures();
module.def(
"backward",
backward,
py::arg("contextId"),
py::arg("roots"),
py::arg("retain_graph") = false,
py::call_guard<py::gil_scoped_release>());
module.def(
"get_gradients",
[](int64_t contextId) -> py::dict {
const auto& autogradContext =
DistAutogradContainer::getInstance().retrieveContext(contextId);
return torch::jit::toPyObject(IValue(autogradContext->getGradients()));
},
py::arg("context_id"));
Py_RETURN_TRUE;
}
} // namespace
Конкретное обратное определение находится в torch/csrc/distributed/autograd/autograd.cpp.
void backward(
int64_t context_id,
const variable_list& roots,
bool retain_graph) {
RECORD_FUNCTION(
kDistAutogradBackwardProfilingKey, std::vector<c10::IValue>());
try {
DistEngine::getInstance().execute(context_id, roots, retain_graph);
} catch (std::exception& e) {
// FIXME: crashes if exception type is not RuntimeError
throw std::runtime_error(e.what());
}
}
Как видите, DistEngine::getInstance().execute(context_id,roots,retain_graph) в конечном итоге будет вызываться для завершения обратного распространения. Это входит в двигатель.
3.1.2 Внутреннее неявное инициирование
Поскольку он инициируется неявно, код относительно скрыт.На этот раз мы используем восходящий подход для удаления кокона. Мы знаем, что если между узлами требуется обратное распространение, будет отправлено BACKWARD_AUTOGRAD_REQ, поэтому начинаем поиск с BACKWARD_AUTOGRAD_REQ.
3.1.2.1 BACKWARD_AUTOGRAD_REQ
В torch/csrc/distributed/autograd/rpc_messages/propagate_gradients_req.cpp PropagateGradientsReq::toMessageImpl вызовет BACKWARD_AUTOGRAD_REQ.
Message PropagateGradientsReq::toMessageImpl() && {
std::vector<at::IValue> ivalues;
// Add all the grad tensors.
for (const auto& grad : grads_) {
ivalues.emplace_back(grad);
}
// Now add autograd metadata.
ivalues.emplace_back(autogradMetadata_.autogradContextId);
ivalues.emplace_back(autogradMetadata_.autogradMessageId);
// Add retain graph.
ivalues.emplace_back(retainGraph_);
// Now pickle using JIT pickler.
std::vector<torch::Tensor> tensorTable;
std::vector<char> payload =
jit::pickle(c10::ivalue::Tuple::create(std::move(ivalues)), &tensorTable);
return Message(
std::move(payload),
std::move(tensorTable),
MessageType::BACKWARD_AUTOGRAD_REQ); // 这里会用到
}
3.1.2.2 PropagateGradientsReq
Продолжайте выяснять, кто отправил BACKWARD_AUTOGRAD_REQ, кто вызвал toMessageImpl? Оказалось, что PropagateGradientsReq был встроен в torch/csrc/distributed/autograd/functions/recvrpc_backward.cpp, а toMessage использовался для построения сообщения. То есть вызов RecvRpcBackward отправляет BACKWARD_AUTOGRAD_REQ.
variable_list RecvRpcBackward::apply(variable_list&& grads) { // 调用Node
std::vector<Variable> outputGrads;
for (size_t i = 0; i < grads.size(); i++) {
const auto& grad = grads[i];
if (grad.defined()) {
outputGrads.emplace_back(grad);
} else {
// Put in zeros for a tensor with no grad.
outputGrads.emplace_back(input_metadata(i).zeros_like());
}
}
auto sharedContext = autogradContext_.lock();
// Send the gradients over the wire and record the future in the autograd
// context.
PropagateGradientsReq gradCall( // 这里构建了 PropagateGradientsReq
autogradMetadata_,
outputGrads,
sharedContext->retrieveGraphTask()->keep_graph_);
// Send the gradients over to the appropriate node.
auto rpcAgent = rpc::RpcAgent::getCurrentRpcAgent();
auto jitFuture = rpcAgent->send( // 发送出去,就是给后向传播过程的下一个节点
rpcAgent->getWorkerInfo(fromWorkerId_),
std::move(gradCall).toMessage(), // 这里调用了PropagateGradientsReq::toMessageImpl
rpc::kUnsetRpcTimeout,
deviceMap_);
// Record the future in the context.
sharedContext->addOutstandingRpc(jitFuture);
// 'recv' function sends the gradients over the wire using RPC, it doesn't
// need to return anything for any downstream autograd function.
return variable_list();
}
Итак, мы знаем, что при выполнении RecvRpcBackward BACKWARD_AUTOGRAD_REQ будет отправлен на следующий узел.Где именно вызывается RecvRpcBackward? Мы представим его в следующем DistEngine.
Подробности следующие:Соответственно, t3 воркера 0 отправляет сообщение BACKWARD_AUTOGRAD_REQ воркеру 1..
+
worker 0 | worker 1
|
|
RecvRpcBackward PropagateGradientsReq |
+ + |
| | |
| | |
| | |
v | |
| |
apply() | |
+ | |
| v |
| |
| +------------------------------> toMessageImpl |
| + |
| | |
| Message(BACKWARD_AUTOGRAD_REQ) | |
| <----------------------------------------+ |
| |
| |
v |
|
rpcAgent+>send(Message) +-------------------------------------------->
+ BACKWARD_AUTOGRAD_REQ |
| |
| |
v |
+
Соответствующая примерная диаграмма:
3.2 Принять обратное распространение
Давайте посмотрим, как получатель обрабатывает обратное распространение. Давайте снова вернемся к рабочему процессу 1, как узел-отправитель на графе принимает сообщения обратного распространения.
3.2.1 Получение сообщения
При создании TensorPipeAgent настройте RequestCallbackImpl как функцию обратного вызова. Это унифицированная функция ответа агента. Когда мы упоминали логику получения прокси ранее, она войдет в функцию RequestCallbackNoPython::processRpc. Видно, что есть логика обработки для BACKWARD_AUTOGRAD_REQ.
Это нормальный поток RPC.
void RequestCallbackNoPython::processRpc(
RpcCommandBase& rpc,
const MessageType& messageType,
const int64_t messageId,
const c10::intrusive_ptr<JitFuture>& responseFuture,
std::shared_ptr<LazyStreamContext> ctx) const {
switch (messageType) {
case MessageType::BACKWARD_AUTOGRAD_REQ: {
processBackwardAutogradReq(rpc, messageId, responseFuture); // 这里调用
return;
};
3.2.2 Обработка сообщений
В процессеBackwardAutogradReq будет:
- Получите DistAutogradContainer.
- Получите контекст, который был ранее установлен в процессе прямого распространения Как видно из предыдущего раздела, на этом рисунке каждое распространение autograd в worker 0 и worker 1 использует один и тот же идентификатор контекста контекста.
- Получите соответствующий SendRpcBackward из контекста через идентификатор контекста отправителя. Здесь мы видим, как используется контекст.
- С sendFunction в качестве параметра для обработки ядра вызывается executeSendFunctionAsync.
void RequestCallbackNoPython::processBackwardAutogradReq(
RpcCommandBase& rpc,
const int64_t messageId,
const c10::intrusive_ptr<JitFuture>& responseFuture) const {
auto& gradientsCall = static_cast<PropagateGradientsReq&>(rpc);
const auto& autogradMetadata = gradientsCall.getAutogradMetadata();
// Retrieve the appropriate autograd context.
auto autogradContext = DistAutogradContainer::getInstance().retrieveContext(
autogradMetadata.autogradContextId); // 得到发送者的context id
// Lookup the appropriate 'send' function to enqueue.
std::shared_ptr<SendRpcBackward> sendFunction = // 依据发送者context id和消息id得到sendFunction
autogradContext->retrieveSendFunction(autogradMetadata.autogradMessageId);
// Attach the gradients to the send function.
sendFunction->setGrads(gradientsCall.getGrads()); // 设置梯度
// Now execute the autograd graph using the "distributed engine."
auto execFuture = DistEngine::getInstance().executeSendFunctionAsync( // 调用引擎
autogradContext, sendFunction, gradientsCall.retainGraph());
// Our response is satisfied when the rpcs come back.
execFuture->addCallback([responseFuture, messageId](JitFuture& execFuture) {
if (!execFuture.hasError()) {
Message m = std::move(PropagateGradientsResp()).toMessage();
m.setId(messageId);
responseFuture->markCompleted(
IValue(c10::make_intrusive<Message>(std::move(m))));
} else {
responseFuture->setError(execFuture.exception_ptr());
}
});
}
существуетworker 1DistEngine::executeSendFunctionAsync выполнит отбрасывание и обработку и, наконец, отправит BACKWARD_AUTOGRAD_REQ в нисходящий поток своего обратного распространения, поэтому мы продолжаем изменять и расширять образец графа, добавляя BACKWARD_AUTOGRAD_REQ.
3.3 Резюме
Мы видим, что есть два пути к движку dist autograd для запуска обратного распространения:
- Во-первых, пример кода явно и активно вызывает обратный вызов, а затем вызывает DistEngine::getInstance().execute, который является рабочим 0.
- Один из них заключается в пассивном вызове DistEngine::getInstance().executeSendFunctionAsync, который является рабочим процессом 1 (конечно, отправка рабочего процесса 0 также соответствует пассивному вызову).
Теперь поиск источника обратного распространения как сверху вниз, так и снизу вверх сводится к DistEngine, поэтому мы представим DistEngine в следующей статье.
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
ссылка 0xFF
[Анализ исходного кода] Как PyTorch реализует обратное распространение (4) ---- конкретный алгоритм
[Анализ исходного кода] Как Pytorch реализует обратное распространение (1) ---- вызов движка