Анализ исходного кода хоровода (1)

iOS распределенный

HorovodРаспределенная платформа обучения с открытым исходным кодом для Uber, поддерживающая основные платформы машинного обучения (Tensorflow, PyTorch и MxNet). Эта статья в основном основана на версииv0.21.1Представляет основную реализацию Horovod и ее интеграцию с различными платформами.

Рабочий процесс Horovod относительно прост, есть очередь сообщений для полученияAllReduce, AllGatherа такжеBroadcastЭти триopзапрос, существует фоновый поток, который через равные промежутки времени будет опрашивать очередь сообщений и получать пакетopПосле этого будетopсерединаtensorВыполняется слияние, а затем выполняются соответствующие операции. еслиtensorв видеопамяти, то он будет использоватьNCCLбиблиотечное исполнение. тогда как если он находится в памяти, он будет использоватьMPIилиGlooвоплощать в жизнь.

Основной код Horovod находится вhorovod/commonв каталоге.operations.ccФайл эквивалентен записи Хоровода, он содержитBackgroundThreadLoop,RunLoopOnceи другие важные функции. Глядя на эти функции, вы можете получить представление.

Сначала оцените функциюRunLoopOnce, здесь опущен некоторый оптимизированный код, например использование кеша ответов, автонастройка и т.д.:

bool RunLoopOnce(HorovodGlobalState& state) {
  // 检查从上一个cycle开始到现在,是否已经超过一个cycle时间(CycleTimeMs)
  auto start_time = std::chrono::steady_clock::now();
  auto sleep_duration = state.last_cycle_start +
                        std::chrono::microseconds(long(
                            state.parameter_manager.CycleTimeMs() * 1000.)) -
                        start_time;
  if (sleep_duration > std::chrono::steady_clock::duration::zero()) {
    std::this_thread::sleep_for(sleep_duration);
  }
  state.last_cycle_start = std::chrono::steady_clock::now();

  // 在Timeline中记录,用户拿到Timeline结果后,可以在chrome中查看
  if (state.mark_cycles_in_timeline) {
    // Mark start of the new cycle.
    state.timeline.MarkCycleStart();
  }

  auto response_list =
      state.controller->ComputeResponseList(horovod_global.shut_down, state);

  state.mark_cycles_in_timeline =
      state.controller->MarkCyclesInTimelinePending();

  // 对于每个response,做collective的操作
  for (auto& response : response_list.responses()) {
    PerformOperation(response, horovod_global);
  }

  return !response_list.shutdown();
}

отHorovodRunOnceВ функции мы видим, что рабочий процесс Хоровода примерно такой, как было сказано ранее, который представляет собой модель производителя и потребителя.controllerВот работа координации: она будет сообщать, какие запросы каждого ранга готовы, и выполнять коллективные операции для готовых запросов.

Далее давайте посмотрим наComputeResponseListэта функция. Эта функция очень длинная, до строк 380. Чтобы легче понять, что делает эта функция, сначала удалите кеш и проверьте код остановки:

ResponseList Controller::ComputeResponseList(std::atomic_bool& shut_down,
                                             HorovodGlobalState& state) {
  CacheCoordinator cache_coordinator(response_cache_.num_active_bits());

  // message queue used only in this cycle
  std::deque<Request> message_queue_tmp;
  tensor_queue_.PopMessagesFromQueue(message_queue_tmp);
  for (auto& message : message_queue_tmp) {
    if (message.request_type() == Request::JOIN) {
      state.joined = true;
      cache_coordinator.set_uncached_in_queue(true);
      continue;
    }
  }

  // Flag indicating that the background thread should shut down.
  bool should_shut_down = shut_down;
  cache_coordinator.set_should_shut_down(should_shut_down);

  ResponseList response_list;
  response_list.set_shutdown(cache_coordinator.should_shut_down());

  {
    // Collect all tensors that are ready to be reduced. Record them in the
    // tensor count table (rank zero) or send them to rank zero to be
    // recorded (everyone else).
    std::vector<std::string> ready_to_reduce;

    if (is_coordinator_) {
      // 对于master进程,记录已经ready的tensor。注意此时message_queue_tmp中的request是来自
      // master进程
      while (!message_queue_tmp.empty()) {
        // Pop the first available message
        Request message = message_queue_tmp.front();
        message_queue_tmp.pop_front();

        if (message.request_type() == Request::JOIN) {
          state.joined_size++;
          continue;
        }

        bool reduce = IncrementTensorCount(message, state.joined_size);
        if (reduce) {
          ready_to_reduce.push_back(message.tensor_name());
        }
      }

      // 接收其他rank的ready的tensor
      std::vector<RequestList> ready_list;
      RecvReadyTensors(ready_to_reduce, ready_list);

      // 处理来自其他rank的request。size_是指有多少个rank
      for (int i = 1; i < size_; ++i) {
        auto received_message_list = ready_list[i];
        for (auto& received_message : received_message_list.requests()) {
          auto& received_name = received_message.tensor_name();

          // Join类型消息是指有新的rank加入,Horovod支持弹性
          if (received_message.request_type() == Request::JOIN) {
            state.joined_size++;
            continue;
          }

          // 增加该tensor已经ready的rank的个数,如果所有rank都ready,则发给其他rank
          bool reduce = IncrementTensorCount(received_message, state.joined_size);
          if (reduce) {
            ready_to_reduce.push_back(received_name);
          }
        }
        if (received_message_list.shutdown()) {
          // Received SHUTDOWN request from one of the workers.
          should_shut_down = true;
        }
      }

      // Check if tensors from previous ticks are ready to reduce after Joins.
      if (state.joined_size > 0) {
        for (auto& table_iter : message_table_) {
          int count = (int)table_iter.second.size();
          if (count == (size_ - state.joined_size) &&
              std::find(ready_to_reduce.begin(), ready_to_reduce.end(),
                        table_iter.first) == ready_to_reduce.end()) {
            state.timeline.NegotiateEnd(table_iter.first);
            ready_to_reduce.push_back(table_iter.first);
          }
        }
      }

      // 这个条件有点让人费解,看字面意思是如果禁止group fusion,并且group_table_非空,则fuse?
      if (state.disable_group_fusion && !group_table_.empty()) {

        // Extract set of common groups from coordinator tensor list and cache hits.
        std::vector<int> common_ready_groups;
        std::unordered_set<int> processed;

        for (const auto& tensor_name : ready_to_reduce) {
          int group_id = group_table_.GetGroupIDFromTensorName(tensor_name);
          if (group_id != NULL_GROUP_ID && processed.find(group_id) == processed.end()) {
            common_ready_groups.push_back(group_id);
            processed.insert(group_id);
            // Leaving name in list, to be skipped later.
          }
        }

        // For each ready group, form and fuse response lists independently
        for (auto id : common_ready_groups) {
          std::deque<Response> responses;
          for (const auto &tensor_name : group_table_.GetGroupTensorNames(id)) {
            if (message_table_.find(tensor_name) != message_table_.end()) {
              // Uncached message
              Response response = ConstructResponse(tensor_name, state.joined_size);
              responses.push_back(std::move(response));

            }
          }

          FuseResponses(responses, state, response_list);
        }
      }


      // At this point, rank zero should have a fully updated tensor count
      // table and should know all the tensors that need to be reduced or
      // gathered, and everyone else should have sent all their information
      // to rank zero. We can now do reductions and gathers; rank zero will
      // choose which ones and in what order, and will notify the other ranks
      // before doing each reduction.
      std::deque<Response> responses;

      for (auto& tensor_name : ready_to_reduce) {
        // Skip tensors in group that were handled earlier.
        if (state.disable_group_fusion &&
            !group_table_.empty() &&
            group_table_.GetGroupIDFromTensorName(tensor_name) != NULL_GROUP_ID) {
          continue;
        }

        Response response = ConstructResponse(tensor_name, state.joined_size);
        responses.push_back(std::move(response));
      }
      if (state.joined_size == size_) {
        // All ranks did Join(). Send the response, reset joined size.
        Response join_response;
        join_response.set_response_type(Response::JOIN);
        join_response.add_tensor_name(JOIN_TENSOR_NAME);
        responses.push_back(std::move(join_response));
        state.joined_size = 0;
      }
      FuseResponses(responses, state, response_list);
      response_list.set_shutdown(should_shut_down);

      // Broadcast final results to other ranks.
      SendFinalTensors(response_list);

    } else {
      // 非master,则发送自己已经ready的tensor给master,再接收已经ready的tensor列表
      RequestList message_list;
      message_list.set_shutdown(should_shut_down);
      while (!message_queue_tmp.empty()) {
        message_list.add_request(message_queue_tmp.front());
        message_queue_tmp.pop_front();
      }

      // Send ready tensors to rank zero
      SendReadyTensors(message_list);

      // Receive final tensors to be processed from rank zero
      RecvFinalTensors(response_list);
    }
  }

  if (!response_list.responses().empty()) {
    std::string tensors_ready;
    for (const auto& r : response_list.responses()) {
      tensors_ready += r.tensor_names_string() + "; ";
    }
  }

  // Reassign cache bits based on current cache order.
  response_cache_.update_cache_bits();

  return response_list;
}

В Хороводе каждая карта соответствует тренировочному процессу, называемомуrank. Например, 4 карты, соответствующие каждому процессуrankтогда[0,1,2,3].rankПроцесс с 0 является ведущим, а остальные — рабочими. рабочий будет вComputeResponseListОтправьте готовый тензор мастеру. Если тензор во всехrankЧжунду готов, мастер уведомит другихrank, вы можете выполнять коллективные операции над этим тензором.

Продолжайте смотреть наHorovodRunOnceЕще одна важная функция, которая появляется вPerformOperation. Эта функция относительно понятна, в основном для выполнения трех задач:

  • Сделайте слияние на тензорах: объедините несколько тензоров в большой тензор, а затем выполните коллективные операции
  • ждать поступления данных
  • Выполнять коллективные операции
void PerformOperation(Response response, HorovodGlobalState& state) {
  std::vector<TensorTableEntry> entries;
  auto& timeline = horovod_global.timeline;
  if (response.response_type() != Response::JOIN) {
    // 这里有点奇怪,直接用了horovod_global这个变量,而拿joined的时候,又是从state里拿的
    horovod_global.tensor_queue.GetTensorEntriesFromResponse(response, entries,
                                                             state.joined);

    for (auto& e : entries) {
      timeline.Start(e.tensor_name, response.response_type());
    }

    if (entries.size() > 1) {
      // 如果多于1个,则可以进行fuse,以提高throughput
      auto first_entry = entries[0];
      // Note: it is OK for different entries to come from different frameworks
      // since buffer allocated here is guaranteed to survive at least till the
      // end of this operation.
      Status status = horovod_global.fusion_buffer.InitializeBuffer(
          horovod_global.controller->TensorFusionThresholdBytes(),
          first_entry.device, first_entry.context,
          horovod_global.current_nccl_stream,
          [&]() { timeline.ActivityStartAll(entries, INIT_FUSION_BUFFER); },
          [&]() { timeline.ActivityEndAll(entries); });
      if (!status.ok()) {
        LOG(DEBUG, horovod_global.controller->GetRank()) << "InitializeBuffer Failed";
        for (auto& e : entries) {
          timeline.End(e.tensor_name, nullptr);
          // Callback can be null if the rank sent Join request.
          if (e.callback != nullptr) {
            e.callback(status);
          }
        }
        return;
      }
    }

    // On GPU data readiness is signalled by ready_event.
    // 即使tensor可以进行操作了,但需要等待数据同步到显存
    std::vector<TensorTableEntry> waiting_tensors;
    for (auto& e : entries) {
      if (e.ready_event != nullptr) {
        timeline.ActivityStart(e.tensor_name, WAIT_FOR_DATA);
        waiting_tensors.push_back(e);
      }
    }
    while (!waiting_tensors.empty()) {
      for (auto it = waiting_tensors.begin(); it != waiting_tensors.end();) {
        if (it->ready_event->Ready()) {
          timeline.ActivityEnd(it->tensor_name);
          timeline.ActivityStart(it->tensor_name, WAIT_FOR_OTHER_TENSOR_DATA);
          it = waiting_tensors.erase(it);
        } else {
          ++it;
        }
      }
      std::this_thread::sleep_for(std::chrono::nanoseconds(100));
    }
    for (auto& e : entries) {
      if (e.ready_event != nullptr) {
        timeline.ActivityEnd(e.tensor_name);
      }
    }
  }

  // 终于可以进行collective的操作了
  Status status;
  try {
    status = op_manager->ExecuteOperation(entries, response);
  } catch (const std::exception& ex) {
    LOG(DEBUG, horovod_global.controller->GetRank()) << "ExecuteOperation Failed";
    status = Status::UnknownError(ex.what());
  }

  if (!status.in_progress()) {
    for (auto& e : entries) {
      timeline.End(e.tensor_name, status.ok() ? e.output : nullptr);
      // Callback can be null if the rank sent Join request.
      if (e.callback != nullptr) {
        e.callback(status);
      }
    }
  }
}

Пока представлен основной рабочий процесс Хоровода.