HorovodРаспределенная платформа обучения с открытым исходным кодом для Uber, поддерживающая основные платформы машинного обучения (Tensorflow, PyTorch и MxNet). Эта статья в основном основана на версииv0.21.1
Представляет основную реализацию Horovod и ее интеграцию с различными платформами.
Рабочий процесс Horovod относительно прост, есть очередь сообщений для полученияAllReduce
, AllGather
а такжеBroadcast
Эти триop
запрос, существует фоновый поток, который через равные промежутки времени будет опрашивать очередь сообщений и получать пакетop
После этого будетop
серединаtensor
Выполняется слияние, а затем выполняются соответствующие операции. еслиtensor
в видеопамяти, то он будет использоватьNCCL
библиотечное исполнение. тогда как если он находится в памяти, он будет использоватьMPI
илиGloo
воплощать в жизнь.
Основной код Horovod находится вhorovod/common
в каталоге.operations.cc
Файл эквивалентен записи Хоровода, он содержитBackgroundThreadLoop
,RunLoopOnce
и другие важные функции. Глядя на эти функции, вы можете получить представление.
Сначала оцените функциюRunLoopOnce
, здесь опущен некоторый оптимизированный код, например использование кеша ответов, автонастройка и т.д.:
bool RunLoopOnce(HorovodGlobalState& state) {
// 检查从上一个cycle开始到现在,是否已经超过一个cycle时间(CycleTimeMs)
auto start_time = std::chrono::steady_clock::now();
auto sleep_duration = state.last_cycle_start +
std::chrono::microseconds(long(
state.parameter_manager.CycleTimeMs() * 1000.)) -
start_time;
if (sleep_duration > std::chrono::steady_clock::duration::zero()) {
std::this_thread::sleep_for(sleep_duration);
}
state.last_cycle_start = std::chrono::steady_clock::now();
// 在Timeline中记录,用户拿到Timeline结果后,可以在chrome中查看
if (state.mark_cycles_in_timeline) {
// Mark start of the new cycle.
state.timeline.MarkCycleStart();
}
auto response_list =
state.controller->ComputeResponseList(horovod_global.shut_down, state);
state.mark_cycles_in_timeline =
state.controller->MarkCyclesInTimelinePending();
// 对于每个response,做collective的操作
for (auto& response : response_list.responses()) {
PerformOperation(response, horovod_global);
}
return !response_list.shutdown();
}
отHorovodRunOnce
В функции мы видим, что рабочий процесс Хоровода примерно такой, как было сказано ранее, который представляет собой модель производителя и потребителя.controller
Вот работа координации: она будет сообщать, какие запросы каждого ранга готовы, и выполнять коллективные операции для готовых запросов.
Далее давайте посмотрим наComputeResponseList
эта функция. Эта функция очень длинная, до строк 380. Чтобы легче понять, что делает эта функция, сначала удалите кеш и проверьте код остановки:
ResponseList Controller::ComputeResponseList(std::atomic_bool& shut_down,
HorovodGlobalState& state) {
CacheCoordinator cache_coordinator(response_cache_.num_active_bits());
// message queue used only in this cycle
std::deque<Request> message_queue_tmp;
tensor_queue_.PopMessagesFromQueue(message_queue_tmp);
for (auto& message : message_queue_tmp) {
if (message.request_type() == Request::JOIN) {
state.joined = true;
cache_coordinator.set_uncached_in_queue(true);
continue;
}
}
// Flag indicating that the background thread should shut down.
bool should_shut_down = shut_down;
cache_coordinator.set_should_shut_down(should_shut_down);
ResponseList response_list;
response_list.set_shutdown(cache_coordinator.should_shut_down());
{
// Collect all tensors that are ready to be reduced. Record them in the
// tensor count table (rank zero) or send them to rank zero to be
// recorded (everyone else).
std::vector<std::string> ready_to_reduce;
if (is_coordinator_) {
// 对于master进程,记录已经ready的tensor。注意此时message_queue_tmp中的request是来自
// master进程
while (!message_queue_tmp.empty()) {
// Pop the first available message
Request message = message_queue_tmp.front();
message_queue_tmp.pop_front();
if (message.request_type() == Request::JOIN) {
state.joined_size++;
continue;
}
bool reduce = IncrementTensorCount(message, state.joined_size);
if (reduce) {
ready_to_reduce.push_back(message.tensor_name());
}
}
// 接收其他rank的ready的tensor
std::vector<RequestList> ready_list;
RecvReadyTensors(ready_to_reduce, ready_list);
// 处理来自其他rank的request。size_是指有多少个rank
for (int i = 1; i < size_; ++i) {
auto received_message_list = ready_list[i];
for (auto& received_message : received_message_list.requests()) {
auto& received_name = received_message.tensor_name();
// Join类型消息是指有新的rank加入,Horovod支持弹性
if (received_message.request_type() == Request::JOIN) {
state.joined_size++;
continue;
}
// 增加该tensor已经ready的rank的个数,如果所有rank都ready,则发给其他rank
bool reduce = IncrementTensorCount(received_message, state.joined_size);
if (reduce) {
ready_to_reduce.push_back(received_name);
}
}
if (received_message_list.shutdown()) {
// Received SHUTDOWN request from one of the workers.
should_shut_down = true;
}
}
// Check if tensors from previous ticks are ready to reduce after Joins.
if (state.joined_size > 0) {
for (auto& table_iter : message_table_) {
int count = (int)table_iter.second.size();
if (count == (size_ - state.joined_size) &&
std::find(ready_to_reduce.begin(), ready_to_reduce.end(),
table_iter.first) == ready_to_reduce.end()) {
state.timeline.NegotiateEnd(table_iter.first);
ready_to_reduce.push_back(table_iter.first);
}
}
}
// 这个条件有点让人费解,看字面意思是如果禁止group fusion,并且group_table_非空,则fuse?
if (state.disable_group_fusion && !group_table_.empty()) {
// Extract set of common groups from coordinator tensor list and cache hits.
std::vector<int> common_ready_groups;
std::unordered_set<int> processed;
for (const auto& tensor_name : ready_to_reduce) {
int group_id = group_table_.GetGroupIDFromTensorName(tensor_name);
if (group_id != NULL_GROUP_ID && processed.find(group_id) == processed.end()) {
common_ready_groups.push_back(group_id);
processed.insert(group_id);
// Leaving name in list, to be skipped later.
}
}
// For each ready group, form and fuse response lists independently
for (auto id : common_ready_groups) {
std::deque<Response> responses;
for (const auto &tensor_name : group_table_.GetGroupTensorNames(id)) {
if (message_table_.find(tensor_name) != message_table_.end()) {
// Uncached message
Response response = ConstructResponse(tensor_name, state.joined_size);
responses.push_back(std::move(response));
}
}
FuseResponses(responses, state, response_list);
}
}
// At this point, rank zero should have a fully updated tensor count
// table and should know all the tensors that need to be reduced or
// gathered, and everyone else should have sent all their information
// to rank zero. We can now do reductions and gathers; rank zero will
// choose which ones and in what order, and will notify the other ranks
// before doing each reduction.
std::deque<Response> responses;
for (auto& tensor_name : ready_to_reduce) {
// Skip tensors in group that were handled earlier.
if (state.disable_group_fusion &&
!group_table_.empty() &&
group_table_.GetGroupIDFromTensorName(tensor_name) != NULL_GROUP_ID) {
continue;
}
Response response = ConstructResponse(tensor_name, state.joined_size);
responses.push_back(std::move(response));
}
if (state.joined_size == size_) {
// All ranks did Join(). Send the response, reset joined size.
Response join_response;
join_response.set_response_type(Response::JOIN);
join_response.add_tensor_name(JOIN_TENSOR_NAME);
responses.push_back(std::move(join_response));
state.joined_size = 0;
}
FuseResponses(responses, state, response_list);
response_list.set_shutdown(should_shut_down);
// Broadcast final results to other ranks.
SendFinalTensors(response_list);
} else {
// 非master,则发送自己已经ready的tensor给master,再接收已经ready的tensor列表
RequestList message_list;
message_list.set_shutdown(should_shut_down);
while (!message_queue_tmp.empty()) {
message_list.add_request(message_queue_tmp.front());
message_queue_tmp.pop_front();
}
// Send ready tensors to rank zero
SendReadyTensors(message_list);
// Receive final tensors to be processed from rank zero
RecvFinalTensors(response_list);
}
}
if (!response_list.responses().empty()) {
std::string tensors_ready;
for (const auto& r : response_list.responses()) {
tensors_ready += r.tensor_names_string() + "; ";
}
}
// Reassign cache bits based on current cache order.
response_cache_.update_cache_bits();
return response_list;
}
В Хороводе каждая карта соответствует тренировочному процессу, называемомуrank
. Например, 4 карты, соответствующие каждому процессуrank
тогда[0,1,2,3]
.rank
Процесс с 0 является ведущим, а остальные — рабочими. рабочий будет вComputeResponseList
Отправьте готовый тензор мастеру. Если тензор во всехrank
Чжунду готов, мастер уведомит другихrank
, вы можете выполнять коллективные операции над этим тензором.
Продолжайте смотреть наHorovodRunOnce
Еще одна важная функция, которая появляется вPerformOperation
. Эта функция относительно понятна, в основном для выполнения трех задач:
- Сделайте слияние на тензорах: объедините несколько тензоров в большой тензор, а затем выполните коллективные операции
- ждать поступления данных
- Выполнять коллективные операции
void PerformOperation(Response response, HorovodGlobalState& state) {
std::vector<TensorTableEntry> entries;
auto& timeline = horovod_global.timeline;
if (response.response_type() != Response::JOIN) {
// 这里有点奇怪,直接用了horovod_global这个变量,而拿joined的时候,又是从state里拿的
horovod_global.tensor_queue.GetTensorEntriesFromResponse(response, entries,
state.joined);
for (auto& e : entries) {
timeline.Start(e.tensor_name, response.response_type());
}
if (entries.size() > 1) {
// 如果多于1个,则可以进行fuse,以提高throughput
auto first_entry = entries[0];
// Note: it is OK for different entries to come from different frameworks
// since buffer allocated here is guaranteed to survive at least till the
// end of this operation.
Status status = horovod_global.fusion_buffer.InitializeBuffer(
horovod_global.controller->TensorFusionThresholdBytes(),
first_entry.device, first_entry.context,
horovod_global.current_nccl_stream,
[&]() { timeline.ActivityStartAll(entries, INIT_FUSION_BUFFER); },
[&]() { timeline.ActivityEndAll(entries); });
if (!status.ok()) {
LOG(DEBUG, horovod_global.controller->GetRank()) << "InitializeBuffer Failed";
for (auto& e : entries) {
timeline.End(e.tensor_name, nullptr);
// Callback can be null if the rank sent Join request.
if (e.callback != nullptr) {
e.callback(status);
}
}
return;
}
}
// On GPU data readiness is signalled by ready_event.
// 即使tensor可以进行操作了,但需要等待数据同步到显存
std::vector<TensorTableEntry> waiting_tensors;
for (auto& e : entries) {
if (e.ready_event != nullptr) {
timeline.ActivityStart(e.tensor_name, WAIT_FOR_DATA);
waiting_tensors.push_back(e);
}
}
while (!waiting_tensors.empty()) {
for (auto it = waiting_tensors.begin(); it != waiting_tensors.end();) {
if (it->ready_event->Ready()) {
timeline.ActivityEnd(it->tensor_name);
timeline.ActivityStart(it->tensor_name, WAIT_FOR_OTHER_TENSOR_DATA);
it = waiting_tensors.erase(it);
} else {
++it;
}
}
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}
for (auto& e : entries) {
if (e.ready_event != nullptr) {
timeline.ActivityEnd(e.tensor_name);
}
}
}
// 终于可以进行collective的操作了
Status status;
try {
status = op_manager->ExecuteOperation(entries, response);
} catch (const std::exception& ex) {
LOG(DEBUG, horovod_global.controller->GetRank()) << "ExecuteOperation Failed";
status = Status::UnknownError(ex.what());
}
if (!status.in_progress()) {
for (auto& e : entries) {
timeline.End(e.tensor_name, status.ok() ? e.output : nullptr);
// Callback can be null if the rank sent Join request.
if (e.callback != nullptr) {
e.callback(status);
}
}
}
}
Пока представлен основной рабочий процесс Хоровода.