[Анализ исходного кода] PyTorch распространяет Автоград (4) ---- как врезаться в движок

машинное обучение PyTorch

0x00 сводка

Выше мы видели ряд базовых классов, таких как AutogradMetadata, DistAutogradContainer и DistAutogradContext. Мы знаем, как доставляется распределенный автоград на основе RPC, как взаимодействовать между узлами и как узлы поддерживают эти сеансы. Эта статья продолжает анализ, основная цель — увидеть, как обратное распространение врезается в движок.

Другие статьи о распространении PyTorch:

[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор

[Анализ исходного кода] Как PyTorch использует GPU

[Анализ исходного кода] Распределенный PyTorch (2) ----- DataParallel (включен)

[Анализ исходного кода] Распределенный PyTorch (3) ----- DataParallel (ниже)

[Анализ исходного кода] Распределенный PyTorch (4) ------ Основная концепция распределенного приложения

[Анализ исходного кода] Распределенный PyTorch (5) ------ Обзор DistributedDataParallel и способы его использования

[Анализ исходного кода] Распределенный PyTorch (6) ---DistributedDataParallel -- инициализация и хранение

[Анализ исходного кода] Распределенный PyTorch (7) ----- Группа процессов DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (8) -------- Бумага DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (9) ----- Инициализация DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (10) ------ Редуктор статической архитектуры DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (11) ----- DistributedDataParallel для создания операций Reducer и Join

[Анализ исходного кода] Распределенный PyTorch (12) ----- Прямое распространение до DistributedDataParallel

[Анализ исходного кода] Распределенный PyTorch (13) ----- Обратное распространение DistributedDataParallel

[Анализ исходного кода] PyTorch, распространяемый Autograd (1) ---- дизайн

[Анализ исходного кода] PyTorch, распространяемый Autograd (2) ---- Фонд RPC

[Анализ исходного кода] PyTorch, распространяемый Autograd (3) ---- контекстно-зависимый

Для лучшего объяснения код в этой статье будет соответственно упрощен в соответствии с конкретной ситуацией.

0x01 Предыдущая память

Напомним содержание предыдущих статей.

во-первых, для распределенного автограда нам необходимо отслеживать все RPC во время прямого распространения, чтобы обеспечить правильное обратное распространение. Для этого при выполнении RPC ставимsendиrecvфункции привязаны к графу автограда.

  • ДолженsendФункция присоединена к исходному узлу RPC, а ее выходное ребро указывает на функцию автоградации входного тензора RPC. При обратном распространенииsendВход в функцию поступает от цели и является соответствующимrecvвывод функции.
  • ДолженrecvФункция присоединяется к принимающему целевому узлу RPC, и ее входные данные получаются от определенных операторов, которые выполняются на принимающем целевом RPC с использованием входных тензоров. При обратном распространенииrecvВыходной градиент функции будет отправлен через исходный узел и использован какsendввод метода.
  • Каждыйsend-recvприсваивается глобально уникальныйautograd_message_idчтобы однозначно идентифицироватьsend-recvправильно. Это полезно для поиска соответствующих функций на удаленных узлах во время обратного распространения.
  • Для RRef всякий раз, когда мы вызываемtorch.distributed.rpc.RRef.to_here(), мы все добавляем соответствующий тензор к задействованным тензорамsend-recvправильно.

Второй, в конкретном коде прямого распространения мы храним в контексте каждый распространяемый автоградsendиrecvфункция. Это гарантирует, что мы сохраним ссылку на соответствующий узел в графе autograd, чтобы поддерживать его в рабочем состоянии. Среди прочего, это также упрощает поиск соответствующихsendиrecvфункция.

снова, следующее является частью определения сообщения в torch/csrc/distributed/rpc/message.h:

// Messages with autograd info
FORWARD_AUTOGRAD_REQ = 0x0f | MessageTypeFlags::REQUEST_TYPE,
FORWARD_AUTOGRAD_RESP = 0x10 | MessageTypeFlags::RESPONSE_TYPE,

// Messages to propagate gradients on the backward pass.
BACKWARD_AUTOGRAD_REQ = 0x11 | MessageTypeFlags::REQUEST_TYPE,
BACKWARD_AUTOGRAD_RESP = 0x12 | MessageTypeFlags::RESPONSE_TYPE,

В предыдущей статье мы видели, как FORWARD_AUTOGRAD_REQ вызывается при прямом распространении.Предположим следующий код: rpc.rpc_sync("worker1", torch.add, args=(t1, t2)), последовательность вызова:

  • rpc_sync вызывает _invoke_rpc.
  • _invoke_rpc вызывает _invoke_rpc_builtin.
  • Затем вызовите pyRpcBuiltin, который, в свою очередь, вызывает sendMessageWithAutograd.
  • sendMessageWithAutograd внутренне создает сообщение FORWARD_AUTOGRAD_REQ и отправляет его с помощью RPC.

Пока у нас есть несколько вопросов по поводу всего процесса:

  • В начальной позиции графа обратного вычисления, как инициировать обратное распространение и как передать его следующему звену обратного распространения?
  • Во внутренней части обратного распространения, когда вызывается BACKWARD_AUTOGRAD_REQ? Когда вызывается операция recv? В контексте, где установлен recvAutogradFunctions_?
  • Как две вышеуказанные ссылки входят в распределенный движок автограда?

Далее мы разберем эти вопросы, а основное — как войти в движок dist.autograd.

0x02 Вычислительный график

Начнем с нескольких примеров из вычислительного графа.

2.1 Общий пример

Давайте сначала посмотрим на обычные расчеты, это местная версия официальной легенды dist.auto. Видно, что граф расчета состоит из AddBackward0, AccumulateGrad и MulBackward0.

t1 = torch.rand((3, 3), requires_grad=True)
t2 = torch.rand((3, 3), requires_grad=True)
t3 = t1 + t2
t4 = torch.rand((3, 3), requires_grad=True)
t5 = torch.mul(t3, t4)
next_functions = t5.grad_fn.next_functions

Конкретная переписка выглядит следующим образом:

2.2 Распределенный пример

Далее, давайте посмотрим на распределенный пример.Этот пример представляет собой код, примерно соответствующий легенде в официальном дизайне.Мы назвали torch.mul(t3, t4) как t5 и добавили потери.

def worker0():
    # On worker 0:

    # Setup the autograd context. Computations that take
    # part in the distributed backward pass must be within
    # the distributed autograd context manager.
    with dist_autograd.context() as context_id:
      t1 = torch.rand((3, 3), requires_grad=True)
      t2 = torch.rand((3, 3), requires_grad=True)

      # Perform some computation remotely.
      t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))

      # Perform some computation locally based on remote result.
      t4 = torch.rand((3, 3), requires_grad=True)
      t5 = torch.mul(t3, t4)

      # Compute some loss.
      loss = t5.sum()

      # Run the backward pass.
      dist_autograd.backward(context_id, [loss])

      # Retrieve the gradients from the context.
      dist_autograd.get_gradients(context_id)

      print(loss)

При распределении t3 запускается вне сайта.

  • t5 соответствует mul, а t5.grad_fn — это .
  • t3.grad_fn — это , то есть recv соответствует CppFunction.
  • потеря - тензор (5,5680, grad_fn =).
  • Остальные Нет.

Давайте снова покажем легенду проекта.Приведенный выше пример кода — рабочий процесс 0 в левой части рисунка ниже, а t3 фактически выполняется на рабочем потоке 1. Вы можете увидеть некоторые функции в распределенном контексте.

2.3 Распределенная версия аннотаций

Для лучшей иллюстрации мы печатаем некоторые журналы в виде комментариев.

def _verify_send(send_function):
    print(send_function.name())
    next_funcs = send_function.next_functions
    print(next_funcs[0][0].name())
    print(next_funcs[1][0].name())

def _verify_recv(recv_function):
    print(recv_function.name())
    next_funcs = recv_function.next_functions
    print(len(next_funcs))

def worker0():
    # On worker 0:

    # Setup the autograd context. Computations that take
    # part in the distributed backward pass must be within
    # the distributed autograd context manager.
    with dist_autograd.context() as context_id:
      t1 = torch.rand((3, 3), requires_grad=True)
      t2 = torch.rand((3, 3), requires_grad=True)

      # Perform some computation remotely.
      #t3 = rpc.rpc_sync("worker1", my_add, args=(t1, t2))
      t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))

      # Perform some computation locally based on remote result.
      t4 = torch.rand((3, 3), requires_grad=True)
      t5 = torch.mul(t3, t4)

      # Compute some loss.
      loss = t5.sum()

      print("--- send ---")
      ctx = dist_autograd._retrieve_context(context_id)
      send_functions = ctx._send_functions()
      _verify_send(list(send_functions.values())[0])

      print("--- loss ---")
      print(loss)
      mul_func = loss.grad_fn.next_functions[0][0]
      print(mul_func.name())
      next_funcs = mul_func.next_functions
      print(next_funcs[0][0].name())
      print(next_funcs[1][0].name())

      print("---- recv ----")
      recv_functions = ctx._recv_functions()
      _verify_recv(list(recv_functions.values())[0])

      # Run the backward pass.
      dist_autograd.backward(context_id, [loss])

      # Retrieve the gradients from the context.
      dist_autograd.get_gradients(context_id)

Результат печати:

--- send ---
torch::distributed::autograd::SendRpcBackward
torch::autograd::AccumulateGrad
torch::autograd::AccumulateGrad
        
--- loss ---
tensor(3.5197, grad_fn=<SumBackward0>)
MulBackward0
torch::distributed::autograd::RecvRpcBackward
torch::autograd::AccumulateGrad
        
---- recv ----
torch::distributed::autograd::RecvRpcBackward

После добавления оператора распределенной корреляции диаграмма выглядит следующим образом:

0x03 Обратное распространение

Далее мы увидим, как ввести движок dist autograd в сочетании с нашей легендой, то есть:

  • Как рабочий 0 активно инициирует обратное распространение, а затем входит в распределенный движок?
  • Как woker 0 инициирует внутренний запрос обратного распространения к worker 1?
  • Как рабочий процесс 1 пассивно принимает сообщения обратного распространения, а затем входит в распределенный механизм?

3.1 Инициировать обратное распространение

Давайте узнаем, как инициировать обратное распространение снизу вверх. Здесь тоже два:

  • Один активно инициируется, например, активный вызов обратного метода при потере работника 0 на приведенном выше рисунке.
  • Один неявно инициируется внутри, например, как t3 в рабочем потоке 0 на рисунке выше сообщает рабочему процессу 1 через recv, что вы должны начать обратное распространение.

3.1.1 Внешняя инициатива

3.1.1.1 Пример

Посмотрим, как активно вызывается обратно распределённого автограда сверху вниз, например вызов будет отображаться на примере.

def worker0():
    # On worker 0:

    with dist_autograd.context() as context_id:
      t1 = torch.rand((3, 3), requires_grad=True)
      t2 = torch.rand((3, 3), requires_grad=True)

      # Perform some computation remotely.
      t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))

      # Perform some computation locally based on remote result.
      t4 = torch.rand((3, 3), requires_grad=True)
      t5 = torch.mul(t3, t4)

      # Compute some loss.
      loss = t5.sum()

      # Run the backward pass.
      dist_autograd.backward(context_id, [loss]) // 这里会调用
3.1.1.2 Мир C++

существуетtorch/_C/_distributed_autograd.pyiСреди них мы можем увидеть следующие аннотации:

# This module is defined in torch/csrc/distributed/autograd/init.cpp

Итак, давайте взглянем на файл torch/csrc/distributed/autograd/init.cpp.

Часть кода опущена, здесь видно, что генерируется контекст, определяются back, get_gradients и т.д.

PyObject* dist_autograd_init(PyObject* _unused, PyObject* noargs) {
  auto autograd_module =
      THPObjectPtr(PyImport_ImportModule("torch.distributed.autograd"));
  auto torch_C_module = THPObjectPtr(PyImport_ImportModule("torch._C"));
  auto torch_C_m = py::handle(torch_C_module).cast<py::module>();
  auto m = torch_C_m.def_submodule("_distributed_autograd", "distributed autograd bindings");
  auto module = py::handle(m).cast<py::module>();

  auto distAutogradContext =
      shared_ptr_class_<DistAutogradContext>(module, "DistAutogradContext")
          .def(
              "_context_id",
              &DistAutogradContext::contextId,
              py::call_guard<py::gil_scoped_release>())
          .def(
              "_recv_functions",
              [](const DistAutogradContext& ctx) {
                std::map<int64_t, py::object> funcs;
                for (const auto& map_entry : ctx.recvFunctions()) {
                  funcs.emplace(
                      map_entry.first,
                      py::reinterpret_steal<py::object>(
                          torch::autograd::functionToPyObject(
                              map_entry.second)));
                }
                return funcs;
              })
          .def(
              "_send_functions",
              [](const ContextPtr& ctx) {
                std::map<int64_t, py::object> funcs;
                for (const auto& map_entry : ctx->sendFunctions()) {
                  funcs.emplace(
                      map_entry.first,
                      py::reinterpret_steal<py::object>(
                          torch::autograd::functionToPyObject(
                              map_entry.second)));
                }
                return funcs;
              })
          .def("_known_worker_ids", &DistAutogradContext::getKnownWorkerIds);

  module.def(
      "_new_context",
      []() -> const ContextPtr {
        return DistAutogradContainer::getInstance().newContext();
      },
      py::return_value_policy::reference);

  py::options options;
  options.disable_function_signatures();

  module.def(
      "backward",
      backward,
      py::arg("contextId"),
      py::arg("roots"),
      py::arg("retain_graph") = false,
      py::call_guard<py::gil_scoped_release>());

  module.def(
      "get_gradients",
      [](int64_t contextId) -> py::dict {
        const auto& autogradContext =
            DistAutogradContainer::getInstance().retrieveContext(contextId);
        return torch::jit::toPyObject(IValue(autogradContext->getGradients()));
      },
      py::arg("context_id"));

  Py_RETURN_TRUE;
}
} // namespace

Конкретное обратное определение находится в torch/csrc/distributed/autograd/autograd.cpp.

void backward(
    int64_t context_id,
    const variable_list& roots,
    bool retain_graph) {
  RECORD_FUNCTION(
      kDistAutogradBackwardProfilingKey, std::vector<c10::IValue>());
  try {
    DistEngine::getInstance().execute(context_id, roots, retain_graph);
  } catch (std::exception& e) {
    // FIXME: crashes if exception type is not RuntimeError
    throw std::runtime_error(e.what());
  }
}

Как видите, DistEngine::getInstance().execute(context_id,roots,retain_graph) в конечном итоге будет вызываться для завершения обратного распространения. Это входит в двигатель.

3.1.2 Внутреннее неявное инициирование

Поскольку он инициируется неявно, код относительно скрыт.На этот раз мы используем восходящий подход для удаления кокона. Мы знаем, что если между узлами требуется обратное распространение, будет отправлено BACKWARD_AUTOGRAD_REQ, поэтому начинаем поиск с BACKWARD_AUTOGRAD_REQ.

3.1.2.1 BACKWARD_AUTOGRAD_REQ

В torch/csrc/distributed/autograd/rpc_messages/propagate_gradients_req.cpp PropagateGradientsReq::toMessageImpl вызовет BACKWARD_AUTOGRAD_REQ.

Message PropagateGradientsReq::toMessageImpl() && {
  std::vector<at::IValue> ivalues;
  // Add all the grad tensors.
  for (const auto& grad : grads_) {
    ivalues.emplace_back(grad);
  }

  // Now add autograd metadata.
  ivalues.emplace_back(autogradMetadata_.autogradContextId);
  ivalues.emplace_back(autogradMetadata_.autogradMessageId);

  // Add retain graph.
  ivalues.emplace_back(retainGraph_);

  // Now pickle using JIT pickler.
  std::vector<torch::Tensor> tensorTable;
  std::vector<char> payload =
      jit::pickle(c10::ivalue::Tuple::create(std::move(ivalues)), &tensorTable);

  return Message(
      std::move(payload),
      std::move(tensorTable),
      MessageType::BACKWARD_AUTOGRAD_REQ); // 这里会用到
}
3.1.2.2 PropagateGradientsReq

Продолжайте выяснять, кто отправил BACKWARD_AUTOGRAD_REQ, кто вызвал toMessageImpl? Оказалось, что PropagateGradientsReq был встроен в torch/csrc/distributed/autograd/functions/recvrpc_backward.cpp, а toMessage использовался для построения сообщения. То есть вызов RecvRpcBackward отправляет BACKWARD_AUTOGRAD_REQ.

variable_list RecvRpcBackward::apply(variable_list&& grads) { // 调用Node
  std::vector<Variable> outputGrads;
  for (size_t i = 0; i < grads.size(); i++) {
    const auto& grad = grads[i];
    if (grad.defined()) {
      outputGrads.emplace_back(grad);
    } else {
      // Put in zeros for a tensor with no grad.
      outputGrads.emplace_back(input_metadata(i).zeros_like());
    }
  }

  auto sharedContext = autogradContext_.lock();
  // Send the gradients over the wire and record the future in the autograd
  // context.
  PropagateGradientsReq gradCall( // 这里构建了 PropagateGradientsReq
      autogradMetadata_,
      outputGrads,
      sharedContext->retrieveGraphTask()->keep_graph_);

  // Send the gradients over to the appropriate node.
  auto rpcAgent = rpc::RpcAgent::getCurrentRpcAgent();
  auto jitFuture = rpcAgent->send( // 发送出去,就是给后向传播过程的下一个节点
      rpcAgent->getWorkerInfo(fromWorkerId_),
      std::move(gradCall).toMessage(), // 这里调用了PropagateGradientsReq::toMessageImpl
      rpc::kUnsetRpcTimeout,
      deviceMap_);

  // Record the future in the context.
  sharedContext->addOutstandingRpc(jitFuture);

  // 'recv' function sends the gradients over the wire using RPC, it doesn't
  // need to return anything for any downstream autograd function.
  return variable_list();
}

Итак, мы знаем, что при выполнении RecvRpcBackward BACKWARD_AUTOGRAD_REQ будет отправлен на следующий узел.Где именно вызывается RecvRpcBackward? Мы представим его в следующем DistEngine.

Подробности следующие:Соответственно, t3 воркера 0 отправляет сообщение BACKWARD_AUTOGRAD_REQ воркеру 1..

                                                                +
                                                       worker 0 | worker 1
                                                                |
                                                                |
 RecvRpcBackward                         PropagateGradientsReq  |
       +                                          +             |
       |                                          |             |
       |                                          |             |
       |                                          |             |
       v                                          |             |
                                                  |             |
     apply()                                      |             |
       +                                          |             |
       |                                          v             |
       |                                                        |
       | +------------------------------>  toMessageImpl        |
       |                                          +             |
       |                                          |             |
       |   Message(BACKWARD_AUTOGRAD_REQ)         |             |
       | <----------------------------------------+             |
       |                                                        |
       |                                                        |
       v                                                        |
                                                                |
rpcAgent+>send(Message)  +-------------------------------------------->
       +                             BACKWARD_AUTOGRAD_REQ      |
       |                                                        |
       |                                                        |
       v                                                        |
                                                                +

Соответствующая примерная диаграмма:

3.2 Принять обратное распространение

Давайте посмотрим, как получатель обрабатывает обратное распространение. Давайте снова вернемся к рабочему процессу 1, как узел-отправитель на графе принимает сообщения обратного распространения.

3.2.1 Получение сообщения

При создании TensorPipeAgent настройте RequestCallbackImpl как функцию обратного вызова. Это унифицированная функция ответа агента. Когда мы упоминали логику получения прокси ранее, она войдет в функцию RequestCallbackNoPython::processRpc. Видно, что есть логика обработки для BACKWARD_AUTOGRAD_REQ.

Это нормальный поток RPC.

void RequestCallbackNoPython::processRpc(
    RpcCommandBase& rpc,
    const MessageType& messageType,
    const int64_t messageId,
    const c10::intrusive_ptr<JitFuture>& responseFuture,
    std::shared_ptr<LazyStreamContext> ctx) const {

  switch (messageType) {

    case MessageType::BACKWARD_AUTOGRAD_REQ: { 
      processBackwardAutogradReq(rpc, messageId, responseFuture); // 这里调用
      return;
    };

3.2.2 Обработка сообщений

В процессеBackwardAutogradReq будет:

  • Получите DistAutogradContainer.
  • Получите контекст, который был ранее установлен в процессе прямого распространения Как видно из предыдущего раздела, на этом рисунке каждое распространение autograd в worker 0 и worker 1 использует один и тот же идентификатор контекста контекста.
  • Получите соответствующий SendRpcBackward из контекста через идентификатор контекста отправителя. Здесь мы видим, как используется контекст.
  • С sendFunction в качестве параметра для обработки ядра вызывается executeSendFunctionAsync.
void RequestCallbackNoPython::processBackwardAutogradReq(
    RpcCommandBase& rpc,
    const int64_t messageId,
    const c10::intrusive_ptr<JitFuture>& responseFuture) const {
  auto& gradientsCall = static_cast<PropagateGradientsReq&>(rpc);
  const auto& autogradMetadata = gradientsCall.getAutogradMetadata();

  // Retrieve the appropriate autograd context.
  auto autogradContext = DistAutogradContainer::getInstance().retrieveContext(
      autogradMetadata.autogradContextId); // 得到发送者的context id

  // Lookup the appropriate 'send' function to enqueue.
  std::shared_ptr<SendRpcBackward> sendFunction = // 依据发送者context id和消息id得到sendFunction
      autogradContext->retrieveSendFunction(autogradMetadata.autogradMessageId);

  // Attach the gradients to the send function.
  sendFunction->setGrads(gradientsCall.getGrads()); // 设置梯度

  // Now execute the autograd graph using the "distributed engine."
  auto execFuture = DistEngine::getInstance().executeSendFunctionAsync( // 调用引擎
      autogradContext, sendFunction, gradientsCall.retainGraph());

  // Our response is satisfied when the rpcs come back.
  execFuture->addCallback([responseFuture, messageId](JitFuture& execFuture) {
    if (!execFuture.hasError()) {
      Message m = std::move(PropagateGradientsResp()).toMessage();
      m.setId(messageId);
      responseFuture->markCompleted(
          IValue(c10::make_intrusive<Message>(std::move(m))));
    } else {
      responseFuture->setError(execFuture.exception_ptr());
    }
  });
}

существуетworker 1DistEngine::executeSendFunctionAsync выполнит отбрасывание и обработку и, наконец, отправит BACKWARD_AUTOGRAD_REQ в нисходящий поток своего обратного распространения, поэтому мы продолжаем изменять и расширять образец графа, добавляя BACKWARD_AUTOGRAD_REQ.

3.3 Резюме

Мы видим, что есть два пути к движку dist autograd для запуска обратного распространения:

  • Во-первых, пример кода явно и активно вызывает обратный вызов, а затем вызывает DistEngine::getInstance().execute, который является рабочим 0.
  • Один из них заключается в пассивном вызове DistEngine::getInstance().executeSendFunctionAsync, который является рабочим процессом 1 (конечно, отправка рабочего процесса 0 также соответствует пассивному вызову).

Теперь поиск источника обратного распространения как сверху вниз, так и снизу вверх сводится к DistEngine, поэтому мы представим DistEngine в следующей статье.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

[Анализ исходного кода] Как Pytorch реализует обратное распространение (3) ---- Динамическая логика движка

[Анализ исходного кода] Как Pytorch реализует обратное распространение (2) ---- Статическая структура движка

[Анализ исходного кода] Как PyTorch реализует обратное распространение (4) ---- конкретный алгоритм

[Анализ исходного кода] Как Pytorch реализует обратное распространение (1) ---- вызов движка