0x00 сводка
Horovod — это простая в использовании высокопроизводительная распределенная платформа обучения, выпущенная Uber в 2017 году и широко используемая в отрасли.
Эта серия поможет вам понять Horovod с помощью анализа исходного кода. Эта статья является шестой в серии, посвященной архитектуре фоновых потоков Horovod.
Эта статья следует непосредственно выше. Из-за ограничения на количество слов, эта статья разделена на две части, пожалуйста, простите меня.
Предыдущие ссылки выглядят следующим образом:
0x04 Общий код
4.1 Фоновые темы
BackgroundThreadLoopЭто фоновый поток в процессе обучения. Он в основном отвечает за связь с другими узлами и обработку требований связи (запросов) от внешнего интерфейса. Он будет опрашивать и вызывать RunLoopOnce и постоянно проверять, есть ли тензоры, которым необходимо общаться. в tensor_queue.Если есть тензоры, которым необходимо взаимодействовать с другими обновлениями узлов, а затем выполнять операцию связи.
Основную логику можно увидеть в функции BackgroundThreadLoop:
- В зависимости от конфигурации компиляции решите, как инициализировать, например, mpi_context.Initialize инициализируется только при компиляции MPI.
- Инициализируйте контроллер и создайте соответствующий контроллер для globalstate в соответствии с загруженной библиотекой коллективных коммуникаций (mpi или gloo);
- Получите различные конфигурации, такие как local_rank;
- установить сходство с фоновым потоком;
- установить поток графического процессора;
- Установить конфигурацию временной шкалы;
- Установите пороговое значение Tensor Fusion, время цикла, емкость кэша ответов, флаг для иерархического allreduce.....;
- Установить автонастройку, размер чанка;
- сброс менеджера операций;
- Введите код ключа RunLoopOnce;
Сокращенная версия кода выглядит следующим образом:
BackgroundThreadLoop(HorovodGlobalState& state) {
......
#if HAVE_MPI
// Initialize mpi context
#if HAVE_DDL
// If DDL is enabled, let DDL ops manage MPI environment.
auto mpi_ctx_manager = DDL_MPIContextManager(ddl_context, gpu_context);
#else
// Otherwise, let MPI ops be in charge.
auto mpi_ctx_manager = MPIContextManager();
#endif
// mpi_context 会根据前端和环境变量传过来的信息,创建 mpi 线程,和一些 mpiOps
mpi_context.Initialize(state.controller->GetRanks(), mpi_ctx_manager);
#endif
......
// 会同步不同 node 的 global_size, local_size, rank, is_coordinator 等信息
// Initialize controller
state.controller->Initialize();
int local_size = state.controller->GetLocalSize();
int local_rank = state.controller->GetLocalRank();
......
// 设置op_manager,这里主要是注册不同的集合通信库的 ops
op_manager.reset(CreateOperationManager(state));
// Signal that initialization is completed.
state.initialization_done = true;
// Iterate until shutdown.
try {
while (RunLoopOnce(state));
} catch (const std::exception& ex) {
LOG(ERROR) << "Horovod background loop uncaught exception: " << ex.what();
}
}
4.2 Где строить кольцо
У вас могут возникнуть вопросы, так как Хоровод это кольцо Allreduce, а где именно установлено кольцо? Давайте выберем несколько реализаций для общего вида. Потому что если изучать подробно, то нужно углубляться в MPI, gloo и т.п., что выходит за рамки данной статьи, поэтому у нас есть только общее понимание.
4.2.1 Вызовы NCCL
Давайте сначала посмотрим на NCCL.
4.2.1.1 NCCL
NCCL — это аббревиатура Nvidia Collective multi-GPU Communication Library.Это коллективная коммуникационная библиотека (все-сборка, сокращение, трансляция), которая реализует несколько GPU.Nvidia провела большую оптимизацию для достижения высокой производительности на PCIe, Nvlink и InfiniBand скорость связи.
4.2.1.2 Horovod
В NCCLAllreduce::Execute мы видим, что вызывается ncclAllReduce, который является API nccl, поэтому мы можем сделать вывод, что его параметры*nccl_op_context_.nccl_comm_
Должен быть ключ.
Status NCCLAllreduce::Execute(std::vector<TensorTableEntry>& entries,
const Response& response) {
// Do allreduce.
auto nccl_result = ncclAllReduce(fused_input_data, buffer_data,
(size_t) num_elements,
GetNCCLDataType(first_entry.tensor), ncclSum,
*nccl_op_context_.nccl_comm_, *gpu_op_context_.stream);
}
nccl_op_context_ — это тип NCCLOpContext, а упрощенная версия NCCLOpContext определяется следующим образом:
class NCCLOpContext {
public:
void InitNCCLComm(const std::vector<TensorTableEntry>& entries,
const std::vector<int32_t>& nccl_device_map);
ncclComm_t* nccl_comm_;
};
Итак, давайте посмотрим на его параметрыnccl_comm_
Как он инициализируется, вы можете видеть, что он вызывает ncclCommInitRank для инициализации.
void NCCLOpContext::InitNCCLComm(const std::vector<TensorTableEntry>& entries,
const std::vector<int32_t>& nccl_device_map) {
// Ensure NCCL communicator is in the map before executing operation.
ncclComm_t& nccl_comm = nccl_context_->nccl_comms[global_state_->current_nccl_stream][nccl_device_map];
if (nccl_comm == nullptr) {
auto& timeline = global_state_->timeline;
timeline.ActivityStartAll(entries, INIT_NCCL);
int nccl_rank, nccl_size;
Communicator nccl_id_bcast_comm;
// 获取rank相关信息
PopulateNCCLCommStrategy(nccl_rank, nccl_size, nccl_id_bcast_comm);
ncclUniqueId nccl_id;
global_state_->controller->Bcast((void*)&nccl_id, sizeof(nccl_id), 0,
nccl_id_bcast_comm);
ncclComm_t new_nccl_comm;
// 这里调用了nccl,传递了rank信息
auto nccl_result = ncclCommInitRank(&new_nccl_comm, nccl_size, nccl_id, nccl_rank);
nccl_context_->ErrorCheck("ncclCommInitRank", nccl_result, nccl_comm);
nccl_comm = new_nccl_comm;
// Barrier helps NCCL to synchronize after initialization and avoid
// deadlock that we've been seeing without it.
global_state_->controller->Barrier(Communicator::GLOBAL);
timeline.ActivityEndAll(entries);
}
nccl_comm_ = &nccl_comm;
}
PopulateNCCLCommStrategy предназначен для получения информации о рангах из глобального состояния.
void NCCLOpContext::PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size,
Communicator& nccl_id_bcast_comm) {
if (communicator_type_ == Communicator::GLOBAL) {
nccl_rank = global_state_->controller->GetRank();
nccl_size = global_state_->controller->GetSize();
} else if (communicator_type_ == Communicator::LOCAL) {
nccl_rank = global_state_->controller->GetLocalRank();
nccl_size = global_state_->controller->GetLocalSize();
} else {
throw std::logic_error("Communicator type " + std::to_string(communicator_type_) +
" is not supported in NCCL mode.");
}
nccl_id_bcast_comm = communicator_type_;
}
Итак, мы должны заглянуть в исходный код NCCL.
4.2.1.3 In NCCL
можно увидеть в init.cc
NCCL_API(ncclResult_t, ncclCommInitRank, ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank);
ncclResult_t ncclCommInitRank(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank) {
NVTX3_FUNC_RANGE_IN(nccl_domain);
int cudaDev;
CUDACHECK(cudaGetDevice(&cudaDev));
// 这里初始化
NCCLCHECK(ncclCommInitRankDev(newcomm, nranks, commId, myrank, cudaDev));
return ncclSuccess;
}
Продолжая видеть, ncclAsyncInit вызывается для завершения окончательной инициализации, передавая общий номер ранга и myrank самого процесса.
static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev) {
ncclResult_t res;
char* env = getenv("NCCL_COMM_ID");
NCCLCHECKGOTO(ncclInit(), res, end);
// Make sure the CUDA runtime is initialized.
CUDACHECKGOTO(cudaFree(NULL), res, end);
NCCLCHECKGOTO(PtrCheck(newcomm, "CommInitRank", "newcomm"), res, end);
if (ncclAsyncMode()) {
// 调用了 ncclAsyncInit 来完成最后初始化,传入了总体rank数目,进程自身的myrank
NCCLCHECKGOTO(ncclAsyncInit(ncclCommInitRankSync, newcomm, nranks, commId, myrank, cudaDev), res, end);
} else {
NCCLCHECKGOTO(ncclCommInitRankSync(newcomm, nranks, commId, myrank, cudaDev), res, end);
}
end:
if (ncclAsyncMode()) return ncclAsyncErrCheck(res);
else return res;
}
ncclComm_t на самом деле является определением типа ncclComm, поэтому давайте посмотрим на определение ncclComm, которое включает в себя общее количество рангов и ранг самого процесса.
struct ncclComm {
struct ncclChannel channels[MAXCHANNELS];
...
// Bitmasks for ncclTransportP2pSetup
int connect;
uint32_t* connectSend;
uint32_t* connectRecv;
int rank; // my rank in the communicator
int nRanks; // number of GPUs in communicator
int cudaDev; // my cuda device index
int64_t busId; // my PCI bus ID in int format
int node;
int nNodes;
int localRanks;
// Intra-process sync
int intraRank;
int intraRanks;
int* intraBarrier;
int intraPhase;
....
};
Таким образом, мы можем примерно понять, что horovod передает информацию о ранге, и NCCL будет формировать кольцо соответственно.
4.2.2 GLOO
Как видно из GlooContext::Initialize, Horovod отправляет информацию о ранге на Rendezvous Server через Rendezvous.
Кольца формируются внутри Gloo.
Среди них cross_rank требуется для иерархического allreduce.
void GlooContext::Initialize(const std::string& gloo_iface) {
attr device_attr;
device_attr.iface = gloo_iface;
device_attr.ai_family = AF_UNSPEC;
auto dev = CreateDevice(device_attr);
auto timeout = GetTimeoutFromEnv();
auto host_env = std::getenv(HOROVOD_HOSTNAME);
std::string hostname = host_env != nullptr ? std::string(host_env) : std::string("localhost");
int rank = GetIntEnvOrDefault(HOROVOD_RANK, 0);
int size = GetIntEnvOrDefault(HOROVOD_SIZE, 1);
int local_rank = GetIntEnvOrDefault(HOROVOD_LOCAL_RANK, 0);
int local_size = GetIntEnvOrDefault(HOROVOD_LOCAL_SIZE, 1);
int cross_rank = GetIntEnvOrDefault(HOROVOD_CROSS_RANK, 0);
int cross_size = GetIntEnvOrDefault(HOROVOD_CROSS_SIZE, 1);
auto rendezvous_addr_env = std::getenv(HOROVOD_GLOO_RENDEZVOUS_ADDR);
auto rendezvous_port = GetIntEnvOrDefault(HOROVOD_GLOO_RENDEZVOUS_PORT, -1);
bool elastic = GetBoolEnvOrDefault(HOROVOD_ELASTIC, false);
if (elastic && reset_) {
std::string server_addr = rendezvous_addr_env;
std::string scope = HOROVOD_GLOO_GET_RANK_AND_SIZE;
HTTPStore init_store(server_addr, rendezvous_port, scope, rank);
auto key = hostname + ":" + std::to_string(local_rank);
std::vector<char> result = init_store.get(key);
std::string s(result.begin(), result.end());
std::stringstream ss(s);
int last_rank = rank;
int last_size = size;
int last_local_rank = local_rank;
int last_local_size = local_size;
int last_cross_rank = cross_rank;
int last_cross_size = cross_size;
rank = ParseNextInt(ss);
size = ParseNextInt(ss);
local_rank = ParseNextInt(ss);
local_size = ParseNextInt(ss);
cross_rank = ParseNextInt(ss);
cross_size = ParseNextInt(ss);
SetEnv(HOROVOD_RANK, std::to_string(rank).c_str());
SetEnv(HOROVOD_SIZE, std::to_string(size).c_str());
SetEnv(HOROVOD_LOCAL_RANK, std::to_string(local_rank).c_str());
SetEnv(HOROVOD_LOCAL_SIZE, std::to_string(local_size).c_str());
SetEnv(HOROVOD_CROSS_RANK, std::to_string(cross_rank).c_str());
SetEnv(HOROVOD_CROSS_SIZE, std::to_string(cross_size).c_str());
}
// 设定了不同的 Rendezvous server
ctx = Rendezvous(HOROVOD_GLOO_GLOBAL_PREFIX,
rendezvous_addr_env, rendezvous_port,
rank, size, dev, timeout);
local_ctx = Rendezvous(HOROVOD_GLOO_LOCAL_PREFIX + hostname,
rendezvous_addr_env, rendezvous_port,
local_rank, local_size, dev, timeout);
cross_ctx = Rendezvous(HOROVOD_GLOO_CROSS_PREFIX + std::to_string(local_rank),
rendezvous_addr_env, rendezvous_port,
cross_rank, cross_size, dev, timeout);
}
4.2.3 MPI
Как вы можете видеть в MPIContext::Initialize, здесь устанавливаются различные ранги.
void MPIContext::Initialize(const std::vector<int>& ranks,
MPIContextManager& ctx_manager) {
auto mpi_threads_disable = std::getenv(HOROVOD_MPI_THREADS_DISABLE);
int required = MPI_THREAD_MULTIPLE;
if (mpi_threads_disable != nullptr &&
std::strtol(mpi_threads_disable, nullptr, 10) > 0) {
required = MPI_THREAD_SINGLE;
}
int is_mpi_initialized = 0;
MPI_Initialized(&is_mpi_initialized);
if (is_mpi_initialized) {
int provided;
MPI_Query_thread(&provided);
} else {
// MPI environment has not been created, using manager to initialize.
ctx_manager.EnvInitialize(required);
should_finalize = true;
}
if (!ranks.empty()) {
MPI_Group world_group;
MPI_Comm_group(MPI_COMM_WORLD, &world_group);
MPI_Group work_group;
MPI_Group_incl(world_group, ranks.size(), ranks.data(), &work_group);
MPI_Comm_create_group(MPI_COMM_WORLD, work_group, 0, &(mpi_comm));
if (mpi_comm == MPI_COMM_NULL) {
mpi_comm = MPI_COMM_WORLD;
}
MPI_Group_free(&world_group);
MPI_Group_free(&work_group);
} else if (!mpi_comm) {
// No ranks were given and no communicator provided to horovod_init() so use
// MPI_COMM_WORLD
MPI_Comm_dup(MPI_COMM_WORLD, &mpi_comm);
}
// Create local comm, Determine local rank by querying the local communicator.
MPI_Comm_split_type(mpi_comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,
&local_comm);
// Get local rank and world rank for cross comm establishment.
int local_rank, world_rank;
MPI_Comm_rank(mpi_comm, &world_rank);
MPI_Comm_rank(local_comm, &local_rank);
// Create cross node communicator.
MPI_Comm_split(mpi_comm, local_rank, world_rank, &cross_comm);
// Create custom MPI float16 data type.
MPI_Type_contiguous(2, MPI_BYTE, &mpi_float16_t);
MPI_Type_commit(&mpi_float16_t);
// Create custom MPI float16 summation op.
MPI_Op_create(&float16_sum, 1, &mpi_float16_sum);
}
0x05 бизнес-логика
Рассмотрим подробнее бизнес-логику.
5.1 Общий бизнес RunLoopOnce
RunLoopOnce отвечает за общую бизнес-логику, и его функции заключаются в следующем:
-
Рассчитать, нужен ли еще сон, то есть проверить, не превысило ли оно время одного цикла с момента последнего цикла;
-
Используйте ComputeResponseList, чтобы согласовать ранг 0 с работником, получить запрос и вычислить ответ;
Ранг 0 будет проходить по response_list и выполнять операции над ответом одну за другой.
response_list обрабатывается рангом 0, а кеш ответов обрабатывается другими рангами.
-
Используйте PerformOperation для выполнения коллективных операций для каждого ответа.
-
Если требуется автонастройка, синхронизируйте параметры;
Мы видим, что рабочий процесс Хоровода примерно такой, как было сказано ранее, и представляет собой модель производителя и потребителя. Контроллер здесь, чтобы выполнять работу по координации: он будет сообщать, какие запросы готовы для каждого ранга, и выполнять коллективные операции для готовых запросов.
Сокращенная версия кода выглядит следующим образом:
bool RunLoopOnce(HorovodGlobalState& state) {
// This delay determines thread frequency and communication message latency
.....
// 让 rank 0 与 worker 协调,获取 Request,计算 response
auto response_list =
state.controller->ComputeResponseList(horovod_global.shut_down, state);
// Get tensor name and size data for autotuning.
.....
// Perform the collective operation. All nodes should end up performing
// the same operation.
// 对于每个response,做collective的操作
int rank = state.controller->GetRank();
for (auto& response : response_list.responses()) {
PerformOperation(response, horovod_global);
}
// 如果需要 auto tune,就同步参数
if (state.parameter_manager.IsAutoTuning()) {
bool should_sync =
state.parameter_manager.Update(tensor_names, total_tensor_size);
if (should_sync) {
state.controller->SynchronizeParameters();
}
}
return !response_list.shutdown();
}
Процесс выглядит следующим образом:
+---------------------------------+
| | +-----------------------------+
| BackgroundThreadLoop | | |
| | | OperationManager |
| +--------------------------+ | | |
| | RunLoopOnce | | | |
| | | | | |
| | | | | |
| | ComputeResponseList | | +----------> ExecuteOperation |
| | + | | | | |
| | | | | | | |
| | | | | | | |
| | | | | | 1 | |
| | v | | | | |
| | | | | | |
| | PerformOperation +----------+ | |
| | | | | |
| +--------------------------+ | | |
| | | |
+---------------------------------+ +-----------------------------+
5.2 ComputeResponseList Расчет ответа
В фоновом потоке наиболее важным вызовом функции являетсяComputeResponseList
. ComputeResponseList реализует процесс координации, то есть согласование ранга 0 с воркером, получение запроса и вычисление ответа.
Horovod также следует дизайну Coordinator, подобно Baidu. Координатор в Baidu и Horovod похож на режим Актера, который в основном координирует работу нескольких процессов. Horovod также вводит новую абстракцию op_manager, когда вычисления фактически выполняются. В какой-то степени мы можем думать о контроллере как об абстракции возможностей управления связью и координацией, тогда как op_manager — это абстракция реальных вычислений.
5.2.1 Общая идея
Функция Controller::ComputeResponseList такова: воркеры отправляют запросы на ранг 0, затем координатор обрабатывает все запросы воркеров, находит готовые, объединяет их и отправляет окончательный результат на другие ранги:
- Используйте PopMessagesFromQueue, чтобы вынуть текущий запрос из тензорной очереди GlobalState своего собственного процесса и обработать его.Конкретная обработка использует кеш, а затем после серии обработок он кэшируется вmessage_queue_tmpсередина;
- Синхронизируйте информацию кеша друг с другом, цель состоит в том, чтобы получить список ответов, хранящийся каждым воркером;
- Определите, требуется ли дальнейшая синхронизация, например, все ли ответы находятся в кэше;
- Если синхронизация не требуется, то
- Это означает, что все сообщения в очереди находятся в кеше, и никакого другого согласования не требуется. Таким образом, кэшированные ответы напрямую объединяются и помещаются в response_list, а следующий раунд временных интервалов будет продолжать обрабатываться;
- Если требуется синхронизация, то
-
Если это ранг 0,
- Поскольку ранг 0 также будет участвовать в обучении машинному обучению, запрос ранга 0 необходимо добавить в таблицу сообщений. Принимать запросы от других рангов и добавлять запросы из других рангов в message_table_. Синхронизация здесь заблокирована.
- Ранг 0 использует RecvReadyTensors для приема запросов от других рангов и добавляет запросы из других рангов в ready_to_reduce. Синхронизация здесь заблокирована. Координатор будет продолжать получать эту информацию до тех пор, пока количество полученных Dones не станет равным global_size.
- Затем пройдите ранг 0+1 ~ ранг n и обработайте ответ каждого ранга один за другим;
- Наконец, в таблице сообщений уже есть все сокращаемые списки.Источником ответов являются следующие три части:
- источник 1, response_cache_ в ранге 0;
- Источник 2, обработайте ready_to_reduce один за другим;
- Источник 3, join_response
- Используйте FuseResponses для слияния тензоров: объедините несколько тензоров в большой тензор, а затем выполните коллективные операции.
- Координатор найдет все тензоры, готовые к редукции, наSendFinalTensors(response_list)Вернуть ответ всем воркерам.Если информация неверна, будет возвращена ошибка, а после завершения передачи будет отправлено Done.
-
Тогда для других рангов:
- Когда рабочий процесс достигает предложения all_reduce во внешнем интерфейсе, он будет использоватьmessage_queue_tmpОрганизовано в message_list черезSendReadyTensorsФункция отправляет запрос главному узлу (координатору, ранг 0), указывающий, что я намерен уменьшить Запрос, а затем итеративно отправляет тензорную информацию для сокращения через message_list, и, наконец, получает запрос Готово, а затем синхронно блокируется.
- Worker использует RecvFinalTensors(response_list) для отслеживания информации об ответах, получает готовый список ответов от ранга 0 и синхронно блокирует. Когда получено сообщение Done, он попытается вызвать снижение производительности.
-
И координатор, и рабочий организуют синхронизированную информацию в массив ответов на последующую операцию PerformOperation.
-
Вот как реализован mpi, то есть координатор и соответствующий воркер будут блокироваться на одну и ту же инструкцию:
- SendReadyTensors и RecvReadyTensors блокируют MPI_Gather;
- ОтправитьFinalTensors и RecvFinalTensors в MPI_Bcast;
Отличить можно так: если координатор отправляет MPI_Bcast, если воркер отправляет MPI_Gather. Коммуникация заключается в том, чтобы сначала синхронизировать размер сообщения, которое необходимо передать, а затем синхронизировать сообщение.
Подробности следующие:
+
|
ComputeResponseList in rank 0 | ComputeResponseList in worker(rank n)
|
|
message_queue_tmp | message_queue_tmp
|
+ | +
| | |
|PopMessagesFromQueue | | PopMessagesFromQueue
| | |
| | |
| CoordinateCacheAndState |
| | |
| <--------------------------------> |
| | |
v | v
|
RecvReadyTensors(ready_to_reduce, ready_list) <-------------> SendReadyTensors(message_list)
+ | +
| | |
| | |
| | |
| | |
v | |
message_table_ | |
+ | |
| | |
| | |
v | |
FuseResponses | |
+ | |
| | |
| | |
v | v
SendFinalTensors(response_list) <----------------> RecvFinalTensors(response_list)
+ | +
| | |
| | |
| | |
v | v
PerformOperation | PerformOperation
|
+
Мобильный телефон показан на рисунке:
5.2.2 Подробный анализ
Ниже приводится более подробный анализ со ссылкой на информацию в Интернете и самостоятельной ее интерпретацией.
ResponseList Controller::ComputeResponseList(std::atomic_bool& shut_down,
HorovodGlobalState& state) {
// Update cache capacity if autotuning is active.
if (parameter_manager_.IsAutoTuning()) {
response_cache_.set_capacity((int)parameter_manager_.CacheEnabled() *
cache_capacity_);
}
// Copy the data structures out from parameters.
// However, don't keep the lock for the rest of the loop, so that
// enqueued stream callbacks can continue.
CacheCoordinator cache_coordinator(response_cache_.num_active_bits());
// 从 Tensor Quene 中把目前的 Request 都取出来,进行处理
// message queue used only in this cycle
std::deque<Request> message_queue_tmp;
tensor_queue_.PopMessagesFromQueue(message_queue_tmp);
for (auto& message : message_queue_tmp) {
if (message.request_type() == Request::JOIN) {
state.joined = true;
// set_uncached_in_queue 记录没有cache的
cache_coordinator.set_uncached_in_queue(true);
continue;
}
// 这里使用了缓存,就是为了缓存本rank已经得到了多少response。
// Keep track of cache hits
if (response_cache_.capacity() > 0) {
// 需要看看这个tensor是否已经得到了对应的response。为啥要缓存呢?不是都 ready 之后,就立刻进行 all reduce 了嘛。
// cached 函数比较复杂,不但要看是否已经缓存,还要看新 tensor 是否和已经缓存的同名 tensor 的各种参数一致,比如device,dtype,shape等等。如果不一致,则标识缓存的是 INVALID。难道深度学习训练中,这些会变更?
auto cache_ = response_cache_.cached(message);
if (cache_ == ResponseCache::CacheState::HIT) {
uint32_t cache_bit = response_cache_.peek_cache_bit(message);
cache_coordinator.record_hit(cache_bit);
// Record initial time cached tensor is encountered in queue.
stall_inspector_.RecordCachedTensorStart(message.tensor_name());
} else {
// 如果没有缓存
if (cache_ == ResponseCache::CacheState::INVALID) {
// 处理无效缓存记录
uint32_t cache_bit = response_cache_.peek_cache_bit(message);
cache_coordinator.record_invalid_bit(cache_bit);
}
// 如果没有缓存,则添加到 set_uncached_in_queue
cache_coordinator.set_uncached_in_queue(true);
// 从stall 移除
// Remove timing entry if uncached or marked invalid.
stall_inspector_.RemoveCachedTensor(message.tensor_name());
}
}
}
if (state.joined && response_cache_.capacity() > 0) {
for (uint32_t bit : response_cache_.list_all_bits()) {
cache_coordinator.record_hit(bit);
}
}
// Flag indicating that the background thread should shut down.
bool should_shut_down = shut_down;
// 处理 stalled
// Check for stalled tensors.
if (stall_inspector_.ShouldPerformCheck()) {
if (is_coordinator_) {
should_shut_down |= stall_inspector_.CheckForStalledTensors(size_);
}
if (response_cache_.capacity() > 0) {
stall_inspector_.InvalidateStalledCachedTensors(cache_coordinator);
}
stall_inspector_.UpdateCheckTime();
}
cache_coordinator.set_should_shut_down(should_shut_down);
if (response_cache_.capacity() > 0) {
// 为什么要彼此同步cache信息?
// Obtain common cache hits and cache invalidations across workers. Also,
// determine if any worker has uncached messages in queue or requests
// a shutdown. This function removes any invalid cache entries, if they
// exist.
// 这里会同步,也会从 response_cache_ 之中移除 invalid 的。
// 目的是得到每个worker 共同存储的 response列表
CoordinateCacheAndState(cache_coordinator);
// Remove uncommon cached tensors from queue and replace to state
// queue for next cycle. Skip adding common cached tensors to
// queue as they are handled separately.
// 此时 cache_coordinator 已经是所有worker 共有的response 列表了。需要移除那些 不在共有response 列表中的 response。
// 为什么有的worker会没有某种response?
// 会从 tensor request messages 之中看看是否已经有cache的了,然后相应更新 tensor_queue_。
std::deque<Request> messages_to_replace;
size_t num_messages = message_queue_tmp.size();
for (size_t i = 0; i < num_messages; ++i) {
auto& message = message_queue_tmp.front();
if (response_cache_.cached(message) == ResponseCache::CacheState::HIT) {
uint32_t cache_bit = response_cache_.peek_cache_bit(message);
if (cache_coordinator.cache_hits().find(cache_bit) ==
cache_coordinator.cache_hits().end()) {
// Try to process again in next cycle.
messages_to_replace.push_back(std::move(message));
} else {
// Remove timing entry for messages being handled this cycle.
stall_inspector_.RemoveCachedTensor(message.tensor_name());
}
} else {
// Remove timing entry for messages being handled this cycle.
stall_inspector_.RemoveCachedTensor(message.tensor_name());
message_queue_tmp.push_back(std::move(message));
}
message_queue_tmp.pop_front();
}
tensor_queue_.PushMessagesToQueue(messages_to_replace);
}
// End of response_cache_.capacity()
ResponseList response_list;
response_list.set_shutdown(cache_coordinator.should_shut_down());
bool need_communication = true;
// 判断是否需要进一步同步,比如response全都在cache之中。
if (response_cache_.capacity() > 0 &&
!cache_coordinator.uncached_in_queue()) {
// if cache is enabled and no uncached new message coming in, no need for
// additional communications
need_communication = false;
// If no messages to send, we can simply return an empty response list;
if (cache_coordinator.cache_hits().empty()) {
return response_list;
}
// otherwise we need to add cached messages to response list.
}
if (!need_communication) {
// 队列中所有消息都在缓存之中,不需要其他的协调。于是直接把缓存的response进行融合,放入response_list
// If all messages in queue have responses in cache, use fast path with
// no additional coordination.
// If group fusion is disabled, fuse tensors in groups separately
if (state.disable_group_fusion && !group_table_.empty()) {
// Note: need group order to be based on position in cache for global consistency
std::vector<int> common_ready_groups;
std::unordered_set<int> processed;
for (auto bit : cache_coordinator.cache_hits()) {
const auto& tensor_name = response_cache_.peek_response(bit).tensor_names()[0];
int group_id = group_table_.GetGroupIDFromTensorName(tensor_name);
if (group_id != NULL_GROUP_ID && processed.find(group_id) == processed.end()) {
common_ready_groups.push_back(group_id);
processed.insert(group_id);
}
}
for (auto id : common_ready_groups) {
std::deque<Response> responses;
for (const auto &tensor_name : group_table_.GetGroupTensorNames(id)) {
auto bit = response_cache_.peek_cache_bit(tensor_name);
responses.push_back(response_cache_.get_response(bit));
// Erase cache hit to avoid processing a second time.
cache_coordinator.erase_hit(bit);
}
FuseResponses(responses, state, response_list);
}
}
std::deque<Response> responses;
// Convert cache hits to responses. Populate so that least
// recently used responses get priority. All workers call the code
// here so we use the get method here to consistently update the cache
// order.
for (auto bit : cache_coordinator.cache_hits()) {
responses.push_back(response_cache_.get_response(bit));
}
// Fuse responses as normal.
FuseResponses(responses, state, response_list);
response_list.set_shutdown(cache_coordinator.should_shut_down());
} else {
// 有没有缓存的消息进入,需要找出来这些是不是可以reduce的。
// There are uncached messages coming in, need communication to figure out
// whether those are ready to be reduced.
// Collect all tensors that are ready to be reduced. Record them in the
// tensor count table (rank zero) or send them to rank zero to be
// recorded (everyone else).
std::vector<std::string> ready_to_reduce;
if (is_coordinator_) {
// 我是 rank 0,对于master进程,记录已经ready的tensor。
// rank 0 也会参与机器学习的训练,所以需要把rank 0的request也加入到message table之中。
while (!message_queue_tmp.empty()) { // 注意此时message_queue_tmp中的request是来自master进程
// Pop the first available message
Request message = message_queue_tmp.front();
message_queue_tmp.pop_front();
if (message.request_type() == Request::JOIN) {
state.joined_size++;
continue;
}
bool reduce = IncrementTensorCount(message, state.joined_size);
stall_inspector_.RecordUncachedTensorStart(
message.tensor_name(), message.request_rank(), size_);
if (reduce) {
ready_to_reduce.push_back(message.tensor_name());
}
}
// 接受其他 rank 的 Request,把其他 rank 的 ready Request 加入到 message_table_ 之中。
// 此处就同步阻塞了
// Receive ready tensors from other ranks
std::vector<RequestList> ready_list;
RecvReadyTensors(ready_to_reduce, ready_list);
// 处理所有 rank 的 Request。
// Process messages.
// 遍历 rank 0+1 ~ rank n,逐一处理每个 rank 的 response
for (int i = 1; i < size_; ++i) { // size_是指有多少个rank
// 每一个 rank 的 response list。
auto received_message_list = ready_list[i];
for (auto& received_message : received_message_list.requests()) {
auto& received_name = received_message.tensor_name();
// Join类型消息是指有新的rank加入,Horovod支持弹性
if (received_message.request_type() == Request::JOIN) {
state.joined_size++; // 增加该tensor已经ready的rank的个数,如果所有rank都ready,则发给其他rank
continue;
}
bool reduce = IncrementTensorCount(received_message, state.joined_size);
stall_inspector_.RecordUncachedTensorStart(
received_message.tensor_name(), received_message.request_rank(),
size_);
// 如果已经达到了最大数值,则可以 reduce 了,加入到 ready_to_reduce。
if (reduce) {
ready_to_reduce.push_back(received_name);
}
}
if (received_message_list.shutdown()) {
// Received SHUTDOWN request from one of the workers.
should_shut_down = true;
}
}
// Check if tensors from previous ticks are ready to reduce after Joins.
// 遍历 message_table_,目的是看看上一轮处理的 response 在本轮是否可以 reduce
if (state.joined_size > 0) {
for (auto& table_iter : message_table_) {
int count = (int)table_iter.second.size();
if (count == (size_ - state.joined_size) &&
std::find(ready_to_reduce.begin(), ready_to_reduce.end(),
table_iter.first) == ready_to_reduce.end()) {
state.timeline.NegotiateEnd(table_iter.first);
ready_to_reduce.push_back(table_iter.first);
}
}
}
// Fuse tensors in groups before processing others.
if (state.disable_group_fusion && !group_table_.empty()) {
// Extract set of common groups from coordinator tensor list and cache hits.
std::vector<int> common_ready_groups;
std::unordered_set<int> processed;
for (const auto& tensor_name : ready_to_reduce) {
int group_id = group_table_.GetGroupIDFromTensorName(tensor_name);
if (group_id != NULL_GROUP_ID && processed.find(group_id) == processed.end()) {
common_ready_groups.push_back(group_id);
processed.insert(group_id);
// Leaving name in list, to be skipped later.
}
}
if (response_cache_.capacity() > 0) {
for (auto bit : cache_coordinator.cache_hits()) {
const auto& tensor_name = response_cache_.peek_response(bit).tensor_names()[0];
int group_id = group_table_.GetGroupIDFromTensorName(tensor_name);
if (group_id != NULL_GROUP_ID && processed.find(group_id) == processed.end()) {
common_ready_groups.push_back(group_id);
processed.insert(group_id);
}
}
}
// For each ready group, form and fuse response lists independently
for (auto id : common_ready_groups) {
std::deque<Response> responses;
for (const auto &tensor_name : group_table_.GetGroupTensorNames(id)) {
if (message_table_.find(tensor_name) != message_table_.end()) {
// Uncached message
Response response = ConstructResponse(tensor_name, state.joined_size);
responses.push_back(std::move(response));
} else {
// Cached message
auto bit = response_cache_.peek_cache_bit(tensor_name);
responses.push_back(response_cache_.get_response(bit));
// Erase cache hit to avoid processing a second time.
cache_coordinator.erase_hit(bit);
}
}
FuseResponses(responses, state, response_list);
}
}
// 此时,message table 之中已经有了所有的可以reduce的列表
// At this point, rank zero should have a fully updated tensor count
// table and should know all the tensors that need to be reduced or
// gathered, and everyone else should have sent all their information
// to rank zero. We can now do reductions and gathers; rank zero will
// choose which ones and in what order, and will notify the other ranks
// before doing each reduction.
std::deque<Response> responses;
// responses 的来源是以下三部分
// 来源1,response_cache_ in rank 0
if (response_cache_.capacity() > 0) {
// Prepopulate response list with cached responses. Populate so that
// least recently used responses get priority. Since only the
// coordinator rank calls this code, use peek instead of get here to
// preserve cache order across workers.
// No need to do this when all ranks did Join.
if (state.joined_size < size_) {
for (auto bit : cache_coordinator.cache_hits()) {
responses.push_back(response_cache_.peek_response(bit));
}
}
}
// 来源2,逐一处理 ready_to_reduce
for (auto& tensor_name : ready_to_reduce) {
// Skip tensors in group that were handled earlier.
if (state.disable_group_fusion &&
!group_table_.empty() &&
group_table_.GetGroupIDFromTensorName(tensor_name) != NULL_GROUP_ID) {
continue;
}
Response response = ConstructResponse(tensor_name, state.joined_size);
responses.push_back(std::move(response));
}
// 来源3,join_response
if (state.joined_size == size_) {
// All ranks did Join(). Send the response, reset joined size.
Response join_response;
join_response.set_response_type(Response::JOIN);
join_response.add_tensor_name(JOIN_TENSOR_NAME);
responses.push_back(std::move(join_response));
state.joined_size = 0;
}
// 进行融合
FuseResponses(responses, state, response_list);
response_list.set_shutdown(should_shut_down);
// Broadcast final results to other ranks.
SendFinalTensors(response_list);
} else {
// 我是其他的 rank,非master,则发送自己已经ready的tensor给master,再接收已经ready的tensor列表
RequestList message_list;
message_list.set_shutdown(should_shut_down);
while (!message_queue_tmp.empty()) {
message_list.add_request(message_queue_tmp.front());
message_queue_tmp.pop_front();
}
// 给 Rank 0 发送 Request,同步阻塞
// Send ready tensors to rank zero
SendReadyTensors(message_list);
// 从 Rank 0 接受 ready response list,同步阻塞
// Receive final tensors to be processed from rank zero
RecvFinalTensors(response_list);
}
}
if (!response_list.responses().empty()) {
std::string tensors_ready;
for (const auto& r : response_list.responses()) {
tensors_ready += r.tensor_names_string() + "; ";
}
}
// If need_communication is false, meaning no uncached message coming in,
// thus no need to update cache.
if (need_communication && response_cache_.capacity() > 0) {
// All workers add supported responses to cache. This updates the cache
// order consistently across workers.
for (auto& response : response_list.responses()) {
if ((response.response_type() == Response::ResponseType::ALLREDUCE ||
response.response_type() == Response::ResponseType::ADASUM ||
response.response_type() == Response::ResponseType::ALLTOALL) &&
(int)response.devices().size() == size_) {
response_cache_.put(response, tensor_queue_, state.joined);
}
}
}
// Reassign cache bits based on current cache order.
response_cache_.update_cache_bits();
return response_list;
}
Далее остановимся на нескольких функциях.
5.2.3 IncrementTensorCount
Цель IncrementTensorCount — вычислить, готовы ли все тензоры.
еслиbool ready_to_reduce = count == (size_ - joined_size) ,
Вы будете знать, что все это можно уменьшить.
bool Controller::IncrementTensorCount(const Request& msg, int joined_size) {
auto& name = msg.tensor_name();
auto table_iter = message_table_.find(name);
if (table_iter == message_table_.end()) {
std::vector<Request> messages = {msg};
messages.reserve(static_cast<unsigned long>(size_));
message_table_.emplace(name, std::move(messages));
table_iter = message_table_.find(name);
} else {
std::vector<Request>& messages = table_iter->second;
messages.push_back(msg);
}
std::vector<Request>& messages = table_iter->second;
int count = (int)messages.size();
bool ready_to_reduce = count == (size_ - joined_size); // 判断是否可以 allreduce
return ready_to_reduce;
}
Конкретный вызов должен отвечать за ранг 0, чтобы увидеть, является ли он allreduce.
То есть, если IncrementTensorCount подсчитан, описание завершено, и Request можно добавить в message_table_.
if (is_coordinator_) {
while (!message_queue_tmp.empty()) {
// Pop the first available message
Request message = message_queue_tmp.front();
message_queue_tmp.pop_front();
if (message.request_type() == Request::JOIN) {
state.joined_size++;
continue;
}
// 这里调用
bool reduce = IncrementTensorCount(message, state.joined_size);
stall_inspector_.RecordUncachedTensorStart(
message.tensor_name(), message.request_rank(), size_);
if (reduce) {
ready_to_reduce.push_back(message.tensor_name());
}
}
5.2.4 RecvReadyTensors
Функция этой функции - собирать Запросы других рангов.
- Используйте MPI_Gather для определения длины сообщения;
- Собирать сообщения с помощью MPI_Gatherv;
- Поскольку ранг 0 уже обработан, ранг 0 здесь не обрабатывается;
void MPIController::RecvReadyTensors(std::vector<std::string>& ready_to_reduce,
std::vector<RequestList>& ready_list) {
// Rank zero has put all its own tensors in the tensor count table.
// Now, it should count all the tensors that are coming from other
// ranks at this tick.
// 1. Get message lengths from every rank.
auto recvcounts = new int[size_];
recvcounts[0] = 0;
MPI_Gather(MPI_IN_PLACE, 1, MPI_INT, recvcounts, 1, MPI_INT, RANK_ZERO,
mpi_ctx_.mpi_comm);
// 2. Compute displacements.
auto displcmnts = new int[size_];
size_t total_size = 0;
for (int i = 0; i < size_; ++i) {
if (i == 0) {
displcmnts[i] = 0;
} else {
displcmnts[i] = recvcounts[i - 1] + displcmnts[i - 1];
}
total_size += recvcounts[i];
}
// 3. Collect messages from every rank.
auto buffer = new uint8_t[total_size];
MPI_Gatherv(nullptr, 0, MPI_BYTE, buffer, recvcounts, displcmnts, MPI_BYTE,
RANK_ZERO, mpi_ctx_.mpi_comm);
// 4. Process messages.
// create a dummy list for rank 0
ready_list.emplace_back();
for (int i = 1; i < size_; ++i) {
auto rank_buffer_ptr = buffer + displcmnts[i];
RequestList received_message_list;
RequestList::ParseFromBytes(received_message_list, rank_buffer_ptr);
ready_list.push_back(std::move(received_message_list));
}
// 5. Free buffers.
delete[] recvcounts;
delete[] displcmnts;
delete[] buffer;
}
5.2.5 SendReadyTensors
Эта функция представляет собой запрос синхронизации другого ранга на ранг 0.
- Используйте MPI_Gather для определения длины сообщения;
- Собирать сообщения с помощью MPI_Gatherv;
void MPIController::SendReadyTensors(RequestList& message_list) {
std::string encoded_message;
RequestList::SerializeToString(message_list, encoded_message);
int encoded_message_length = (int)encoded_message.length() + 1;
int ret_code = MPI_Gather(&encoded_message_length, 1, MPI_INT, nullptr, 1,
MPI_INT, RANK_ZERO, mpi_ctx_.mpi_comm);
ret_code = MPI_Gatherv((void*)encoded_message.c_str(), encoded_message_length,
MPI_BYTE, nullptr, nullptr, nullptr, MPI_BYTE,
RANK_ZERO, mpi_ctx_.mpi_comm);
}
5.2.6 SendFinalTensors
Функция этой функции заключается в том, что ранг 0 отправляет окончательный результат другим рангам;
void MPIController::SendFinalTensors(ResponseList& response_list) {
// Notify all nodes which tensors we'd like to reduce at this step.
std::string encoded_response;
ResponseList::SerializeToString(response_list, encoded_response);
int encoded_response_length = (int)encoded_response.length() + 1;
MPI_Bcast(&encoded_response_length, 1, MPI_INT, RANK_ZERO, mpi_ctx_.mpi_comm);
MPI_Bcast((void*)encoded_response.c_str(), encoded_response_length, MPI_BYTE,
RANK_ZERO, mpi_ctx_.mpi_comm);
}
5.2.7 RecvFinalTensors
Функция этой функции заключается в том, что воркер принимает список готовых ответов от ранга 0 и синхронно блокирует
void MPIController::RecvFinalTensors(ResponseList& response_list) {
int msg_length;
int ret_code =
MPI_Bcast(&msg_length, 1, MPI_INT, RANK_ZERO, mpi_ctx_.mpi_comm);
auto buffer = new uint8_t[msg_length];
ret_code =
MPI_Bcast(buffer, msg_length, MPI_BYTE, RANK_ZERO, mpi_ctx_.mpi_comm);
ResponseList::ParseFromBytes(response_list, buffer);
delete[] buffer;
}
5.3 Выполнение действий на основе ответа
Далее мы рассмотрим другую важную операцию PerformOperation, которая предназначена для выполнения операций на основе ответа.
Последовательность вызова такова:
- BackgroundThreadLoop вызывает RunLoopOnce;
- Если RunLoopOnce имеет ранг 0, обработайте response_list и затем вызовите PerformOperation;
- PerformOperation, а затем вызовите op_manager -> ExecuteOperation------ ExecuteAllreduce;
Мы видим, что ComputeResponseList возвращает response_list, то есть тензор, соответствующий этим ответам, может делать allreduce. Затем он будет проходить каждый ответ и выполнять PerformOperation.
auto response_list =
state.controller->ComputeResponseList(horovod_global.shut_down, state);
int rank = state.controller->GetRank();
for (auto& response : response_list.responses()) {
PerformOperation(response, horovod_global);
}
5.3.1 PerformOperation
Продолжайте выполнять RunLoopOnce из ComputeResponseList, и рабочий узел будет вызывать PerformOperation для каждого опроса ответа, чтобы завершить соответствующую работу по сокращению в соответствии со списком response_list, возвращенным предыдущим ComputeResponseList.
Статус основного вызова = op_manager->ExecuteOperation(entries, response); выглядит следующим образом:
-
PerformOperation извлечет соответствующий TensorEntry из horovod_global.tensor_queue с помощью функции GetTensorEntriesFromResponse;
-
Если буфер не был инициализирован, вызовите horovod_global.fusion_buffer.InitializeBuffer для инициализации;
-
Тогда status = op_manager->ExecuteOperation(entries, response) вызовет разные op->Execute(entries, response) для выполнения операций сокращения;
-
Затем вызовите обратный вызов разных записей, где обратный вызов обычно является внешним интерфейсом для соответствующей операции;
// Process a Response by doing a reduction, a gather, a broadcast, or
// raising an error.
void PerformOperation(Response response, HorovodGlobalState& state) {
std::vector<TensorTableEntry> entries;
auto& timeline = horovod_global.timeline;
if (response.response_type() != Response::JOIN) {
horovod_global.tensor_queue.GetTensorEntriesFromResponse(response, entries,
state.joined);
if (entries.size() > 1) { // 如果多于1个,则可以进行fuse,以提高throughput
auto first_entry = entries[0];
Status status = horovod_global.fusion_buffer.InitializeBuffer(
horovod_global.controller->TensorFusionThresholdBytes(),
first_entry.device, first_entry.context,
horovod_global.current_nccl_stream,
[&]() { timeline.ActivityStartAll(entries, INIT_FUSION_BUFFER); },
[&]() { timeline.ActivityEndAll(entries); });
if (!status.ok()) {
for (auto& e : entries) {
timeline.End(e.tensor_name, nullptr);
// Callback can be null if the rank sent Join request.
if (e.callback != nullptr) {
e.callback(status);
}
}
return;
}
}
// On GPU data readiness is signalled by ready_event.
// 即使tensor可以进行操作了,但需要等待数据同步到显存
std::vector<TensorTableEntry> waiting_tensors;
for (auto& e : entries) {
if (e.ready_event != nullptr) {
timeline.ActivityStart(e.tensor_name, WAIT_FOR_DATA);
waiting_tensors.push_back(e);
}
}
while (!waiting_tensors.empty()) {
for (auto it = waiting_tensors.begin(); it != waiting_tensors.end();) {
if (it->ready_event->Ready()) {
timeline.ActivityEnd(it->tensor_name);
timeline.ActivityStart(it->tensor_name, WAIT_FOR_OTHER_TENSOR_DATA);
it = waiting_tensors.erase(it);
} else {
++it;
}
}
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}
}
Status status;
try {
// 进行collective的操作
status = op_manager->ExecuteOperation(entries, response);
} catch (const std::exception& ex) {
status = Status::UnknownError(ex.what());
}
... // 调用 callback 函数
}
5.3.2 ExecuteOperation
Тогда status = op_manager->ExecuteOperation(entries, response) вызовет разные op->Execute(entries, response) для выполнения операций сокращения.
А вот и OperationManager.
Status OperationManager::ExecuteOperation(std::vector<TensorTableEntry>& entries,
const Response& response) const {
if (response.response_type() == Response::ALLREDUCE) {
return ExecuteAllreduce(entries, response);
} else if (response.response_type() == Response::ALLGATHER) {
return ExecuteAllgather(entries, response);
} else if (response.response_type() == Response::BROADCAST) {
return ExecuteBroadcast(entries, response);
} else if (response.response_type() == Response::ALLTOALL) {
return ExecuteAlltoall(entries, response);
} else if (response.response_type() == Response::JOIN) {
return ExecuteJoin(entries, response);
} else if (response.response_type() == Response::ADASUM) {
return ExecuteAdasum(entries, response);
} else if (response.response_type() == Response::ERROR) {
return ExecuteError(entries, response);
} else {
throw std::logic_error("No operation found for response type provided");
}
}
5.3.3 ExecuteAllreduce
op->Execute(entries, response); это вызов чего-то вроде MPIAllreduce.Execute.
Status OperationManager::ExecuteAllreduce(std::vector<TensorTableEntry>& entries,
const Response& response) const {
for (auto& op : allreduce_ops_) {
if (op->Enabled(*param_manager_, entries, response)) {
return op->Execute(entries, response);
}
}
}
откуда берется allreduce_ops_? в конструкторе OperationManager.
allreduce_ops_(std::move(allreduce_ops)),
Итак, давайте посмотрим на allreduce_ops.
5.3.4 allreduce_ops
Добавьте allreduce_ops в CreateOperationManager.
Как видите, добавленные типы примерно такие:
- MPI_GPUAllreduce
- NCCLHierarchicalAllreduce
- NCCLAllreduce
- DDLAllreduce
- GlooAllreduce
- CCLAllreduce
- MPIAllreduce
- ......
OperationManager* CreateOperationManager(HorovodGlobalState& state) {
// Order of these operations is very important. Operations will be checked
// sequentially from the first to the last. The first 'Enabled' operation will
// be executed.
std::vector<std::shared_ptr<AllreduceOp>> allreduce_ops;
std::vector<std::shared_ptr<AllgatherOp>> allgather_ops;
std::vector<std::shared_ptr<BroadcastOp>> broadcast_ops;
std::vector<std::shared_ptr<AllreduceOp>> adasum_ops;
std::vector<std::shared_ptr<AlltoallOp>> alltoall_ops;
#if HAVE_MPI && HAVE_GPU // 如果构建了 MPI,就添加对应MPI_GPUAllreduce
if (mpi_context.IsEnabled()) {
#if HOROVOD_GPU_ALLREDUCE == 'M'
allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
new MPI_GPUAllreduce(&mpi_context, &gpu_context, &state)));
#elif HAVE_NCCL && HOROVOD_GPU_ALLREDUCE == 'N' // 如果编译了NCCL,就添加 AdasumGpuAllreduceOp
adasum_ops.push_back(std::shared_ptr<AllreduceOp>(new AdasumGpuAllreduceOp(&mpi_context, &nccl_context, &gpu_context, &state)));
allreduce_ops.push_back(
std::shared_ptr<AllreduceOp>(new NCCLHierarchicalAllreduce(
&nccl_context, &mpi_context, &gpu_context, &state)));
#elif HAVE_DDL && HOROVOD_GPU_ALLREDUCE == 'D'// 如果编译了DDL,就添加DDLAllreduce
allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
new DDLAllreduce(&ddl_context, &gpu_context, &state)));
#endif
#if HAVE_NCCL && HOROVOD_GPU_ALLREDUCE == 'N'// 如果编译了NCCL,就添加NCCLAllreduce
allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
new NCCLAllreduce(&nccl_context, &gpu_context, &state)));
#endif
5.3.5 MPIAllreduce
Поскольку существует множество типов allreduce_ops, мы возьмем MPIAllreduce в качестве примера следующим образом:
class MPIAllreduce : public AllreduceOp {
public:
MPIAllreduce(MPIContext* mpi_context, HorovodGlobalState* global_state);
virtual ~MPIAllreduce() = default;
Status Execute(std::vector<TensorTableEntry>& entries, const Response& response) override;
bool Enabled(const ParameterManager& param_manager,
const std::vector<TensorTableEntry>& entries,
const Response& response) const override;
protected:
MPIContext* mpi_context_;
};
MPIAllreduce::Execute
Здесь используется MPI_Allreduce, а также обрабатывается слияние, например MemcpyOutFusionBuffer.
#include "mpi_operations.h"
Status MPIAllreduce::Execute(std::vector<TensorTableEntry>& entries, const Response& response) {
auto& first_entry = entries[0];
const void* fused_input_data;
void* buffer_data;
size_t buffer_len;
int64_t num_elements = NumElements(entries);
// Copy memory into the fusion buffer.
auto& timeline = global_state_->timeline;
if (entries.size() > 1) {
timeline.ActivityStartAll(entries, MEMCPY_IN_FUSION_BUFFER);
MemcpyInFusionBuffer(entries, fused_input_data, buffer_data, buffer_len);
timeline.ActivityEndAll(entries);
} else {
fused_input_data = first_entry.tensor->data();
buffer_data = (void*) first_entry.output->data();
buffer_len = (size_t) first_entry.output->size();
}
if (response.prescale_factor() != 1.0) {
// Execute prescaling op
ScaleBuffer(response.prescale_factor(), entries, fused_input_data, buffer_data, num_elements);
fused_input_data = buffer_data; // for unfused, scale is done out of place
}
// Do allreduce.
timeline.ActivityStartAll(entries, MPI_ALLREDUCE);
const void* sendbuf = entries.size() > 1 || fused_input_data == buffer_data
? MPI_IN_PLACE : fused_input_data;
int op = MPI_Allreduce(sendbuf, buffer_data,
(int) num_elements,
mpi_context_->GetMPIDataType(first_entry.tensor),
mpi_context_->GetMPISumOp(first_entry.tensor->dtype()),
mpi_context_->GetMPICommunicator(Communicator::GLOBAL));
timeline.ActivityEndAll(entries);
if (response.postscale_factor() != 1.0) {
// Execute postscaling op
ScaleBuffer(response.postscale_factor(), entries, buffer_data, buffer_data, num_elements);
}
// Copy memory out of the fusion buffer.
if (entries.size() > 1) {
timeline.ActivityStartAll(entries, MEMCPY_OUT_FUSION_BUFFER);
MemcpyOutFusionBuffer(buffer_data, entries);
timeline.ActivityEndAll(entries);
}
return Status::OK();
}
Конкретная логика на данный момент такова:
+---------------------------------+
| | +-----------------------+
| BackgroundThreadLoop | | |
| | | OperationManager |
| +--------------------------+ | | |
| | RunLoopOnce | | | |
| | | | | |
| | | | | | +--> GPUAllreduce
| | ComputeResponseList | | +----------> ExecuteOperation | |
| | + | | | | + | |
| | | | | | | | | +--> NCCLHierarchicalAllreduce
| | | | | | | | | |
| | | | | | 1 | | 2 | |
| | v | | | | | | +--> NCCLAllreduce
| | | | | | | | |
| | PerformOperation +----------+ | v | |
| | | | | ExecuteAllreduce | +--> DDLAllreduce
| +--------------------------+ | | + | |
| | | | | |
+---------------------------------+ | | | +--> GlooAllreduce
| | allreduce_ops----------+
| | | | +----------------+
| | | +--> | MPIAllreduce |
+-----------------------+ | |
| | |
+----------------------------------> Execute |
3 | |
+----------------+
Телефон такой:
На этом этапе архитектура фонового потока в основном проясняется, и нам нужно вернуться, чтобы посмотреть, как реализован оптимизатор в следующей статье.
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
Если вы хотите получать своевременные новости о статьях, написанных отдельными лицами, или хотите видеть технические материалы, рекомендованные отдельными лицами, обратите внимание.
ссылка 0xFF
Узнайте о распределенном обучении Pytorch, этого достаточно!
horovod использует обучение распределенной модели с помощью horovod
Новое видение Spark: упрощение использования глубокого обучения
Scaling model training in PyTorch using distributed data parallel
Обучение масштабируемой модели в PyTorch с использованием распределенного параллелизма данных
A developer-friendly guide to mixed precision training with PyTorch
Удобное для разработчиков руководство по обучению PyTorch смешанной точности
На дворе 2020 год, почему глубокое обучение еще не на 100 % в облаке?
Почему в 2020 году нельзя проводить 100% глубокое обучение в облаке?
Познакомить вас с распределенной обучающей структурой популярного жареного цыпленка Horovod.
Распределенное обучение на нескольких GPU с Horovod в режиме Amazon SageMaker Pipeline
kubernetes training_Распределенное обучение глубокому обучению с хороводом в Kubernetes
Horovod — распределенная среда глубокого обучения на основе TensorFlow.
В этой статье объясняются необходимые знания о распределенном обучении Tensorflow.
Анализ исходного кода хоровода (1)