0x00 сводка
Эта статья является седьмой в серии распределенных PyTorch, в которой представлена концепция групп процессов, на которую опирается DistributedDataParallel.
Другие статьи из этой серии:
Другие связанные статьи:
Автоматическая дифференциация инструментов глубокого обучения (1)
Автоматическая дифференциация инструментов глубокого обучения (2)
Автоматическая дифференциация оружия глубокого обучения (3) --- Пример интерпретации
[Анализ исходного кода] Как PyTorch реализует прямое распространение (1) --- Базовый класс (1)
[Анализ исходного кода] Как PyTorch реализует прямое распространение (2) --- Базовый класс (ниже)
[Анализ исходного кода] Как PyTorch реализует прямое распространение (3) --- конкретная реализация
[Анализ исходного кода] Как Pytorch реализует обратное распространение (1) ---- вызов движка
[Анализ исходного кода] Как PyTorch реализует обратное распространение (4) ---- конкретный алгоритм
0x01 Обзор
1.1 Основные понятия
Что касается распределенной связи, PyTorch предоставляет несколько концепций: группа процессов, серверная часть, инициализация, хранилище.
- группа процессов: DDP — это настоящее распределенное обучение, которое может использовать несколько машин для формирования задачи параллельной работы. Чтобы обеспечить связь между различными рабочими DDP, PyTorch устанавливает концепцию групп процессов.
- задняя часть: Концепция бэкенда — логичная концепция. По сути, серверная часть представляет собой механизм связи IPC. Для пользователя это способ общения с коллекцией.С точки зрения кода, это то, какой процесс (серия процессов) идти, и использует ли бэкенд ProcessGroupMPI или ProcessGroupGloo .....
- инициализация: Хотя у нас есть концепция бэкенда и группы процессов, как сделать так, чтобы рабочие обнаруживали друг друга перед созданием группы процессов? Для этого требуется метод инициализации, сообщающий всем о передаче сообщения: как связаться с процессом на других машинах?
-
Store: можно рассматривать как распределенное хранилище «ключ-значение», которое обменивается информацией между процессами в группе и инициализирует распределенные пакеты (путем явного создания хранилища как
init_method
заменять).
1.2 Инициализировать группу процессов
Перед вызовом любого другого метода DDP вам необходимо использоватьtorch.distributed.init_process_group()
Инициализировать группу процессов.
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
import os
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
Этот метод инициализирует распределенную группу процессов по умолчанию и распределенный пакет. Этот метод блокируется, пока все процессы не присоединятся.Функция определяется следующим образом:
init_process_group ( backend ,
init_method = None ,
timeout = default_pg_timeout ,
world_size =- 1 ,
rank =- 1 ,
store = None ,
group_name = '' ,
pg_options = None )
Существует два основных способа инициализации группы процессов:
- Явно укажите store, rank и world_size.
- Укажите init_method (строка URL), который указывает, где и как обнаруживать одноранговые узлы.
Если ни один из них не указан,init_method
предполагается "env://".
Итак, вы можете видеть, что store и init_method являются взаимоисключающими.
Параметры следующие:
-
задняя часть- Бэкенд для использования. Допустимые значения включают
mpi
,gloo
,иnccl
. Поле должно быть представлено в виде строчной строки (например,"gloo"
) дано, а также может бытьBackend
свойства (напр.Backend.GLOO
) для доступа . если вnccl
Серверная часть использует несколько процессов на машине, каждый процесс должен иметь эксклюзивный доступ к каждому используемому графическому процессору, поскольку совместное использование графических процессоров между процессами может привести к взаимоблокировкам. -
init_method– URL-адрес, указывающий, как инициализировать группу процессов. если не указано
init_method
илиstore
Если указано, по умолчанию используется "env://". иstore
взаимоисключающий. -
world_size– Количество процессов, участвующих в задании. если
store
указан, требуется world_size. -
rank– уровень текущего процесса (должно быть значение от 0 до
world_size
число между -1). еслиstore
указано, требуется ранг. -
store– Хранилище ключей/значений, доступное для всех работников для обмена информацией о соединении/адресе. и
init_method
взаимоисключающий. -
timeout– Время ожидания операции, выполняемой над группой процессов, истекло. Значение по умолчанию равно 30 минутам. Это относится к
gloo
задняя часть. заnccl
, который находится только в переменной окруженияNCCL_BLOCKING_WAIT
илиNCCL_ASYNC_ERROR_HANDLING
Применяется при установке на 1. - group_name- Название группы.
- pg_options ( Process Group Options , optional) — параметры группы процессов, указывающие, какие дополнительные параметры необходимо передать при построении конкретной группы процессов.
0x02 Концепция и дизайн
2.1 Функция
По умолчанию коллективная коммуникация выполняется в группе по умолчанию (также известной как мир) и требует, чтобы все процессы выполняли вызовы распределенных функций. Тем не менее, некоторые рабочие места могли бы выиграть от более детального общения. Здесь в игру вступают распределенные группы.new_group()
Функцию можно использовать для создания новой распределенной группы, которая представляет собой произвольное подмножество всех процессов.new_group()
Возвращает непрозрачный дескриптор группы, который можно использовать какgroup
Аргументы предоставляются всем агрегатным функциям (агрегатные функции — это распределенные функции, используемые для обмена информацией в определенных шаблонах программирования).
2.2 Суть
Отложите концепцию, посмотрите на ее суть из кода. Группа процессов должна установить коммуникационную нить для каждого процесса обучения. Основной поток (вычислительный поток) обучается на переднем плане, а этот коммуникационный поток обменивается данными в фоновом режиме. Возьмем в качестве примера ProcessGroupMPI, который добавляет еще одну очередь в коммуникационный поток для буферной и асинхронной обработки. Таким образом, все процессы в группе процессов могут образовывать коллектив для выполнения коллективных коммуникационных операций в фоновом режиме.
Например, в рабочем потоке слева есть два потока: вычислительный поток отвечает за вычисление градиента, а затем требует, чтобы коммуникационный поток обменивался градиентами с другими рабочими процессами.
+---------------------------------------------------------------+ +--------------+
| Worker Process | | Other Worker |
| | | |
| +----------------------+ +-----------------------+ | | +----------+ |
| | Computation thread | | Communication thread | | | | Comm | |
| | | | | | | | thread | |
| | | | | | | | | |
| | Main Thread | | workerThread_ | | | | | |
| | | | | | | | | |
| | | | | | | | | |
| | Gradient computation | | | | | | | |
| | + | | | | | | | |
| | | | | | + | + | | | |
| | | | | | /| | |\ | | | |
| | v | /|_|\ | | / +-+----+ \ | | | |
| | Does All+Reduce |/ grad\| Does communication |/ Gradient \| | | |
| | |\ _ /| |\ /| | | |
| | | \| |/ | | \ +-+----+ / | | | |
| | | | | \| | |/ | | | |
| | | | | + | + | | | |
| | | | | | | | | |
| | | | | | | | | |
| +----------------------+ +-----------------------+ | | +----------+ |
| | | |
+---------------------------------------------------------------+ +--------------+
0x03 использовать
Теперь, когда мы знаем природу групп процессов, давайте посмотрим, как их использовать.
Сначала в _ddp_init_helper будет сгенерирован dist.Reducer, а группа процессов будет передана как один из параметров Reducer.
def _ddp_init_helper(self, parameters, expect_sparse_gradient, param_to_name_mapping):
"""
Initialization helper function that does the following:
(1) bucketing the parameters for reductions
(2) resetting the bucketing states
(3) registering the grad hooks
(4) Logging constructin-time DDP logging data
(5) passing a handle of DDP to SyncBatchNorm Layer
"""
self.num_iterations = 0
# The bucket size limit is specified in the constructor.
# Additionally, we allow for a single small bucket for parameters
# that are defined first, such that their gradients don't spill into
# a much larger bucket, adding unnecessary latency after gradient
# computation finishes. Experiments showed 1MB is a reasonable value.
bucket_indices = dist._compute_bucket_assignment_by_size(
parameters[0],
[dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap],
expect_sparse_gradient[0],
)
# Note: reverse list of buckets because we want to approximate the
# order in which their gradients are produced, and assume they
# are used in the forward pass in the order they are defined.
self.reducer = dist.Reducer(
parameters,
list(reversed(bucket_indices)),
self.process_group, # 这里使用了
expect_sparse_gradient,
self.bucket_bytes_cap,
self.find_unused_parameters,
self.gradient_as_bucket_view,
param_to_name_mapping,
)
Во-вторых, в функции построения Reducer группа процессов будет настроена на переменную-член Reducer process_group_ выше.
Reducer::Reducer(
std::vector<std::vector<at::Tensor>> replicas,
std::vector<std::vector<size_t>> bucket_indices,
c10::intrusive_ptr<c10d::ProcessGroup> process_group,
std::vector<std::vector<bool>> expect_sparse_gradients,
int64_t bucket_bytes_cap,
bool find_unused_parameters,
bool gradient_as_bucket_view,
std::unordered_map<size_t, std::string> paramNames)
: replicas_(std::move(replicas)),
process_group_(std::move(process_group)), // 在这里
Наконец, когда необходимо выполнить all-reduce для градиента, для обработки будет вызван процесс process_group_->allreduce(tensors).
Теперь мы знаем, как использовать группы процессов.
void Reducer::all_reduce_bucket(Bucket& bucket) {
std::vector<at::Tensor> tensors;
tensors.reserve(bucket.replicas.size());
for (const auto& replica : bucket.replicas) {
tensors.push_back(replica.contents);
}
if (comm_hook_ == nullptr) {
bucket.work = process_group_->allreduce(tensors); // 这里会进行调用
} else {
GradBucket grad_bucket(
next_bucket_,
tensors[0],
// Since currently we do not support single-process multiple-device
// mode, we can assume only one replica in the bucket.
bucket.replicas[0].offsets,
bucket.replicas[0].lengths,
bucket.replicas[0].sizes_vec);
bucket.future_work = comm_hook_->runHook(grad_bucket);
}
}
сборка 0x04
4.1 Мир Python
4.1.1 rendezvous
Из исходного кода init_process_group несколько реализаций сборки отличаются в деталях, мы просто смотрим на gloo и mpi.
-
gloo использует рандеву для установки главного адреса.
-
MPI не требует рандеву, а начинается с mpirun.
Оба метода создают ProcessGroup, назначенную default_pg, а затем используют default_pg для установки GroupMember.WORLD.
def _update_default_pg(pg):
GroupMember.WORLD = group.WORLD = pg
Конкретный код init_process_group выглядит следующим образом:
def init_process_group(backend,
init_method=None,
timeout=default_pg_timeout,
world_size=-1,
rank=-1,
store=None,
group_name='',
pg_options=None):
"""
Initializes the default distributed process group, and this will also
initialize the distributed package.
There are 2 main ways to initialize a process group:
1. Specify ``store``, ``rank``, and ``world_size`` explicitly.
2. Specify ``init_method`` (a URL string) which indicates where/how
to discover peers. Optionally specify ``rank`` and ``world_size``,
or encode all required parameters in the URL and omit them.
If neither is specified, ``init_method`` is assumed to be "env://".
"""
global _pg_group_ranks
global _backend
global _default_pg_init_method
if store is not None:
assert world_size > 0, 'world_size must be positive if using store'
assert rank >= 0, 'rank must be non-negative if using store'
elif init_method is None:
init_method = "env://"
backend = Backend(backend)
if backend == Backend.MPI:
default_pg = _new_process_group_helper( # 生成了一个 ProcessGroup 赋值给 default_pg
-1,
-1,
[],
Backend.MPI,
None,
group_name=group_name,
timeout=timeout)
_update_default_pg(default_pg) # 用 default_pg 设置 GroupMember.WORLD
else:
# backward compatible API
if store is None:
rendezvous_iterator = rendezvous( # 先生成一个store
init_method, rank, world_size, timeout=timeout
)
store, rank, world_size = next(rendezvous_iterator)
store.set_timeout(timeout)
default_pg = _new_process_group_helper( # 再进行构建 ProcessGroup
world_size,
rank,
[],
backend,
store,
pg_options=pg_options,
group_name=group_name,
timeout=timeout)
_update_default_pg(default_pg) # 用 default_pg 设置 GroupMember.WORLD
_pg_group_ranks[GroupMember.WORLD] = {i: i for i in range(GroupMember.WORLD.size())} # type: ignore[attr-defined, index]
_backend = _pg_map[GroupMember.WORLD][0] # type: ignore[index]
_default_pg_init_method = init_method
# barrier at the end to ensure that once we return from this method, all
# process groups including global variables are updated correctly on all
# ranks.
if backend == Backend.MPI:
# MPI backend doesn't use store.
barrier()
else:
# Use store based barrier here since barrier() used a bunch of
# default devices and messes up NCCL internal state.
_store_based_barrier(rank, store, timeout)
# Set sequence numbers for gloo and nccl process groups.
if get_backend(default_pg) in [Backend.GLOO, Backend.NCCL]:
default_pg._set_sequence_number_for_group()
4.1.2 _new_process_group_helper
Будут использоваться различные бэкенды_new_process_group_helper
конкретная конструкция,_new_process_group_helper
На самом деле вызываются разные реализации C++, такие как ProcessGroupGloo, ProcessGroupMPI, ProcessGroupNCCL.
def _new_process_group_helper(world_size,
rank,
group_ranks,
backend,
store,
pg_options=None,
group_name=None,
timeout=default_pg_timeout):
"""
Create a new distributed process group.
This function must be called by ALL processes in the global group, even if
the calling process is not part of the newly created group. In that case,
this function returns GroupMember.NON_GROUP_MEMBER.
This function is called with ``group_ranks == []`` for the default group.
"""
global _pg_map
global _group_count
global _pg_names
if not group_name:
group_name = str(_group_count)
_group_count += 1
# The list of group ranks is empty if we're creating the default group.
is_default_group = (len(group_ranks) == 0)
backend = Backend(backend)
pg: Union[ProcessGroupGloo, ProcessGroupMPI, ProcessGroupNCCL]
if backend == Backend.MPI:
pg = ProcessGroupMPI.create(group_ranks) # 构建了 ProcessGroupMPI
if not pg:
return GroupMember.NON_GROUP_MEMBER
_pg_map[pg] = (Backend.MPI, None)
_pg_names[pg] = group_name
else:
# If this is a subgroup (which means group_ranks is specified),
# we check if the current process is a member of the new group.
if not is_default_group:
global_rank = _get_default_group().rank()
if global_rank not in group_ranks:
return GroupMember.NON_GROUP_MEMBER
# Use the group name as prefix in the default store, such that
# a single store can be reused by multiple groups.
prefix_store = PrefixStore(group_name, store)
if backend == Backend.GLOO:
pg = ProcessGroupGloo( # 构建了 ProcessGroupGloo
prefix_store,
rank,
world_size,
timeout=timeout)
_pg_map[pg] = (Backend.GLOO, store)
_pg_names[pg] = group_name
elif backend == Backend.NCCL:
if pg_options is not None:
assert isinstance(pg_options, ProcessGroupNCCL.Options), \
"Expected pg_options argument to be of type ProcessGroupNCCL.Options"
else:
# default pg_options for NCCL
pg_options = ProcessGroupNCCL.Options()
pg_options.is_high_priority_stream = False
pg_options._timeout = timeout
pg = ProcessGroupNCCL( # 构建了 ProcessGroupNCCL
prefix_store,
rank,
world_size,
pg_options)
_pg_map[pg] = (Backend.NCCL, store)
_pg_names[pg] = group_name
else:
pg = getattr(Backend, backend.upper())(
prefix_store,
rank,
world_size,
timeout)
_pg_map[pg] = (backend, store)
_pg_names[pg] = group_name
return pg
Текущий процесс выглядит следующим образом:
+
|
|
v
init_process_group
+
|
|
+------------+-------------+
| |
| |
v v
Backend.MPI Backend.GLOO & Backend.NCCL
+ +
| |
| |
| v
| store = rendezvous()
| +
| |
| |
+------------+-------------+
|
|
v
_new_process_group_helper
+
|
|
|
+------------------------------------------------------+
| | |
| | |
v v v
ProcessGroupMPI ProcessGroupGloo(store) ProcessGroupNCCL(store)
4.1.3
Возьмем в качестве примера ProcessGroupMPI. Мы видим, что базовым классом ProcessGroupMPI является ProcessGroup.
class ProcessGroupMPI(ProcessGroup):
def __init__(
self,
rank: int,
size: int,
pgComm: int,
): ...
@staticmethod
def create(ranks: List[int]) -> ProcessGroupMPI: ...
ProcessGroup определяет несколько функций связи с коллекциями, но ни одна из них не реализована, но из его комментариев мы можем видеть, что производные классы будут иметь несколько перегруженных реализаций.
class ProcessGroup(__pybind11_builtins.pybind11_object):
# no doc
def allgather(self, *args, **kwargs): # real signature unknown; restored from __doc__
"""
allgather(*args, **kwargs)
Overloaded function.
1. allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: List[List[at::Tensor]], input_tensors: List[at::Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x000001A9460233F0>) -> c10d::ProcessGroup::Work
2. allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: List[at::Tensor], input_tensor: at::Tensor) -> c10d::ProcessGroup::Work
"""
pass
def allgather_coalesced(self, output_lists, *args, **kwargs): # real signature unknown; NOTE: unreliably restored from __doc__
""" allgather_coalesced(self: torch._C._distributed_c10d.ProcessGroup, output_lists: List[List[at::Tensor]], input_list: List[at::Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x000001A946023370>) -> c10d::ProcessGroup::Work """
pass
def allreduce(self, *args, **kwargs): # real signature unknown; restored from __doc__
"""
allreduce(*args, **kwargs)
Overloaded function.
1. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: List[at::Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x000001A946023570>) -> c10d::ProcessGroup::Work
2. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: List[at::Tensor], op: torch._C._distributed_c10d.ReduceOp = <ReduceOp.SUM: 0>) -> c10d::ProcessGroup::Work
3. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: at::Tensor, op: torch._C._distributed_c10d.ReduceOp = <ReduceOp.SUM: 0>) -> c10d::ProcessGroup::Work
"""
pass
И независимо от того, какой класс ProcessGroup является производным, он указывает на мир C++, например, в torch/csrc/distributed/c10d/init.cpp есть следующий код:
// Define static create function instead of a constructor, because
// this function may return null. This happens if this process is not
// part of a sub group that is to be created.
processGroupMPI.def_static(
"create",
[](std::vector<int> ranks) {
return ::c10d::ProcessGroupMPI::createProcessGroupMPI(ranks);
},
py::call_guard<py::gil_scoped_release>());
Таким образом, видно, что последним вызовом является createProcessGroupMPI, поэтому мы переходим непосредственно к миру C++, чтобы увидеть.
4.2 Мир С++
4.2.1 Определение ProcessGroupMPI
Определение ProcessGroupMPI находится в torch/lib/c10d/ProcessGroupMPI.cpp. Это эквивалентно выполнению рабочей очереди и асинхронных операций. Вот несколько замечаний:
-
Все функции класса ProcessGroupMPI должны вызываться в одном и том же порядке среди процессов в группе. Только так мы можем гарантировать совпадение одних и тех же вызовов между процессами.
-
Все функции MPI, предоставляемые классом ProcessGroupMPI, асинхронно отправляются в рабочие потоки. Следовательно, ProcessGroupMPI полагается на реализацию MPI, которая используется для обеспечения минимального значения поддержки потока MPI_THREAD_SERIALIZED. То есть процесс может быть многопоточным, и несколько потоков могут выполнять вызовы MPI, но только по одному: вызовы MPI не выполняются одновременно из двух разных потоков (все вызовы MPI сериализуются). Однако ProcessGroupMPI будет поддерживать только одну группу процессов, если используется MPI_THREAD_SERIALIZED. Другими словами, глобально не может быть создано более 1 группы процессов.
-
Если вы хотите использовать несколько ProcessGroupMPI, требуется, чтобы значение поддержки потока реализации MPI было кратным MPI\u thread\u, то есть несколько потоков могут вызывать MPI без каких-либо ограничений.
-
Также обратите внимание, что ProcessGroupMPI поддерживает только одиночные тензорные операции. Другими словами, размер входного тензорного вектора всегда должен быть равен 1.
-
Если используемый MPI является MPI с поддержкой CUDA, то тензор CUDA может поддерживаться, и ProcessGroupMPI автоматически обнаружит эту поддержку.
// ProcessGroupMPI implements MPI bindings for c10d.
//
// All functions on this class are expected to be called in the same
// order across processes in the group. This is the only way that we
// can guarantee to match up the same calls across processes.
//
// All MPI functions provided by this class is asynchronously scheduled on a
// Worker thread. Therefore, ProcessGroupMPI requires the MPI implementation
// that is used to have a minimum thread support value of MPI_THREAD_SERIALIZED.
// That is, The process may be multi-threaded, and multiple threads may make
// MPI calls, but only one at a time: MPI calls are not made concurrently from
// two distinct threads (all MPI calls are serialized). However, with
// MPI_THREAD_SERIALIZED, ProcessGroupMPI will only support a singe process
// group. In other words, no more than 1 process group can be created globally.
//
// If you would like to use multiple ProcessGroupMPI, it requres your MPI
// implemenation to have a thread support value of MPI_THREAD_MULTIPLE, that is,
// multiple threads may call MPI, with no restriction.
//
// Also note that ProcessGroupMPI only supports a single Tensor operation. In
// other words, the size of the input Tensor vector should always be 1.
//
// CUDA tensor can be supported if the MPI used is CUDA-aware MPI, and
// ProcessGroupMPI will automatically detect this support.
class ProcessGroupMPI : public ProcessGroup {
public:
class WorkMPI : public ProcessGroup::Work {
public:
explicit WorkMPI(
std::vector<at::Tensor> outputTensors,
const char* profilingTitle = nullptr,
const c10::optional<std::vector<at::Tensor>>& inputTensors =
c10::nullopt)
: ProcessGroup::Work(-1, OpType::UNKNOWN, profilingTitle, inputTensors),
outputTensors_(std::move(outputTensors)),
future_(c10::make_intrusive<at::ivalue::Future>(
c10::ListType::create(c10::TensorType::get()))) {}
std::vector<at::Tensor> result() override;
c10::intrusive_ptr<c10::ivalue::Future> getFuture() override;
protected:
friend class ProcessGroupMPI;
private:
void finishWorkMPI();
void finishWorkMPIError(std::exception_ptr eptr);
std::vector<at::Tensor> outputTensors_;
c10::intrusive_ptr<at::ivalue::Future> future_;
};
class AsyncWork : public ProcessGroup::Work {
public:
AsyncWork(
MPI_Request request,
std::vector<at::Tensor> outputTensors,
const char* profilingTitle = nullptr,
const c10::optional<std::vector<at::Tensor>>& inputTensors =
c10::nullopt);
virtual ~AsyncWork();
bool isCompleted() override;
bool isSuccess() const override;
int sourceRank() const override;
bool wait(std::chrono::milliseconds timeout = kUnsetTimeout) override;
void abort() override;
std::vector<at::Tensor> result() override;
protected:
void populateException();
private:
const std::vector<at::Tensor> outputTensors_;
MPI_Request request_;
MPI_Status status_;
};
// Constructor will spawn up the worker thread loop
explicit ProcessGroupMPI(int rank, int size, MPI_Comm pgComm);
virtual ~ProcessGroupMPI();
protected:
using WorkType =
std::tuple<std::unique_ptr<WorkEntry>, c10::intrusive_ptr<WorkMPI>>;
// Worker thread loop
void runLoop();
// Helper function that is called by the destructor
void destroy();
c10::intrusive_ptr<ProcessGroup::Work> enqueue(
std::unique_ptr<WorkEntry> entry,
const char* profilingTitle = nullptr,
const c10::optional<std::vector<at::Tensor>>& inputTensors = c10::nullopt);
bool stop_;
std::mutex pgMutex_;
std::thread workerThread_;
std::deque<WorkType> queue_;
std::condition_variable queueProduceCV_;
std::condition_variable queueConsumeCV_;
// Global states
static void initMPIOnce();
static void mpiExit();
static std::once_flag onceFlagInitMPI;
static std::mutex pgGlobalMutex_;
static int mpiThreadSupport_;
MPI_Comm pgComm_;
};
4.2.2 Инициализация
Метод createProcessGroupMPI завершает инициализацию группы процессов, которая в основном вызывает общие подпрограммы программирования MPI, такие как initMPIOnce, MPI_Comm_create, MPI_Barrier и подобные.
c10::intrusive_ptr<ProcessGroupMPI> ProcessGroupMPI::createProcessGroupMPI(
std::vector<int> ranks) {
// Once initialization
initMPIOnce();
MPI_Comm groupComm = MPI_COMM_WORLD;
int rank = -1;
int size = -1;
{
std::lock_guard<std::mutex> globalLock(pgGlobalMutex_);
// If no ranks are specified, assume we're creating the root group
if (!ranks.empty()) {
MPI_Group worldGroup;
MPI_Group ranksGroup;
MPI_CHECK(MPI_Comm_group(MPI_COMM_WORLD, &worldGroup));
MPI_CHECK(
MPI_Group_incl(worldGroup, ranks.size(), ranks.data(), &ranksGroup));
constexpr int kMaxNumRetries = 3;
bool groupComm_updated = false;
MPI_Barrier(MPI_COMM_WORLD);
for (int i = 0; i < kMaxNumRetries; ++i) {
if (MPI_Comm_create(MPI_COMM_WORLD, ranksGroup, &groupComm)) {
groupComm_updated = true;
break;
}
}
MPI_CHECK(groupComm_updated);
MPI_CHECK(MPI_Group_free(&worldGroup));
MPI_CHECK(MPI_Group_free(&ranksGroup));
}
// Fetch rank and world size for this group (MPI_COMM_WORLD or new)
if (groupComm != MPI_COMM_NULL) {
MPI_CHECK(MPI_Comm_rank(groupComm, &rank));
MPI_CHECK(MPI_Comm_size(groupComm, &size));
}
}
// If this process is not part of the group, we don't construct a
// process group instance. This is in line with the semantics of the
// other process group types.
if (groupComm == MPI_COMM_NULL) {
return c10::intrusive_ptr<ProcessGroupMPI>(); // 生成
}
return c10::make_intrusive<ProcessGroupMPI>(rank, size, groupComm); // 生成
}
4.2.2.1 initMPIOnce
API MPI_Init_thread вызывается для инициализации среды исполнения MPI.
void ProcessGroupMPI::initMPIOnce() {
// Initialize MPI environment
std::call_once(onceFlagInitMPI, []() {
MPI_CHECK(MPI_Init_thread(
nullptr, nullptr, MPI_THREAD_SERIALIZED, &mpiThreadSupport_));
if (mpiThreadSupport_ < MPI_THREAD_SERIALIZED) {
throw std::runtime_error(
"Used MPI implementation doesn't have the "
"minimum level of threading support: "
"MPI_THREAD_SERIALIZED. This is required by "
"c10d package");
}
if (std::atexit(ProcessGroupMPI::mpiExit)) {
throw std::runtime_error("Fail to register the MPI exit handler");
}
});
4.2.2.2 ProcessGroupMPI
WorkerThread_ генерируется в методе сборки ProcessGroupMPI, который запускает runLoop.
ProcessGroupMPI::ProcessGroupMPI(int rank, int size, MPI_Comm pgComm)
: ProcessGroup(rank, size), stop_(false), pgComm_(pgComm) {
if (pgComm_ == MPI_COMM_NULL) {
throw std::runtime_error("pgComm_ must not be MPI_COMM_NULL");
}
// Start the worker thread accepting MPI calls
workerThread_ = std::thread(&ProcessGroupMPI::runLoop, this);
}
4.2.3 Бег
4.2.3.1 Выполнение упаковки
Здесь есть две инкапсуляции: WorkEntry инкапсулирует выполнение вычисления, а WorkMPI инкапсулирует результат выполнения вычисления (поскольку вычисление является асинхронным). детали следующим образом:
WorkEntry — это инкапсуляция метода выполнения, или каждый раз, когда необходимо выполнить коллективную коммуникационную операцию, она должна быть инкапсулирована здесь.
struct WorkEntry {
explicit WorkEntry(
std::vector<at::Tensor>* srcPtr,
std::vector<at::Tensor>* dstPtr,
std::function<void(std::unique_ptr<WorkEntry>&)> run)
: dst(dstPtr ? *dstPtr : std::vector<at::Tensor>()),
run(std::move(run)) {
if (srcPtr) {
src = *srcPtr;
}
}
// Not copyable
WorkEntry(const WorkEntry&) = delete;
// Not copy assignable
WorkEntry& operator=(const WorkEntry&) = delete;
// For input and output tensors (in-place), we will always use src
std::vector<at::Tensor> src;
// Copy of user provided outputs.
const std::vector<at::Tensor> dst;
// src rank returned, for recv only
int* srcRank = nullptr;
std::function<void(std::unique_ptr<WorkEntry>&)> run;
};
WorkMPI — это инкапсуляция результата выполнения.
class WorkMPI : public ProcessGroup::Work {
public:
explicit WorkMPI(
std::vector<at::Tensor> outputTensors,
const char* profilingTitle = nullptr,
const c10::optional<std::vector<at::Tensor>>& inputTensors =
c10::nullopt)
: ProcessGroup::Work(-1, OpType::UNKNOWN, profilingTitle, inputTensors),
outputTensors_(std::move(outputTensors)),
future_(c10::make_intrusive<at::ivalue::Future>(
c10::ListType::create(c10::TensorType::get()))) {}
std::vector<at::Tensor> result() override;
c10::intrusive_ptr<c10::ivalue::Future> getFuture() override;
protected:
friend class ProcessGroupMPI;
private:
void finishWorkMPI();
void finishWorkMPIError(std::exception_ptr eptr);
std::vector<at::Tensor> outputTensors_;
c10::intrusive_ptr<at::ivalue::Future> future_;
};
При вставке в рабочую очередь фактически вставляется двойка (WorkEntry, WorkMPI), и мы объясним, как ее использовать позже.
4.2.3.2 allreduce
Возьмите allreduce в качестве примера, чтобы увидеть, как это обрабатывается. Это инкапсулировать MPI_Allreduce в WorkEntry и вставить его в очередь.
В последующем цикле выполнения WorkEntry удаляется, а затем запускается MPI_Allreduce.
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupMPI::allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts) {
checkSingleTensor(tensors);
std::function<void(std::unique_ptr<WorkEntry>&)> runFunc =
[opts, this](std::unique_ptr<WorkEntry>& entry) {
auto data = (entry->src)[0];
c10::DeviceGuard guard(data.device());
std::unique_lock<std::mutex> globalLock(pgGlobalMutex_);
MPI_CHECK(MPI_Allreduce( // 封装了此函数
MPI_IN_PLACE,
data.data_ptr(),
data.numel(),
mpiDatatype.at(data.scalar_type()),
mpiOp.at(opts.reduceOp),
pgComm_));
};
auto entry = std::make_unique<WorkEntry>(&tensors, &tensors, std::move(runFunc));
return enqueue(
std::move(entry),
"mpi:all_reduce",
c10::optional<std::vector<at::Tensor>>(tensors));
}
4.2.3.3 enqueue
Метод enqueue заключается в том, чтобы вставить в очередь двухкортежность (WorkEntry, WorkMPI), а запись->dst в ней — сохранить результат расчета в WorkMPI.
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupMPI::enqueue(
std::unique_ptr<WorkEntry> entry,
const char* profilingTitle,
const c10::optional<std::vector<at::Tensor>>& inputTensors) {
// 生成 WorkMPI,把 entry->dst 就是 计算结果存放到 WorkMPI 之中
auto work = c10::make_intrusive<WorkMPI>(entry->dst, profilingTitle, inputTensors);
std::unique_lock<std::mutex> lock(pgMutex_);
// 插入二元组
queue_.push_back(std::make_tuple(std::move(entry), work));
lock.unlock();
queueProduceCV_.notify_one();
return work;
}
4.2.3.4 runLoop
Метод runLoop основного цикла заключается в непрерывном извлечении записи для обработки.
void ProcessGroupMPI::runLoop() {
std::unique_lock<std::mutex> lock(pgMutex_);
while (!stop_) {
if (queue_.empty()) {
queueProduceCV_.wait(lock);
continue;
}
auto workTuple = std::move(queue_.front());
queue_.pop_front();
auto& workEntry = std::get<0>(workTuple); // 进行计算
auto& work = std::get<1>(workTuple); // 拿到WorkMPI
lock.unlock();
queueConsumeCV_.notify_one();
try {
workEntry->run(workEntry);
work->finishWorkMPI(); // 会等待WorkMPI的计算结果
} catch (...) {
work->finishWorkMPIError(std::current_exception());
}
lock.lock();
}
}
FinishWorkMPI будет помечен и уведомлен.
void ProcessGroupMPI::WorkMPI::finishWorkMPI() {
future_->markCompleted(at::IValue(outputTensors_));
finish();
}
Код базового класса выглядит следующим образом:
void ProcessGroup::Work::finish(std::exception_ptr exception) {
std::unique_lock<std::mutex> lock(mutex_);
completed_ = true;
exception_ = exception;
if (recordFunctionEndCallback_) {
recordFunctionEndCallback_();
recordFunctionEndCallback_ = nullptr;
}
lock.unlock();
cv_.notify_all();
}
Подробности следующие:
+
Worker 1 | Worker 2
|
|
|
+-----------------+ +--------------------------------------+ | +------------------------------------+ +---------------+
| Main Thread | | ProcessGroupMPI | | | ProcessGroupMPI | | Main Thread |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | +--------------------------------+ | | | +------------------------------+ | | |
| | | | runLoop workerThread_ | | | | | runloop workerThread_ | | | |
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| +---------+ | | | +-------------------------+ | | | | | +-----------------------+ | | | |
| | | | allreduce | | | queue_ | | | | | | | queue_ | | | allreduce | +---------+ |
| | Reducer | +-------------------> | | | | | | | | | <-------------------+ | | |
| | | | | | | | | | | | | | | | | | | Reducer | |
| +---------+ | | | | +-------------------+ | | | | | | | +-----------------+ | | | | | | |
| | | | | |WorkEntry | | | | | | | | | WorkEntry | | | | | +---------+ |
| | | | | | | | | | | | | | | | | | | | |
| | | | | | MPI_Allreduce <-----------------------------> MPI_Allreduce| | | | | |
| | | | | | | | | | | | | | | | | | | | |
| | | | | +-------------------+ | | | | | | | +-----------------+ | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | +-------------------------+ | | | | | +-----------------------+ | | | |
| | | | | | | | | | | | |
| | | +--------------------------------+ | | | +------------------------------+ | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
+-----------------+ +--------------------------------------+ | +------------------------------------+ +---------------+
|
|
+
Телефон такой:
4.4 Упаковка
PyTorch инкапсулирует различные группы процессов, так что пользователи могут вызывать GroupMember.WORLD для выполнения различных операций, но пользователям это безразлично.
def _get_default_group():
"""
Getting the default process group created by init_process_group
"""
if not is_initialized():
raise RuntimeError("Default process group has not been initialized, "
"please make sure to call init_process_group.")
return GroupMember.WORLD
Другой пример: в torch/distributed/distributed_c10d.py вы можете увидеть такие функции, как all_to_all и all_gather, в следующих методах Комментарии очень подробные (здесь опущены из-за нехватки места), и вы можете изучить их самостоятельно, если вы интересно.
def all_to_all(output_tensor_list,
input_tensor_list,
group=None,
async_op=False):
"""
Each process scatters list of input tensors to all processes in a group and
return gathered list of tensors in output list.
Args:
output_tensor_list (list[Tensor]): List of tensors to be gathered one
per rank.
input_tensor_list (list[Tensor]): List of tensors to scatter one per rank.
group (ProcessGroup, optional): The process group to work on. If None,
the default process group will be used.
async_op (bool, optional): Whether this op should be an async op.
Returns:
Async work handle, if async_op is set to True.
None, if not async_op or if not part of the group.
"""
if _rank_not_in_group(group):
return
opts = AllToAllOptions()
_check_tensor_list(output_tensor_list, "output_tensor_list")
_check_tensor_list(input_tensor_list, "input_tensor_list")
if group is None:
default_pg = _get_default_group()
work = default_pg.alltoall(output_tensor_list, input_tensor_list, opts)
else:
work = group.alltoall(output_tensor_list, input_tensor_list, opts)
if async_op:
return work
else:
work.wait()
Код all_gather выглядит следующим образом:
def all_gather(tensor_list,
tensor,
group=None,
async_op=False):
"""
Gathers tensors from the whole group in a list.
Complex tensors are supported.
Args:
tensor_list (list[Tensor]): Output list. It should contain
correctly-sized tensors to be used for output of the collective.
tensor (Tensor): Tensor to be broadcast from current process.
group (ProcessGroup, optional): The process group to work on. If None,
the default process group will be used.
async_op (bool, optional): Whether this op should be an async op
Returns:
Async work handle, if async_op is set to True.
None, if not async_op or if not part of the group
"""
_check_tensor_list(tensor_list, "tensor_list")
_check_single_tensor(tensor, "tensor")
if _rank_not_in_group(group):
return
tensor_list = [t if not t.is_complex() else torch.view_as_real(t) for t in tensor_list]
tensor = tensor if not tensor.is_complex() else torch.view_as_real(tensor)
if group is None:
default_pg = _get_default_group()
work = default_pg.allgather([tensor_list], [tensor])
else:
work = group.allgather([tensor_list], [tensor])
if async_op:
return work
else:
work.wait()
На данный момент введение группы процессов завершено. Ждите следующего документа по нашему анализу DDP.
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
ссылка 0xFF
pytorch (распределенные) данные параллельной личной практики - DataParallel/DistributedDataParallel
у-у-у-у. tresens.co/2019/04/04/…
DISTRIBUTED TRAINING WITH UNEVEN INPUTS USING THE JOIN CONTEXT MANAGER